Skip to content

Mind Map Module

The Mind Map module defines the data structures for representing tasks, messages, and various message parts in the Azad system. It is one of the most critical components as it defines the core data model for the entire agent system.

Overview

The Mind Map is a data structure that represents the conversation and tool interactions within a task. It consists of:

  • A Task class that contains a list of messages and the current state
  • Various Message classes for different roles (system, user, assistant, tool)
  • Various Part classes for different content types (text, image, tool call, tool result)

This structure allows the agent to maintain a complete history of the conversation and tool interactions, which is essential for context management and reasoning.

Key Concepts

Task Structure

The Task class is the root of the Mind Map data structure. It represents a specific task being executed by the agent and contains:

  • A list of messages (system, user, assistant, tool)
  • The current state of the task (running, completed, failed, etc.)
  • Methods for adding and retrieving messages

Message Flow

The Mind Map is designed to support a specific message flow:

  1. The task starts with a TaskConfigMessage that defines the configuration for the task
  2. The user sends a UserMessage with the task description
  3. The assistant responds with an AssistantMessage that may contain text and/or tool calls
  4. If the assistant calls a tool, a ToolMessage is added with the tool result
  5. This cycle continues until the task is completed

Nested Tasks

The Mind Map supports nested tasks, which allow the agent to work on subtasks within a main task. This is implemented through:

  • TaskEntryMessage: Marks the start of a nested task
  • TaskExitMessage: Marks the end of a nested task
  • current_task_level(): Calculates the current nesting level
  • current_task_messages(): Gets messages within the current task level
  • current_task_config(): Gets the configuration for the current task level

This nesting capability is essential for complex tasks that require multiple steps or subtasks.

Tool Call Flow

Tool calls follow a specific flow:

  1. Assistant adds a ToolCallPart to an AssistantMessage
  2. Environment approves or rejects the tool call
  3. Tool is executed if approved
  4. Result is added as a ToolResultPart in a ToolMessage

Compression Support

The Mind Map supports compression through CompressionMessage, which records which messages were compressed and how.

Important Considerations

When working with the Mind Map:

  1. Task Boundaries: Always respect task boundaries when processing messages. Use current_task_messages() instead of accessing task.messages directly.

  2. Message Order: Messages are ordered chronologically. The started_ts and finished_ts fields track when each message was created and completed.

  3. Tool Call Flow: Tool calls follow a specific flow as described above.

  4. Nested Tasks: Nested tasks allow for complex workflows. Each nested task has its own configuration and message history.

API Reference

azad.mind_map

Attributes

task_logger module-attribute

task_logger = getLogger(__name__)

ANYJSON module-attribute

ANYJSON = Dict[str, Any]

SystemContent module-attribute

SystemContent = List[TextPart]

Type alias for the content of a SystemMessage (list of TextParts).

UserContent module-attribute

UserContent = List[Union[TextPart, ImagePart, FilePart]]

Type alias for the content of a UserMessage (list of Text, Image, or FileParts).

AssistantContent module-attribute

AssistantContent = List[Union[TextPart, ToolCallPart, ReasoningPart, RedactedThinkingPart]]

Type alias for the content of an AssistantMessage (list of Text, ToolCall, Reasoning, or RedactedThinkingParts).

ToolContent module-attribute

ToolContent = List[ToolResultPart]

Type alias for the content of a ToolMessage (list of ToolResultParts).

InformationalContent module-attribute

InformationalContent = List[InformationalPart]

Type alias for the content of an InformationalMessage (list of InformationalParts).

CompressionContent module-attribute

CompressionContent = List[CompressionPart]

Type alias for the content of a CompressionMessage (list of CompressionParts).

TaskConfigContent module-attribute

TaskConfigContent = List[TaskConfigPart]

Type alias for the content of a TaskConfigMessage (list of TaskConfigParts).

TaskEntryContent module-attribute

TaskEntryContent = List[TaskEntryPart]

Type alias for the content of a TaskEntryMessage (list of TaskEntryParts).

TaskExitContent module-attribute

TaskExitContent = List[TaskExitPart]

Type alias for the content of a TaskExitMessage (list of TaskExitParts).

current_config module-attribute

current_config = getattr(model_class, 'model_config', {})

Classes

TaskState

Bases: str, Enum

Enum representing the possible states of a task.

Attributes:

  • running

    The task is currently executing or awaiting input/action.

  • completed

    The task has finished successfully without errors.

  • failed

    The task encountered an error and could not complete.

  • paused

    The task execution has been temporarily suspended.

  • aborted

    The task was explicitly stopped before completion.

  • resume_awaited

    The task is paused and waiting for a trigger to resume.

ObserverResult

Bases: BaseModel

Represents the outcome of a single observer monitoring a tool execution.

Observers can perform checks or actions related to a tool's execution (e.g., safety checks, logging, validation).

Attributes:

  • is_success (bool) –

    Indicates whether the observer's check passed or action succeeded.

  • observer_name (str) –

    The unique identifier for the observer that produced this result.

  • message (Optional[str]) –

    An optional human-readable message from the observer (e.g., reason for failure).

  • data (Optional[ANYJSON]) –

    Optional structured data provided by the observer (e.g., validation details).

TextPart

Bases: BaseModel

Represents a plain text segment within a message's content.

Attributes:

  • type (Literal['text']) –

    Always 'text'. Discriminator field for message part unions.

  • text (str) –

    The actual text content.

ImagePart

Bases: BaseModel

Represents an image within a message's content.

Attributes:

  • type (Literal['image']) –

    Always 'image'. Discriminator field for message part unions.

  • image (str) –

    The image data, typically base64-encoded string.

  • mime_type (Optional[str]) –

    Optional MIME type of the image (e.g., 'image/jpeg', 'image/png'). Helps the receiving end interpret the image data.

FilePart

Bases: BaseModel

Represents a file attachment within a message's content (typically User messages).

Attributes:

  • type (Literal['file']) –

    Always 'file'. Discriminator field for message part unions.

  • data (str) –

    The file content, typically base64-encoded string.

  • mime_type (str) –

    The MIME type of the file (e.g., 'application/pdf', 'text/csv'). Essential for interpretation.

ReasoningPart

Bases: BaseModel

Represents the reasoning or thinking process of an Assistant, often before generating output or calling a tool.

Attributes:

  • type (Literal['reasoning']) –

    Always 'reasoning'. Discriminator field for message part unions.

  • reasoning (str) –

    The textual description of the Assistant's thought process.

  • signature (str | None) –

    An optional cryptographic signature to verify the origin or integrity of the reasoning, if applicable.

  • started_ts (float | None) –

    Optional timestamp (Unix epoch seconds) when the reasoning process started.

  • finished_ts (float | None) –

    Optional timestamp (Unix epoch seconds) when the reasoning process finished.

RedactedThinkingPart

Bases: BaseModel

Represents a placeholder for thinking/reasoning that has been redacted or omitted, possibly for brevity or privacy.

Attributes:

  • type (Literal['redacted_thinking']) –

    Always 'redacted_thinking'. Discriminator field for message part unions.

  • data (str) –

    A string indicating that thinking occurred but was redacted (content may vary).

Attributes
type class-attribute instance-attribute
type: Literal['redacted_thinking'] = 'redacted_thinking'

ToolCallPart

Bases: BaseModel

Represents a request from the Assistant to execute a specific tool with given arguments.

This part initiates a tool execution flow. It will typically be followed later by a ToolMessage containing a ToolResultPart with the same tool_call_id.

Attributes:

  • type (Literal['toolCall']) –

    Always 'toolCall'. Discriminator field for message part unions.

  • tool_call_id (str) –

    A unique identifier for this specific tool invocation attempt. Used to link this call to its corresponding ToolResultPart.

  • tool_name (str) –

    The name of the tool the Assistant wants to execute.

  • args (ANYJSON) –

    A JSON object (dictionary) containing the arguments for the tool, as generated by the Assistant.

  • is_loading (bool) –

    Indicates if the tool call is currently being processed (result pending). Starts True, becomes False when the result is added.

  • is_approval_pending (bool) –

    Indicates if the tool call requires explicit user/system approval before execution. Starts True if auto-approval is off.

ToolResultContent

Bases: BaseModel

Represents a discrete piece of content generated by a tool, intended for display or further processing.

A single ToolResultPart can contain multiple ToolResultContent parts (e.g., text and an image).

Attributes:

  • type (Literal['text'] | Literal['image']) –

    The type of content, currently 'text' or 'image'.

  • text (Optional[str]) –

    The textual content generated by the tool (used if type is 'text').

  • image_data (Optional[str]) –

    The image data generated by the tool, typically base64 (used if type is 'image').

  • mime_type (Optional[str]) –

    The MIME type if the content is an image (e.g., 'image/png').

Attributes
type class-attribute instance-attribute
type: Literal['text'] | Literal['image'] = 'text'
image_data class-attribute instance-attribute
image_data: Optional[str] = None
mime_type class-attribute instance-attribute
mime_type: Optional[str] = None

ToolResultPart

Bases: BaseModel

Represents the result of executing a tool that was previously requested via a ToolCallPart.

This part concludes a tool execution flow initiated by a ToolCallPart.

Attributes:

  • type (Literal['toolResult']) –

    Always 'toolResult'. Discriminator field for message part unions.

  • tool_call_id (str) –

    The unique identifier matching the tool_call_id of the ToolCallPart that initiated this execution. Links the result back to the request.

  • tool_name (str) –

    The name of the tool that was executed.

  • result (ANYJSON) –

    The primary result data from the tool execution, typically a JSON object. The structure depends on the specific tool.

  • observer_results (List[ObserverResult]) –

    A list of ObserverResult objects, capturing outcomes from any observers that monitored this tool execution.

  • is_approved (bool) –

    Indicates whether the execution of this tool call was approved (either automatically or manually).

  • is_error (bool) –

    Indicates whether the tool execution resulted in an error.

  • error (str | None) –

    A description of the error if is_error is True.

  • is_loading (bool) –

    Indicates if the result itself is still loading (rarely True, usually False when part is added).

  • has_feedback (bool) –

    Indicates if feedback has been provided for this tool result (e.g., by the user).

  • tool_result_contents (List[ToolResultContent]) –

    A list of ToolResultContent parts representing displayable output generated by the tool (e.g., text summaries, images).

InformationalPart

Bases: BaseModel

Represents metadata or non-core conversational events within a message.

Used for conveying system status, errors unrelated to specific tool calls, or other contextual information that isn't direct user/assistant dialogue.

Attributes:

  • type (Literal['informational']) –

    Always 'informational'. Discriminator field for message part unions.

  • is_visible_ai (bool) –

    Whether this information should be considered by the AI/Assistant.

  • is_visible_ui (bool) –

    Whether this information should typically be displayed to the end-user.

  • informational_type (Optional[str]) –

    A category for the information (e.g., 'system_status', 'context_update', 'error').

  • error_type (Optional[str]) –

    If it's an error, a more specific error category.

  • details (Optional[str]) –

    A human-readable description of the information or event.

  • additional_data (Optional[ANYJSON] | Optional[List[ANYJSON]]) –

    Optional structured data associated with the event.

Attributes
type class-attribute instance-attribute
type: Literal['informational'] = 'informational'
informational_type class-attribute instance-attribute
informational_type: Optional[str] = None
error_type class-attribute instance-attribute
error_type: Optional[str] = None
details class-attribute instance-attribute
details: Optional[str] = None
additional_data class-attribute instance-attribute
additional_data: Optional[ANYJSON] | Optional[List[ANYJSON]] = None

CompressionPart

Bases: BaseModel

Contains metadata about a message compression event that occurred in the conversation history.

This part doesn't represent conversational content itself, but rather describes an operation performed on previous messages.

Attributes:

  • type (Literal['compression']) –

    Always 'compression'. Discriminator field for message part unions.

  • start_message_idx (int) –

    The starting index (in the original message list) of the compressed range.

  • end_message_idx (int) –

    The ending index (inclusive) of the compressed range.

  • reason (str) –

    The reason why compression was applied (e.g., 'token_limit', 'summarization').

  • strategies (List[str]) –

    A list of strategies used for compression (e.g., 'summarize', 'drop_intermediate').

  • compressed_message_ids (List[str]) –

    List of message IDs that were removed or replaced during compression.

  • kept_message_ids (List[str]) –

    List of message IDs within the range that were retained (if any).

  • cache (Dict[str, Any]) –

    A dictionary possibly containing cached data related to the compression (e.g., the summary).

  • metadata (Optional[Dict[str, Any]]) –

    Optional dictionary for any other relevant metadata about the compression event.

Attributes
type class-attribute instance-attribute
type: Literal['compression'] = 'compression'
strategies instance-attribute
strategies: List[str]
compressed_message_ids instance-attribute
compressed_message_ids: List[str]
kept_message_ids instance-attribute
kept_message_ids: List[str]
cache class-attribute instance-attribute
cache: Dict[str, Any] = Field(default_factory=dict)
metadata class-attribute instance-attribute
metadata: Optional[Dict[str, Any]] = None

TaskConfigPart

Bases: BaseModel

Represents the configuration settings for a task or sub-task.

This defines parameters like the AI model, tools available, prompt segments, etc.

Attributes:

  • type (Literal['taskconfig']) –

    Always 'taskconfig'. Discriminator field for message part unions.

  • model_name (str) –

    Identifier for the primary language model to be used.

  • max_tokens (int | None) –

    Optional maximum allowed number of tokens to generate.

  • dialect_name (str | None) –

    Optional name of the prompt dialect/formatter to use.

  • dialect_options (dict[str, Any] | None) –

    Optional dictionary of options specific to the chosen dialect.

  • user_agent_prompt (str | None) –

    Optional prompt segment defining the user agent's behavior or persona.

  • user_prompt (str | None) –

    Optional prompt segment providing user-specific instructions or context.

  • thinking_budget_tokens (int | None) –

    Optional limit on tokens the Assistant can use for internal 'thinking' steps.

  • tool_metadata (List[ToolMetadata]) –

    List of ToolMetadata objects describing the tools available in this task context.

  • enable_parallel_tools (bool) –

    Whether the Assistant is allowed to request multiple tool calls concurrently.

  • enable_auto_tools (bool) –

    Whether tool calls should be executed automatically without explicit approval.

  • compression_config (CompressionConfig) –

    Configuration for message history compression behavior.

Attributes
type class-attribute instance-attribute
type: Literal['taskconfig'] = 'taskconfig'
model_name class-attribute instance-attribute
model_name: str = Field(description="LITELLM Model Id with provider name, e.g., 'openai/gpt-4o'")
max_tokens class-attribute instance-attribute
max_tokens: int | None = Field(None, description='Optional maximum allowed number of tokens to generate')
api_base class-attribute instance-attribute
api_base: str | None = Field(None, description='Optional API base URL for the model')
dialect_name class-attribute instance-attribute
dialect_name: str | None = Field(None)
dialect_options class-attribute instance-attribute
dialect_options: dict[str, Any] | None = Field(None)
user_agent_prompt class-attribute instance-attribute
user_agent_prompt: str | None = Field(None, description='Optional prompt section for user agent configuration')
user_prompt class-attribute instance-attribute
user_prompt: str | None = Field(None, description='Optional prompt section for user configuration')
thinking_budget_tokens class-attribute instance-attribute
thinking_budget_tokens: int | None = Field(None, description='Optional thinking token budget')
reasoning_effort class-attribute instance-attribute
reasoning_effort: str | None = Field(None, description='Optional reasoning effort level')
tool_metadata class-attribute instance-attribute
tool_metadata: List[ToolMetadata] = Field(default_factory=list)
enable_parallel_tools class-attribute instance-attribute
enable_parallel_tools: bool = Field(default=True, description='Whether to enable parallel tool calls')
enable_auto_tools class-attribute instance-attribute
enable_auto_tools: bool = Field(default=False, description='Whether tools should be automatically approved')
compression_config class-attribute instance-attribute
compression_config: CompressionConfig = Field(default=CompressionConfig(enabled=True, token_limit=200000), description='Configuration for message history compression')
enable_search: bool = Field(False, description='Whether to enable search/grounding via Gemini built-in tools.')

TaskEntryPart

Bases: BaseModel

Marks the beginning of a nested sub-task within the main task flow.

Typically created when a tool call indicates the start of a sub-problem.

Attributes:

TaskExitPart

Bases: BaseModel

Marks the end of a nested sub-task, providing results back to the parent task.

This usually corresponds to the completion of a sub-task initiated by a TaskEntryPart.

Attributes:

  • type (Literal['taskexit']) –

    Always 'taskexit'. Discriminator field for message part unions.

  • handoff_description (List[TextPart | ImagePart]) –

    A list of TextPart or ImagePart describing the results or outcome of the sub-task, intended for the parent task's context (often used to populate the result of the tool call that triggered the sub-task).

Attributes
type class-attribute instance-attribute
type: Literal['taskexit'] = 'taskexit'
handoff_description class-attribute instance-attribute
handoff_description: List[TextPart | ImagePart] = Field(default_factory=list)

MessageRole

Bases: str, Enum

Enumeration of the possible roles a message can have in the conversation.

Attributes:

  • system

    Instructions or context provided by the system, not a direct participant.

  • user

    Input provided by the end-user.

  • assistant

    Responses or actions generated by the AI model.

  • tool

    Results returned from executing a tool requested by the assistant.

  • informational

    Metadata, status updates, or non-dialogue events.

  • compression

    Metadata about message history compression events.

  • taskconfig

    Contains configuration settings for the current task scope.

  • taskentry

    Marks the beginning of a nested sub-task.

  • taskexit

    Marks the end of a nested sub-task.

MessageBase

Bases: BaseModel

Base model for all message types, containing common metadata fields.

Attributes:

  • id (str) –

    A unique identifier for this specific message instance (default: generated nanoid).

  • task_id (str) –

    The identifier of the parent Task this message belongs to.

  • started_ts (int) –

    Timestamp (Unix epoch seconds) when the message was created or started processing.

  • finished_ts (int | None) –

    Optional timestamp (Unix epoch seconds) when the message processing completed (e.g., Assistant response finished, tool result received).

  • world_state_id (str | None) –

    Optional identifier for the world state snapshot associated with this message.

  • world_state_hash (str | None) –

    Optional hash representing the world state at the time of this message.

  • error_message (str | None) –

    Optional error message if the creation or processing of this message failed.

Attributes
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
started_ts class-attribute instance-attribute
started_ts: int = Field(default_factory=lambda: int(time()))
world_state_hash class-attribute instance-attribute
world_state_hash: str | None = None

SystemMessage

Bases: MessageBase

A message containing system-level instructions or context.

Attributes:

Attributes
content class-attribute instance-attribute
content: SystemContent = Field(default_factory=list)
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
started_ts class-attribute instance-attribute
started_ts: int = Field(default_factory=lambda: int(time()))
world_state_hash class-attribute instance-attribute
world_state_hash: str | None = None

UserMessage

Bases: MessageBase

A message representing input from the end-user.

Attributes:

  • role (Literal[user]) –

    Always 'user'.

  • content (UserContent) –

    A list containing TextPart, ImagePart, or FilePart elements.

  • is_ui_hidden (Optional[bool]) –

    If True, suggests this message might not be directly displayed in a standard chat UI (e.g., could be programmatically added context).

Attributes
content class-attribute instance-attribute
content: UserContent = Field(default_factory=list)
is_ui_hidden class-attribute instance-attribute
is_ui_hidden: Optional[bool] = Field(default=False)
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
started_ts class-attribute instance-attribute
started_ts: int = Field(default_factory=lambda: int(time()))
world_state_hash class-attribute instance-attribute
world_state_hash: str | None = None

AssistantMessage

Bases: MessageBase

A message representing output or actions from the AI Assistant.

Attributes:

  • role (Literal[assistant]) –

    Always 'assistant'.

  • content (AssistantContent) –

    A list containing TextPart, ToolCallPart, ReasoningPart, or RedactedThinkingPart elements.

  • status (Literal['loading', 'completed', 'error', 'interrupted']) –

    The current status of the Assistant's turn generation ('loading', 'completed', 'error', 'interrupted').

  • cost (float | None) –

    Optional cost associated with generating this message (e.g., API cost).

  • cache_writes (int | None) –

    Optional count of cache writes during generation.

  • cache_reads (int | None) –

    Optional count of cache reads during generation.

  • tokens_in (int | None) –

    Optional count of input tokens processed to generate this message.

  • tokens_out (int | None) –

    Optional count of output tokens generated in this message.

  • model_id (str | None) –

    Optional identifier of the language model used to generate this message.

Attributes
status class-attribute instance-attribute
status: Literal['loading', 'completed', 'error', 'interrupted'] = Field(default='loading')
cost class-attribute instance-attribute
cost: float | None = Field(default=0)
cache_writes class-attribute instance-attribute
cache_writes: int | None = Field(default=0)
cache_reads class-attribute instance-attribute
cache_reads: int | None = Field(default=0)
tokens_in class-attribute instance-attribute
tokens_in: int | None = Field(default=0)
tokens_out class-attribute instance-attribute
tokens_out: int | None = Field(default=0)
model_id class-attribute instance-attribute
model_id: str | None = Field(default=None)
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
started_ts class-attribute instance-attribute
started_ts: int = Field(default_factory=lambda: int(time()))
world_state_hash class-attribute instance-attribute
world_state_hash: str | None = None

ToolMessage

Bases: MessageBase

A message containing the results of tool executions requested by the Assistant.

Attributes:

  • role (Literal[tool]) –

    Always 'tool'.

  • content (ToolContent) –

    A list containing only ToolResultPart elements, each corresponding to a previously issued ToolCallPart.

Attributes
content class-attribute instance-attribute
content: ToolContent = Field(default_factory=list)
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
started_ts class-attribute instance-attribute
started_ts: int = Field(default_factory=lambda: int(time()))
world_state_hash class-attribute instance-attribute
world_state_hash: str | None = None

InformationalMessage

Bases: MessageBase

A message conveying metadata, status, or non-dialogue events.

Attributes:

Attributes
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
started_ts class-attribute instance-attribute
started_ts: int = Field(default_factory=lambda: int(time()))
world_state_hash class-attribute instance-attribute
world_state_hash: str | None = None

CompressionMessage

Bases: MessageBase

A message containing metadata about a message history compression event.

Attributes:

Attributes
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
started_ts class-attribute instance-attribute
started_ts: int = Field(default_factory=lambda: int(time()))
world_state_hash class-attribute instance-attribute
world_state_hash: str | None = None

TaskConfigMessage

Bases: MessageBase

A message containing configuration settings applicable to the current task scope.

Attributes:

Attributes
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
started_ts class-attribute instance-attribute
started_ts: int = Field(default_factory=lambda: int(time()))
world_state_hash class-attribute instance-attribute
world_state_hash: str | None = None

TaskEntryMessage

Bases: MessageBase

A message marking the beginning of a nested sub-task.

Attributes:

Attributes
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
started_ts class-attribute instance-attribute
started_ts: int = Field(default_factory=lambda: int(time()))
world_state_hash class-attribute instance-attribute
world_state_hash: str | None = None

TaskExitMessage

Bases: MessageBase

A message marking the end of a nested sub-task.

Attributes:

Attributes
content class-attribute instance-attribute
content: TaskExitContent = Field(default_factory=list)
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
started_ts class-attribute instance-attribute
started_ts: int = Field(default_factory=lambda: int(time()))
world_state_hash class-attribute instance-attribute
world_state_hash: str | None = None

Task

Bases: BaseModel

Represents a single, self-contained task or conversation.

Manages the state, overall content (like a title or summary), and the chronological list of messages that make up the task's history. Includes methods for manipulating and querying the message list and task state.

Attributes:

  • id (str) –

    Unique identifier for the task (default: generated nanoid).

  • state (TaskState) –

    The current TaskState of the task.

  • content (List[ImagePart | TextPart]) –

    High-level content describing the task itself (e.g., title, summary), represented as a list of Text or Image parts.

  • messages (List[Message]) –

    The chronological list of Message objects comprising the task history.

Attributes
id class-attribute instance-attribute
id: str = Field(default_factory=generate)
content class-attribute instance-attribute
content: List[ImagePart | TextPart] = Field(default_factory=list)
messages class-attribute instance-attribute
messages: List[Message] = Field(default_factory=list)
Functions
validate_messages classmethod
validate_messages(data: Dict[str, Any]) -> Dict[str, Any]

Pydantic validator (before initialization) to ensure messages in the input data are converted to their appropriate Message subclasses based on 'role'.

Parameters:

  • data (Dict[str, Any]) –

    The raw dictionary data attempting to be validated into a Task model.

Returns:

  • Dict[str, Any]

    The dictionary data with the 'messages' list potentially transformed

  • Dict[str, Any]

    to contain proper Message model instances.

Raises:

  • ValueError

    If a message dictionary has an unknown 'role' or fails validation.

  • (Note

    Current implementation logs warnings/errors instead of raising)

Source code in azad/mind_map.py
@model_validator(mode='before')
@classmethod
def validate_messages(cls, data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Pydantic validator (before initialization) to ensure messages in the input
    data are converted to their appropriate `Message` subclasses based on 'role'.

    Args:
        data: The raw dictionary data attempting to be validated into a Task model.

    Returns:
        The dictionary data with the 'messages' list potentially transformed
        to contain proper Message model instances.

    Raises:
        ValueError: If a message dictionary has an unknown 'role' or fails validation.
        (Note: Current implementation logs warnings/errors instead of raising)
    """
    # Only process if messages are present
    if "messages" not in data:
        return data

    # Transform each message in the list to the proper type
    messages = data.get("messages", []) # Use .get for safety
    if not isinstance(messages, list): # Ensure messages is a list
         task_logger.warning(f"Task messages field is not a list: {type(messages)}. Resetting to empty list.")
         data["messages"] = []
         return data

    processed_messages = []
    message_classes = {
        MessageRole.system.value: SystemMessage,
        MessageRole.user.value: UserMessage,
        MessageRole.assistant.value: AssistantMessage,
        MessageRole.tool.value: ToolMessage,
        MessageRole.informational.value: InformationalMessage,
        MessageRole.compression.value: CompressionMessage,
        MessageRole.taskconfig.value: TaskConfigMessage,
        MessageRole.taskentry.value: TaskEntryMessage,
        MessageRole.taskexit.value: TaskExitMessage,
    }

    for i, msg_data in enumerate(messages):
        # Skip if already a proper message object
        if isinstance(msg_data, MessageBase):
            processed_messages.append(msg_data)
            continue

        # Process dict messages by role
        if isinstance(msg_data, dict) and "role" in msg_data:
            role = msg_data.get("role")
            if role is not None and role in message_classes:
                try:
                    model_class = message_classes[role]
                    processed_messages.append(model_class.model_validate(msg_data))
                except Exception as e:
                    task_logger.error(f"Failed to parse message at index {i} with role {role}: {str(e)}. Message data: {msg_data}")
                    # Optionally: raise ValueError(...) or append raw dict
            else:
                role_str = str(role) if role is not None else "None"
                task_logger.warning(f"Unknown message role encountered during validation: {role_str} in message at index {i}. Skipping.")
        else:
            # Keep as is if we can't process it (or log/raise error)
            task_logger.warning(f"Message at index {i} is not a valid Pydantic model or dict with role: {type(msg_data)}. Skipping.")

    # Replace the messages with the processed ones
    data["messages"] = processed_messages
    return data
current_task_level
current_task_level() -> int

Calculates the current nesting level based on TaskEntry and TaskExit messages.

Level 0 represents the root task. Each TaskEntry increments the level, and each TaskExit decrements it.

Returns:

  • int

    The current nesting depth (0 for root, 1 for the first sub-task, etc.).

Source code in azad/mind_map.py
def current_task_level(self) -> int:
    """Calculates the current nesting level based on TaskEntry and TaskExit messages.

    Level 0 represents the root task. Each TaskEntry increments the level,
    and each TaskExit decrements it.

    Returns:
        The current nesting depth (0 for root, 1 for the first sub-task, etc.).
    """
    level = 0
    for message in self.messages:
        if isinstance(message, TaskEntryMessage):
            level += 1
        elif isinstance(message, TaskExitMessage) and level > 0:
            level -= 1
    return level
current_task_config
current_task_config() -> TaskConfigPart

Gets the TaskConfigPart that applies to the current task nesting level.

For nested tasks (level > 0), it retrieves the config from the corresponding TaskEntryMessage. For the root level (level 0), it finds the most recent TaskConfigMessage at that level.

Returns:

Raises:

  • ValueError

    If no applicable configuration can be found (should not happen in a well-formed task).

Source code in azad/mind_map.py
def current_task_config(self) -> TaskConfigPart:
    """Gets the `TaskConfigPart` that applies to the current task nesting level.

    For nested tasks (level > 0), it retrieves the config from the corresponding
    `TaskEntryMessage`. For the root level (level 0), it finds the most recent
    `TaskConfigMessage` at that level.

    Returns:
        The applicable `TaskConfigPart`.

    Raises:
        ValueError: If no applicable configuration can be found (should not happen
                    in a well-formed task).
    """
    current_level = self.current_task_level()

    if current_level == 0:
        # At root level, find the most recent root-level config
        root_configs = []
        level = 0
        for message in self.messages:
            if isinstance(message, TaskEntryMessage):
                level += 1
            elif isinstance(message, TaskExitMessage) and level > 0:
                level -= 1
            elif level == 0 and isinstance(message, TaskConfigMessage):
                if message.content:
                    root_configs.append(message.content[0])
                else:
                    task_logger.warning(f"TaskConfigMessage {message.id} has empty content.")
        if root_configs:
            return root_configs[-1]
        # Fallback: if no root configs, search entire history (should have one)
        return self.get_current_config() # Uses reversed search
    else:
        # For nested levels, find the config in the entry message
        level = 0
        for message in self.messages:
            if isinstance(message, TaskEntryMessage):
                level += 1
                if level == current_level:
                    if message.content and isinstance(message.content[0], TaskEntryPart):
                        return message.content[0].initial_task_config
                    else:
                        task_logger.warning(f"TaskEntryMessage {message.id} has invalid or empty content.")
                        # Fallback if content is bad
                        return self.get_current_config()
            elif isinstance(message, TaskExitMessage) and level > 0:
                level -= 1

        task_logger.warning(f"Could not find TaskEntryMessage for level {current_level}. Falling back to most recent config.")
        # Should ideally not get here, fallback to last overall config
        return self.get_current_config()
current_task_messages
current_task_messages() -> List[Message]

Retrieves all messages belonging to the current active task nesting level.

Includes the TaskEntry and TaskExit messages that define the boundaries of the current level.

Returns:

  • List[Message]

    A list of Message objects within the current task scope.

Source code in azad/mind_map.py
def current_task_messages(self) -> List[Message]:
    """Retrieves all messages belonging to the current active task nesting level.

    Includes the TaskEntry and TaskExit messages that define the boundaries
    of the current level.

    Returns:
        A list of `Message` objects within the current task scope.
    """
    current_level = self.current_task_level()
    result_messages = []
    active_level = 0
    collecting = (current_level == 0) # Start collecting if at root

    for message in self.messages:
        is_entry = isinstance(message, TaskEntryMessage)
        is_exit = isinstance(message, TaskExitMessage)

        if is_entry:
            if active_level == current_level: # Entering level below current, stop collecting
                collecting = False
            active_level += 1
            if active_level == current_level: # Entering the target level, start collecting
                collecting = True
                result_messages.append(message) # Include the entry message for this level

        elif is_exit:
            if active_level == current_level: # Exiting the target level
                result_messages.append(message) # Include the exit message
                collecting = False # Stop collecting for deeper levels
            if active_level > 0:
                active_level -= 1
            if active_level == current_level: # Back to the current level after exiting a deeper level
                collecting = True # Resume collecting

        # Collect message if we are inside the target level or at the target level after an exit
        elif collecting:
            result_messages.append(message)

    # If at root and no nesting happened, result_messages might be empty, return all
    if current_level == 0 and not any(isinstance(m, (TaskEntryMessage, TaskExitMessage)) for m in self.messages):
        return self.messages

    return result_messages
find_current_task_boundaries
find_current_task_boundaries() -> Tuple[Optional[int], Optional[int]]

Finds the indices of the TaskEntry and TaskExit messages for the current level.

Returns:

  • Optional[int]

    A tuple (start_index, end_index):

  • Optional[int]
    • start_index: Index of the TaskEntryMessage for the current level (or 0 if at root level) None if level > 0 and not found.
  • Tuple[Optional[int], Optional[int]]
    • end_index: Index of the corresponding TaskExitMessage, or None if the task level has not yet exited.
  • Tuple[Optional[int], Optional[int]]

    Returns (0, None) for root level.

Source code in azad/mind_map.py
def find_current_task_boundaries(self) -> Tuple[Optional[int], Optional[int]]:
    """Finds the indices of the TaskEntry and TaskExit messages for the current level.

    Returns:
        A tuple `(start_index, end_index)`:
        - `start_index`: Index of the `TaskEntryMessage` for the current level (or 0 if at root level)
          None if level > 0 and not found.
        - `end_index`: Index of the corresponding `TaskExitMessage`, or `None` if the task level
                       has not yet exited.
        Returns `(0, None)` for root level.
    """
    current_level = self.current_task_level()

    if current_level == 0:
        # Root level always starts at index 0
        return 0, None

    # Nested levels: find the specific entry and exit
    level = 0
    entry_index: Optional[int] = None
    for i, message in enumerate(self.messages):
        if isinstance(message, TaskEntryMessage):
            level += 1
            if level == current_level:
                entry_index = i # Found the entry for the current level
        elif isinstance(message, TaskExitMessage):
            if level == current_level and entry_index is not None:
                # Found the exit corresponding to the entry of the current level
                return entry_index, i
            if level > 0:
                level -= 1 # Decrement level after processing the exit

    # If loop finishes, the task hasn't exited yet
    return entry_index, None
add_task_entry
add_task_entry(task_description: Sequence[Union[TextPart, ImagePart, InformationalPart]], initial_task_config: TaskConfigPart) -> TaskEntryMessage

Initiates a new nested task level by adding required messages.

Sequence of messages added: 1. TaskEntryMessage: Contains the initial context and config reference. 2. TaskConfigMessage: The actual configuration for the new task level. 3. UserMessage (hidden): Contains text/image parts from the initial context. 4. InformationalMessage(s): One for each informational part in the initial context.

Parameters:

  • task_description (Sequence[Union[TextPart, ImagePart, InformationalPart]]) –

    A sequence of parts describing the sub-task's goal or initial state.

  • initial_task_config (TaskConfigPart) –

    The configuration (TaskConfigPart) for this new sub-task.

Returns:

Source code in azad/mind_map.py
def add_task_entry(
    self,
    task_description: Sequence[Union[TextPart, ImagePart, InformationalPart]],
    initial_task_config: TaskConfigPart
) -> TaskEntryMessage:
    """Initiates a new nested task level by adding required messages.

    Sequence of messages added:
    1. `TaskEntryMessage`: Contains the initial context and config reference.
    2. `TaskConfigMessage`: The actual configuration for the new task level.
    3. `UserMessage` (hidden): Contains text/image parts from the initial context.
    4. `InformationalMessage`(s): One for each informational part in the initial context.

    Args:
        task_description: A sequence of parts describing the sub-task's goal or initial state.
        initial_task_config: The configuration (`TaskConfigPart`) for this new sub-task.

    Returns:
        The created `TaskEntryMessage`.
    """
    task_description_list = list(task_description)
    entry_ts = time.time() # Consistent timestamp for related messages

    # 1. Create and add the task entry message FIRST
    task_entry_part = TaskEntryPart(
        initial_task_config=initial_task_config,
        initial_context=task_description_list
    )
    entry_message = TaskEntryMessage(
        task_id=self.id,
        started_ts=int(entry_ts),
        finished_ts=int(entry_ts), # Entry is instantaneous
        content=[task_entry_part]
    )
    self.add_message(entry_message) # add_message handles finished_ts of previous
    task_logger.debug(f"Added TaskEntryMessage: {entry_message.id}")

    # 2. Add the task config AFTER the entry
    config_message = self.add_task_config(initial_task_config, timestamp=entry_ts + 0.001)
    task_logger.debug(f"Added TaskConfigMessage: {config_message.id}")

    # 3. Process initial context parts
    text_parts = [part for part in task_description_list if isinstance(part, TextPart)]
    image_parts = [part for part in task_description_list if isinstance(part, ImagePart)]
    informational_parts = [part for part in task_description_list if isinstance(part, InformationalPart)]

    # Add a single hidden UserMessage containing text and images
    if text_parts or image_parts:
        user_msg = self.add_human_message(
            text=[part.text for part in text_parts],
            images=image_parts,
            is_ui_hidden=True,
            timestamp=entry_ts + 0.002 # Ensure order
        )
        if user_msg: # Check if message was actually added (had content)
            task_logger.debug(f"Added hidden UserMessage for context: {user_msg.id}")

    # Add each informational part as an individual message
    for i, part in enumerate(informational_parts):
        info_msg = self.add_informational_message(
            informational_type=part.informational_type or "context",
            details=part.details or "",
            is_visible_ai=part.is_visible_ai,
            is_visible_ui=part.is_visible_ui,
            additional_data=part.additional_data,
            timestamp=entry_ts + 0.003 + (i * 0.001) # Ensure order
        )
        task_logger.debug(f"Added InformationalMessage for context: {info_msg.id}")

    return entry_message
add_task_exit
add_task_exit(handoff_description: Sequence[Union[TextPart, ImagePart]], payloadKey='task_result') -> Tuple[TaskExitMessage, Optional[ToolMessage]]

Completes the current nested task level and potentially updates the triggering tool result.

Adds a TaskExitMessage and attempts to find the ToolCallPart (usually in the parent level's AssistantMessage) that initiated this sub-task. If found, it updates the corresponding ToolResultPart (or adds a new ToolMessage) with the handoff_description.

Parameters:

  • handoff_description (Sequence[Union[TextPart, ImagePart]]) –

    Sequence of Text/Image parts describing the sub-task's outcome.

  • payloadKey

    The key within the tool result's result dictionary where the text handoff description will be stored.

Returns:

  • TaskExitMessage

    A tuple containing:

  • Optional[ToolMessage]
    • The newly created TaskExitMessage.
  • Tuple[TaskExitMessage, Optional[ToolMessage]]
    • The ToolMessage that was updated or created with the result (or None if the triggering tool call could not be reliably identified).
Source code in azad/mind_map.py
def add_task_exit(self, handoff_description: Sequence[Union[TextPart, ImagePart]], payloadKey="task_result") -> Tuple[TaskExitMessage, Optional[ToolMessage]]:
    """Completes the current nested task level and potentially updates the triggering tool result.

    Adds a `TaskExitMessage` and attempts to find the `ToolCallPart` (usually in the
    parent level's `AssistantMessage`) that initiated this sub-task. If found, it
    updates the corresponding `ToolResultPart` (or adds a new `ToolMessage`) with
    the `handoff_description`.

    Args:
        handoff_description: Sequence of Text/Image parts describing the sub-task's outcome.
        payloadKey: The key within the tool result's `result` dictionary where the text
                    handoff description will be stored.

    Returns:
        A tuple containing:
        - The newly created `TaskExitMessage`.
        - The `ToolMessage` that was updated or created with the result (or `None` if
          the triggering tool call could not be reliably identified).
    """
    handoff_description_list = list(handoff_description)
    exit_ts = time.time() # Consistent timestamp

    # Find the entry point of the current task level
    entry_index, _ = self.find_current_task_boundaries()
    triggering_tool_call_part: Optional[ToolCallPart] = None
    triggering_assistant_msg_idx: Optional[int] = None

    # Identify tools marked as task entry/exit in the *current* config
    try:
        current_config = self.current_task_config()
        tool_metadata = current_config.tool_metadata
        task_entry_tool_names = {tool.name for tool in tool_metadata if tool.is_task_entry}
    except ValueError: # Handle case where config might not be found (shouldn't happen)
         task_logger.error("Cannot find current task config during add_task_exit. Cannot identify entry tools.")
         task_entry_tool_names = set()


    if entry_index is None and self.current_task_level() > 0:
         task_logger.warning("Cannot find task entry point for add_task_exit at level > 0. Cannot reliably link to triggering tool.")
         # Proceed without linking
    elif entry_index is not None:
        # Search backwards from just *before* the entry message index
        search_start_index = entry_index - 1
        for i in range(search_start_index, -1, -1):
            message = self.messages[i]
            if isinstance(message, AssistantMessage):
                # First priority: Find a tool call that matches a known task entry tool name
                if task_entry_tool_names:
                    # Find the *last* ToolCallPart in this message whose name matches a task entry tool
                    for part in reversed(message.content):
                        if isinstance(part, ToolCallPart) and part.tool_name in task_entry_tool_names:
                            # Check if this tool call has already been answered
                            answered = False
                            for subsequent_msg in self.messages[i+1:]: # Search messages *after* the assistant message
                                if isinstance(subsequent_msg, ToolMessage):
                                    for res_part in subsequent_msg.content:
                                        if isinstance(res_part, ToolResultPart) and res_part.tool_call_id == part.tool_call_id:
                                            answered = True
                                            break
                                if answered: break

                            if not answered:
                                triggering_tool_call_part = part
                                triggering_assistant_msg_idx = i
                                task_logger.info(f"Found potential triggering tool call (by name): {part.tool_call_id} ({part.tool_name}) in message {message.id}")
                                break # Found the most likely candidate

                # Second priority (fallback): Find the most recent unanswered tool call that's followed immediately by our task entry
                if triggering_tool_call_part is None:
                    # Check if this message is right before the task entry
                    if i == search_start_index:
                        # This message comes right before our task entry, so check all tool calls
                        for part in reversed(message.content):
                            if isinstance(part, ToolCallPart):
                                # Check if this tool call has already been answered
                                answered = False
                                for subsequent_msg in self.messages[i+1:entry_index]: # Search messages *after* this message but *before* the entry
                                    if isinstance(subsequent_msg, ToolMessage):
                                        for res_part in subsequent_msg.content:
                                            if isinstance(res_part, ToolResultPart) and res_part.tool_call_id == part.tool_call_id:
                                                answered = True
                                                break
                                    if answered: break

                                if not answered:
                                    triggering_tool_call_part = part
                                    triggering_assistant_msg_idx = i
                                    task_logger.info(f"Found potential triggering tool call (by sequence): {part.tool_call_id} ({part.tool_name}) in message {message.id}")
                                    break # Found the most likely candidate

            if triggering_tool_call_part:
                break # Stop searching assistant messages once a candidate is found

    # Create and add the task exit message
    exit_message = TaskExitMessage(
        task_id=self.id,
        started_ts=int(exit_ts),
        finished_ts=int(exit_ts), # Exit is instantaneous
        content=[TaskExitPart(handoff_description=handoff_description_list)]
    )
    self.add_message(exit_message)
    task_logger.debug(f"Added TaskExitMessage: {exit_message.id}")

    # Update or create the tool result message if a trigger was found
    updated_tool_message: Optional[ToolMessage] = None
    if triggering_tool_call_part:
        tool_call_id_to_answer = triggering_tool_call_part.tool_call_id
        tool_name_to_answer = triggering_tool_call_part.tool_name

        # Combine text parts for the result payload
        text_result = "".join([p.text for p in handoff_description_list if isinstance(p, TextPart)])
        image_parts_result = [p for p in handoff_description_list if isinstance(p, ImagePart)]

        # Create a standard result structure with status
        result_payload = {
            payloadKey: text_result,
            "status": "completed",  # Add standard status field
            "message": text_result  # Add standard message field
        }
        result_contents = []
        if image_parts_result:
            for img in image_parts_result:
                result_contents.append(ToolResultContent(
                    type="image",
                    image_data=img.image,
                    mime_type=img.mime_type
                ))
        # Add text result as content too if desired
        if text_result:
             result_contents.append(ToolResultContent(type="text", text=text_result))

        # Use answer_tool_call to handle adding the result and updating the call part status
        updated_tool_message = self.answer_tool_call(
            tool_call=triggering_tool_call_part,
            result=result_payload,
            observer_results=[], # No observers in this context
            is_error=False,
            is_approved=True, # Implicitly approved by task completion
            is_loading=False,
            has_feedback=True, # Indicates result came from task exit
            timestamp=exit_ts + 0.001 # Ensure it's after exit msg
        )
        # Update the ToolResultPart's contents list specifically
        if updated_tool_message and updated_tool_message.content:
            tool_result_part = updated_tool_message.content[0]
            if isinstance(tool_result_part, ToolResultPart):
                 tool_result_part.tool_result_contents = result_contents

        task_logger.info(f"Linked TaskExit to ToolCall {tool_call_id_to_answer} via ToolMessage {updated_tool_message.id if updated_tool_message else 'N/A'}.")

    else:
        task_logger.warning("No unambiguous triggering tool call found for task exit. Cannot add corresponding ToolResult message.")

    return exit_message, updated_tool_message
get_message
get_message(message_id: str) -> Optional[Message]

Retrieves a message from the task history by its unique ID.

Parameters:

  • message_id (str) –

    The ID of the message to retrieve.

Returns:

  • Optional[Message]

    The Message object if found, otherwise None.

Source code in azad/mind_map.py
def get_message(self, message_id: str) -> Optional[Message]:
    """Retrieves a message from the task history by its unique ID.

    Args:
        message_id: The ID of the message to retrieve.

    Returns:
        The `Message` object if found, otherwise `None`.
    """
    for m in self.messages:
        if m.id == message_id:
            return m
    task_logger.warning(f"No message found with ID {message_id}")
    return None
get_current_config
get_current_config() -> TaskConfigPart

Gets the most recently added TaskConfigPart from the entire message history.

Searches backwards through all messages. This is a fallback or general-purpose config retrieval method. Use current_task_config for level-specific config.

Returns:

Raises:

  • ValueError

    If no TaskConfigMessage exists anywhere in the history.

Source code in azad/mind_map.py
def get_current_config(self) -> TaskConfigPart:
    """Gets the most recently added `TaskConfigPart` from the entire message history.

    Searches backwards through all messages. This is a fallback or general-purpose
    config retrieval method. Use `current_task_config` for level-specific config.

    Returns:
        The last found `TaskConfigPart`.

    Raises:
        ValueError: If no `TaskConfigMessage` exists anywhere in the history.
    """
    for message in reversed(self.messages):
        if isinstance(message, TaskConfigMessage) and message.content:
            # Ensure content is not empty and contains the expected part
            if isinstance(message.content[0], TaskConfigPart):
                 return message.content[0]
            else:
                 task_logger.warning(f"TaskConfigMessage {message.id} has invalid content type: {type(message.content[0])}")

    task_logger.error("No valid TaskConfigMessage found in the entire task history.")
    raise ValueError("No task configuration message found in history.")
add_task_config
add_task_config(part: TaskConfigPart, timestamp: Optional[float] = None) -> TaskConfigMessage

Adds a new TaskConfigMessage to the history.

Parameters:

  • part (TaskConfigPart) –

    The TaskConfigPart containing the configuration settings.

  • timestamp (Optional[float], default: None ) –

    Optional timestamp (Unix epoch float) for the message creation. Defaults to now.

Returns:

Source code in azad/mind_map.py
def add_task_config(self, part: TaskConfigPart, timestamp: Optional[float] = None) -> TaskConfigMessage:
    """Adds a new `TaskConfigMessage` to the history.

    Args:
        part: The `TaskConfigPart` containing the configuration settings.
        timestamp: Optional timestamp (Unix epoch float) for the message creation. Defaults to now.

    Returns:
        The newly created `TaskConfigMessage`.
    """
    ts = int(timestamp if timestamp is not None else time.time())
    new_message = TaskConfigMessage(
        task_id=self.id,
        started_ts=ts,
        finished_ts=ts, # Config addition is instantaneous
        content=[part]
    )
    self.add_message(new_message)
    return new_message
add_human_message
add_human_message(text: Optional[Union[str, List[str]]] = None, images: Optional[List[ImagePart]] = None, is_ui_hidden: bool = False, timestamp: Optional[float] = None) -> Optional[UserMessage]

Adds a new UserMessage to the history.

Parameters:

  • text (Optional[Union[str, List[str]]], default: None ) –

    A single string or list of strings for text content.

  • images (Optional[List[ImagePart]], default: None ) –

    A list of ImagePart objects for image content.

  • is_ui_hidden (bool, default: False ) –

    Whether to mark the message as potentially hidden in the UI.

  • timestamp (Optional[float], default: None ) –

    Optional timestamp (Unix epoch float) for message creation. Defaults to now.

Returns:

  • Optional[UserMessage]

    The newly created UserMessage if it has content, otherwise None.

Source code in azad/mind_map.py
def add_human_message(self,
                      text: Optional[Union[str, List[str]]] = None,
                      images: Optional[List[ImagePart]] = None,
                      is_ui_hidden: bool = False,
                      timestamp: Optional[float] = None
                      ) -> Optional[UserMessage]: # Return Optional UserMessage
    """Adds a new `UserMessage` to the history.

    Args:
        text: A single string or list of strings for text content.
        images: A list of `ImagePart` objects for image content.
        is_ui_hidden: Whether to mark the message as potentially hidden in the UI.
        timestamp: Optional timestamp (Unix epoch float) for message creation. Defaults to now.

    Returns:
        The newly created `UserMessage` if it has content, otherwise `None`.
    """
    content: UserContent = []
    if isinstance(text, list):
        for t in text:
            if t is not None and isinstance(t, str): # Ensure text item is not None and str
                 content.append(TextPart(text=t))
    elif isinstance(text, str):
        content.append(TextPart(text=text))

    if images:
        valid_images = [img for img in images if img is not None and isinstance(img, ImagePart)]
        content.extend(valid_images)

    if not content:
         task_logger.warning("Attempted to add a human message with no valid text or image content. Skipping.")
         return None # Return None if no content was added

    ts = int(timestamp if timestamp is not None else time.time())
    new_message = UserMessage(
        task_id=self.id,
        content=content,
        started_ts=ts,
        is_ui_hidden=is_ui_hidden
        # finished_ts will be set by the next call to add_message
    )
    self.add_message(new_message)
    return new_message
add_message
add_message(message: Message)

Appends a message to the task's history list and updates timestamps.

Sets the finished_ts of the previous message to the started_ts of the new message (or current time, whichever is earlier, but not before the previous message's start time). Also ensures the new message has task_id and started_ts set.

Parameters:

  • message (Message) –

    The Message object to add.

Source code in azad/mind_map.py
def add_message(self, message: Message):
    """Appends a message to the task's history list and updates timestamps.

    Sets the `finished_ts` of the *previous* message to the `started_ts`
    of the new message (or current time, whichever is earlier, but not before
    the previous message's start time). Also ensures the new message has
    `task_id` and `started_ts` set.

    Args:
        message: The `Message` object to add.
    """
    current_time = int(time.time())

    # Ensure message has required fields before potentially using them
    if not hasattr(message, 'task_id') or not message.task_id:
         message.task_id = self.id
    if not hasattr(message, 'started_ts') or not message.started_ts:
         message.started_ts = current_time # Use current time if missing

    # Add finished_ts to the previous message
    if self.messages:
        last_message = self.messages[-1]
        if last_message.finished_ts is None:
            # Use the start time of the new message, clamped by current time and last message start
            finish_time = min(message.started_ts, current_time)
            last_message.finished_ts = max(last_message.started_ts, finish_time)

    self.messages.append(message)
    task_logger.debug(f"Added {message.role.value} message: {message.id} at index {len(self.messages)-1}")
add_informational_message
add_informational_message(informational_type: str, details: str, is_visible_ai: bool = False, is_visible_ui: bool = False, additional_data: Optional[ANYJSON] | Optional[List[ANYJSON]] = None, timestamp: Optional[float] = None) -> InformationalMessage

Adds a new InformationalMessage to the history.

Parameters:

  • informational_type (str) –

    Category of the information.

  • details (str) –

    Human-readable details.

  • is_visible_ai (bool, default: False ) –

    Should the AI see this?

  • is_visible_ui (bool, default: False ) –

    Should the UI display this?

  • additional_data (Optional[ANYJSON] | Optional[List[ANYJSON]], default: None ) –

    Optional structured data.

  • timestamp (Optional[float], default: None ) –

    Optional creation timestamp (Unix epoch float). Defaults to now.

Returns:

Source code in azad/mind_map.py
def add_informational_message(
    self,
    informational_type: str,
    details: str,
    is_visible_ai: bool = False,
    is_visible_ui: bool = False,
    additional_data: Optional[ANYJSON] | Optional[List[ANYJSON]] = None,
    timestamp: Optional[float] = None
) -> InformationalMessage:
    """Adds a new `InformationalMessage` to the history.

    Args:
        informational_type: Category of the information.
        details: Human-readable details.
        is_visible_ai: Should the AI see this?
        is_visible_ui: Should the UI display this?
        additional_data: Optional structured data.
        timestamp: Optional creation timestamp (Unix epoch float). Defaults to now.

    Returns:
        The newly created `InformationalMessage`.
    """
    informational_part = InformationalPart(
        is_visible_ai=is_visible_ai,
        is_visible_ui=is_visible_ui,
        informational_type=informational_type,
        details=details,
        additional_data=additional_data
    )

    ts = int(timestamp if timestamp is not None else time.time())
    new_message = InformationalMessage(
        task_id=self.id,
        started_ts=ts,
        finished_ts=ts, # Informational is instantaneous
        content=[informational_part]
    )

    self.add_message(new_message)
    return new_message
add_text_part
add_text_part(message_id: str, text: str) -> bool

Appends a TextPart to the content of an existing message.

Only applicable to messages whose content list supports TextPart (System, User, Assistant).

Parameters:

  • message_id (str) –

    The ID of the message to modify.

  • text (str) –

    The text content to add.

Returns:

  • bool

    True if the part was added successfully, False otherwise (e.g., message

  • bool

    not found, wrong role, content not a list).

Source code in azad/mind_map.py
def add_text_part(self, message_id: str, text: str) -> bool:
    """Appends a `TextPart` to the content of an existing message.

    Only applicable to messages whose content list supports `TextPart`
    (System, User, Assistant).

    Args:
        message_id: The ID of the message to modify.
        text: The text content to add.

    Returns:
        `True` if the part was added successfully, `False` otherwise (e.g., message
        not found, wrong role, content not a list).
    """
    message = self.get_message(message_id)
    if message is None: return False
    if not text: # Don't add empty text parts
         task_logger.warning(f"Attempted to add empty text part to message {message_id}.")
         return False

    text_part = TextPart(text=text)

    # Check if the message type allows TextPart in its content list definition
    # This relies on the Content Type aliases defined earlier
    allowed_types = (SystemMessage, UserMessage, AssistantMessage)

    if isinstance(message, allowed_types):
        # Ensure content is a mutable list
        if isinstance(message.content, list):
             message.content.append(text_part)
             # If adding to Assistant, ensure status reflects ongoing generation if needed
             if isinstance(message, AssistantMessage) and message.status == "completed":
                 message.status = "loading" # Or handle based on context
             return True
        else:
             task_logger.error(f"Cannot add text part: Message {message_id} content is not a list ({type(message.content)}).")
             return False
    else:
        task_logger.warning(f"Cannot add text part to message {message_id} with role {message.role}")
        return False
add_reasoning_part
add_reasoning_part(message_id: str, reasoning: str, signature: Optional[str]) -> bool

Appends a ReasoningPart to the content of an existing AssistantMessage.

Parameters:

  • message_id (str) –

    The ID of the AssistantMessage to modify.

  • reasoning (str) –

    The reasoning text.

  • signature (Optional[str]) –

    Optional signature for the reasoning.

Returns:

  • bool

    True if the part was added successfully, False otherwise.

Source code in azad/mind_map.py
def add_reasoning_part(self, message_id: str, reasoning: str, signature: Optional[str]) -> bool:
    """Appends a `ReasoningPart` to the content of an existing `AssistantMessage`.

    Args:
        message_id: The ID of the `AssistantMessage` to modify.
        reasoning: The reasoning text.
        signature: Optional signature for the reasoning.

    Returns:
        `True` if the part was added successfully, `False` otherwise.
    """
    message = self.get_message(message_id)
    if message is None: return False
    if not reasoning: # Don't add empty reasoning
         task_logger.warning(f"Attempted to add empty reasoning part to message {message_id}.")
         return False

    if isinstance(message, AssistantMessage):
         reasoning_part = ReasoningPart(
             reasoning=reasoning,
             signature=signature,
             started_ts=time.time() # Mark start time when added
         )
         if isinstance(message.content, list):
              # Optionally finish previous reasoning part's timestamp
              for part in reversed(message.content):
                   if isinstance(part, ReasoningPart) and part.finished_ts is None:
                        part.finished_ts = reasoning_part.started_ts
                        break
              message.content.append(reasoning_part)
              return True
         else:
              task_logger.error(f"Cannot add reasoning part: Message {message_id} content is not a list ({type(message.content)}).")
              return False
    else:
        task_logger.warning(f"Cannot add reasoning part to message {message_id} with role {message.role}")
        return False
add_tool_call_part
add_tool_call_part(message_id: str, tool_call: ToolCallPart) -> bool

Appends a ToolCallPart to the content of an existing AssistantMessage.

Also sets the AssistantMessage status back to 'loading' if it was 'completed'.

Parameters:

  • message_id (str) –

    The ID of the AssistantMessage to modify.

  • tool_call (ToolCallPart) –

    The ToolCallPart object to add.

Returns:

  • bool

    True if the part was added successfully, False otherwise.

Source code in azad/mind_map.py
def add_tool_call_part(self, message_id: str, tool_call: ToolCallPart) -> bool:
    """Appends a `ToolCallPart` to the content of an existing `AssistantMessage`.

    Also sets the AssistantMessage status back to 'loading' if it was 'completed'.

    Args:
        message_id: The ID of the `AssistantMessage` to modify.
        tool_call: The `ToolCallPart` object to add.

    Returns:
        `True` if the part was added successfully, `False` otherwise.
    """
    message = self.get_message(message_id)
    if message is None: return False
    if not isinstance(tool_call, ToolCallPart):
         task_logger.error(f"Invalid object provided to add_tool_call_part: expected ToolCallPart, got {type(tool_call)}")
         return False

    if isinstance(message, AssistantMessage):
        if isinstance(message.content, list):
            # Optionally finish previous reasoning part's timestamp
            for part in reversed(message.content):
                if isinstance(part, ReasoningPart) and part.finished_ts is None:
                     part.finished_ts = time.time()
                     break
            message.content.append(tool_call)
            # Ensure assistant message status reflects pending tool call
            if message.status != "loading":
                 message.status = "loading"
            return True
        else:
            task_logger.error(f"Cannot add tool call part: Message {message_id} content is not a list ({type(message.content)}).")
            return False
    else:
        task_logger.warning(f"Cannot add tool call part to message {message_id} with role {message.role}")
        return False
add_assistant_message
add_assistant_message(id: Optional[str] = None, timestamp: Optional[float] = None) -> AssistantMessage

Adds a new, empty AssistantMessage to the history, ready to be populated.

Sets the initial status to 'loading' and fetches the model name from the current task configuration.

Parameters:

  • id (Optional[str], default: None ) –

    Optional specific ID for the message. If None, a nanoid is generated.

  • timestamp (Optional[float], default: None ) –

    Optional creation timestamp (Unix epoch float). Defaults to now.

Returns:

Raises:

  • ValueError

    If the current task configuration cannot be determined.

Source code in azad/mind_map.py
def add_assistant_message(self, id: Optional[str] = None, timestamp: Optional[float] = None) -> AssistantMessage:
    """Adds a new, empty `AssistantMessage` to the history, ready to be populated.

    Sets the initial status to 'loading' and fetches the model name from the
    current task configuration.

    Args:
        id: Optional specific ID for the message. If None, a nanoid is generated.
        timestamp: Optional creation timestamp (Unix epoch float). Defaults to now.

    Returns:
        The newly created `AssistantMessage`.

    Raises:
        ValueError: If the current task configuration cannot be determined.
    """
    ts = int(timestamp if timestamp is not None else time.time())
    message_id = id if id else nanoid.generate()

    try:
        current_config = self.current_task_config() # Get config relevant to current level
        model_name = current_config.model_name
    except ValueError as e:
         task_logger.error(f"Cannot determine current model config to add assistant message: {e}")
         # Handle error appropriately - perhaps raise, or use a default model name?
         raise ValueError("Could not determine model configuration for new Assistant message.") from e


    new_message = AssistantMessage(
        task_id=self.id,
        id=message_id,
        started_ts=ts,
        status="loading", # Explicitly start as loading
        model_id=model_name,
    )
    self.add_message(new_message)
    return new_message
add_system_message
add_system_message(text: str, id: Optional[str] = None, timestamp: Optional[float] = None) -> SystemMessage

Adds a new SystemMessage with the given text content.

Parameters:

  • text (str) –

    The text content for the system message.

  • id (Optional[str], default: None ) –

    Optional specific ID for the message. If None, a nanoid is generated.

  • timestamp (Optional[float], default: None ) –

    Optional creation timestamp (Unix epoch float). Defaults to now.

Returns:

Source code in azad/mind_map.py
def add_system_message(self, text: str, id: Optional[str] = None, timestamp: Optional[float] = None) -> SystemMessage:
    """Adds a new `SystemMessage` with the given text content.

    Args:
        text: The text content for the system message.
        id: Optional specific ID for the message. If None, a nanoid is generated.
        timestamp: Optional creation timestamp (Unix epoch float). Defaults to now.

    Returns:
        The newly created `SystemMessage`.
    """
    ts = int(timestamp if timestamp is not None else time.time())
    message_id = id if id else nanoid.generate()
    if not text:
         task_logger.warning("Attempted to add SystemMessage with empty text.")
         # Decide if this is allowed or should raise an error/return None

    new_message = SystemMessage(
        task_id=self.id,
        id=message_id,
        started_ts=ts,
        finished_ts=ts, # System message is instantaneous
        content=[TextPart(text=text)]
    )
    self.add_message(new_message)
    return new_message
add_tool_result_message
add_tool_result_message(tool_call_id: str, tool_name: str, result: ANYJSON, observer_results: List[ObserverResult], is_error: bool, is_approved: bool, tool_result_contents: List[ToolResultContent] = [], error_message: Optional[str] = None, is_loading: bool = False, has_feedback: bool = False, timestamp: Optional[float] = None) -> ToolMessage

Creates and adds a new ToolMessage containing a single ToolResultPart.

This is the standard way to record a tool's execution result.

Parameters:

  • tool_call_id (str) –

    The ID of the ToolCallPart this result corresponds to.

  • tool_name (str) –

    The name of the tool that was executed.

  • result (ANYJSON) –

    The primary JSON result data from the tool.

  • observer_results (List[ObserverResult]) –

    List of results from observers monitoring the execution.

  • is_error (bool) –

    Boolean indicating if the tool execution failed.

  • is_approved (bool) –

    Boolean indicating if the tool execution was approved.

  • tool_result_contents (List[ToolResultContent], default: [] ) –

    Optional list of displayable content parts (text/image) generated by the tool.

  • error_message (Optional[str], default: None ) –

    Optional error description if is_error is True.

  • is_loading (bool, default: False ) –

    Should typically be False when adding the result.

  • has_feedback (bool, default: False ) –

    Optional flag indicating if user feedback exists for this result.

  • timestamp (Optional[float], default: None ) –

    Optional creation timestamp (Unix epoch float). Defaults to now.

Returns:

Source code in azad/mind_map.py
def add_tool_result_message(
    self,
    tool_call_id: str,
    tool_name: str,
    result: ANYJSON,
    observer_results: List[ObserverResult],
    is_error: bool,
    is_approved: bool,
    tool_result_contents: List[ToolResultContent] = [], # Added parameter
    error_message: Optional[str] = None, # Added parameter
    is_loading: bool = False,
    has_feedback: bool = False,
    timestamp: Optional[float] = None
) -> ToolMessage:
    """Creates and adds a *new* `ToolMessage` containing a single `ToolResultPart`.

    This is the standard way to record a tool's execution result.

    Args:
        tool_call_id: The ID of the `ToolCallPart` this result corresponds to.
        tool_name: The name of the tool that was executed.
        result: The primary JSON result data from the tool.
        observer_results: List of results from observers monitoring the execution.
        is_error: Boolean indicating if the tool execution failed.
        is_approved: Boolean indicating if the tool execution was approved.
        tool_result_contents: Optional list of displayable content parts (text/image)
                              generated by the tool.
        error_message: Optional error description if `is_error` is True.
        is_loading: Should typically be False when adding the result.
        has_feedback: Optional flag indicating if user feedback exists for this result.
        timestamp: Optional creation timestamp (Unix epoch float). Defaults to now.

    Returns:
        The newly created `ToolMessage`.
    """
    tool_result_part = ToolResultPart(
        tool_call_id=tool_call_id,
        tool_name=tool_name,
        result=result,
        observer_results=observer_results or [],
        is_error=is_error,
        is_approved=is_approved,
        is_loading=is_loading, # Typically False
        has_feedback=has_feedback,
        tool_result_contents=tool_result_contents or [], # Use provided contents
        error=error_message if is_error else None # Use provided error message
    )

    ts = int(timestamp if timestamp is not None else time.time())
    new_message = ToolMessage(
        task_id=self.id,
        started_ts=ts,
        finished_ts=ts, # Tool result addition is instantaneous
        content=[tool_result_part]
    )

    self.add_message(new_message)
    return new_message
add_compression_message
add_compression_message(start_idx: int, end_idx: int, reason: str, strategies: List[str], compressed_message_ids: List[str], kept_message_ids: List[str], cache: Dict[str, Any] = {}, metadata: Optional[Dict[str, Any]] = None, timestamp: Optional[float] = None) -> CompressionMessage

Adds a CompressionMessage to record a history compression event.

Parameters:

  • start_idx (int) –

    Start index of the compressed message range (original list).

  • end_idx (int) –

    End index of the compressed message range (original list, inclusive).

  • reason (str) –

    Reason for compression (e.g., 'token_limit').

  • strategies (List[str]) –

    List of compression strategies applied.

  • compressed_message_ids (List[str]) –

    IDs of messages removed/replaced.

  • kept_message_ids (List[str]) –

    IDs of messages within the range that were kept.

  • cache (Dict[str, Any], default: {} ) –

    Optional cached data related to the compression (e.g., summary).

  • metadata (Optional[Dict[str, Any]], default: None ) –

    Optional additional metadata.

  • timestamp (Optional[float], default: None ) –

    Optional creation timestamp (Unix epoch float). Defaults to now.

Returns:

Source code in azad/mind_map.py
def add_compression_message(
    self,
    start_idx: int,
    end_idx: int,
    reason: str,
    strategies: List[str],
    compressed_message_ids: List[str],
    kept_message_ids: List[str],
    cache: Dict[str, Any] = {},
    metadata: Optional[Dict[str, Any]] = None,
    timestamp: Optional[float] = None
) -> CompressionMessage:
    """Adds a `CompressionMessage` to record a history compression event.

    Args:
        start_idx: Start index of the compressed message range (original list).
        end_idx: End index of the compressed message range (original list, inclusive).
        reason: Reason for compression (e.g., 'token_limit').
        strategies: List of compression strategies applied.
        compressed_message_ids: IDs of messages removed/replaced.
        kept_message_ids: IDs of messages within the range that were kept.
        cache: Optional cached data related to the compression (e.g., summary).
        metadata: Optional additional metadata.
        timestamp: Optional creation timestamp (Unix epoch float). Defaults to now.

    Returns:
        The newly created `CompressionMessage`.
    """
    compression_part = CompressionPart(
        start_message_idx=start_idx,
        end_message_idx=end_idx,
        reason=reason,
        strategies=strategies or [],
        compressed_message_ids=compressed_message_ids or [],
        kept_message_ids=kept_message_ids or [],
        cache=cache if cache is not None else {},
        metadata=metadata
    )

    ts = int(timestamp if timestamp is not None else time.time())
    new_message = CompressionMessage(
        task_id=self.id,
        started_ts=ts,
        finished_ts=ts, # Compression message is instantaneous
        content=[compression_part]
    )

    self.add_message(new_message)
    return new_message
answer_tool_call
answer_tool_call(tool_call: ToolCallPart, result: ANYJSON, observer_results: List[ObserverResult], is_error: bool, is_approved: bool, tool_result_contents: List[ToolResultContent] = [], error_message: Optional[str] = None, is_loading: bool = False, has_feedback: bool = False, timestamp: Optional[float] = None) -> ToolMessage

Convenience method to add a tool result and update the original call part's status.

Finds the AssistantMessage containing the tool_call and updates the is_loading and is_approval_pending flags on the ToolCallPart within it. Then, calls add_tool_result_message to add the actual result.

Parameters:

  • tool_call (ToolCallPart) –

    The original ToolCallPart object that is being answered.

  • result (ANYJSON) –

    The primary JSON result data from the tool.

  • observer_results (List[ObserverResult]) –

    List of results from observers.

  • is_error (bool) –

    Boolean indicating if the execution failed.

  • is_approved (bool) –

    Boolean indicating if the execution was approved.

  • tool_result_contents (List[ToolResultContent], default: [] ) –

    Optional list of displayable content parts from the tool.

  • error_message (Optional[str], default: None ) –

    Optional error description if is_error is True.

  • is_loading (bool, default: False ) –

    Status for the result itself (usually False).

  • has_feedback (bool, default: False ) –

    Optional flag indicating if user feedback exists.

  • timestamp (Optional[float], default: None ) –

    Optional creation timestamp (Unix epoch float). Defaults to now.

Returns:

  • ToolMessage

    The newly created ToolMessage containing the result.

Raises:

  • TypeError

    If the tool_call argument is not a ToolCallPart instance.

  • ValueError

    If the original ToolCallPart cannot be found in the history.

Source code in azad/mind_map.py
def answer_tool_call(
    self,
    tool_call: ToolCallPart,
    result: ANYJSON,
    observer_results: List[ObserverResult],
    is_error: bool,
    is_approved: bool,
    tool_result_contents: List[ToolResultContent] = [], # Added parameter
    error_message: Optional[str] = None, # Added parameter
    is_loading: bool = False,
    has_feedback: bool = False,
    # sent_to_llm: bool = True, # This arg wasn't used, removed for clarity
    timestamp: Optional[float] = None
) -> ToolMessage:
    """Convenience method to add a tool result and update the original call part's status.

    Finds the `AssistantMessage` containing the `tool_call` and updates the
    `is_loading` and `is_approval_pending` flags on the `ToolCallPart` within it.
    Then, calls `add_tool_result_message` to add the actual result.

    Args:
        tool_call: The original `ToolCallPart` object that is being answered.
        result: The primary JSON result data from the tool.
        observer_results: List of results from observers.
        is_error: Boolean indicating if the execution failed.
        is_approved: Boolean indicating if the execution was approved.
        tool_result_contents: Optional list of displayable content parts from the tool.
        error_message: Optional error description if `is_error` is True.
        is_loading: Status for the result itself (usually False).
        has_feedback: Optional flag indicating if user feedback exists.
        timestamp: Optional creation timestamp (Unix epoch float). Defaults to now.

    Returns:
        The newly created `ToolMessage` containing the result.

    Raises:
        TypeError: If the `tool_call` argument is not a `ToolCallPart` instance.
        ValueError: If the original `ToolCallPart` cannot be found in the history.
    """
    if not isinstance(tool_call, ToolCallPart):
         raise TypeError(f"answer_tool_call expects a ToolCallPart instance for 'tool_call', got {type(tool_call)}")

    # Update the status of the original ToolCallPart in the AssistantMessage
    updated_call_part_status = False
    for msg in reversed(self.messages): # Search backwards is efficient
        if isinstance(msg, AssistantMessage):
             # Check if message content is list before iterating
             if isinstance(msg.content, list):
                  for i, part in enumerate(msg.content):
                       if isinstance(part, ToolCallPart) and part.tool_call_id == tool_call.tool_call_id:
                            # Update status flags in place
                            part.is_loading = is_loading # Mark call part as no longer loading
                            part.is_approval_pending = False # Approval (or denial) has happened
                            updated_call_part_status = True
                            task_logger.debug(f"Updated ToolCallPart {tool_call.tool_call_id} status in message {msg.id}")
                            break # Stop searching parts in this message
             if updated_call_part_status:
                  break # Stop searching messages once updated

    if not updated_call_part_status:
         # Raise an error because failing to update the status indicates a potential logic issue
         task_logger.error(f"Could not find original ToolCallPart with ID {tool_call.tool_call_id} to update its status.")
         raise ValueError(f"Original ToolCallPart with ID {tool_call.tool_call_id} not found in task history.")


    # Add the new ToolMessage with the result
    return self.add_tool_result_message(
        tool_call_id=tool_call.tool_call_id,
        tool_name=tool_call.tool_name,
        result=result,
        observer_results=observer_results,
        is_error=is_error,
        is_approved=is_approved,
        tool_result_contents=tool_result_contents, # Pass through
        error_message=error_message,             # Pass through
        is_loading=is_loading, # Pass through loading status for the result itself
        has_feedback=has_feedback,
        timestamp=timestamp
    )
get_updates
get_updates(since_message_id: Optional[str]) -> List[Message]

Retrieves all messages added to the history after a specified message ID.

Parameters:

  • since_message_id (Optional[str]) –

    The ID of the message after which to retrieve updates. If None or empty, returns all messages in the task.

Returns:

  • List[Message]

    A list of Message objects added after the message with since_message_id.

  • List[Message]

    Returns all messages if since_message_id is None or not found.

Source code in azad/mind_map.py
def get_updates(self, since_message_id: Optional[str]) -> List[Message]:
    """Retrieves all messages added to the history *after* a specified message ID.

    Args:
        since_message_id: The ID of the message after which to retrieve updates.
                          If `None` or empty, returns all messages in the task.

    Returns:
        A list of `Message` objects added after the message with `since_message_id`.
        Returns all messages if `since_message_id` is None or not found.
    """
    if not since_message_id:
        return list(self.messages) # Return a copy

    try:
        # Find index + 1 of the message with since_message_id
        idx = next(i for i, msg in enumerate(self.messages) if msg.id == since_message_id)
        return list(self.messages[idx+1:]) # Return slice (copy)
    except StopIteration:
         # If since_message_id is not found, return all messages (as a copy)
         task_logger.warning(f"get_updates: since_message_id {since_message_id} not found. Returning all messages.")
         return list(self.messages)

Functions

create_message_from_dict

create_message_from_dict(data: Dict[str, Any]) -> Message

Creates the appropriate Message subclass instance from a dictionary.

Uses the 'role' field in the dictionary to determine which message class (e.g., UserMessage, AssistantMessage) to instantiate and validate against.

Parameters:

  • data (Dict[str, Any]) –

    A dictionary containing message data, including a 'role' key.

Returns:

  • Message

    An instance of the corresponding Message subclass.

Raises:

  • ValueError

    If the 'role' key is missing, unknown, or if the data fails validation against the determined message class.

Source code in azad/mind_map.py
def create_message_from_dict(data: Dict[str, Any]) -> Message:
    """Creates the appropriate `Message` subclass instance from a dictionary.

    Uses the 'role' field in the dictionary to determine which message class
    (e.g., `UserMessage`, `AssistantMessage`) to instantiate and validate against.

    Args:
        data: A dictionary containing message data, including a 'role' key.

    Returns:
        An instance of the corresponding `Message` subclass.

    Raises:
        ValueError: If the 'role' key is missing, unknown, or if the data fails
                    validation against the determined message class.
    """
    role = data.get("role")
    if not role:
        raise ValueError(f"Message data missing 'role' field: {data}")

    # Map roles to their respective classes
    message_classes = {
        MessageRole.system.value: SystemMessage,
        MessageRole.user.value: UserMessage,
        MessageRole.assistant.value: AssistantMessage,
        MessageRole.tool.value: ToolMessage,
        MessageRole.informational.value: InformationalMessage,
        MessageRole.compression.value: CompressionMessage,
        MessageRole.taskconfig.value: TaskConfigMessage,
        MessageRole.taskentry.value: TaskEntryMessage,
        MessageRole.taskexit.value: TaskExitMessage,
    }

    model_class = message_classes.get(role)

    if model_class:
        try:
            # Validate the dictionary against the determined Pydantic model
            return model_class.model_validate(data)
        except Exception as e:
            task_logger.error(f"Failed to create message from dict with role {role}: {str(e)}. Data: {data}")
            # Re-raise to indicate failure during creation
            raise ValueError(f"Failed to validate message data for role {role}: {str(e)}") from e
    else:
        task_logger.warning(f"Unknown message role '{role}' encountered during dict creation.")
        raise ValueError(f"Unknown message role: {role}")