Bases: DialectParser
JSON dialect parser for real-time content parsing.
Handles streaming content and emits events for text and tool calls.
Uses the streamingjson library for incremental parsing of incomplete JSON.
Source code in azad/prompts/dialects/json/parser.py
| def __init__(self, schema: Dict, prompt_data: PromptData, config: JSONDialectConfig):
self.schema = schema
self.prompt_data = prompt_data
self.config = config
self.logger = logging.getLogger(__name__)
# Parser state
self.reset_state()
|
PARAMETER_BUFFER_THRESHOLD = 10
TEXT_BUFFER_THRESHOLD = 50
MIN_EMIT_INTERVAL_MS = 20
prompt_data = prompt_data
logger = getLogger(__name__)
Reset the parser state to initial values.
Source code in azad/prompts/dialects/json/parser.py
| def reset_state(self):
"""Reset the parser state to initial values."""
# Text tracking
self.text_buffer = ""
self.sent_text_start = False
self.is_in_code_block = False
# JSON tracking
self.json_buffer = ""
self.is_in_json = False
self.json_depth = 0
self.in_string = False
self.escape_next = False
# Tool call tracking
self.in_tool_call = False
self.current_tool_name = ""
self.tool_call_id = ""
# Parameter tracking
self.parameter_buffer: Dict[str, str] = {}
self.parameter_chunk_buffer: Dict[str, str] = {}
self.parameters_complete: Set[str] = set()
self.current_params: Dict[str, Any] = {}
self.last_known_params: Dict[str, Any] = {}
self.params_started: Set[str] = set()
# Chunk emission timing
self.last_emit_time: Dict[str, float] = {} # Last time a chunk was emitted for each parameter
# streamingjson lexer for incremental parsing
self.lexer = streamingjson.Lexer()
# Raw JSON fragments before completion
self.raw_json_buffer = ""
# State flags
self.tool_ready_emitted = False
self.last_processed_json_length = 0
|
Process incoming data and emit appropriate events.
Parameters:
Returns:
Source code in azad/prompts/dialects/json/parser.py
| def feed(self, data: bytes) -> List[AINetworkEventUnion]:
"""Process incoming data and emit appropriate events.
Args:
data: Bytes of incoming data
Returns:
List of network events
"""
text = data.decode('utf-8')
events: List[AINetworkEventUnion] = []
# Process character by character
i = 0
while i < len(text):
# Handle code blocks first (highest priority)
if text[i] == '`' and not self.in_string:
self.is_in_code_block = not self.is_in_code_block
self.text_buffer += text[i]
if not self.sent_text_start:
events.append(AINetworkEventTextStart())
self.sent_text_start = True
i += 1
continue
# Inside a code block, everything is treated as text
if self.is_in_code_block:
self.text_buffer += text[i]
if not self.sent_text_start:
events.append(AINetworkEventTextStart())
self.sent_text_start = True
i += 1
continue
# Check for the start of a JSON-like structure
if text[i] == '{' and not self.is_in_json:
# Potential start of JSON
# Emit any pending text
if self.text_buffer:
if not self.sent_text_start:
events.append(AINetworkEventTextStart())
self.sent_text_start = True
events.append(AINetworkEventTextChunk(content=self.text_buffer))
self.text_buffer = ""
# Start JSON tracking
self.is_in_json = True
self.json_depth = 1
self.json_buffer = '{'
self.raw_json_buffer = '{'
self.lexer.append_string('{')
i += 1
continue
# If we're tracking a JSON object, continue accumulating
if self.is_in_json:
# Update string and escape tracking
if text[i] == '"' and not self.escape_next:
self.in_string = not self.in_string
elif text[i] == '\\' and self.in_string:
self.escape_next = True
elif self.escape_next:
self.escape_next = False
# Update depth tracking (only when not in a string)
if not self.in_string:
if text[i] == '{':
self.json_depth += 1
elif text[i] == '}':
self.json_depth -= 1
# If we've closed the outermost object
if self.json_depth == 0:
# Add the closing brace
self.json_buffer += text[i]
self.raw_json_buffer += text[i]
self.lexer.append_string(text[i])
# Process the complete JSON object
json_events = self._process_complete_json()
events.extend(json_events)
# Reset JSON state
self.is_in_json = False
self.json_buffer = ""
self.raw_json_buffer = ""
self.lexer = streamingjson.Lexer() # Reset lexer for next JSON
i += 1
continue
# Add character to JSON buffer
self.json_buffer += text[i]
self.raw_json_buffer += text[i]
self.lexer.append_string(text[i])
# Try to extract incremental updates
if len(self.json_buffer) % 4 == 0: # Check periodically
json_events = self._process_incremental_json()
events.extend(json_events)
i += 1
continue
# Regular text
self.text_buffer += text[i]
# Emit text chunks at reasonable intervals
if len(self.text_buffer) >= self.TEXT_BUFFER_THRESHOLD:
if not self.sent_text_start:
events.append(AINetworkEventTextStart())
self.sent_text_start = True
events.append(AINetworkEventTextChunk(content=self.text_buffer))
self.text_buffer = ""
i += 1
# Process any remaining JSON if we're still in JSON mode
if self.is_in_json:
json_events = self._process_incremental_json()
events.extend(json_events)
# After processing incremental JSON, flush any remaining parameter chunk buffers
# that might not have reached the threshold for emission
if self.in_tool_call:
for param_name in list(self.parameter_chunk_buffer.keys()):
if param_name in self.params_started and param_name not in self.parameters_complete and self.parameter_chunk_buffer[param_name]:
events.append(AINetworkEventParameterChunk(
parameter=param_name,
content=self.parameter_chunk_buffer[param_name],
tool_call_id=self.tool_call_id
))
self.parameter_chunk_buffer[param_name] = ""
# Flush any remaining text buffer
if self.text_buffer:
if not self.sent_text_start:
events.append(AINetworkEventTextStart())
self.sent_text_start = True
events.append(AINetworkEventTextChunk(content=self.text_buffer))
self.text_buffer = ""
return events
|
Source code in azad/prompts/base_dialect.py
| def feed_tool_call_delta(self, tool_call: litellm.types.utils.ChatCompletionDeltaToolCall) -> list[AINetworkEventUnion]:
return NotImplemented
|