Skip to content

azad.ainetwork.compression_handler Module

azad.ainetwork.compression_handler

Compression handler for AINetwork.

This module provides a simple interface for compressing task message history when context window limits are exceeded.

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

CompressionHandler

CompressionHandler(compressor: Compressor, max_attempts: int = 2)

Handles compression operations for AINetwork.

This class provides a simple interface to handle context window exceeded errors by compressing the conversation history up to a configurable number of attempts.

Initialize the compression handler.

Parameters:

  • max_attempts (int, default: 2 ) –

    Maximum number of compression attempts

Source code in azad/ainetwork/compression_handler.py
def __init__(self,compressor: Compressor, max_attempts: int = 2):
    """
    Initialize the compression handler.

    Args:
        max_attempts: Maximum number of compression attempts
    """
    self.max_attempts = max_attempts
    self.attempts = 0
    self.compressor = compressor
Attributes
max_attempts instance-attribute
max_attempts = max_attempts
compressor instance-attribute
compressor = compressor
Functions
handle_context_exceeded async
handle_context_exceeded(task: Task, error_message: str) -> AsyncGenerator[AINetworkEventUnion, None]

Handle a context window exceeded error by applying compression.

Parameters:

  • task (Task) –

    The task to compress

  • error_message (str) –

    The error message from the context window exceeded error

  • strategy_type

    The compression strategy to use

Yields:

Source code in azad/ainetwork/compression_handler.py
async def handle_context_exceeded(
    self, 
    task: Task, 
    error_message: str,
) -> AsyncGenerator[ainetwork_types.AINetworkEventUnion, None]:
    """
    Handle a context window exceeded error by applying compression.

    Args:
        task: The task to compress
        error_message: The error message from the context window exceeded error
        strategy_type: The compression strategy to use

    Yields:
        Network events related to compression
    """
    # Check if we've reached the maximum number of compression attempts
    if self.attempts >= self.max_attempts:
        logger.warning(f"Maximum compression attempts reached ({self.max_attempts}), failing request")
        yield ainetwork_types.NetworkConnectionFailed(
            message=f"Context window exceeded after {self.attempts} compression attempts: {error_message}",
            errName="ContextWindowExceededError",
            origErrName="ContextWindowExceededError"
        )
        return

    # Increment compression attempt counter
    self.attempts += 1

    logger.info(f"Context window exceeded, attempting compression (attempt {self.attempts}/{self.max_attempts})")

    try:
        self.compressor.load(task)

        # Apply compression - this adds a compression message to the task
        await self.compressor.compress()

        # Find the compression message that was just added
        compression_messages = [msg for msg in task.messages if msg.role == "compression"]
        if not compression_messages:
            logger.warning("No compression message found after compression")
            raise ValueError("Compression failed: No compression message found")

        # Get the latest compression message
        compression_msg = compression_messages[-1]

        # Emit compression event
        yield ainetwork_types.AINetworkEventCompressionTriggered(
            msg=compression_msg
        )

        logger.info(f"Compression applied successfully (attempt {self.attempts}/{self.max_attempts})")

    except Exception as compression_error:
        logger.error(f"Error during compression: {str(compression_error)}")

        # Create an error network event
        yield ainetwork_types.NetworkConnectionFailed(
            message=f"Compression failed: {str(compression_error)}",
            errName="CompressionError",
            origErrName=compression_error.__class__.__name__
        )

        # Reraise the exception
        raise
reset
reset()

Reset the compression attempt counter.

Source code in azad/ainetwork/compression_handler.py
def reset(self):
    """Reset the compression attempt counter."""
    self.attempts = 0

Modules