Skip to content

azad.compression.core Module

azad.compression.core

Core compression framework.

This module defines the core interfaces and classes for the compression system, including type definitions, the CompressionStrategy interface, and the Compressor class for managing the compression process.

Attributes

Classes

CompressionStrategyType

Bases: str, Enum

Enum of supported compression strategy types.

Attributes
TOOL_COMPRESSION class-attribute instance-attribute
TOOL_COMPRESSION = 'tool_compression'
TRUNCATE_AND_SUMMARIZE class-attribute instance-attribute
TRUNCATE_AND_SUMMARIZE = 'truncate_and_summarize'
COMPACT_SUMMARIZATION class-attribute instance-attribute
COMPACT_SUMMARIZATION = 'compact_summarization'

BaseCompressionConfig

Bases: BaseModel

Base configuration for all compression strategies.

CompressionCheckpoint

Bases: BaseModel

Represents a compression checkpoint. A checkpoint marks a point in the message history where compression has been applied. It contains metadata about what was compressed and how.

Attributes
timestamp class-attribute instance-attribute
timestamp: int = Field(default_factory=lambda: int(time()))
compressed_message_ids class-attribute instance-attribute
compressed_message_ids: List[str] = Field(default_factory=list)
kept_message_ids class-attribute instance-attribute
kept_message_ids: List[str] = Field(default_factory=list)
metadata class-attribute instance-attribute
metadata: Dict[str, Any] = Field(default_factory=dict)
Functions
is_message_compressed
is_message_compressed(message_id: str) -> bool

Check if a message was compressed in this checkpoint.

Source code in azad/compression/core.py
def is_message_compressed(self, message_id: str) -> bool:
    """Check if a message was compressed in this checkpoint."""
    return message_id in self.compressed_message_ids

CompressionStrategy

Bases: ABC

Interface for compression strategies.

A compression strategy is responsible for compressing a task's messages. All strategies must implement the strategy_type property and compress method. Strategies must respect task boundaries and only operate on messages within the current task level.

Attributes
strategy_type abstractmethod property
strategy_type: CompressionStrategyType

Get the type of this compression strategy.

Functions
compress abstractmethod async
compress(task: Task, new_checkpoint: Optional[CompressionCheckpoint], config: CompressionConfig) -> List[Message]

Compress the messages in a task.

This method should implement the compression logic for the strategy. It must update the checkpoint with which messages were compressed or kept. It must respect task boundaries and only compress messages within the current task level.

Important: Implementations MUST use task.current_task_messages() instead of task.messages directly to ensure proper handling of nested mindmaps.

Parameters:

  • task (Task) –

    The task containing messages to compress

  • new_checkpoint (Optional[CompressionCheckpoint]) –

    The checkpoint to update (None for transform only)

  • config (CompressionConfig) –

    Configuration for compression

Returns:

  • List[Message]

    The compressed messages list for the current task level

Source code in azad/compression/core.py
@abstractmethod
async def compress(self, task: Task, new_checkpoint: Optional[CompressionCheckpoint], config: CompressionConfig) -> List[Message]:
    """Compress the messages in a task.

    This method should implement the compression logic for the strategy.
    It must update the checkpoint with which messages were compressed or kept.
    It must respect task boundaries and only compress messages within the current task level.

    Important: Implementations MUST use task.current_task_messages() instead of
    task.messages directly to ensure proper handling of nested mindmaps.

    Args:
        task: The task containing messages to compress
        new_checkpoint: The checkpoint to update (None for transform only)
        config: Configuration for compression

    Returns:
        The compressed messages list for the current task level
    """
    pass

Compressor

Compressor(strategy: CompressionStrategy, config: CompressionConfig)

Main class for compressing messages in a task.

This class manages the compression process, applying a specific strategy to compress messages in a task. It ensures that only one compression strategy is active at a time and tracks which messages were included in each compression.

It provides utility methods for working with task boundaries to ensure proper handling of nested mindmaps. All methods respect task boundaries and only operate on messages within the current task level.

Initialize the compressor with a strategy.

Parameters:

Source code in azad/compression/core.py
def __init__(self, strategy: CompressionStrategy, config: CompressionConfig) -> None:
    """Initialize the compressor with a strategy.

    Args:
        strategy: The compression strategy to use
        config: Configuration for the compression
    """
    self.strategy = strategy
    self.checkpoints: List[CompressionCheckpoint] = []
    self.config = config
    self.task: Task
    self.logger = logging.getLogger(__name__)
Attributes
strategy instance-attribute
strategy = strategy
logger instance-attribute
logger = getLogger(__name__)
Functions
get_task_messages
get_task_messages() -> List[Message]

Get all messages within the current task boundaries.

This utility method ensures that only messages within the current task level are considered for compression, respecting nested mindmap boundaries.

Returns:

  • List[Message]

    List of messages within the current task level

Source code in azad/compression/core.py
def get_task_messages(self) -> List[Message]:
    """Get all messages within the current task boundaries.

    This utility method ensures that only messages within the current task level
    are considered for compression, respecting nested mindmap boundaries.

    Returns:
        List of messages within the current task level
    """
    return self.task.current_task_messages()
get_task_boundaries
get_task_boundaries() -> Tuple[Optional[int], Optional[int]]

Get the start and end indices of the current task level.

This utility method is used to determine the proper message indices for the current task level, respecting nested mindmap boundaries.

Performance Note: Boundary calculation can be expensive for large message histories.
Consider caching this result if called frequently within the same compression cycle.

Returns:

  • Tuple[Optional[int], Optional[int]]

    Tuple of (start_index, end_index), where end_index is None if the task hasn't ended yet

Source code in azad/compression/core.py
def get_task_boundaries(self) -> Tuple[Optional[int], Optional[int]]:
    """Get the start and end indices of the current task level.

    This utility method is used to determine the proper message indices 
    for the current task level, respecting nested mindmap boundaries.

    # Performance Note: Boundary calculation can be expensive for large message histories.
    # Consider caching this result if called frequently within the same compression cycle.

    Returns:
        Tuple of (start_index, end_index), where end_index is None if the task hasn't ended yet
    """
    return self.task.find_current_task_boundaries()
get_compression_messages_in_task
get_compression_messages_in_task() -> List[CompressionMessage]

Get all compression messages within the current task level.

This utility method filters compression messages to ensure only those relevant to the current task level are considered.

Returns:

Source code in azad/compression/core.py
def get_compression_messages_in_task(self) -> List[CompressionMessage]:
    """Get all compression messages within the current task level.

    This utility method filters compression messages to ensure only those
    relevant to the current task level are considered.

    Returns:
        List of compression messages within the current task level
    """
    current_messages = self.get_task_messages()
    return [msg for msg in current_messages if msg.role == MessageRole.compression]
load
load(task: Task) -> None

Load existing compression checkpoints from a task.

Only loads compression checkpoints that are relevant to the current task level, respecting nested mindmap boundaries.

Parameters:

  • task (Task) –

    The task to load checkpoints from

Source code in azad/compression/core.py
def load(self, task: Task) -> None:
    """Load existing compression checkpoints from a task.

    Only loads compression checkpoints that are relevant to the current task level,
    respecting nested mindmap boundaries.

    Args:
        task: The task to load checkpoints from
    """
    # Store the task
    self.task = task
    # Reset checkpoints
    self.checkpoints = []

    # Find compression messages in the current task level only
    compression_messages = self.get_compression_messages_in_task()

    if not compression_messages:
        return

    # Load checkpoints from compression messages, sorted by timestamp
    for msg in sorted(compression_messages, key=lambda m: m.started_ts):
        for part in msg.content:
            if part.type == "compression" and hasattr(part, "metadata"):
                metadata = part.metadata or {}
                if "checkpoint" in metadata:
                    checkpoint_data = metadata["checkpoint"]
                    checkpoint = CompressionCheckpoint(
                        checkpoint_id=checkpoint_data.get("checkpoint_id", ""),
                        timestamp=checkpoint_data.get("timestamp", 0),
                        compressed_message_ids=checkpoint_data.get("compressed_message_ids", []),
                        kept_message_ids=checkpoint_data.get("kept_message_ids", []),
                        metadata=checkpoint_data.get("metadata", {})
                    )
                    self.checkpoints.append(checkpoint)

    # Log loaded checkpoints
    self.logger.info(f"Loaded {len(self.checkpoints)} compression checkpoints from current task level")
compress async
compress() -> List[Message]

Create a new compression checkpoint.

This method applies the compression strategy to create a new checkpoint and adds a compression message to the task with the checkpoint information. It respects task boundaries and only compresses messages within the current task level.

Returns:

  • List[Message]

    The compressed list of messages for the current task level

Source code in azad/compression/core.py
async def compress(self) -> List[Message]:
    """Create a new compression checkpoint.

    This method applies the compression strategy to create a new checkpoint
    and adds a compression message to the task with the checkpoint information.
    It respects task boundaries and only compresses messages within the current task level.

    Returns:
        The compressed list of messages for the current task level
    """
    # Get the current task messages
    current_messages = self.get_task_messages()

    # Use the last message ID in the current task level for the checkpoint
    last_message_id = ""
    if current_messages:
        last_message_id = current_messages[-1].id

    # Create a new checkpoint
    new_checkpoint = CompressionCheckpoint(
        checkpoint_id=last_message_id,
    )

    # Apply compression strategy (which respects task boundaries)
    transformed_message_list = await self.strategy.compress(self.task, new_checkpoint, self.config)

    # Store the checkpoint
    self.checkpoints.append(new_checkpoint)

    # Add a compression message to the task
    self._add_compression_message(self.task, new_checkpoint, self.config)

    # Return transformed_message_list
    return transformed_message_list
transform async
transform() -> List[Message]

Transform task messages based on the current checkpoints.

This method runs the compress method using the current strategy and cache, forcing the compression to be applied up to the last checkpoint. It respects task boundaries and only operates on messages within the current task level.

Returns:

  • List[Message]

    The transformed list of messages for the current task level only

Source code in azad/compression/core.py
async def transform(self) -> List[Message]:
    """Transform task messages based on the current checkpoints.

    This method runs the compress method using the current strategy and cache,
    forcing the compression to be applied up to the last checkpoint.
    It respects task boundaries and only operates on messages within the current task level.

    Returns:
        The transformed list of messages for the current task level only
    """
    if not self.checkpoints:
        # Return current task messages if no checkpoints exist
        return self.get_task_messages()

    # If task has no messages, return empty list
    if len(self.get_task_messages()) == 0:
        return []

    # Use the compress method but only for messages in the current task level
    return await self.strategy.compress(task=self.task, config=self.config, new_checkpoint=None)