Skip to content

azad.compression.strategies.truncation Module

azad.compression.strategies.truncation

Truncation compression strategy.

This module provides a deterministic compression strategy that preserves the first human message and the most recent message pairs, removing older messages to fit within context limits.

We can't implement a dynamically updating sliding window as it would break prompt cache every time. To maintain the benefits of caching, we need to keep conversation history static. This operation should be performed as infrequently as possible.

Attributes

Classes

TruncationStrategy

TruncationStrategy()

Bases: CompressionStrategy

Strategy that removes a subset of messages to reduce context size.

This strategy preserves the first human message and the most recent message pairs, while removing others to fit within limits. It is deterministic and replayable, producing the same output given the same input messages and configuration.

When a user reaches a large context, we can assume that the first half is likely irrelevant to their current task. Therefore, this function should only be called when absolutely necessary to fit within context limits, not as a continuous process.

This strategy respects task boundaries and only operates on messages within the current task level.

Initialize the truncation strategy.

Source code in azad/compression/strategies/truncation.py
def __init__(self):
    """Initialize the truncation strategy."""
    self.logger = logging.getLogger(__name__)
Attributes
logger instance-attribute
logger = getLogger(__name__)
strategy_type property
strategy_type: CompressionStrategyType

Get the type of this compression strategy.

Functions
compress
compress(task: Task, new_checkpoint: Optional[CompressionCheckpoint], config: CompressionConfig) -> List[Message]

Compress messages using the truncation strategy.

This method identifies which messages to keep and which to compress based on the truncation configuration. It preserves the first human message and the most recent message pairs based on the configuration.

This method respects task boundaries and only operates on messages within the current task level.

Parameters:

  • task (Task) –

    The task containing messages to compress

  • new_checkpoint (Optional[CompressionCheckpoint]) –

    The checkpoint to update (None for transform only)

  • config (CompressionConfig) –

    Configuration for compression

Returns:

  • List[Message]

    The compressed messages list for the current task level

Source code in azad/compression/strategies/truncation.py
def compress(self, task: Task, new_checkpoint: Optional[CompressionCheckpoint], config: CompressionConfig) -> List[Message]:
    """Compress messages using the truncation strategy.

    This method identifies which messages to keep and which to compress
    based on the truncation configuration. It preserves the first human message
    and the most recent message pairs based on the configuration.

    This method respects task boundaries and only operates on messages within
    the current task level.

    Args:
        task: The task containing messages to compress
        new_checkpoint: The checkpoint to update (None for transform only)
        config: Configuration for compression

    Returns:
        The compressed messages list for the current task level
    """
    # Cast config to TruncationConfig
    trunc_config = cast(TruncationConfig, config)

    # Get all messages for the current task level only
    all_messages = task.current_task_messages()

    # If no messages, nothing to do
    if not all_messages:
        return []

    # Initialize sets for tracking message IDs
    kept_message_ids = set()
    compressed_message_ids = set()

    # Check if we're creating a new checkpoint or using existing ones
    if new_checkpoint is None:
        # Transformation mode: Use existing checkpoints 
        # Look for any existing compression checkpoints in the current task level
        compression_messages = [msg for msg in all_messages if msg.role == MessageRole.compression]

        if compression_messages:
            # Use the most recent checkpoint to determine which messages are compressed
            for msg in all_messages:
                # Skip compression messages
                if msg.role == MessageRole.compression:
                    continue

                # Check if this message was compressed in any checkpoint
                message_compressed = False
                for comp_msg in compression_messages:
                    for part in comp_msg.content:
                        metadata = getattr(part, "metadata", {}) or {}
                        if "checkpoint" in metadata:
                            checkpoint_data = metadata["checkpoint"]
                            if msg.id in checkpoint_data.get("compressed_message_ids", []):
                                message_compressed = True
                                compressed_message_ids.add(msg.id)
                                break
                    if message_compressed:
                        break

                # If not compressed, keep it
                if not message_compressed:
                    kept_message_ids.add(msg.id)
        else:
            # No checkpoints, keep all messages
            for msg in all_messages:
                if msg.role != MessageRole.compression:
                    kept_message_ids.add(msg.id)
    else:
        # Creating a new checkpoint: Apply the truncation strategy

        # STEP 1: Find the first human message to preserve
        first_user_message = self._find_first_user_message(all_messages)
        if first_user_message:
            kept_message_ids.add(first_user_message.id)

        # STEP 2: Find all message pairs (assistant-tool)
        all_pairs = self._find_message_pairs(all_messages)

        # STEP 3: Keep the most recent message pairs based on config
        pairs_to_keep_count = min(trunc_config.preserve_recent_pairs_count, len(all_pairs))
        recent_pairs = all_pairs[-pairs_to_keep_count:] if pairs_to_keep_count > 0 else []

        # Add message IDs from recent pairs to kept list
        for pair in recent_pairs:
            for msg in pair:
                kept_message_ids.add(msg.id)

        # STEP 4: Mark all other non-compression messages as compressed
        for msg in all_messages:
            if msg.role == MessageRole.compression:
                continue  # Skip compression messages

            # Keep any TaskEntry or TaskExit messages to preserve task boundaries
            if msg.role in (MessageRole.taskentry, MessageRole.taskexit):
                kept_message_ids.add(msg.id)
                continue

            if msg.id not in kept_message_ids:
                compressed_message_ids.add(msg.id)
                if new_checkpoint.metadata is not None:
                    new_checkpoint.metadata[msg.id] = {"reason": "truncated"}

        # STEP 5: Update checkpoint with kept and compressed message IDs
        new_checkpoint.kept_message_ids.extend(kept_message_ids)
        new_checkpoint.compressed_message_ids.extend(compressed_message_ids)

    # Log compression statistics
    total_count = len(all_messages) - sum(1 for msg in all_messages if msg.role == MessageRole.compression)
    kept_count = len(kept_message_ids)
    compressed_count = len(compressed_message_ids)

    self.logger.info(f"Truncation strategy kept {kept_count}/{total_count} messages "
                     f"({kept_count/total_count:.1%} if total_count else 0) "
                     f"and compressed {compressed_count} messages")

    # Return list of messages to keep (excluding compression messages)
    return [msg for msg in all_messages if msg.id in kept_message_ids and msg.role != MessageRole.compression]