Skip to content

azad.introspect Module

azad.introspect

Tool for introspecting Azad's server interface methods and generating TypeScript client.

Attributes

CYAN module-attribute

CYAN = '\x1b[96m'

GREEN module-attribute

GREEN = '\x1b[92m'

YELLOW module-attribute

YELLOW = '\x1b[93m'

BLUE module-attribute

BLUE = '\x1b[94m'

RESET module-attribute

RESET = '\x1b[0m'

Classes

Config dataclass

Config(verbose: bool, output_file: str)

Configuration for the introspection tool.

Functions

get_all_subclasses

get_all_subclasses(cls: Type) -> Set[Type]

Recursively get all subclasses of a class.

Source code in azad/introspect.py
def get_all_subclasses(cls: Type) -> Set[Type]:
    """Recursively get all subclasses of a class."""
    subclasses = set()
    for subclass in cls.__subclasses__():
        subclasses.add(subclass)
        subclasses.update(get_all_subclasses(subclass))
    return subclasses

get_type_str

get_type_str(typ: Any, config: Config, indent: int = 0) -> str

Get a string representation of a type.

Source code in azad/introspect.py
def get_type_str(typ: Any, config: Config, indent: int = 0) -> str:
    """Get a string representation of a type."""
    if typ == Any:
        return "Any"

    # Handle enum types
    if isinstance(typ, type) and issubclass(typ, enum.Enum):
        enum_literals = []
        for member in typ:
            value = member.value
            if isinstance(value, str):
                enum_literals.append(f"'{value}'")
            elif isinstance(value, (int, float)):
                enum_literals.append(str(value))
            else:
                enum_literals.append("any")
        return " | ".join(enum_literals)

    origin = get_origin(typ)
    if origin:
        args = get_args(typ)
        if origin == typing.Union:
            # Handle Optional[X] which is Union[X, None]
            if len(args) == 2 and args[1] == type(None):
                return f"Optional[{get_type_str(args[0], config, indent)}]"
            return f"Union[{', '.join(get_type_str(arg, config, indent) for arg in args)}]"
        elif origin == list:
            return f"List[{get_type_str(args[0], config, indent)}]"
        elif origin == dict:
            return f"Dict[{', '.join(get_type_str(arg, config, indent) for arg in args)}]"
        elif origin == tuple:
            return f"Tuple[{', '.join(get_type_str(arg, config, indent) for arg in args)}]"
        return f"{origin.__name__}[{', '.join(get_type_str(arg, config, indent) for arg in args)}]"

    if isinstance(typ, type):
        if issubclass(typ, BaseModel):
            return f"{YELLOW}{typ.__name__}{RESET}" if config.verbose else typ.__name__
        return typ.__name__

    return str(typ)

get_generic_info

get_generic_info(typ: Any) -> tuple[Optional[str], List[Any]]

Get the base name and type arguments of a generic type.

Source code in azad/introspect.py
def get_generic_info(typ: Any) -> tuple[Optional[str], List[Any]]:
    """Get the base name and type arguments of a generic type."""
    origin = get_origin(typ)
    if origin:
        return origin.__name__, list(get_args(typ))
    if hasattr(typ, "__origin__"):
        return typ.__origin__.__name__, list(typ.__args__)
    return None, []

simplify_union_type

simplify_union_type(args: tuple) -> Optional[str]

Try to simplify a union type to a simpler TypeScript type.

Source code in azad/introspect.py
def simplify_union_type(args: tuple) -> Optional[str]:
    """Try to simplify a union type to a simpler TypeScript type."""
    if len(args) != 2:
        return None

    # Handle Optional[X] which is Union[X, None]
    if args[1] in (type(None), None):
        return f"{python_type_to_ts(args[0])} | null"

    # Handle Union[X, Any] -> X for basic types
    if args[1] == Any:
        base_type = python_type_to_ts(args[0])
        if base_type in ("string", "number", "boolean"):
            return base_type

    # Handle Union[Any, X] -> X for basic types
    if args[0] == Any:
        base_type = python_type_to_ts(args[1])
        if base_type in ("string", "number", "boolean"):
            return base_type

    return None

handle_union_type

handle_union_type(args: tuple) -> str

Handle Union type conversion to TypeScript.

Source code in azad/introspect.py
def handle_union_type(args: tuple) -> str:
    """Handle Union type conversion to TypeScript."""
    # Try to simplify the union type
    simple_type = simplify_union_type(args)
    if simple_type:
        return simple_type

    # Regular union type
    return " | ".join(python_type_to_ts(arg) for arg in args)

is_union_type

is_union_type(typ: Any) -> bool

Check if a type is a union type (either from typing or Python 3.10+ syntax).

Source code in azad/introspect.py
def is_union_type(typ: Any) -> bool:
    """Check if a type is a union type (either from typing or Python 3.10+ syntax)."""
    origin = get_origin(typ)
    if origin is None and hasattr(typ, "__origin__"):
        origin = typ.__origin__
    result = (
        origin == Union or
        (isinstance(origin, str) and origin == "Union") or
        str(type(typ)) == "<class 'types.UnionType'>"  # Python 3.10+ X | Y syntax
    )
    return result

get_union_args

get_union_args(typ: Any) -> tuple

Get the arguments of a union type.

Source code in azad/introspect.py
def get_union_args(typ: Any) -> tuple:
    """Get the arguments of a union type."""
    if hasattr(typ, "__args__"):
        return typ.__args__
    return get_args(typ)

get_all_parent_classes

get_all_parent_classes(cls: Type) -> Set[Type]

Recursively get all parent classes that are BaseModel subclasses.

Source code in azad/introspect.py
def get_all_parent_classes(cls: Type) -> Set[Type]:
    """Recursively get all parent classes that are BaseModel subclasses."""
    parents = set()
    for base in cls.__bases__:
        if base != BaseModel and issubclass(base, BaseModel):
            parents.add(base)
            parents.update(get_all_parent_classes(base))
    return parents

get_exported_types

get_exported_types() -> Set[Type]

Get all types that will be exported to network_api_types.ts.

Source code in azad/introspect.py
def get_exported_types() -> Set[Type]:
    """Get all types that will be exported to network_api_types.ts."""
    exported = set()

    # Add all BaseModel subclasses
    subclasses = get_all_subclasses(BaseModel)
    exported.update(subclasses)
    exported.add(BaseModel)

    # Add response types and their parent classes recursively
    response_types = [BaseResponse, ErrorResponse, MessageResponse, DataResponse]
    for typ in response_types:
        exported.add(typ)
        exported.update(get_all_parent_classes(typ))

    # Add other known types and their parent classes recursively
    other_types = [EventData, LogData, TaskStatusResponse]
    for typ in other_types:
        exported.add(typ)
        exported.update(get_all_parent_classes(typ))

    # Add models directly defined in type_definitions
    for name, obj in inspect.getmembers(type_definitions):
        if inspect.isclass(obj) and issubclass(obj, BaseModel) and obj != BaseModel:
            # Ensure the model itself is added
            exported.add(obj)
            # Also add its BaseModel parents recursively
            exported.update(get_all_parent_classes(obj))

    # Import Message union type from mind_map
    try:
        from .mind_map import Message
        exported.add(Message)  # Add Message union type
    except ImportError:
        print("Could not import Message type from mind_map")

    return exported

process_type

process_type(typ: Any, exported_types: Set[Type], depth: int = 0) -> str

Process a type, recursively handling generics and namespacing.

Source code in azad/introspect.py
def process_type(typ: Any, exported_types: Set[Type], depth: int = 0) -> str:
    """Process a type, recursively handling generics and namespacing."""

    # Utility types that should never be namespaced
    UTILITY_TYPES = {
        "Record", "Array", "AsyncGenerator", "Promise", "AsyncIterator",
        "Map", "Set", "Partial", "Pick", "Omit", "Required"
    }

    # Basic type mapping
    BASIC_TYPES = {
        str: "string",
        int: "number",
        float: "number",
        bool: "boolean",
        Any: "any",
        type(None): "null",
    }

    # Handle basic types
    if typ in BASIC_TYPES:
        return BASIC_TYPES[typ]
    elif isinstance(typ, TypeVar):
        return typ.__name__ if hasattr(typ, "__name__") else "any"
    elif hasattr(typ, "__name__") and typ.__name__ == "datetime":
        return "string"

    # Handle enum types
    if isinstance(typ, type) and issubclass(typ, enum.Enum):
        enum_literals = []
        for member in typ:
            value = member.value
            if isinstance(value, str):
                enum_literals.append(f"'{value}'")
            elif isinstance(value, (int, float)):
                enum_literals.append(str(value))
            else:
                enum_literals.append("any")
        return " | ".join(enum_literals)

    # Handle Union types
    if is_union_type(typ):
        args = get_union_args(typ)
        if len(args) == 2 and (args[1] is None or args[1] == type(None)):
            return f"{process_type(args[0], exported_types)} | null"
        return " | ".join(process_type(arg, exported_types) for arg in args)

    # Handle Pydantic models (including generics)
    if isinstance(typ, type) and issubclass(typ, BaseModel):
        type_name = typ.__name__
        # Check if it's a generic model with parameters
        if hasattr(typ, "__pydantic_generic_metadata__"):
            generic_args = typ.__pydantic_generic_metadata__.get('args', ())
            if generic_args:
                # Recursively process generic arguments
                processed_args = [process_type(arg, exported_types, depth + 1) for arg in generic_args]
                return f"network_api_types.{type_name}<{', '.join(processed_args)}>"
        # Non-generic Pydantic model
        if typ in exported_types:
            return f"network_api_types.{type_name}"
        return type_name

    # Handle standard generics
    origin = get_origin(typ)
    if origin is not None:
        args = get_args(typ)
        type_name = origin.__name__

        # Handle Python container types
        if origin == dict:
            value_type = process_type(args[1], exported_types, depth + 1)
            result = f"Record<string, {value_type}>"
            return result
        elif origin == list:
            return f"Array<{process_type(args[0], exported_types)}>"

        # Process type arguments recursively
        processed_args = [process_type(arg, exported_types, depth + 1) for arg in args]

        # Don't namespace utility types
        if type_name in UTILITY_TYPES:
            return f"{type_name}<{', '.join(processed_args)}>"

        # Namespace exported types
        if origin in exported_types:
            return f"network_api_types.{type_name}<{', '.join(processed_args)}>"

        return f"{type_name}<{', '.join(processed_args)}>"

    # Handle non-generic types
    if isinstance(typ, type):
        if typ == str:
            return "string"
        elif typ == int or typ == float:
            return "number"
        elif typ == bool:
            return "boolean"
        elif typ.__name__ == "datetime":
            return "string"
        elif typ in exported_types:
            return f"network_api_types.{typ.__name__}"

    return "any"

analyze_pydantic_model

analyze_pydantic_model(model_class: type, seen_models: Set[type], config: Config, indent: int = 0) -> None

Recursively analyze a Pydantic model's fields.

Source code in azad/introspect.py
def analyze_pydantic_model(model_class: type, seen_models: Set[type], config: Config, indent: int = 0) -> None:
    """Recursively analyze a Pydantic model's fields."""
    if model_class in seen_models:
        return

    seen_models.add(model_class)
    indent_str = "  " * indent

    if config.verbose:
        # Print class name and inheritance info
        bases = [b for b in model_class.__bases__ if b != BaseModel and issubclass(b, BaseModel)]
        if bases:
            base_names = [f"{BLUE}{b.__name__}{RESET}" for b in bases]
            print(f"\n{indent_str}{YELLOW}{model_class.__name__}{RESET} ({' + '.join(base_names)})")
        else:
            print(f"\n{indent_str}{YELLOW}{model_class.__name__}{RESET}")

        # Get subclasses if this is a base class
        subclasses = get_all_subclasses(model_class)
        if subclasses:
            subclass_names = [f"{BLUE}{sc.__name__}{RESET}" for sc in subclasses]
            print(f"{indent_str}  {CYAN}Subclasses:{RESET} {', '.join(subclass_names)}")

        # Print fields
        for name, field in model_class.model_fields.items():
            field_type = field.annotation
            print(f"{indent_str}  {CYAN}{RESET} {name}: {get_type_str(field_type, config, indent + 1)}")

    # Recursively analyze nested Pydantic models in fields
    for name, field in model_class.model_fields.items():
        field_type = field.annotation
        origin = get_origin(field_type)
        if origin:
            args = get_args(field_type)
            for arg in args:
                if isinstance(arg, type) and issubclass(arg, BaseModel):
                    analyze_pydantic_model(arg, seen_models, config, indent + 2)
        elif isinstance(field_type, type) and issubclass(field_type, BaseModel):
            analyze_pydantic_model(field_type, seen_models, config, indent + 2)

    # Recursively analyze subclasses
    for subclass in get_all_subclasses(model_class):
        analyze_pydantic_model(subclass, seen_models, config, indent + 1)

analyze_method

analyze_method(method, config: Config) -> None

Analyze a method's signature and type information.

Source code in azad/introspect.py
def analyze_method(method, config: Config) -> None:
    """Analyze a method's signature and type information."""
    sig = inspect.signature(method)
    endpoint_info = method._slipstream_endpoint

    if config.verbose:
        # Print method signature with colors
        params = []
        for name, param in sig.parameters.items():
            if name == 'self':
                continue
            annotation = param.annotation if param.annotation != inspect.Parameter.empty else Any
            default = "" if param.default == inspect.Parameter.empty else f"={param.default}"
            params.append(f"{name}: {get_type_str(annotation, config)}{default}")

        return_annotation = sig.return_annotation if sig.return_annotation != inspect.Parameter.empty else Any
        print(f"\n{GREEN}╭─ Method: {method.__name__}{RESET}")
        print(f"│ {CYAN}Signature:{RESET} {method.__name__}({', '.join(params)}) -> {get_type_str(return_annotation, config)}")
        print(f"│ {CYAN}Decorator:{RESET} @{endpoint_info['type'].value}")
        print(f"│")
        print(f"│ {CYAN}Type Analysis:{RESET}")

    # Analyze parameter types
    seen_models = set()
    for param in sig.parameters.values():
        if param.name == 'self':
            continue

        annotation = param.annotation
        if annotation == inspect.Parameter.empty:
            continue

        # Check if it's a Pydantic model
        origin = get_origin(annotation)
        if origin:
            args = get_args(annotation)
            for arg in args:
                if isinstance(arg, type) and issubclass(arg, BaseModel):
                    analyze_pydantic_model(arg, seen_models, config, 1)
        elif isinstance(annotation, type) and issubclass(annotation, BaseModel):
            analyze_pydantic_model(annotation, seen_models, config, 1)

    # Analyze return type
    return_annotation = sig.return_annotation
    if return_annotation != inspect.Parameter.empty:
        origin = get_origin(return_annotation)
        if origin:
            args = get_args(return_annotation)
            for arg in args:
                if isinstance(arg, type) and issubclass(arg, BaseModel):
                    analyze_pydantic_model(arg, seen_models, config, 1)
        elif isinstance(return_annotation, type) and issubclass(return_annotation, BaseModel):
            analyze_pydantic_model(return_annotation, seen_models, config, 1)

    if config.verbose:
        print(f"╰{'─' * 50}")

process_type_for_message

process_type_for_message(typ: Any, exported_types: Set[Type]) -> str

Process a type for Message interfaces without namespace prefixing.

Source code in azad/introspect.py
def process_type_for_message(typ: Any, exported_types: Set[Type]) -> str:
    """Process a type for Message interfaces without namespace prefixing."""
    # Utility types that should never be namespaced
    UTILITY_TYPES = {
        "Record", "Array", "AsyncGenerator", "Promise", "AsyncIterator",
        "Map", "Set", "Partial", "Pick", "Omit", "Required"
    }

    # Basic type mapping
    BASIC_TYPES = {
        str: "string",
        int: "number",
        float: "number",
        bool: "boolean",
        Any: "any",
        type(None): "null",
    }

    # Handle basic types
    if typ in BASIC_TYPES:
        return BASIC_TYPES[typ]
    elif isinstance(typ, TypeVar):
        return typ.__name__ if hasattr(typ, "__name__") else "any"
    elif hasattr(typ, "__name__") and typ.__name__ == "datetime":
        return "string"

    # NEW: enum types here as well
    if isinstance(typ, type) and issubclass(typ, enum.Enum):
        ts_values = []
        for member in typ:
            val = member.value
            if isinstance(val, str):
                ts_values.append(f"'{val}'")
            elif isinstance(val, (int, float)):
                ts_values.append(str(val))
            else:
                ts_values.append("any")
        return " | ".join(ts_values)

    # Handle Union types (including Optional)
    if is_union_type(typ):
        args = get_union_args(typ)
        # Handle Optional[T] as T | null
        if len(args) == 2 and (args[1] is None or args[1] == type(None)):
            return f"{process_type_for_message(args[0], exported_types)} | null"
        # Handle other unions
        return " | ".join(process_type_for_message(arg, exported_types) for arg in args)

    # Handle generics and Literal
    origin = get_origin(typ)
    if origin is not None:
        args = get_args(typ)
        # Handle Literal types
        if origin == Literal:
            print(f"\n=== Processing Literal type with args: {args} ===")
            ts_args = []
            for arg in args:
                # For enum objects, use the underlying value directly
                if isinstance(arg, enum.Enum):
                    value = arg.value
                    enum_class = arg.__class__.__name__
                    print(f"  Literal arg: <{enum_class}.{arg.name}: '{value}'> (type: <enum '{enum_class}'>)")
                    print(f"    *** ENUM: {enum_class}.{arg.name} with value: {value} ***")

                    if isinstance(value, str):
                        ts_args.append(f"'{value}'")
                        print(f"      String enum value: '{value}'")
                    elif isinstance(value, (int, float)):
                        ts_args.append(str(value))
                        print(f"      Numeric enum value: {value}")
                    else:
                        ts_args.append("any")
                        print(f"      Complex enum value (using 'any')")
                else:
                    arg_repr = repr(arg)
                    print(f"  Literal arg: {arg_repr} (type: {type(arg)})")

                    if isinstance(arg, str):
                        # Check if this is a string representation of an enum (e.g., 'MessageRole.system')
                        if '.' in arg and arg.split('.')[0] in ['MessageRole', 'TaskState']:
                            enum_name, value = arg.split('.')
                            print(f"    *** FOUND STRING ENUM REFERENCE: {enum_name}.{value} ***")
                            ts_args.append(f"'{value}'")  # Just use the value part
                            print(f"      Extracting enum value: '{value}'")
                        else:
                            ts_args.append(f"'{arg}'")
                            print(f"    Regular string: '{arg}'")

                    elif isinstance(arg, (int, float)):
                        ts_args.append(str(arg))
                        print(f"    Numeric: {arg}")

                    elif arg is True:
                        ts_args.append("true")
                        print(f"    Boolean: true")

                    elif arg is False:
                        ts_args.append("false")
                        print(f"    Boolean: false")

                    elif arg is None:
                        ts_args.append("null")
                        print(f"    Null")
                    else:
                        print(f"    Unknown type: {type(arg)}, using 'any'")
                        ts_args.append("any")

            result = " | ".join(ts_args)
            print(f"  => Final TypeScript: {result}")
            return result
        # Handle other generics
        type_name = origin.__name__
        if origin == dict:
            value_type = process_type_for_message(args[1], exported_types)
            return f"Record<string, {value_type}>"
        elif origin == list:
            return f"Array<{process_type_for_message(args[0], exported_types)}>"
        elif type_name == "AsyncIterator":
            inner = process_type_for_message(args[0], exported_types)
            return f"AsyncGenerator<{inner}, void>"
        processed_args = [process_type_for_message(arg, exported_types) for arg in args]
        if type_name in UTILITY_TYPES:
            return f"{type_name}<{', '.join(processed_args)}>"
        return f"{type_name}<{', '.join(processed_args)}>"

    # Handle non-generic types
    if isinstance(typ, type):
        if issubclass(typ, BaseModel):
            if hasattr(typ, "__parameters__") and typ.__parameters__: # type: ignore
                params = [param.__name__ for param in typ.__parameters__] # type: ignore
                return f"{typ.__name__}<{', '.join(params)}>"
            return typ.__name__
        elif typ == str:
            return "string"
        elif typ == int or typ == float:
            return "number"
        elif typ == bool:
            return "boolean"
        elif typ.__name__ == "datetime":
            return "string"
        return typ.__name__

    return "any"

is_discriminator_field

is_discriminator_field(model_class: Type[BaseModel], field_name: str) -> bool

Check if a field is used as a discriminator in a model.

Source code in azad/introspect.py
def is_discriminator_field(model_class: Type[BaseModel], field_name: str) -> bool:
    """Check if a field is used as a discriminator in a model."""
    # Check for explicit discriminator in Config
    if hasattr(model_class, "Config") and hasattr(model_class.Config, "discriminator"): # type: ignore
        return field_name == getattr(model_class.Config, "discriminator") # type: ignore

    # Standard convention fields
    if field_name in ("type", "event"):
        return True

    return False

has_literal_type

has_literal_type(field_type: Any) -> bool

Check if a field type is a Literal or contains Literals in a union.

Source code in azad/introspect.py
def has_literal_type(field_type: Any) -> bool:
    """Check if a field type is a Literal or contains Literals in a union."""
    # Direct Literal
    if get_origin(field_type) == Literal:
        return True

    # Union of Literals
    if is_union_type(field_type):
        return any(get_origin(arg) == Literal for arg in get_union_args(field_type))

    return False

format_jsdoc

format_jsdoc(description: Optional[str], indent: str = '') -> List[str]

Formats a description string into JSDoc comment lines.

Source code in azad/introspect.py
def format_jsdoc(description: Optional[str], indent: str = "") -> List[str]:
    """Formats a description string into JSDoc comment lines."""
    if not description:
        return []

    lines = []
    description_lines = description.strip().split('\n')
    if len(description_lines) == 1:
        lines.append(f"{indent}/** {description_lines[0].strip()} */")
    else:
        lines.append(f"{indent}/**")
        for line in description_lines:
            lines.append(f"{indent} * {line.strip()}")
        lines.append(f"{indent} */")
    return lines

generate_typescript_interfaces

generate_typescript_interfaces(models: Set[Type[BaseModel]], exported_types: Set[Type], agent_class: Type[AzadAgent] = AzadAgent, methods_to_generate: Optional[Set[str]] = None) -> str
Source code in azad/introspect.py
def generate_typescript_interfaces(models: Set[Type[BaseModel]], exported_types: Set[Type], agent_class: Type[AzadAgent] = AzadAgent, methods_to_generate: Optional[Set[str]] = None) -> str:
    output = []
    output.append('// Auto-generated TypeScript interfaces for Azad types\n')
    # Add imports for any base types used in MessageType or similar enums if needed
    # Example: import { SomeEnum } from './enums'; 
    output.append('import { MessageType } from "./network_types"; // Assuming network_types defines MessageType enum\n')

    all_models = set()
    for model in models:
        all_models.add(model)
        all_models.update(get_all_parent_classes(model))

    output.append('export type ServerResponse<T = any> = ErrorResponse | DataResponse<T> | MessageResponse;\n')

    # Sort models for consistent output
    sorted_models = sorted(
        [m for m in all_models if '[' not in m.__name__], # Filter out generic placeholders like Model[T]
        key=lambda m: m.__name__
    )

    for model in sorted_models:
        # Generate class-level JSDoc ONLY from the model's OWN docstring
        # Access the raw __doc__ attribute which doesn't inherit automatically
        raw_doc = getattr(model, '__doc__', None) 
        # Use inspect.cleandoc to remove indentation etc., similar to getdoc
        class_doc = inspect.cleandoc(raw_doc) if raw_doc else None 

        # Only add the comment if the class *itself* has a non-empty docstring
        if class_doc: 
            output.extend(format_jsdoc(class_doc)) # No indent for top-level interface

        # Generate interface definition
        type_params = [param.__name__ for param in getattr(model, "__parameters__", [])]
        type_params_str = f"<{', '.join(type_params)}>" if type_params else ""
        bases = [b for b in model.__bases__ if b != BaseModel and issubclass(b, BaseModel)]
        # Ensure base names are processed correctly (e.g., handle generics if necessary)
        extends_list = [process_type_for_message(base, exported_types) for base in bases]
        extends = f" extends {', '.join(extends_list)}" if extends_list else ""
        output.append(f"export interface {model.__name__}{type_params_str}{extends} {{")

        field_indent = "  " # Indentation for fields and their docs inside the interface

        # Sort fields for consistent output (optional but good practice)
        sorted_fields = sorted(model.model_fields.items())

        for name, field in sorted_fields: # Iterate through sorted fields
            field_type = field.annotation
            # The field object itself (FieldInfo) holds the description
            description = field.description 

            # Generate field-level JSDoc
            output.extend(format_jsdoc(description, indent=field_indent))

            # Generate field type definition
            ts_type = process_type_for_message(field_type, exported_types)

            is_optional = not field.is_required()
            if field.default_factory is not None or field.default is not None:
                 if not (is_union_type(field_type) and type(None) in get_union_args(field_type)):
                    is_optional = False
            if is_discriminator_field(model, name):
                is_optional = False

            optional_marker = "?" if is_optional else ""
            output.append(f"{field_indent}{name}{optional_marker}: {ts_type};")

        output.append("}\n") # Close interface block and add newline

    # Generate Message union type (if applicable)
    try:
        from .mind_map import Message # Assuming Message is defined here
        if get_origin(Message) == Union:
            message_types = get_args(Message)
            if message_types:
                message_type_names = [
                    mt.__name__ for mt in message_types if hasattr(mt, '__name__')
                ]
                if message_type_names:
                    output.append('// Message union type, combining all specific message structures')
                    # Add docstring for the Message union itself if available
                    # class_doc = inspect.getdoc(Message) # This might not work directly for Union aliases
                    # output.extend(format_jsdoc(class_doc))
                    output.append(f'export type Message = {" | ".join(sorted(message_type_names))};\n') # Sort for consistency
    except ImportError:
        print("Could not import Message type from mind_map for union generation.")
    except TypeError:
         print(f"Warning: Message type from mind_map could not be processed as a Union.")


    # Generate Message interfaces for decorated methods
    # Sort methods for consistent output
    agent_methods = []
    for name, method in inspect.getmembers(agent_class):
         if hasattr(method, '_slipstream_endpoint') and (methods_to_generate is None or name in methods_to_generate):
             agent_methods.append((name, method))

    agent_methods.sort(key=lambda item: item[0]) # Sort by method name

    for name, method in agent_methods:
        sig = inspect.signature(method)
        endpoint_info = method._slipstream_endpoint
        messageType = endpoint_info["type"].name # SHOULD BE UPPERCASE ideally
        method_name_camel = to_camel_case(name)
        interface_name = capitalize_first_letter(method_name_camel) + 'Message'

        # Add method docstring as JSDoc for the message interface
        method_doc = inspect.getdoc(method)
        output.extend(format_jsdoc(method_doc))

        output.append(f"export interface {interface_name} {{")
        # Use the actual enum member name if MessageType is an Enum
        output.append(f"  type: MessageType.{messageType.upper()}; // Adjusted to use uppercase enum member")
        output.append("  data: {")
        output.append(f"    operation: '{name}';")

        # Sort parameters for consistency
        params_sorted = sorted(
            [(p_name, param) for p_name, param in sig.parameters.items() if p_name != 'self'],
            key=lambda item: item[0]
        )

        for param_name, param in params_sorted:
            param_type = param.annotation if param.annotation != inspect.Parameter.empty else Any
            # Get parameter description if available (less common than field descriptions)
            # Note: Standard function parameters don't have built-in description metadata
            # like Pydantic fields. Docstrings are the usual place.
            # We could potentially parse the method's docstring for param descriptions,
            # but that's significantly more complex (e.g., requires parsing numpydoc/googledoc format).
            # For now, we'll stick to field/class descriptions.

            ts_type = process_type_for_message(param_type, exported_types)
            is_optional = param.default != inspect.Parameter.empty
            optional_marker = "?" if is_optional else ""
            output.append(f"    {param_name}{optional_marker}: {ts_type};")

        output.append("  };")
        output.append("  requestId?: [string, number]; // Optional request identifier")
        output.append("  peerID?: string; // Optional sender peer ID")
        output.append("}\n") # Close interface and add newline

    return "\n".join(output)

convert_brackets_to_angles

convert_brackets_to_angles(ts_type: str) -> str

Convert square bracket generic notation to angle bracket notation.

Source code in azad/introspect.py
def convert_brackets_to_angles(ts_type: str) -> str:
    """Convert square bracket generic notation to angle bracket notation."""
    # Handle DataResponse[T] -> DataResponse<T>
    while '[' in ts_type and ']' in ts_type:
        start = ts_type.rindex('[')
        end = ts_type.index(']', start)
        name = ts_type[:start]
        inner = ts_type[start + 1:end]
        rest = ts_type[end + 1:]
        ts_type = f"{name}<{inner}>{rest}"
    return ts_type

to_snake_case

to_snake_case(camel_str: str) -> str

Convert camelCase to snake_case.

Source code in azad/introspect.py
def to_snake_case(camel_str: str) -> str:
    """Convert camelCase to snake_case."""
    result = [camel_str[0].lower()]
    for c in camel_str[1:]:
        if c.isupper():
            result.append('_')
            result.append(c.lower())
        else:
            result.append(c)
    return ''.join(result)

to_camel_case

to_camel_case(snake_str: str) -> str

Convert snake_case to camelCase.

Source code in azad/introspect.py
def to_camel_case(snake_str: str) -> str:
    """Convert snake_case to camelCase."""
    components = snake_str.split('_')
    return components[0] + ''.join(x.title() for x in components[1:])

capitalize_first_letter

capitalize_first_letter(s: str) -> str

Capitalize the first letter of a string.

Source code in azad/introspect.py
def capitalize_first_letter(s: str) -> str:
    """Capitalize the first letter of a string."""
    return s[0].upper() + s[1:] if s else s

generate_client_method

generate_client_method(method, exported_types: Set[Type]) -> Tuple[str, str]

Generate TypeScript client method from a decorated Python method.

Source code in azad/introspect.py
def generate_client_method(method, exported_types: Set[Type]) -> Tuple[str, str]:
    """Generate TypeScript client method from a decorated Python method."""
    sig = inspect.signature(method)
    endpoint_info = method._slipstream_endpoint
    python_name = method.__name__
    method_name = to_camel_case(python_name)
    message_type = capitalize_first_letter(method_name) + 'Message'

    # Instead of generating a parameter for each function argument, we now
    # use a single "args" parameter.
    args_param = f'args: Omit<network_api_types.{message_type}["data"], "operation">'

    # (Keep the get_return_type logic as before)
    def get_return_type(method) -> str:
        sig = inspect.signature(method)
        return_annotation = sig.return_annotation if sig.return_annotation != inspect.Parameter.empty else Any
        def process_type_inner(typ: Any) -> str:
            from pydantic import BaseModel  # Ensure BaseModel is imported

            if is_union_type(typ):
                args = get_union_args(typ)
                processed_args = [process_type_inner(arg) for arg in args]
                return " | ".join(processed_args)

            # Handle Pydantic models (including generics)
            if isinstance(typ, type) and issubclass(typ, BaseModel):
                type_name = typ.__name__
                # Check for generic metadata
                if hasattr(typ, "__pydantic_generic_metadata__"):
                    generic_args = typ.__pydantic_generic_metadata__.get('args', ())
                    if generic_args:
                        processed_args = [process_type_inner(arg) for arg in generic_args]
                        parent_name = typ.__name__.split('[')[0]
                        return f"network_api_types.{parent_name}<{', '.join(processed_args)}>"
                # Non-generic Pydantic model
                if typ in exported_types:
                    return f"network_api_types.{type_name}"
                return type_name

            origin = get_origin(typ)
            if origin:
                args = get_args(typ)
                type_name = origin.__name__
                if type_name == "AsyncIterator":
                    inner = process_type_inner(args[0])
                    return f"AsyncGenerator<{inner}, void>"
                if origin == dict:
                    value_type = process_type_inner(args[1])
                    return f"Record<string, {value_type}>"
                # Process type arguments, directly namespacing Pydantic models
                processed_args = []
                for arg in args:
                    if isinstance(arg, type) and issubclass(arg, BaseModel) and arg in exported_types:
                        processed_args.append(f"network_api_types.{arg.__name__}")
                    else:
                        processed_args.append(process_type_inner(arg))
                if origin in exported_types:
                    return f"network_api_types.{type_name}<{', '.join(processed_args)}>"
                return f"{type_name}<{', '.join(processed_args)}>"

            if typ == Any:
                return "any"
            elif typ == type(None):
                return "null"
            elif typ == str:
                return "string"
            elif typ == int or typ == float:
                return "number"
            elif typ == bool:
                return "boolean"
            elif hasattr(typ, "__name__"):
                if typ in exported_types:
                    return f"network_api_types.{typ.__name__}"
                return typ.__name__
            return "any"

        if method._slipstream_endpoint['type'].value == 'fire_and_forget':
            return "void"
        if is_union_type(return_annotation):
            args = get_union_args(return_annotation)
            processed_args = [process_type_inner(arg) for arg in args]
            return " | ".join(processed_args)
        return process_type_inner(return_annotation)

    raw_return_type = get_return_type(method)
    processed_return_type = raw_return_type.replace('dict[str, Any]', 'Record<string, any>')
    ts_return_type = convert_brackets_to_angles(processed_return_type)

    decorator_type = endpoint_info['type'].value  # e.g. 'request', 'stream_request', 'fire_and_forget'

    if decorator_type == 'request':  # request_response
        return method_name, f"""
  async {method_name}({args_param}): Promise<{ts_return_type}> {{
    const message: network_api_types.{message_type} = {{
      type: MessageType.REQUEST,
      data: {{
        operation: '{python_name}',
        ...args,
      }},
      requestId: this.getNextRequestId()
    }};

    return new Promise((resolve, reject) => {{
      const handlerKey = this.getHandlerKey(message.requestId!);
      this.responseHandlers.set(handlerKey, (response: {ts_return_type}) => {{
        if (response.success) {{
          resolve(response);
        }} else {{
          reject(response);
        }}
      }});
      this.sendMessage(message);
    }});
  }}"""

    elif decorator_type == 'stream_request':  # request_stream
        # Extract inner type from AsyncGenerator<T, void>
        inner_type = ts_return_type[len("AsyncGenerator<"):-len(", void>")]
        return method_name, f"""
  async *{method_name}({args_param}): {ts_return_type} {{
    const requestId = this.getNextRequestId();
    const message: network_api_types.{message_type} = {{
      type: MessageType.STREAM_REQUEST,
      data: {{
        operation: '{python_name}',
        ...args,
      }},
      requestId
    }};

    const handlerKey = this.getHandlerKey(requestId);

    // Create an event queue to buffer incoming events
    const eventQueue: Array<any> = [];

    // This will hold a reference to the resolve function when we're waiting for events
    let queueSignal: (() => void) | null = null;

    // Function that returns a promise that resolves when new data is available
    const waitForQueueData = (): Promise<void> => {{
      return new Promise<void>(resolve => {{
        if (eventQueue.length > 0) {{
          // Data already available, resolve immediately
          resolve();
        }} else {{
          // No data yet, store the resolver for later
          queueSignal = resolve;
        }}
      }});
    }};

    // Set up the response handler to add events to our queue
    this.responseHandlers.set(handlerKey, (data: any) => {{
      // Add the new event to our queue
      eventQueue.push(data);

      // If there's a pending wait operation, signal it to continue
      if (queueSignal) {{
        queueSignal();
        queueSignal = null;
      }}
    }});

    // Send the request to start the stream
    this.sendMessage(message);

    try {{
      while (true) {{
        // If the queue is empty, wait for data to arrive
        if (eventQueue.length === 0) {{
          await waitForQueueData();
        }}

        // Get the next event from the queue (FIFO)
        const data = eventQueue.shift();

        // Check for stream end or invalid data
        if (!data || data.type === 'stream_end') {{
          break;
        }}

        // Yield the event with proper type casting
        yield data as {inner_type};
      }}
    }} finally {{
      // Always clean up the handler when done
      this.responseHandlers.delete(handlerKey);
    }}
  }}"""

    elif decorator_type == 'fire_and_forget':  # fire_and_forget
        return method_name, f"""
  async {method_name}({args_param}): Promise<{ts_return_type}> {{
    const message: network_api_types.{message_type} = {{
      type: MessageType.FIRE_AND_FORGET,
      data: {{
        operation: '{python_name}',
        ...args,
      }},
      requestId: this.getNextRequestId()
    }};

    return this.sendMessage(message);
  }}"""
    else:
        raise ValueError(f"Unknown decorator type: {decorator_type}")

collect_models

collect_models(method) -> Set[Type[BaseModel]]

Collect all Pydantic models used in a method's signature.

Source code in azad/introspect.py
def collect_models(method) -> Set[Type[BaseModel]]:
    """Collect all Pydantic models used in a method's signature."""
    models = set()
    sig = inspect.signature(method)

    def add_model(typ: Any) -> None:
        if isinstance(typ, type):
            if issubclass(typ, BaseModel):
                models.add(typ)
                # Add base classes
                for base in typ.__bases__:
                    if base != BaseModel and issubclass(base, BaseModel):
                        models.add(base)
                # Add subclasses and their fields recursively
                for subclass in get_all_subclasses(typ):
                    models.add(subclass)
                    # Add fields from subclasses
                    for name, field in subclass.model_fields.items():
                        add_type(field.annotation)
                # Add fields from this model
                for name, field in typ.model_fields.items():
                    add_type(field.annotation)

    def add_type(typ: Any) -> None:
        origin = get_origin(typ)
        if origin:
            for arg in get_args(typ):
                add_type(arg)
        else:
            add_model(typ)

    # Analyze parameters
    for param in sig.parameters.values():
        if param.annotation != inspect.Parameter.empty:
            add_type(param.annotation)

    # Analyze return type
    if sig.return_annotation != inspect.Parameter.empty:
        add_type(sig.return_annotation)

    return models

copy_template_directory

copy_template_directory(template_dir: str, output_dir: str) -> None

Copy template directory to output directory.

Source code in azad/introspect.py
def copy_template_directory(template_dir: str, output_dir: str) -> None:
    """Copy template directory to output directory."""
    import shutil
    # Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Copy from template/package to output/package
    template_package = os.path.join(template_dir, 'package')
    output_package = os.path.join(output_dir, 'package')

    if os.path.exists(output_package):
        shutil.rmtree(output_package)
    shutil.copytree(template_package, output_package)

main

main(args: Optional[Namespace] = None)

Main entry point for the introspection tool. Can accept pre-parsed args.

Source code in azad/introspect.py
def main(args: Optional[argparse.Namespace] = None):
    """Main entry point for the introspection tool. Can accept pre-parsed args."""
    # If args are not passed directly, parse them from command line
    if args is None:
        parser = argparse.ArgumentParser(description="Introspect Azad's server interface")
        parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose output')
        parser.add_argument('--output', '-o', default='azad-ts-client', help='Output directory')
        parser.add_argument('--methods', '-m', help='Comma-separated list of methods to generate')
        args = parser.parse_args()

    # Generate WebSocket client
    generate_websocket_client(args)

    # Generate HTTP client
    generate_http_client(args)

generate_http_client

generate_http_client(args: Namespace)

Generates the TypeScript client for the HTTP server.

Source code in azad/introspect.py
def generate_http_client(args: argparse.Namespace):
    """Generates the TypeScript client for the HTTP server."""
    server_process = None
    try:
        # Start the server
        server_command = ["python", "-m", "azad.http_server.server"]
        server_process = subprocess.Popen(server_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print("HTTP server started...")
        time.sleep(2)  # Give the server a moment to start

        # Generate the types
        output_dir = os.path.join(args.output, 'package', 'src')
        os.makedirs(output_dir, exist_ok=True)
        output_file = os.path.join(output_dir, 'network_http_types.ts')

        codegen_command = [
            "npx", "--yes", "openapi-typescript",
            "http://localhost:3333/openapi.json",
            "--output", output_file
        ]
        subprocess.run(codegen_command, check=True)
        print(f"HTTP client types generated at {output_file}")

        # Fetch the OpenAPI schema
        response = requests.get("http://localhost:3333/openapi.json")
        response.raise_for_status()
        schema = response.json()

        # Generate the client methods
        client_code = generate_ts_client_from_schema(schema)
        client_output_file = os.path.join(output_dir, 'http-client.ts')
        with open(client_output_file, 'w') as f:
            f.write(client_code)
        print(f"HTTP client generated at {client_output_file}")


    except (requests.exceptions.RequestException, subprocess.CalledProcessError) as e:
        print(f"Error generating HTTP client: {e}")
        if server_process:
            stdout, stderr = server_process.communicate()
            print("Server STDOUT:")
            print(stdout.decode())
            print("Server STDERR:")
            print(stderr.decode())

    finally:
        if server_process:
            os.kill(server_process.pid, signal.SIGTERM)
            print("HTTP server stopped.")

generate_websocket_client

generate_websocket_client(args: Namespace)

Generates the TypeScript client for the WebSocket server.

Source code in azad/introspect.py
def generate_websocket_client(args: argparse.Namespace):
    """Generates the TypeScript client for the WebSocket server."""
    # Parse methods to generate from the provided or parsed args
    methods_to_generate = set(args.methods.split(',')) if hasattr(args, 'methods') and args.methods else None

    # Use args.output and args.verbose directly
    # Ensure args has the necessary attributes, provide defaults if needed
    verbose = getattr(args, 'verbose', False)
    output_dir = getattr(args, 'output', 'azad-ts-client')

    config = Config(verbose=verbose, output_file=output_dir)

    if config.verbose:
        print(f"{GREEN}{'─' * 60}")
        print(f"│ {CYAN}Azad Server Interface Analysis{RESET}")
        print(f"├{'─' * 60}{RESET}")

    # Collect models only from specified methods
    all_models = set()
    for name, method in inspect.getmembers(AzadAgent):
        if hasattr(method, '_slipstream_endpoint'):
            # Skip if not in methods_to_generate
            if methods_to_generate and name not in methods_to_generate:
                continue
            analyze_method(method, config)
            all_models.update(collect_models(method))

    if config.verbose:
        print(f"{GREEN}{'─' * 60}{RESET}")

    # template client is located at azad/template_clients/ts
    ts_client_dir = os.path.join(os.path.dirname(__file__), 'template_clients', 'ts')
    # Copy template directory to output
    copy_template_directory(ts_client_dir, args.output)

    # Get exported types
    exported_types = get_exported_types()

    # Generate TypeScript interfaces in network_api_types.ts
    type_definitions_content = generate_typescript_interfaces(all_models, exported_types, AzadAgent, methods_to_generate)
    try:
        from .predefined_tools import PREDEFINED_TOOL_SCHEMAS, PREDEFINED_OUTPUT_SCHEMAS, PREDEFINED_TOOL_METADATA

        type_definitions_content += "\n// Predefined tool schemas - strict Pydantic schemas for pre-defined server tools\n"

        for tool_name, schema_class in PREDEFINED_TOOL_SCHEMAS.items():
            output_schema = PREDEFINED_OUTPUT_SCHEMAS[tool_name]

            type_definitions_content += f"\nexport interface {schema_class.__name__} {{\n"
            for name, field in schema_class.model_fields.items():
                field_type = field.annotation
                ts_type = python_type_to_ts(field_type, exported_types)
                is_optional = not field.is_required()
                optional_marker = "?" if is_optional else ""
                type_definitions_content += f"  {name}{optional_marker}: {ts_type};\n"
            type_definitions_content += "}\n"

            type_definitions_content += f"\nexport interface {output_schema.__name__} {{\n"
            for name, field in output_schema.model_fields.items():
                field_type = field.annotation
                ts_type = python_type_to_ts(field_type, exported_types)
                is_optional = not field.is_required()
                optional_marker = "?" if is_optional else ""
                type_definitions_content += f"  {name}{optional_marker}: {ts_type};\n"
            type_definitions_content += "}\n"

        type_definitions_content += "\n// Predefined tool metadata\n"

        metadata_dict = {metadata["name"]: metadata for metadata in PREDEFINED_TOOL_METADATA}

        import json
        metadata_json = json.dumps(metadata_dict, indent=2)

        type_definitions_content += f"export const PREDEFINED_TOOL_METADATA = {metadata_json} satisfies Record<string, ToolMetadata>;\n"
    except ImportError:
        print("Could not import predefined tools schemas for TypeScript generation")
    except Exception as e:
        print(f"Error processing predefined tools for TypeScript: {e}")
    with open(os.path.join(args.output, 'package', 'src', 'network_api_types.ts'), 'w') as f:
        f.write(type_definitions_content)

    from .introspect_environment import main as env_main
    env_main(verbose=args.verbose, output_dir=args.output, 
             template=ts_client_dir, exported_types=all_models)

    client_path = os.path.join(ts_client_dir, 'package', 'src', 'client.ts')
    with open(client_path, 'r') as f:
        template = f.read()

    methods = []
    for name, method in inspect.getmembers(AzadAgent):
        if hasattr(method, '_slipstream_endpoint'):
            if methods_to_generate and name not in methods_to_generate:
                continue
            _, method_code = generate_client_method(method, exported_types)
            methods.append(method_code)

    client_code = template.replace(
        "// Auto-generated method declarations will be inserted here",
        "\n".join(methods)
    )

    if (config.verbose):
        print("\nGenerated methods:")
        print("\n".join(methods))

    with open(os.path.join(args.output, 'package', 'src', 'client.ts'), 'w') as f:
        f.write(client_code)

    # Append the http types export to the main index.ts
    index_path = os.path.join(args.output, 'package', 'src', 'index.ts')
    with open(index_path, 'a') as f:
        f.write("\n// Export HTTP client types\n")
        f.write("export * from './network_http_types';\n")
        f.write("export * from './http-client';\n")

generate_ts_client_from_schema

generate_ts_client_from_schema(schema: Dict[str, Any]) -> str

Generates a TypeScript client from an OpenAPI schema.

Source code in azad/introspect.py
def generate_ts_client_from_schema(schema: Dict[str, Any]) -> str:
    """Generates a TypeScript client from an OpenAPI schema."""

    paths = schema.get("paths", {})
    client_methods = []

    for path, path_item in paths.items():
        for method, operation in path_item.items():
            operation_id = operation.get("operationId")
            if not operation_id:
                continue

            # Clean up the operationId to be a valid function name
            # e.g., "upload_failed_edit_upload_failed_edit_post" -> "uploadFailedEdit"
            clean_op_id = to_camel_case(operation_id.split('_')[0])

            # Determine the payload type
            payload_type = 'never'
            if "requestBody" in operation:
                try:
                    payload_type = f'paths["{path}"]["{method}"]["requestBody"]["content"]["application/json"]'
                except KeyError:
                    pass

            ts_method = method.upper()

            # Generate the method
            client_methods.append(f"""
  {clean_op_id}: (
    payload: {payload_type}
  ) => {{
    return client.{ts_method}("{path}", {{
      body: payload,
    }});
  }},""")

    # Assemble the full client code
    client_code = f"""
import createClient from "openapi-fetch";
import type {{ paths }} from "./network_http_types";

export const createHttpClient = (baseUrl: string) => {{
    const client = createClient<paths>({{ baseUrl }});

    return {{
        {"".join(client_methods)}
    }};
}};
"""
    return client_code

Modules