Skip to content

azad.prompts.dialects.json.parser Module

azad.prompts.dialects.json.parser

JSON dialect parser for real-time streaming content parsing.

Attributes

Classes

JSONDialectConfig

Bases: DialectConfig

Configuration for JSON dialect parser.

Attributes
pretty_print class-attribute instance-attribute
pretty_print: bool = Field(default=True, description='Whether to format JSON with indentation')
indentation class-attribute instance-attribute
indentation: int = Field(default=2, description='Indentation level for pretty printing')

JSONDialectParser

JSONDialectParser(schema: Dict, prompt_data: PromptData, config: JSONDialectConfig)

Bases: DialectParser

JSON dialect parser for real-time content parsing.

Handles streaming content and emits events for text and tool calls. Uses the streamingjson library for incremental parsing of incomplete JSON.

Source code in azad/prompts/dialects/json/parser.py
def __init__(self, schema: Dict, prompt_data: PromptData, config: JSONDialectConfig):
    self.schema = schema
    self.prompt_data = prompt_data
    self.config = config
    self.logger = logging.getLogger(__name__)

    # Parser state
    self.reset_state()
Attributes
prompt_data instance-attribute
prompt_data = prompt_data
logger instance-attribute
logger = getLogger(__name__)
Functions
reset_state
reset_state()

Reset the parser state to initial values.

Source code in azad/prompts/dialects/json/parser.py
def reset_state(self):
    """Reset the parser state to initial values."""
    # Text tracking
    self.text_buffer = ""
    self.sent_text_start = False
    self.is_in_code_block = False

    # JSON tracking
    self.json_buffer = ""
    self.is_in_json = False
    self.json_depth = 0
    self.in_string = False
    self.escape_next = False

    # Tool call tracking
    self.in_tool_call = False
    self.current_tool_name = ""
    self.tool_call_id = ""

    # Parameter tracking
    self.parameter_buffer: Dict[str, str] = {}
    self.parameter_chunk_buffer: Dict[str, str] = {}
    self.parameters_complete: Set[str] = set()
    self.current_params: Dict[str, Any] = {}
    self.last_known_params: Dict[str, Any] = {}
    self.params_started: Set[str] = set()

    # Chunk emission timing
    self.last_emit_time: Dict[str, float] = {}  # Last time a chunk was emitted for each parameter

    # streamingjson lexer for incremental parsing
    self.lexer = streamingjson.Lexer()

    # Raw JSON fragments before completion
    self.raw_json_buffer = ""

    # State flags
    self.tool_ready_emitted = False
    self.last_processed_json_length = 0
feed
feed(data: bytes) -> List[AINetworkEventUnion]

Process incoming data and emit appropriate events.

Parameters:

  • data (bytes) –

    Bytes of incoming data

Returns:

Source code in azad/prompts/dialects/json/parser.py
def feed(self, data: bytes) -> List[AINetworkEventUnion]:
    """Process incoming data and emit appropriate events.

    Args:
        data: Bytes of incoming data

    Returns:
        List of network events
    """
    text = data.decode('utf-8')
    events: List[AINetworkEventUnion] = []

    # Process character by character
    i = 0
    while i < len(text):
        # Handle code blocks first (highest priority)
        if text[i] == '`' and not self.in_string:
            self.is_in_code_block = not self.is_in_code_block
            self.text_buffer += text[i]

            if not self.sent_text_start:
                events.append(AINetworkEventTextStart())
                self.sent_text_start = True

            i += 1
            continue

        # Inside a code block, everything is treated as text
        if self.is_in_code_block:
            self.text_buffer += text[i]

            if not self.sent_text_start:
                events.append(AINetworkEventTextStart())
                self.sent_text_start = True

            i += 1
            continue

        # Check for the start of a JSON-like structure
        if text[i] == '{' and not self.is_in_json:
            # Potential start of JSON
            # Emit any pending text
            if self.text_buffer:
                if not self.sent_text_start:
                    events.append(AINetworkEventTextStart())
                    self.sent_text_start = True

                events.append(AINetworkEventTextChunk(content=self.text_buffer))
                self.text_buffer = ""

            # Start JSON tracking
            self.is_in_json = True
            self.json_depth = 1
            self.json_buffer = '{'
            self.raw_json_buffer = '{'
            self.lexer.append_string('{')
            i += 1
            continue

        # If we're tracking a JSON object, continue accumulating
        if self.is_in_json:
            # Update string and escape tracking
            if text[i] == '"' and not self.escape_next:
                self.in_string = not self.in_string
            elif text[i] == '\\' and self.in_string:
                self.escape_next = True
            elif self.escape_next:
                self.escape_next = False

            # Update depth tracking (only when not in a string)
            if not self.in_string:
                if text[i] == '{':
                    self.json_depth += 1
                elif text[i] == '}':
                    self.json_depth -= 1

                    # If we've closed the outermost object
                    if self.json_depth == 0:
                        # Add the closing brace
                        self.json_buffer += text[i]
                        self.raw_json_buffer += text[i]
                        self.lexer.append_string(text[i])

                        # Process the complete JSON object
                        json_events = self._process_complete_json()
                        events.extend(json_events)

                        # Reset JSON state
                        self.is_in_json = False
                        self.json_buffer = ""
                        self.raw_json_buffer = ""
                        self.lexer = streamingjson.Lexer()  # Reset lexer for next JSON
                        i += 1
                        continue

            # Add character to JSON buffer
            self.json_buffer += text[i]
            self.raw_json_buffer += text[i]
            self.lexer.append_string(text[i])

            # Try to extract incremental updates
            if len(self.json_buffer) % 4 == 0:  # Check periodically
                json_events = self._process_incremental_json()
                events.extend(json_events)

            i += 1
            continue

        # Regular text
        self.text_buffer += text[i]

        # Emit text chunks at reasonable intervals
        if len(self.text_buffer) >= self.TEXT_BUFFER_THRESHOLD:
            if not self.sent_text_start:
                events.append(AINetworkEventTextStart())
                self.sent_text_start = True

            events.append(AINetworkEventTextChunk(content=self.text_buffer))
            self.text_buffer = ""

        i += 1

    # Process any remaining JSON if we're still in JSON mode
    if self.is_in_json:
        json_events = self._process_incremental_json()
        events.extend(json_events)

        # After processing incremental JSON, flush any remaining parameter chunk buffers
        # that might not have reached the threshold for emission
        if self.in_tool_call:
            for param_name in list(self.parameter_chunk_buffer.keys()):
                if param_name in self.params_started and param_name not in self.parameters_complete and self.parameter_chunk_buffer[param_name]:
                    events.append(AINetworkEventParameterChunk(
                        parameter=param_name,
                        content=self.parameter_chunk_buffer[param_name],
                        tool_call_id=self.tool_call_id
                    ))
                    self.parameter_chunk_buffer[param_name] = ""

    # Flush any remaining text buffer
    if self.text_buffer:
        if not self.sent_text_start:
            events.append(AINetworkEventTextStart())
            self.sent_text_start = True

        events.append(AINetworkEventTextChunk(content=self.text_buffer))
        self.text_buffer = ""

    return events
feed_tool_call_delta
feed_tool_call_delta(tool_call: ChatCompletionDeltaToolCall) -> list[AINetworkEventUnion]
Source code in azad/prompts/base_dialect.py
def feed_tool_call_delta(self, tool_call: litellm.types.utils.ChatCompletionDeltaToolCall) -> list[AINetworkEventUnion]:
    return NotImplemented