Skip to content

Loop Stepper Module

The Loop Stepper module is a critical component of the Azad system that handles the execution of individual steps in the agent loop. It's important to understand that each loop step is only one step and requires to be run in a loop at your own implementation.

Overview

The loop_stepper.py module provides the core functionality for executing a single step of the agent's reasoning and action cycle. It:

  1. Processes streaming responses from the language model
  2. Executes tool calls
  3. Manages the task state
  4. Emits events to the client

The main function in this module is step(), which takes a task, task configuration, environment, and other parameters, and executes a single step of the agent loop.

Key Concepts

Single-Step Execution

The most important concept to understand about the Loop Stepper is that it executes only one step of the agent loop. This means:

  • It processes one response from the language model
  • It executes any tool calls in that response
  • It updates the task state
  • It returns control to the caller

To implement a complete agent loop, you need to call step() repeatedly until the task is completed or an error occurs.

Event-Driven Architecture

The Loop Stepper uses an event-driven architecture to communicate with the client. It:

  1. Emits events as they occur (e.g., tool calls, tool results, errors)
  2. Allows the client to react to these events in real-time
  3. Provides a streaming interface for the language model response

Tool Execution Flow

When the language model calls a tool, the Loop Stepper:

  1. Validates the tool call
  2. Requests approval from the user (if required)
  3. Executes the tool
  4. Returns the result to the language model in the next step

Error Handling

The Loop Stepper includes robust error handling to ensure that the agent can recover from errors and continue execution. It:

  1. Catches and logs exceptions
  2. Updates the task state appropriately
  3. Emits error events to the client
  4. Cleans up resources

Cancellation Support

The Loop Stepper supports cancellation through a cancellation token. This allows the client to cancel a step that is taking too long or is no longer needed.

Implementation Details

The step() function is the main entry point for the Loop Stepper. It:

  1. Initializes the task state and environment
  2. Creates a dialect based on the task configuration
  3. Sends the current task state to the language model
  4. Processes the language model's response
  5. Executes any tool calls
  6. Updates the task state
  7. Emits events to the client

The read_streaming_tool_details() function processes the streaming response from the language model and extracts tool calls and other content.

Usage Example

Here's a simplified example of how to use the Loop Stepper:

async def run_agent_loop(task, task_config, environment):
    iteration = 0
    while task.state == TaskState.running:
        iteration += 1
        await loop_stepper.step(
            task=task,
            task_config=task_config,
            environment=environment,
            emit_event=handle_event,
            iteration=iteration,
            cancellation_token=None
        )
        # Process events, update UI, etc.

Important Considerations

When working with the Loop Stepper:

  1. Single Step: Remember that each call to step() is only one step of the agent loop. You need to call it repeatedly to complete a task.

  2. Event Handling: Implement proper event handling to process events emitted by the Loop Stepper.

  3. Cancellation: Use the cancellation token to cancel a step if needed.

  4. Error Handling: Handle errors appropriately to ensure the agent can recover and continue execution.

  5. Resource Cleanup: Ensure that resources are properly cleaned up, especially when cancelling a step.

API Reference

azad.loop_stepper

Attributes

SEPERATOR_TOP module-attribute

SEPERATOR_TOP = '=' * 80

SEPERATOR_BOTTOM module-attribute

SEPERATOR_BOTTOM = '-' * 80

logger module-attribute

logger = getLogger(__name__)

Classes

Functions

step async

step(task: Task, task_config: TaskConfig, environment: Environment, emit_event: Callable[[AINetworkEventUnion], None], iteration=0, cancellation_token: Optional[Event] = None)
Source code in azad/loop_stepper.py
async def step(task:Task, task_config: TaskConfig, environment:Environment, emit_event: Callable[[AINetworkEventUnion], None],
               iteration=0, cancellation_token: Optional[asyncio.Event] = None):

    # Add logging for cancellation token to help with debugging
    token_id = id(cancellation_token) if cancellation_token else None
    logger.log(LogLevel.PROD, SEPERATOR_TOP, extra={"event": "separator"})
    logger.log(LogLevel.PROD, f"Starting iteration {iteration} with cancellation token id {token_id}",
               extra={"event": "task", "iteration": iteration, "token_id": token_id})

    assistant_id = nanoid.generate()
    last_message_id = task.messages[-1].id if task.messages else None

    try:
        current_mindmap_config: TaskConfigPart = task.current_task_config()
    except ValueError as e:
        logger.error(f"Failed to get current task config: {e}. Aborting step.")
        task.state = TaskState.failed
        emit_event(ainetwork_types.NetworkConnectionInterrupted(message=f"Configuration error: {e}", errName="ConfigError", origErrName="ValueError"))
        emit_event(ainetwork_types.StepEndEvent(new_messages=[], updated_state=task.state,log=
                                                f"Failed to get current task config: {e}. Aborting step."))
        return

    dialect_name = current_mindmap_config.dialect_name
    dialect_options = current_mindmap_config.dialect_options or {}

    try:
        reasoning_effort: Optional[Literal['low', 'medium', 'high']] = None
        if current_mindmap_config.reasoning_effort is not None and current_mindmap_config.reasoning_effort in ['low', 'medium', 'high']:
            reasoning_effort = current_mindmap_config.reasoning_effort # type: ignore
        dialect: Dialect = PromptDialectRegistry.create(dialect_name or "xml", **dialect_options)
        ai_network: AINetwork = AINetwork()
        network_config: ainetwork_types.NetworkConfig = ainetwork_types.NetworkConfig(
            model=current_mindmap_config.model_name,
            max_tokens=current_mindmap_config.max_tokens,
            api_key=task_config.model_api_key,
            thinking_budget_tokens=current_mindmap_config.thinking_budget_tokens,
            reasoning_effort=reasoning_effort,
            enable_parallel_tools=getattr(current_mindmap_config, "enable_parallel_tools", True),
            api_base=current_mindmap_config.api_base,
            enable_search=current_mindmap_config.enable_search,
        )
    except Exception as e:
        logger.error(f"Failed to initialize dialect or network config: {e}. Aborting step.")
        task.state = TaskState.failed
        emit_event(ainetwork_types.NetworkConnectionInterrupted(message=f"Initialization error: {e}", errName="InitError", origErrName=type(e).__name__))
        emit_event(ainetwork_types.StepEndEvent(new_messages=[], updated_state=task.state,log=f"Failed to initialize dialect or network config: {e}. Aborting step."))
        return

    prompt_data: PromptData = PromptData(
        messages=task.messages,
        dyanmic_task_config=task_config,
        task_config=current_mindmap_config,
        tool_metadata=current_mindmap_config.tool_metadata,
        current_assistant_id=assistant_id
    )

    task.state = TaskState.running
    assistant_response: Optional[StepResponse] = None
    event_stream: Optional[AsyncIterator[AINetworkEventUnion]] = None
    assistant_message: Optional[AssistantMessage] = None

    try:
        # Check cancellation before making request
        if cancellation_token and cancellation_token.is_set():
            logger.info(f"Cancellation detected before AINetwork request (token id: {token_id})")
            raise CancelledError("Operation cancelled before AINetwork request")

        logger.debug(f"Making AINetwork request with cancellation token id {token_id}")
        event_stream = ai_network.make_request(
            prompt_data=prompt_data, task=task, dialect=dialect,
            config=network_config, assistant_id=assistant_id,
        )

        logger.debug("Waiting for tool call details from stream...")
        assistant_response = await read_streaming_tool_details(
            event_stream, emit_event, assistant_id, cancellation_token, current_mindmap_config
        )
        logger.debug(f"Raw assistant_response from read_streaming: {assistant_response.model_dump_json(indent=2)}")

        assistant_message = task.add_assistant_message(assistant_id)
        if assistant_response.reasoning:
            # Ensure signature is present if reasoning exists (might be needed by model)
            if not assistant_response.reasoning.signature:
                logger.warning("Reasoning part missing signature, adding empty string.")
                assistant_response.reasoning.signature = "" # Add empty string if None
            task.add_reasoning_part(
                message_id=assistant_message.id,
                reasoning=assistant_response.reasoning.reasoning,
                signature=assistant_response.reasoning.signature
            )
        if assistant_response.text:
            task.add_text_part(message_id=assistant_message.id, text=assistant_response.text)

        # Check cancellation after stream processing
        if cancellation_token and cancellation_token.is_set():
            logger.info(f"Cancellation detected after stream processing (token id: {token_id})")
            raise CancelledError("Operation cancelled after stream processing")

        tools_to_process: List[ToolCallPart] = []
        is_multi_call = False # Track if it was originally a multi-call request
        if assistant_response.multi_tool_calls:
            tools_to_process = assistant_response.multi_tool_calls
            is_multi_call = True # Set flag
            logger.debug(f"[SEQ-MULTI-TOOL] Received {len(tools_to_process)} tool calls to process sequentially.")
        elif assistant_response.tool_call:
            tools_to_process = [assistant_response.tool_call]
            logger.debug("[SINGLE-TOOL] Received 1 tool call to process.")
        else:
            logger.warning("Assistant response did not contain any tool calls.")
            # Ensure assistant message exists before trying to add informational message or update status
            if assistant_message:
                 task.add_informational_message("warning", "The assistant did not request any tool calls.", is_visible_ai=True, is_visible_ui=False, timestamp=assistant_message.started_ts + 0.001) # Add timestamp offset
                 assistant_message.status = "completed"
            task.state = TaskState.running

        all_tools_succeeded = True
        executed_tool_names = []
        executed_tool_ids = []
        failed_tool_details = []

        for index, tool_call in enumerate(tools_to_process):
            # --- Start of loop processing a single tool_call ---
            # Check for cancellation before processing each tool
            if cancellation_token and cancellation_token.is_set():
                logger.info(f"Cancellation detected before processing tool {index+1}: {tool_call.tool_name} (token id: {token_id})")
                raise CancelledError(f"Operation cancelled before processing tool {index+1}: {tool_call.tool_name}")

            tool_name = tool_call.tool_name
            execution_id = tool_call.tool_call_id
            tool_args = tool_call.args or {} # Ensure args is a dict, default to empty

            try: args_str = json.dumps(tool_args, indent=2); logger.debug(f"Processing ToolCallPart:\n Name: {tool_name}\n ID: {execution_id}\n Args:\n{args_str}")
            except Exception: logger.debug(f"Processing ToolCallPart: Name={tool_name}, ID={execution_id}, Args={tool_args} (JSON dump failed)")

            log_prefix = f"[SEQ-TOOL {index+1}/{len(tools_to_process)}]"
            logger.debug(f"{log_prefix} Processing tool: {tool_name} (ID: {execution_id})")

            # Add ToolCallPart to mind map
            tool_call.is_loading = True
            task.add_tool_call_part(tool_call=tool_call, message_id=assistant_message.id)

            logger.log(LogLevel.PROD, "", extra={"event": "tool_request", "tool_name": tool_name, "parameters": tool_args, "tool_id": execution_id})

            # Emit ToolReady
            emit_event(ainetwork_types.AINetworkEventToolReady(tool_name=tool_name, tool_call_id=execution_id, args=tool_args))
            await asyncio.sleep(0.01) # Small delay to ensure ToolReady is processed

            # Get tool metadata early to check if it requires explicit approval
            tool_metadata = next((t for t in current_mindmap_config.tool_metadata if t.name == tool_name), None)
            if not tool_metadata:
                logger.error(f"{log_prefix} Tool '{tool_name}' metadata not found in config.")
                raise ValueError(f"Tool '{tool_name}' metadata not found in config.")

            # Check both auto_approve setting AND if tool allows auto-approval
            if getattr(task_config, "auto_approve", False) and not getattr(tool_metadata, "requires_approval", False):
                approval_data = ApprovalResponse(approved=True, feedback=None, images=[])
                logger.debug(f"{log_prefix} Auto-approving tool {tool_name} (ID: {execution_id})")
            else:
                # If either auto_approve is False OR tool requires approval, request approval
                approval_reason = "auto_approve disabled" if not getattr(task_config, "auto_approve", False) else "tool requires explicit approval"
                logger.debug(f"{log_prefix} Requesting approval for {tool_name} (ID: {execution_id}) - Reason: {approval_reason}")
                approval_response: EnvironmentResponse[ApprovalResponse] = await environment.approve_tool_use(tool_call)
                approval_data = approval_response.data

            # Find the part in the task again (safer in case of async issues)
            call_part_in_task: Optional[ToolCallPart] = None
            assistant_msg = task.get_message(assistant_message.id) # Re-fetch in case of updates
            if assistant_msg and isinstance(assistant_msg, AssistantMessage):
                 for part in assistant_msg.content:
                     if isinstance(part, ToolCallPart) and part.tool_call_id == execution_id:
                         call_part_in_task = part; break

            if not call_part_in_task:
                logger.error(f"{log_prefix} Could not find ToolCallPart {execution_id} in task to update status post-approval!")
                all_tools_succeeded = False; failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": "Internal error: ToolCallPart vanished"}); continue

            # Handle Approval Result
            is_approved_by_env = approval_data and approval_data.approved
            feedback = approval_data.feedback if approval_data else "No approval feedback."
            images = approval_data.images if approval_data else []
            call_part_in_task.is_approval_pending = False # Update status in task

            if not is_approved_by_env:
                logger.debug(f"{log_prefix} Tool '{tool_name}' rejected. Feedback: {feedback}")
                call_part_in_task.is_loading = False # Update status
                task.answer_tool_call(tool_call=call_part_in_task, result={"message": feedback or "Tool use rejected."}, is_error=False, is_approved=False, observer_results=[])
                all_tools_succeeded = False; failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": f"Rejected: {feedback}"})
                if feedback or images: task.add_human_message(text=feedback, images=images or None)
                continue # Move to next tool in sequence

            # --- Approved: Execute ---
            logger.debug(f"{log_prefix} Tool '{tool_name}' approved. Executing...")
            try:
                env_response = await environment.execute_tool(call_part_in_task)
                tool_result_data = env_response.data

                # Handle execution result (success, env error, reported error)
                if tool_result_data is None or not env_response.success:
                    error_msg = env_response.message or "Tool execution failed in environment"
                    logger.error(f"{log_prefix} Tool '{tool_name}' execution failed: {error_msg}")
                    call_part_in_task.is_loading = False # Update status
                    task.answer_tool_call(tool_call=call_part_in_task, result={"message": error_msg}, is_error=True, is_approved=True, observer_results=[])
                    all_tools_succeeded = False; failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": f"Execution failed: {error_msg}"}); continue

                # --- Execution Succeeded (or tool reported error) ---
                logger.debug(f"{log_prefix} Tool '{tool_name}' execution completed.")
                call_part_in_task.is_loading = False # Update status

                # --- Add ToolResultPart via answer_tool_call ---
                # This is where the individual result is added to the mind map
                tool_msg = task.answer_tool_call(
                    tool_call=call_part_in_task,
                    result=tool_result_data.result,
                    is_error=tool_result_data.is_error,
                    is_approved=True, # It was approved
                    observer_results=tool_result_data.observer_results or []
                )
                # ----------------------------------------------------

                executed_tool_names.append(tool_name); executed_tool_ids.append(execution_id)

                # Handle reported errors
                if tool_result_data.is_error:
                     all_tools_succeeded = False
                     error_detail = tool_result_data.error or str(tool_result_data.result.get("message", "Unknown error"))
                     failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": f"Reported error: {error_detail}"})
                     logger.warning(f"{log_prefix} Tool '{tool_name}' reported an error: {error_detail}")

                # Handle Task Entry/Exit (unchanged)
                is_task_entry = tool_metadata.is_task_entry
                is_task_exit = tool_metadata.is_task_exit
                tool_data = tool_result_data.result
                if is_task_entry and not tool_result_data.is_error:
                    try:
                        config_values = extract_parameters_by_type(tool_result=tool_data, tool_metadata=tool_metadata, param_type=TaskParameterType.TASK_CONFIG)
                        description_values = extract_parameters_by_type(tool_result=tool_data, tool_metadata=tool_metadata, param_type=TaskParameterType.TASK_DESCRIPTION)
                        # print(config_values)
                        print(description_values)
                        if description_values:
                            # Apply to task using generic schema
                            entry_schema = TaskEntryToolSchema(
                                task_description=description_values[0],
                                initial_task_config=config_values[0]
                            )
                            entry_schema.apply_to_task(task)
                    except Exception as e: logger.error(f"{log_prefix} TaskEntry schema failed: {e}"); task.add_informational_message("error", f"Task Entry '{tool_name}' failed: {e}.", is_visible_ai=True, is_visible_ui=True); all_tools_succeeded=False; failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": f"TaskEntry failed: {e}"})
                if is_task_exit and not tool_result_data.is_error:
                    level = task.current_task_level(); logger.info(f"{log_prefix} TaskExit tool {tool_name} at level {level}.")
                    if level == 0: task.state = TaskState.completed; logger.info("Task completed by exit tool.")
                    elif level > 0:
                        try:
                            handoff_values = extract_parameters_by_type(tool_data, tool_metadata, TaskParameterType.HANDOFF_DESCRIPTION)
                            param_name = get_parameter_key_name(tool_metadata, TaskParameterType.HANDOFF_DESCRIPTION)
                            if handoff_values:
                                exit_schema = TaskExitToolSchema(
                                    handoff_description=handoff_values[0]['handoff_description']
                                )
                                exit_message, tool_message = exit_schema.apply_to_task(task,result_key=param_name if param_name else "task_result")
                                # emit the update to tool message
                                if tool_message:
                                    emit_event(ainetwork_types.TaskMessageEvent(
                                        message=tool_message
                                    ))
                                logger.debug("Exiting subtask via parameter types")
                        except Exception as e:
                            logger.error(f"Failed to exit nested task with predefined schema: {str(e)}")
                            # Fallback to direct method call if schema validation fails
                            _, tool_message = task.add_task_exit(
                                handoff_description=tool_data.get("handoff_description", []),
                            )

            except CancelledError: raise
            except Exception as e:
                # Handle unexpected errors during execution/result processing
                logger.exception(f"{log_prefix} Unexpected error processing tool '{tool_name}': {e}")
                if call_part_in_task: call_part_in_task.is_loading = False
                task.answer_tool_call(tool_call=call_part_in_task if call_part_in_task else tool_call, result={"message": f"Unexpected error: {e}"}, is_error=True, is_approved=True, observer_results=[])
                all_tools_succeeded = False; failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": f"Unexpected error: {e}"}); continue

            # --- End of loop processing a single tool_call ---

        # --- After processing all tools in the sequence ---
        if tools_to_process:
             if not is_multi_call:
                  logger.debug(f"[SINGLE-TOOL] Processing complete for: {executed_tool_names[0] if executed_tool_names else 'None'}")

             # Mark the assistant message as completed now that all its tools are processed
             if assistant_message:
                 assistant_message.status = "completed"
                 assistant_message.finished_ts = int(time.time())

             # Keep task running unless a tool explicitly completed it
             if task.state != TaskState.completed:
                 task.state = TaskState.running # Stay running even if some tools failed

             # Emit ONE ContentComplete event after the sequence (or single tool)
             logger.debug("Emitting final ContentComplete for the step.")
             emit_event(ainetwork_types.AINetworkEventContentComplete())


    except CancelledError as e:
        logger.info(f"Operation cancelled: {e} (token id: {token_id})")
        task.state = TaskState.aborted
        if assistant_message: assistant_message.status = "interrupted"
        emit_event(ainetwork_types.NetworkConnectionInterrupted(message=str(e) or "Cancelled", errName="UserAborted", origErrName="CancelledError"))
        if assistant_message:
            for part in assistant_message.content:
                 if isinstance(part, ToolCallPart) and part.is_loading: part.is_loading = False

    except Exception as e:
        logger.exception(f"Unhandled exception during step execution: {e}")
        task.state = TaskState.failed
        if assistant_message: assistant_message.status = "error"
        emit_event(ainetwork_types.NetworkConnectionInterrupted(message=f"Internal error: {e}", errName="StepExecutionError", origErrName=type(e).__name__))

    finally:
        try:
             new_messages = task.get_updates(last_message_id)
             logger.debug(f"Found {len(new_messages)} new messages to emit.")
             emit_event(ainetwork_types.StepEndEvent(new_messages=new_messages, updated_state=task.state,log="Reached the end of the stream correctly"))
        except Exception as e:
             logger.error(f"Failed to get or emit mindmap updates: {e}")
             emit_event(ainetwork_types.StepEndEvent(new_messages=[], updated_state=TaskState.failed,log=f"Failed to get or emit mindmap updates: {e}"))
             if task.state not in [TaskState.failed, TaskState.aborted]: task.state = TaskState.failed

        # --- Clean up (unchanged) ---
        if event_stream:
            try: await event_stream.aclose(); logger.debug("AINetwork event stream closed.")
            except Exception as e: logger.error(f"Error closing event stream: {e}")

        logger.log(LogLevel.PROD, f"Iteration {iteration} finished with task state: {task.state.value}", extra={"event": "task", "iteration": iteration, "final_state": task.state.value})
        logger.log(LogLevel.PROD, SEPERATOR_BOTTOM, extra={"event": "separator"})
        return

read_streaming_tool_details async

read_streaming_tool_details(event_stream: AsyncIterator[AINetworkEventUnion], emit_event: Callable[[AINetworkEventUnion], None], assistant_response_id: str, cancellation_token: Optional[Event], config: TaskConfigPart) -> StepResponse
Source code in azad/loop_stepper.py
async def read_streaming_tool_details(
    event_stream: AsyncIterator[ainetwork_types.AINetworkEventUnion],
    emit_event: Callable[[AINetworkEventUnion], None],
    assistant_response_id: str,
    cancellation_token: Optional[asyncio.Event],
    config: TaskConfigPart
) -> StepResponse:
    # Add token_id tracking for this function too
    token_id = id(cancellation_token) if cancellation_token else None

    thinking_chunks: list[str] = []
    reasoning_part = ReasoningPart(reasoning="", signature=None)
    parsed_tools_in_stream: List[Dict[str, Any]] = []
    current_tool_name: Optional[str] = None
    current_tool_params: Dict[str, str] = {}
    current_parser_tool_id: Optional[str] = None # Store the ID from ToolName event


    try:
        async for event in event_stream:
            # Check cancellation during event stream processing
            if cancellation_token and cancellation_token.is_set():
                logger.info(f"Cancellation detected during event stream processing (token id: {token_id})")
                raise CancelledError("Operation cancelled during event stream processing")
            emit_event(event) # Emit all events downstream
            # --- DEBUGGING ---
            if isinstance(event, (ainetwork_types.AINetworkEventToolName, ainetwork_types.AINetworkEventParameterStart, ainetwork_types.AINetworkEventParameterChunk, ainetwork_types.AINetworkEventParameterEnd, ainetwork_types.AINetworkEventMultiToolStart, ainetwork_types.AINetworkEventMultiToolEnd)): logger.debug(f"--> Received Parser Event: {event}")
            # -----------------
            match event:
                case ainetwork_types.AINetworkEventMultiToolStart(): logger.debug("MultiToolStart received."); is_multi_tool_mode = True; parsed_tools_in_stream = []; current_tool_name = None; current_tool_params = {}; current_parser_tool_id = None
                case ainetwork_types.AINetworkEventToolName(tool_name=name, tool_call_id=parser_id):
                    logger.debug(f"ToolName: Name='{name}', ParserID='{parser_id}'")
                    if current_tool_name: logger.debug(f"Finalizing previous '{current_tool_name}'."); parsed_tools_in_stream.append({"name": current_tool_name, "params": current_tool_params.copy(), "parser_id": current_parser_tool_id})
                    current_tool_name = name; current_tool_params = {}; current_parser_tool_id = parser_id
                case ainetwork_types.AINetworkEventParameterStart(parameter=param):
                    if current_tool_name: logger.debug(f"ParamStart: Tool='{current_tool_name}', Param='{param}'"); current_tool_params.setdefault(param, "")
                    else: logger.warning(f"ParamStart for '{param}' outside tool.")
                case ainetwork_types.AINetworkEventParameterChunk(parameter=param, content=content):
                    if current_tool_name:
                        current_tool_params.setdefault(param, "") # Ensure key exists
                        current_tool_params[param] += content
                    else: logger.warning(f"ParamChunk for '{param}' outside tool.")
                case ainetwork_types.AINetworkEventParameterEnd(parameter=param):
                    if current_tool_name: logger.debug(f"ParamEnd: Tool='{current_tool_name}', Param='{param}'. Final value start: '{current_tool_params.get(param, '')[:50]}...'")
                    else: logger.warning(f"ParamEnd for '{param}' outside tool.")
                case ainetwork_types.AINetworkEventToolReady(): logger.debug("Ignoring ToolReady event.") # Ignore ToolReady
                case ainetwork_types.AINetworkEventMultiToolEnd():
                    logger.debug("MultiToolEnd received.")
                    if current_tool_name: logger.debug(f"Finalizing last tool '{current_tool_name}' at MultiToolEnd."); parsed_tools_in_stream.append({"name": current_tool_name, "params": current_tool_params.copy(), "parser_id": current_parser_tool_id}); current_tool_name = None; current_tool_params = {}; current_parser_tool_id = None
                case ainetwork_types.AINetworkEventTextChunk(content=content): thinking_chunks.append(content)
                case ainetwork_types.AIEventReasoningChunk(content=content, signature=signature): reasoning_part.reasoning += content; reasoning_part.signature = signature if signature else reasoning_part.signature
                case ainetwork_types.AINetworkConnectionEnded():
                    logger.debug("ConnectionEnded received.")
                    if current_tool_name: logger.debug(f"Finalizing tool '{current_tool_name}' due to ConnectionEnded."); parsed_tools_in_stream.append({"name": current_tool_name, "params": current_tool_params.copy(), "parser_id": current_parser_tool_id})
                    break
    except CancelledError: logger.info("Stream processing cancelled."); raise

    final_thinking = "".join(thinking_chunks) if thinking_chunks else None
    final_reasoning = reasoning_part if reasoning_part.reasoning else None
    final_multi_tool_calls: List[ToolCallPart] = []
    final_single_tool_call: Optional[ToolCallPart] = None

    if parsed_tools_in_stream:
        logger.debug(f"Generating final ToolCallParts for {len(parsed_tools_in_stream)} parsed tools.")
        original_multi_mode = len(parsed_tools_in_stream) > 1 # Remember if it was multi based on count
        for i, parsed_tool in enumerate(parsed_tools_in_stream):
             execution_id = parsed_tool.get("parser_id") or str(uuid.uuid4())
             final_params = parsed_tool["params"]
             try: logger.debug(f"Tool {i+1} ('{parsed_tool['name']}'): Final Params:\n{json.dumps(final_params, indent=2)}")
             except Exception: logger.debug(f"Tool {i+1} ('{parsed_tool['name']}'): Final Params: {final_params}")
             tool_part = ToolCallPart(tool_call_id=execution_id, tool_name=parsed_tool["name"], args=final_params, is_loading=True, is_approval_pending=True)
             final_multi_tool_calls.append(tool_part)
             logger.info(f"Generated ToolCallPart: Name='{tool_part.tool_name}', ExecutionID='{tool_part.tool_call_id}', ParserID='{parsed_tool.get('parser_id')}'")

        if len(final_multi_tool_calls) == 1 and not original_multi_mode: # Check original intent
             final_single_tool_call = final_multi_tool_calls[0]; final_multi_tool_calls = []
             logger.debug(f"Single tool detected ('{final_single_tool_call.tool_name}'), setting final_single_tool_call.")
        elif len(final_multi_tool_calls) > 1:
             logger.debug(f"Multiple tools ({len(final_multi_tool_calls)}) detected, setting final_multi_tool_calls.")

    return StepResponse(
        text=final_thinking,
        reasoning=final_reasoning,
        assistant_response_id=assistant_response_id,
        tool_call=final_single_tool_call,
        multi_tool_calls=final_multi_tool_calls if final_multi_tool_calls else None
    )

Modules