Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0029f81
wip restrcuturing agent executor and liteagent
lorenzejay Jan 9, 2026
dc3ae93
fix: handle None task in AgentExecutor to prevent errors
lorenzejay Jan 10, 2026
5cef85c
refactor: streamline AgentExecutor initialization by removing redunda…
lorenzejay Jan 10, 2026
13dc7e2
ensure executors work inside a flow due to flow in flow async structure
lorenzejay Jan 14, 2026
b7a13e1
refactor: enhance agent kickoff preparation by separating common logic
lorenzejay Jan 14, 2026
ae17178
linting and tests
lorenzejay Jan 14, 2026
5048d54
Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/enh-d…
lorenzejay Jan 14, 2026
38db734
fix test
lorenzejay Jan 14, 2026
341812d
refactor: improve test for Agent kickoff parameters
lorenzejay Jan 14, 2026
e9b8610
refactor: update test task guardrail process output for improved vali…
lorenzejay Jan 15, 2026
842a1db
test fix cassette
lorenzejay Jan 15, 2026
e4bd788
test fix cassette
lorenzejay Jan 15, 2026
3a6702e
working
lorenzejay Jan 15, 2026
6541f01
working cassette
lorenzejay Jan 15, 2026
09185ac
refactor: streamline agent execution and enhance flow compatibility
lorenzejay Jan 15, 2026
3a1deb1
fixed cassette
lorenzejay Jan 15, 2026
83c62a6
Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/enh-d…
lorenzejay Jan 15, 2026
601eda9
Enhance Flow Execution Logic
lorenzejay Jan 15, 2026
ad83e8a
Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/enh-d…
lorenzejay Jan 15, 2026
7f7b509
Enhance Agent and Flow Execution Logic
lorenzejay Jan 15, 2026
6405274
Enhance Flow Listener Logic and Agent Imports
lorenzejay Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
470 changes: 420 additions & 50 deletions lib/crewai/src/crewai/agent/core.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@


class CrewAgentExecutorMixin:
crew: Crew
crew: Crew | None
agent: Agent
task: Task
task: Task | None
iterations: int
max_iter: int
messages: list[LLMMessage]
Expand Down
5 changes: 3 additions & 2 deletions lib/crewai/src/crewai/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.agent_executor import AgentExecutor, CrewAgentExecutorFlow
from crewai.experimental.evaluation import (
AgentEvaluationResult,
AgentEvaluator,
Expand All @@ -23,8 +23,9 @@
__all__ = [
"AgentEvaluationResult",
"AgentEvaluator",
"AgentExecutor",
"BaseEvaluator",
"CrewAgentExecutorFlow",
"CrewAgentExecutorFlow", # Deprecated alias for AgentExecutor
"EvaluationScore",
"EvaluationTraceCallback",
"ExperimentResult",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from collections.abc import Callable
from collections.abc import Callable, Coroutine
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
from uuid import uuid4
Expand Down Expand Up @@ -37,6 +37,7 @@
handle_unknown_error,
has_reached_max_iterations,
is_context_length_exceeded,
is_inside_event_loop,
process_llm_response,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
Expand Down Expand Up @@ -73,13 +74,17 @@ class AgentReActState(BaseModel):
ask_for_human_input: bool = Field(default=False)


class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
"""Flow-based executor matching CrewAgentExecutor interface.
class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"""Flow-based agent executor for both standalone and crew-bound execution.

Inherits from:
- Flow[AgentReActState]: Provides flow orchestration capabilities
- CrewAgentExecutorMixin: Provides memory methods (short/long/external term)

This executor can operate in two modes:
- Standalone mode: When crew and task are None (used by Agent.kickoff())
- Crew mode: When crew and task are provided (used by Agent.execute_task())

Note: Multiple instances may be created during agent initialization
(cache setup, RPM controller setup, etc.) but only the final instance
should execute tasks via invoke().
Expand All @@ -88,8 +93,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def __init__(
self,
llm: BaseLLM,
task: Task,
crew: Crew,
agent: Agent,
prompt: SystemPromptResult | StandardPromptResult,
max_iter: int,
Expand All @@ -98,6 +101,8 @@ def __init__(
stop_words: list[str],
tools_description: str,
tools_handler: ToolsHandler,
task: Task | None = None,
crew: Crew | None = None,
step_callback: Any = None,
original_tools: list[BaseTool] | None = None,
function_calling_llm: BaseLLM | Any | None = None,
Expand All @@ -111,8 +116,6 @@ def __init__(

Args:
llm: Language model instance.
task: Task to execute.
crew: Crew instance.
agent: Agent to execute.
prompt: Prompt templates.
max_iter: Maximum iterations.
Expand All @@ -121,6 +124,8 @@ def __init__(
stop_words: Stop word list.
tools_description: Tool descriptions.
tools_handler: Tool handler instance.
task: Optional task to execute (None for standalone agent execution).
crew: Optional crew instance (None for standalone agent execution).
step_callback: Optional step callback.
original_tools: Original tool list.
function_calling_llm: Optional function calling LLM.
Expand All @@ -131,9 +136,9 @@ def __init__(
"""
self._i18n: I18N = i18n or get_i18n()
self.llm = llm
self.task = task
self.task: Task | None = task
self.agent = agent
self.crew = crew
self.crew: Crew | None = crew
self.prompt = prompt
self.tools = tools
self.tools_names = tools_names
Expand Down Expand Up @@ -178,7 +183,6 @@ def __init__(
else self.stop
)
)

self._state = AgentReActState()

def _ensure_flow_initialized(self) -> None:
Expand Down Expand Up @@ -264,7 +268,7 @@ def call_llm_and_parse(self) -> Literal["parsed", "parser_error", "context_error
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
response_model=None,
executor_context=self,
)

Expand Down Expand Up @@ -449,15 +453,25 @@ def recover_from_context_length(self) -> Literal["initialized"]:

return "initialized"

def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
def invoke(
self, inputs: dict[str, Any]
) -> dict[str, Any] | Coroutine[Any, Any, dict[str, Any]]:
"""Execute agent with given inputs.

When called from within an existing event loop (e.g., inside a Flow),
this method returns a coroutine that should be awaited. The Flow
framework handles this automatically.

Args:
inputs: Input dictionary containing prompt variables.

Returns:
Dictionary with agent output.
Dictionary with agent output, or a coroutine if inside an event loop.
"""
# Magic auto-async: if inside event loop, return coroutine for Flow to await
if is_inside_event_loop():
return self.invoke_async(inputs)

self._ensure_flow_initialized()

with self._execution_lock:
Expand Down Expand Up @@ -525,6 +539,87 @@ def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
finally:
self._is_executing = False

async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute agent asynchronously with given inputs.

This method is designed for use within async contexts, such as when
the agent is called from within an async Flow method. It uses
kickoff_async() directly instead of running in a separate thread.

Args:
inputs: Input dictionary containing prompt variables.

Returns:
Dictionary with agent output.
"""
self._ensure_flow_initialized()

with self._execution_lock:
if self._is_executing:
raise RuntimeError(
"Executor is already running. "
"Cannot invoke the same executor instance concurrently."
)
self._is_executing = True
self._has_been_invoked = True

try:
# Reset state for fresh execution
self.state.messages.clear()
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False

if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))

self.state.ask_for_human_input = bool(
inputs.get("ask_for_human_input", False)
)

# Use async kickoff directly since we're already in an async context
await self.kickoff_async()

formatted_answer = self.state.current_answer

if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)

if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)

self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
self._create_external_memory(formatted_answer)

return {"output": formatted_answer.output}

except AssertionError:
fail_text = Text()
fail_text.append("❌ ", style="red bold")
fail_text.append(
"Agent failed to reach a final answer. This is likely a bug - please report it.",
style="red",
)
self._console.print(fail_text)
raise
except Exception as e:
handle_unknown_error(self._printer, e)
raise
finally:
self._is_executing = False

def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> AgentAction | AgentFinish:
Expand Down Expand Up @@ -583,11 +678,14 @@ def _show_start_logs(self) -> None:
if self.agent is None:
raise ValueError("Agent cannot be None")

if self.task is None:
return

crewai_event_bus.emit(
self.agent,
AgentLogsStartedEvent(
agent_role=self.agent.role,
task_description=(self.task.description if self.task else "Not Found"),
task_description=self.task.description,
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
),
Expand Down Expand Up @@ -621,10 +719,12 @@ def _handle_crew_training_output(
result: Agent's final output.
human_feedback: Optional feedback from human.
"""
# Early return if no crew (standalone mode)
if self.crew is None:
return

agent_id = str(self.agent.id)
train_iteration = (
getattr(self.crew, "_train_iteration", None) if self.crew else None
)
train_iteration = getattr(self.crew, "_train_iteration", None)

if train_iteration is None or not isinstance(train_iteration, int):
train_error = Text()
Expand Down Expand Up @@ -806,3 +906,7 @@ def __get_pydantic_core_schema__(
requiring arbitrary_types_allowed=True.
"""
return core_schema.any_schema()


# Backward compatibility alias (deprecated)
CrewAgentExecutorFlow = AgentExecutor
Loading
Loading