Handles compression operations for AINetwork.
This class provides a simple interface to handle context window exceeded errors
by compressing the conversation history up to a configurable number of attempts.
Initialize the compression handler.
Parameters:
-
max_attempts
(int, default:
2
)
–
Maximum number of compression attempts
Source code in azad/ainetwork/compression_handler.py
| def __init__(self,compressor: Compressor, max_attempts: int = 2):
"""
Initialize the compression handler.
Args:
max_attempts: Maximum number of compression attempts
"""
self.max_attempts = max_attempts
self.attempts = 0
self.compressor = compressor
|
max_attempts = max_attempts
Handle a context window exceeded error by applying compression.
Parameters:
-
task
(Task)
–
-
error_message
(str)
–
The error message from the context window exceeded error
-
strategy_type
–
The compression strategy to use
Yields:
Source code in azad/ainetwork/compression_handler.py
| async def handle_context_exceeded(
self,
task: Task,
error_message: str,
) -> AsyncGenerator[ainetwork_types.AINetworkEventUnion, None]:
"""
Handle a context window exceeded error by applying compression.
Args:
task: The task to compress
error_message: The error message from the context window exceeded error
strategy_type: The compression strategy to use
Yields:
Network events related to compression
"""
# Check if we've reached the maximum number of compression attempts
if self.attempts >= self.max_attempts:
logger.warning(f"Maximum compression attempts reached ({self.max_attempts}), failing request")
yield ainetwork_types.NetworkConnectionFailed(
message=f"Context window exceeded after {self.attempts} compression attempts: {error_message}",
errName="ContextWindowExceededError",
origErrName="ContextWindowExceededError"
)
return
# Increment compression attempt counter
self.attempts += 1
logger.info(f"Context window exceeded, attempting compression (attempt {self.attempts}/{self.max_attempts})")
try:
self.compressor.load(task)
# Apply compression - this adds a compression message to the task
await self.compressor.compress()
# Find the compression message that was just added
compression_messages = [msg for msg in task.messages if msg.role == "compression"]
if not compression_messages:
logger.warning("No compression message found after compression")
raise ValueError("Compression failed: No compression message found")
# Get the latest compression message
compression_msg = compression_messages[-1]
# Emit compression event
yield ainetwork_types.AINetworkEventCompressionTriggered(
msg=compression_msg
)
logger.info(f"Compression applied successfully (attempt {self.attempts}/{self.max_attempts})")
except Exception as compression_error:
logger.error(f"Error during compression: {str(compression_error)}")
# Create an error network event
yield ainetwork_types.NetworkConnectionFailed(
message=f"Compression failed: {str(compression_error)}",
errName="CompressionError",
origErrName=compression_error.__class__.__name__
)
# Reraise the exception
raise
|
Reset the compression attempt counter.
Source code in azad/ainetwork/compression_handler.py
| def reset(self):
"""Reset the compression attempt counter."""
self.attempts = 0
|