async def step(task:Task, task_config: TaskConfig, environment:Environment, emit_event: Callable[[AINetworkEventUnion], None],
iteration=0, cancellation_token: Optional[asyncio.Event] = None):
# Add logging for cancellation token to help with debugging
token_id = id(cancellation_token) if cancellation_token else None
logger.log(LogLevel.PROD, SEPERATOR_TOP, extra={"event": "separator"})
logger.log(LogLevel.PROD, f"Starting iteration {iteration} with cancellation token id {token_id}",
extra={"event": "task", "iteration": iteration, "token_id": token_id})
assistant_id = nanoid.generate()
last_message_id = task.messages[-1].id if task.messages else None
try:
current_mindmap_config: TaskConfigPart = task.current_task_config()
except ValueError as e:
logger.error(f"Failed to get current task config: {e}. Aborting step.")
task.state = TaskState.failed
emit_event(ainetwork_types.NetworkConnectionInterrupted(message=f"Configuration error: {e}", errName="ConfigError", origErrName="ValueError"))
emit_event(ainetwork_types.StepEndEvent(new_messages=[], updated_state=task.state,log=
f"Failed to get current task config: {e}. Aborting step."))
return
dialect_name = current_mindmap_config.dialect_name
dialect_options = current_mindmap_config.dialect_options or {}
try:
reasoning_effort: Optional[Literal['low', 'medium', 'high']] = None
if current_mindmap_config.reasoning_effort is not None and current_mindmap_config.reasoning_effort in ['low', 'medium', 'high']:
reasoning_effort = current_mindmap_config.reasoning_effort # type: ignore
dialect: Dialect = PromptDialectRegistry.create(dialect_name or "xml", **dialect_options)
ai_network: AINetwork = AINetwork()
network_config: ainetwork_types.NetworkConfig = ainetwork_types.NetworkConfig(
model=current_mindmap_config.model_name,
max_tokens=current_mindmap_config.max_tokens,
api_key=task_config.model_api_key,
thinking_budget_tokens=current_mindmap_config.thinking_budget_tokens,
reasoning_effort=reasoning_effort,
enable_parallel_tools=getattr(current_mindmap_config, "enable_parallel_tools", True),
api_base=current_mindmap_config.api_base,
enable_search=current_mindmap_config.enable_search,
)
except Exception as e:
logger.error(f"Failed to initialize dialect or network config: {e}. Aborting step.")
task.state = TaskState.failed
emit_event(ainetwork_types.NetworkConnectionInterrupted(message=f"Initialization error: {e}", errName="InitError", origErrName=type(e).__name__))
emit_event(ainetwork_types.StepEndEvent(new_messages=[], updated_state=task.state,log=f"Failed to initialize dialect or network config: {e}. Aborting step."))
return
prompt_data: PromptData = PromptData(
messages=task.messages,
dyanmic_task_config=task_config,
task_config=current_mindmap_config,
tool_metadata=current_mindmap_config.tool_metadata,
current_assistant_id=assistant_id
)
task.state = TaskState.running
assistant_response: Optional[StepResponse] = None
event_stream: Optional[AsyncIterator[AINetworkEventUnion]] = None
assistant_message: Optional[AssistantMessage] = None
try:
# Check cancellation before making request
if cancellation_token and cancellation_token.is_set():
logger.info(f"Cancellation detected before AINetwork request (token id: {token_id})")
raise CancelledError("Operation cancelled before AINetwork request")
logger.debug(f"Making AINetwork request with cancellation token id {token_id}")
event_stream = ai_network.make_request(
prompt_data=prompt_data, task=task, dialect=dialect,
config=network_config, assistant_id=assistant_id,
)
logger.debug("Waiting for tool call details from stream...")
assistant_response = await read_streaming_tool_details(
event_stream, emit_event, assistant_id, cancellation_token, current_mindmap_config
)
logger.debug(f"Raw assistant_response from read_streaming: {assistant_response.model_dump_json(indent=2)}")
assistant_message = task.add_assistant_message(assistant_id)
if assistant_response.reasoning:
# Ensure signature is present if reasoning exists (might be needed by model)
if not assistant_response.reasoning.signature:
logger.warning("Reasoning part missing signature, adding empty string.")
assistant_response.reasoning.signature = "" # Add empty string if None
task.add_reasoning_part(
message_id=assistant_message.id,
reasoning=assistant_response.reasoning.reasoning,
signature=assistant_response.reasoning.signature
)
if assistant_response.text:
task.add_text_part(message_id=assistant_message.id, text=assistant_response.text)
# Check cancellation after stream processing
if cancellation_token and cancellation_token.is_set():
logger.info(f"Cancellation detected after stream processing (token id: {token_id})")
raise CancelledError("Operation cancelled after stream processing")
tools_to_process: List[ToolCallPart] = []
is_multi_call = False # Track if it was originally a multi-call request
if assistant_response.multi_tool_calls:
tools_to_process = assistant_response.multi_tool_calls
is_multi_call = True # Set flag
logger.debug(f"[SEQ-MULTI-TOOL] Received {len(tools_to_process)} tool calls to process sequentially.")
elif assistant_response.tool_call:
tools_to_process = [assistant_response.tool_call]
logger.debug("[SINGLE-TOOL] Received 1 tool call to process.")
else:
logger.warning("Assistant response did not contain any tool calls.")
# Ensure assistant message exists before trying to add informational message or update status
if assistant_message:
task.add_informational_message("warning", "The assistant did not request any tool calls.", is_visible_ai=True, is_visible_ui=False, timestamp=assistant_message.started_ts + 0.001) # Add timestamp offset
assistant_message.status = "completed"
task.state = TaskState.running
all_tools_succeeded = True
executed_tool_names = []
executed_tool_ids = []
failed_tool_details = []
for index, tool_call in enumerate(tools_to_process):
# --- Start of loop processing a single tool_call ---
# Check for cancellation before processing each tool
if cancellation_token and cancellation_token.is_set():
logger.info(f"Cancellation detected before processing tool {index+1}: {tool_call.tool_name} (token id: {token_id})")
raise CancelledError(f"Operation cancelled before processing tool {index+1}: {tool_call.tool_name}")
tool_name = tool_call.tool_name
execution_id = tool_call.tool_call_id
tool_args = tool_call.args or {} # Ensure args is a dict, default to empty
try: args_str = json.dumps(tool_args, indent=2); logger.debug(f"Processing ToolCallPart:\n Name: {tool_name}\n ID: {execution_id}\n Args:\n{args_str}")
except Exception: logger.debug(f"Processing ToolCallPart: Name={tool_name}, ID={execution_id}, Args={tool_args} (JSON dump failed)")
log_prefix = f"[SEQ-TOOL {index+1}/{len(tools_to_process)}]"
logger.debug(f"{log_prefix} Processing tool: {tool_name} (ID: {execution_id})")
# Add ToolCallPart to mind map
tool_call.is_loading = True
task.add_tool_call_part(tool_call=tool_call, message_id=assistant_message.id)
logger.log(LogLevel.PROD, "", extra={"event": "tool_request", "tool_name": tool_name, "parameters": tool_args, "tool_id": execution_id})
# Emit ToolReady
emit_event(ainetwork_types.AINetworkEventToolReady(tool_name=tool_name, tool_call_id=execution_id, args=tool_args))
await asyncio.sleep(0.01) # Small delay to ensure ToolReady is processed
# Get tool metadata early to check if it requires explicit approval
tool_metadata = next((t for t in current_mindmap_config.tool_metadata if t.name == tool_name), None)
if not tool_metadata:
logger.error(f"{log_prefix} Tool '{tool_name}' metadata not found in config.")
raise ValueError(f"Tool '{tool_name}' metadata not found in config.")
# Check both auto_approve setting AND if tool allows auto-approval
if getattr(task_config, "auto_approve", False) and not getattr(tool_metadata, "requires_approval", False):
approval_data = ApprovalResponse(approved=True, feedback=None, images=[])
logger.debug(f"{log_prefix} Auto-approving tool {tool_name} (ID: {execution_id})")
else:
# If either auto_approve is False OR tool requires approval, request approval
approval_reason = "auto_approve disabled" if not getattr(task_config, "auto_approve", False) else "tool requires explicit approval"
logger.debug(f"{log_prefix} Requesting approval for {tool_name} (ID: {execution_id}) - Reason: {approval_reason}")
approval_response: EnvironmentResponse[ApprovalResponse] = await environment.approve_tool_use(tool_call)
approval_data = approval_response.data
# Find the part in the task again (safer in case of async issues)
call_part_in_task: Optional[ToolCallPart] = None
assistant_msg = task.get_message(assistant_message.id) # Re-fetch in case of updates
if assistant_msg and isinstance(assistant_msg, AssistantMessage):
for part in assistant_msg.content:
if isinstance(part, ToolCallPart) and part.tool_call_id == execution_id:
call_part_in_task = part; break
if not call_part_in_task:
logger.error(f"{log_prefix} Could not find ToolCallPart {execution_id} in task to update status post-approval!")
all_tools_succeeded = False; failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": "Internal error: ToolCallPart vanished"}); continue
# Handle Approval Result
is_approved_by_env = approval_data and approval_data.approved
feedback = approval_data.feedback if approval_data else "No approval feedback."
images = approval_data.images if approval_data else []
call_part_in_task.is_approval_pending = False # Update status in task
if not is_approved_by_env:
logger.debug(f"{log_prefix} Tool '{tool_name}' rejected. Feedback: {feedback}")
call_part_in_task.is_loading = False # Update status
task.answer_tool_call(tool_call=call_part_in_task, result={"message": feedback or "Tool use rejected."}, is_error=False, is_approved=False, observer_results=[])
all_tools_succeeded = False; failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": f"Rejected: {feedback}"})
if feedback or images: task.add_human_message(text=feedback, images=images or None)
continue # Move to next tool in sequence
# --- Approved: Execute ---
logger.debug(f"{log_prefix} Tool '{tool_name}' approved. Executing...")
try:
env_response = await environment.execute_tool(call_part_in_task)
tool_result_data = env_response.data
# Handle execution result (success, env error, reported error)
if tool_result_data is None or not env_response.success:
error_msg = env_response.message or "Tool execution failed in environment"
logger.error(f"{log_prefix} Tool '{tool_name}' execution failed: {error_msg}")
call_part_in_task.is_loading = False # Update status
task.answer_tool_call(tool_call=call_part_in_task, result={"message": error_msg}, is_error=True, is_approved=True, observer_results=[])
all_tools_succeeded = False; failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": f"Execution failed: {error_msg}"}); continue
# --- Execution Succeeded (or tool reported error) ---
logger.debug(f"{log_prefix} Tool '{tool_name}' execution completed.")
call_part_in_task.is_loading = False # Update status
# --- Add ToolResultPart via answer_tool_call ---
# This is where the individual result is added to the mind map
tool_msg = task.answer_tool_call(
tool_call=call_part_in_task,
result=tool_result_data.result,
is_error=tool_result_data.is_error,
is_approved=True, # It was approved
observer_results=tool_result_data.observer_results or []
)
# ----------------------------------------------------
executed_tool_names.append(tool_name); executed_tool_ids.append(execution_id)
# Handle reported errors
if tool_result_data.is_error:
all_tools_succeeded = False
error_detail = tool_result_data.error or str(tool_result_data.result.get("message", "Unknown error"))
failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": f"Reported error: {error_detail}"})
logger.warning(f"{log_prefix} Tool '{tool_name}' reported an error: {error_detail}")
# Handle Task Entry/Exit (unchanged)
is_task_entry = tool_metadata.is_task_entry
is_task_exit = tool_metadata.is_task_exit
tool_data = tool_result_data.result
if is_task_entry and not tool_result_data.is_error:
try:
config_values = extract_parameters_by_type(tool_result=tool_data, tool_metadata=tool_metadata, param_type=TaskParameterType.TASK_CONFIG)
description_values = extract_parameters_by_type(tool_result=tool_data, tool_metadata=tool_metadata, param_type=TaskParameterType.TASK_DESCRIPTION)
# print(config_values)
print(description_values)
if description_values:
# Apply to task using generic schema
entry_schema = TaskEntryToolSchema(
task_description=description_values[0],
initial_task_config=config_values[0]
)
entry_schema.apply_to_task(task)
except Exception as e: logger.error(f"{log_prefix} TaskEntry schema failed: {e}"); task.add_informational_message("error", f"Task Entry '{tool_name}' failed: {e}.", is_visible_ai=True, is_visible_ui=True); all_tools_succeeded=False; failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": f"TaskEntry failed: {e}"})
if is_task_exit and not tool_result_data.is_error:
level = task.current_task_level(); logger.info(f"{log_prefix} TaskExit tool {tool_name} at level {level}.")
if level == 0: task.state = TaskState.completed; logger.info("Task completed by exit tool.")
elif level > 0:
try:
handoff_values = extract_parameters_by_type(tool_data, tool_metadata, TaskParameterType.HANDOFF_DESCRIPTION)
param_name = get_parameter_key_name(tool_metadata, TaskParameterType.HANDOFF_DESCRIPTION)
if handoff_values:
exit_schema = TaskExitToolSchema(
handoff_description=handoff_values[0]['handoff_description']
)
exit_message, tool_message = exit_schema.apply_to_task(task,result_key=param_name if param_name else "task_result")
# emit the update to tool message
if tool_message:
emit_event(ainetwork_types.TaskMessageEvent(
message=tool_message
))
logger.debug("Exiting subtask via parameter types")
except Exception as e:
logger.error(f"Failed to exit nested task with predefined schema: {str(e)}")
# Fallback to direct method call if schema validation fails
_, tool_message = task.add_task_exit(
handoff_description=tool_data.get("handoff_description", []),
)
except CancelledError: raise
except Exception as e:
# Handle unexpected errors during execution/result processing
logger.exception(f"{log_prefix} Unexpected error processing tool '{tool_name}': {e}")
if call_part_in_task: call_part_in_task.is_loading = False
task.answer_tool_call(tool_call=call_part_in_task if call_part_in_task else tool_call, result={"message": f"Unexpected error: {e}"}, is_error=True, is_approved=True, observer_results=[])
all_tools_succeeded = False; failed_tool_details.append({"name": tool_name, "id": execution_id, "reason": f"Unexpected error: {e}"}); continue
# --- End of loop processing a single tool_call ---
# --- After processing all tools in the sequence ---
if tools_to_process:
if not is_multi_call:
logger.debug(f"[SINGLE-TOOL] Processing complete for: {executed_tool_names[0] if executed_tool_names else 'None'}")
# Mark the assistant message as completed now that all its tools are processed
if assistant_message:
assistant_message.status = "completed"
assistant_message.finished_ts = int(time.time())
# Keep task running unless a tool explicitly completed it
if task.state != TaskState.completed:
task.state = TaskState.running # Stay running even if some tools failed
# Emit ONE ContentComplete event after the sequence (or single tool)
logger.debug("Emitting final ContentComplete for the step.")
emit_event(ainetwork_types.AINetworkEventContentComplete())
except CancelledError as e:
logger.info(f"Operation cancelled: {e} (token id: {token_id})")
task.state = TaskState.aborted
if assistant_message: assistant_message.status = "interrupted"
emit_event(ainetwork_types.NetworkConnectionInterrupted(message=str(e) or "Cancelled", errName="UserAborted", origErrName="CancelledError"))
if assistant_message:
for part in assistant_message.content:
if isinstance(part, ToolCallPart) and part.is_loading: part.is_loading = False
except Exception as e:
logger.exception(f"Unhandled exception during step execution: {e}")
task.state = TaskState.failed
if assistant_message: assistant_message.status = "error"
emit_event(ainetwork_types.NetworkConnectionInterrupted(message=f"Internal error: {e}", errName="StepExecutionError", origErrName=type(e).__name__))
finally:
try:
new_messages = task.get_updates(last_message_id)
logger.debug(f"Found {len(new_messages)} new messages to emit.")
emit_event(ainetwork_types.StepEndEvent(new_messages=new_messages, updated_state=task.state,log="Reached the end of the stream correctly"))
except Exception as e:
logger.error(f"Failed to get or emit mindmap updates: {e}")
emit_event(ainetwork_types.StepEndEvent(new_messages=[], updated_state=TaskState.failed,log=f"Failed to get or emit mindmap updates: {e}"))
if task.state not in [TaskState.failed, TaskState.aborted]: task.state = TaskState.failed
# --- Clean up (unchanged) ---
if event_stream:
try: await event_stream.aclose(); logger.debug("AINetwork event stream closed.")
except Exception as e: logger.error(f"Error closing event stream: {e}")
logger.log(LogLevel.PROD, f"Iteration {iteration} finished with task state: {task.state.value}", extra={"event": "task", "iteration": iteration, "final_state": task.state.value})
logger.log(LogLevel.PROD, SEPERATOR_BOTTOM, extra={"event": "separator"})
return