Skip to content

azad.introspect_environment Module

azad.introspect_environment

Tool for introspecting Azad's environment interface methods and types.

Attributes

CYAN module-attribute

CYAN = '\x1b[96m'

GREEN module-attribute

GREEN = '\x1b[92m'

YELLOW module-attribute

YELLOW = '\x1b[93m'

BLUE module-attribute

BLUE = '\x1b[94m'

RESET module-attribute

RESET = '\x1b[0m'

parser module-attribute

parser = ArgumentParser(description="Introspect Azad's environment interface")

args module-attribute

args = parse_args()

Classes

Config dataclass

Config(verbose: bool)

Configuration for the introspection tool.

DiscoveredType dataclass

DiscoveredType(name: str, origin: Optional[Type], args: List[Any], module: Optional[str], is_generic: bool, parent_types: Set[str], child_types: Set[str], is_enum: bool = False)

Information about a discovered type.

TypeRegistry

TypeRegistry()

Registry of discovered types.

Source code in azad/introspect_environment.py
def __init__(self):
    self.types: Dict[str, DiscoveredType] = {}
    self.environment_methods: List[Tuple[str, inspect.Signature]] = []
    self.streaming_methods: List[Tuple[str, inspect.Signature]] = []
Attributes
environment_methods instance-attribute
environment_methods: List[Tuple[str, Signature]] = []
streaming_methods instance-attribute
streaming_methods: List[Tuple[str, Signature]] = []
Functions
add_type
add_type(typ: Any, parent: Optional[str] = None) -> None

Add a type to the registry.

Source code in azad/introspect_environment.py
def add_type(self, typ: Any, parent: Optional[str] = None) -> None:
    """Add a type to the registry."""
    print(f"\n{CYAN}Examining type:{RESET}")
    print(f"  • Raw type: {typ}")

    if typ == Any:
        print("  ✗ Skipping Any type")
        return

    # Get type name and module
    type_name = getattr(typ, '__name__', str(typ))
    type_module = getattr(typ, '__module__', None)
    print(f"  • Name: {type_name}")
    print(f"  • Module: {type_module}")

    # Skip if already processed
    if type_name in self.types:
        print(f"  ↻ Already processed, adding parent: {parent}")
        if parent:
            self.types[type_name].parent_types.add(parent)
        return

    # Get origin and args for generic types
    origin = get_origin(typ)
    args = get_args(typ)
    # Check if this is an enum

    is_enum = isinstance(typ, type) and issubclass(typ, enum.Enum)
    print(f"  • Origin: {origin}")
    print(f"  • Args: {args}")

    # Create type info
    type_info = DiscoveredType(
        name=type_name,
        origin=origin,
        args=args, # type: ignore
        module=type_module,
        is_generic=bool(origin or getattr(typ, '__origin__', None)),
        parent_types=set([parent]) if parent else set(),
        child_types=set(),
        is_enum=is_enum
    )

    print(f"  ✓ Adding new type: {type_name}")
    if type_info.is_generic:
        print(f"    (Generic type with origin {origin})")

    self.types[type_name] = type_info

    # Process generic arguments
    if args:
        print(f"  → Processing generic arguments:")
        for arg in args:
            print(f"    • Argument: {arg}")
            if hasattr(arg, '__name__'):
                print(f"      ↓ Recursing into {arg.__name__}")
                self.add_type(arg, type_name)
                type_info.child_types.add(arg.__name__)
            else:
                print(f"      ✗ Skipping argument without __name__")
get_all_types
get_all_types() -> List[DiscoveredType]

Get all discovered types.

Source code in azad/introspect_environment.py
def get_all_types(self) -> List[DiscoveredType]:
    """Get all discovered types."""
    return list(self.types.values())

Functions

convert_brackets_to_angles

convert_brackets_to_angles(type_str: str) -> str

Recursively convert all [] to <> in type strings.

Source code in azad/introspect_environment.py
def convert_brackets_to_angles(type_str: str) -> str:
    """Recursively convert all [] to <> in type strings."""
    print(f"\n{CYAN}Converting brackets in: {type_str}{RESET}")
    result = ""
    i = 0
    while i < len(type_str):
        if type_str[i] == '[':
            print(f"  • Found opening bracket at position {i}")
            # Find matching closing bracket
            depth = 1
            j = i + 1
            while j < len(type_str) and depth > 0:
                if type_str[j] == '[':
                    depth += 1
                elif type_str[j] == ']':
                    depth -= 1
                j += 1
            # Convert inner part recursively
            inner_part = type_str[i+1:j-1]
            print(f"  • Processing inner part: {inner_part}")
            inner = convert_brackets_to_angles(inner_part)
            print(f"  • Converted inner part to: {inner}")
            new_part = f"<{inner}>"
            print(f"  • Adding to result: {new_part}")
            result += new_part
            i = j
        else:
            result += type_str[i]
            i += 1
    print(f"  • Final result: {result}")
    return result

analyze_method_signature

analyze_method_signature(method, registry: TypeRegistry, config: Config) -> None

Analyze a method's signature and record discovered types.

Source code in azad/introspect_environment.py
def analyze_method_signature(method, registry: TypeRegistry, config: Config) -> None:
    """Analyze a method's signature and record discovered types."""
    sig = inspect.signature(method)

    print(f"\n{GREEN}╭─ Analyzing method: {method.__name__}{RESET}")

    # Print signature info
    params = []
    for name, param in sig.parameters.items():
        if name == 'self':
            continue
        annotation = param.annotation if param.annotation != inspect.Parameter.empty else Any
        default = "" if param.default == inspect.Parameter.empty else f"={param.default}"
        params.append(f"{name}: {annotation}{default}")

    return_annotation = sig.return_annotation if sig.return_annotation != inspect.Parameter.empty else Any
    print(f"│ {CYAN}Signature:{RESET} {method.__name__}({', '.join(params)}) -> {return_annotation}")

    # Analyze parameter types
    print(f"│\n{CYAN}Parameter Types:{RESET}")
    for param in sig.parameters.values():
        if param.name == 'self':
            continue
        if param.annotation != inspect.Parameter.empty:
            print(f"│ • Processing parameter {param.name}: {param.annotation}")
            registry.add_type(param.annotation)

    # Analyze return type
    print(f"│\n{CYAN}Return Type:{RESET}")
    if sig.return_annotation != inspect.Parameter.empty:
        print(f"│ • Processing return type: {sig.return_annotation}")
        registry.add_type(sig.return_annotation)

    print(f"╰{'─' * 50}")

get_all_subclasses

get_all_subclasses(cls: Type) -> Set[Type]

Recursively get all subclasses of a class.

Source code in azad/introspect_environment.py
def get_all_subclasses(cls: Type) -> Set[Type]:
    """Recursively get all subclasses of a class."""
    subclasses = set()
    for subclass in cls.__subclasses__():
        subclasses.add(subclass)
        subclasses.update(get_all_subclasses(subclass))
    return subclasses

get_all_union_types

get_all_union_types(typ: Any) -> Set[Type]

Recursively get all types in a Union.

Source code in azad/introspect_environment.py
def get_all_union_types(typ: Any) -> Set[Type]:
    """Recursively get all types in a Union."""
    types = set()
    origin = get_origin(typ)
    if origin == Union:
        for arg in get_args(typ):
            types.update(get_all_union_types(arg))
    else:
        types.add(typ)
    return types

get_all_parent_classes

get_all_parent_classes(cls: Type) -> Set[Type]

Recursively get all parent classes that are BaseModel subclasses.

Source code in azad/introspect_environment.py
def get_all_parent_classes(cls: Type) -> Set[Type]:
    """Recursively get all parent classes that are BaseModel subclasses."""
    parents = set()
    for base in cls.__bases__:
        if base != BaseModel and issubclass(base, BaseModel):
            parents.add(base)
            parents.update(get_all_parent_classes(base))
    return parents

collect_types_from_annotation

collect_types_from_annotation(annotation: Any, discovered_types: Set[Type]) -> None

Collect BaseModel types from a type annotation.

Source code in azad/introspect_environment.py
def collect_types_from_annotation(annotation: Any, discovered_types: Set[Type]) -> None:
    """Collect BaseModel types from a type annotation."""
    # Handle basic types
    if isinstance(annotation, type):
        if issubclass(annotation, BaseModel):
            discovered_types.add(annotation)
            # Add parent classes
            discovered_types.update(get_all_parent_classes(annotation))
            # Add subclasses
            discovered_types.update(get_all_subclasses(annotation))
            # Add types from fields
            for field in annotation.model_fields.values():
                collect_types_from_annotation(field.annotation, discovered_types) # type: ignore

    # Handle generic types
    origin = get_origin(annotation)
    if origin:
        # Add the origin if it's a BaseModel
        if isinstance(origin, type) and issubclass(origin, BaseModel):
            discovered_types.add(origin)
        # Process generic arguments
        for arg in get_args(annotation):
            collect_types_from_annotation(arg, discovered_types) # type: ignore

print_type_info

print_type_info(typ: Any, indent: str = '') -> None

Print detailed information about a type.

Source code in azad/introspect_environment.py
def print_type_info(typ: Any, indent: str = "") -> None:
    """Print detailed information about a type."""
    print(f"{indent}Type: {typ}")
    print(f"{indent}Type of type: {type(typ)}")
    print(f"{indent}Has __origin__: {hasattr(typ, '__origin__')}")
    if hasattr(typ, '__origin__'):
        print(f"{indent}  __origin__: {typ.__origin__}")
    print(f"{indent}Has __args__: {hasattr(typ, '__args__')}")
    if hasattr(typ, '__args__'):
        print(f"{indent}  __args__: {typ.__args__}")
    print(f"{indent}Has __parameters__: {hasattr(typ, '__parameters__')}")
    if hasattr(typ, '__parameters__'):
        print(f"{indent}  __parameters__: {typ.__parameters__}")
    print(f"{indent}get_origin(): {get_origin(typ)}")
    print(f"{indent}get_args(): {get_args(typ)}")
    if isinstance(typ, type):
        print(f"{indent}Is class: True")
        print(f"{indent}Module: {typ.__module__}")
        print(f"{indent}Name: {typ.__name__}")
        if hasattr(typ, '__bases__'):
            print(f"{indent}Bases: {typ.__bases__}")
    print()

analyze_environment_methods

analyze_environment_methods(config: Config) -> TypeRegistry

Analyze environment methods and discover types.

Source code in azad/introspect_environment.py
def analyze_environment_methods(config: Config) -> TypeRegistry:
    """Analyze environment methods and discover types."""
    registry = TypeRegistry()

    print(f"\n{GREEN}=== Environment Method Analysis ==={RESET}\n")

    # Get all type hints from Environment
    try:
        print(f"\n{CYAN}Scanning Environment methods:{RESET}")

        for name, method in inspect.getmembers(Environment, predicate=inspect.isfunction):
            if name.startswith('_'):
                continue
            # Check for streaming methods
            source = inspect.getsource(method)
            sig = inspect.signature(method)
            is_async_iterator = (
                hasattr(sig.return_annotation, '__origin__') and
                sig.return_annotation.__origin__ in (AsyncIterator, AsyncIteratorABC)
            )
            if 'self._protocol.stream_stream' in source and is_async_iterator:
                registry.streaming_methods.append((name, sig))

            print(f"\n{YELLOW}Analyzing method: {name}{RESET}")

            # Get method signature and type hints
            sig = inspect.signature(method)
            method_hints = typing.get_type_hints(method)

            # Save method info for later, excluding streaming methods
            is_streaming = (
                'self._protocol.stream_stream' in source and
                hasattr(sig.return_annotation, '__origin__') and
                sig.return_annotation.__origin__ in (AsyncIterator, AsyncIteratorABC)
            )
            if not is_streaming:
                registry.environment_methods.append((name, sig))

            # Analyze parameter types
            for param_name, param_type in method_hints.items():
                if param_name == 'self' or param_name == 'return':
                    continue

                print(f"Processing parameter {param_name}: {param_type}")
                print_type_info(param_type, "  ")

                # Handle the type and its relationships
                process_type(param_type, registry)

            # Analyze return type
            if 'return' in method_hints:
                return_type = method_hints['return']
                print(f"Processing return type: {return_type}")
                print_type_info(return_type, "  ")

                # Handle the return type and its relationships
                process_type(return_type, registry)

    except Exception as e:
        print(f"Error getting type hints: {e}")

    # Explicitly add all AINetworkEvent subclasses to the registry
    for subclass in get_all_subclasses(AINetworkEvent):
        registry.add_type(subclass)

    # Explicitly add StreamingDict and StreamingPromptResponse and StreamingContent
    registry.add_type(StreamingDict)
    registry.add_type(StreamingPromptResponse)
    registry.add_type(StreamingContent)
    registry.add_type(Message)

    return registry

process_type

process_type(typ: Any, registry: TypeRegistry) -> None
Source code in azad/introspect_environment.py
def process_type(typ: Any, registry: TypeRegistry) -> None:
    origin = get_origin(typ)
    if origin:
        if origin == Union:
            for arg in get_args(typ):
                process_type(arg, registry)
        elif isinstance(origin, type) and issubclass(origin, BaseModel):
            registry.add_type(origin)
            for field in origin.model_fields.values():
                process_type(field.annotation, registry)
        for arg in get_args(typ):
            process_type(arg, registry)
    elif isinstance(typ, type):
        if issubclass(typ, BaseModel):
            registry.add_type(typ)
            for field in typ.model_fields.values():
                process_type(field.annotation, registry)

print_discovered_types

print_discovered_types(registry: TypeRegistry) -> None

Print all discovered types in a structured format.

Source code in azad/introspect_environment.py
def print_discovered_types(registry: TypeRegistry) -> None:
    """Print all discovered types in a structured format."""
    print(f"\n{GREEN}=== Discovered Types ==={RESET}\n")

    # Group types by category
    response_types = []
    generic_types = []
    container_types = []
    other_types = []

    for typ in registry.get_all_types():
        if typ.name.endswith('Response'):
            response_types.append(typ)
        elif typ.is_generic:
            if typ.origin and typ.origin.__module__ == 'typing':
                container_types.append(typ)
            else:
                generic_types.append(typ)
        else:
            other_types.append(typ)

    def print_type_group(title: str, types: List[DiscoveredType]) -> None:
        if types:
            print(f"{CYAN}{title}:{RESET}")
            for t in sorted(types, key=lambda x: x.name):
                # Print type name and module
                type_str = f"  • {YELLOW}{t.name}{RESET}"
                if t.module:
                    type_str += f" ({t.module})"

                # Print generic info if applicable
                if t.args:
                    args_str = ', '.join(str(arg) for arg in t.args)
                    type_str += f" [{args_str}]"

                # Print relationships
                if t.parent_types:
                    type_str += f"\n    ↑ Used by: {', '.join(sorted(t.parent_types))}"
                if t.child_types:
                    type_str += f"\n    ↓ Uses: {', '.join(sorted(t.child_types))}"

                print(type_str)
            print()

    print_type_group("Response Types", response_types)
    print_type_group("Generic Types", generic_types)
    print_type_group("Container Types", container_types)
    print_type_group("Other Types", other_types)

python_type_to_ts

python_type_to_ts(typ: Any, exported_types: Optional[Set[Type]] = None, seen: Optional[Set[Any]] = None) -> str

Convert a Python type to TypeScript.

Source code in azad/introspect_environment.py
def python_type_to_ts(typ: Any, exported_types: Optional[Set[Type]] = None, seen: Optional[Set[Any]] = None) -> str:

    """Convert a Python type to TypeScript."""
    if seen is None:
        seen = set()

    # Prevent infinite recursion
    if id(typ) in seen:
        return "any"
    seen.add(id(typ))

    try:
        print(f"\n{CYAN}DEBUG: Type mapping for: {typ}{RESET}")
        print(f"  • Type info:")
        print(f"    - Type: {type(typ)}")
        print(f"    - Has origin: {hasattr(typ, '__origin__')}")
        if hasattr(typ, '__origin__'):
            print(f"    - Origin: {typ.__origin__}")
        print(f"    - Has args: {hasattr(typ, '__args__')}")
        if hasattr(typ, '__args__'):
            print(f"    - Args: {typ.__args__}")

        # Get origin and args early to preserve type information
        origin = get_origin(typ)
        args = get_args(typ)
        print(f"    - get_origin(): {origin}")
        print(f"    - get_args(): {args}")

        # Handle container types first to preserve type information
        if origin in (AsyncIterator, AsyncIteratorABC):
            print(f"  • Processing AsyncIterator type")
            if args:
                print(f"    - Processing inner type: {args[0]}")
                inner_type = python_type_to_ts(args[0], exported_types, seen)
                print(f"    - Inner type mapped to: {inner_type}")
                result = f"AsyncGenerator<{inner_type}>"
                print(f"    - Final AsyncGenerator type: {result}")
                return convert_brackets_to_angles(str(result))
            print("    - No args found, defaulting to any")
            return "AsyncGenerator<any>"

        # Handle literal types
        if origin == Literal:
            args = get_args(typ)
            ts_args = []
            for arg in args:
                if isinstance(arg, str):
                    ts_args.append(f"'{arg}'")
                elif isinstance(arg, (int, float)):
                    ts_args.append(str(arg))
                elif arg is True:
                    ts_args.append("true")
                elif arg is False:
                    ts_args.append("false")
                elif arg is None:
                    ts_args.append("null")
                elif isinstance(arg, enum.Enum):  # Handle enum members in Literal types
                    value = arg.value
                    enum_class = arg.__class__.__name__
                    # Ensure consistent handling of all enum types, especially MessageRole
                    if isinstance(value, str):
                        ts_args.append(f"'{value}'")  # Just use the value part
                    elif isinstance(value, (int, float)):
                        ts_args.append(str(value))
                    else:
                        ts_args.append("any")
                else:
                    ts_args.append("any")
            return " | ".join(ts_args)

        # Basic type mapping
        BASIC_TYPES = {
            str: "string",
            int: "number",
            float: "number",
            bool: "boolean",
            Any: "any",
            type(None): "null",
        }

        # Handle basic types
        if typ in BASIC_TYPES:
            result = BASIC_TYPES[typ]
            print(f"  • Mapped basic type to: {result}")
            result = str(result)  # Ensure we have a string
            return convert_brackets_to_angles(result)
        elif isinstance(typ, TypeVar):
            result = typ.__name__ if hasattr(typ, "__name__") else "any"
            print(f"  • Mapped TypeVar to: {result}")
            return result
        elif hasattr(typ, "__name__") and typ.__name__ == "datetime":
            print(f"  • Mapped datetime to: string")
            return "string"
        # Handle built-in dict explicitly
        elif typ == dict:
            print(f"  • Mapped built-in dict to: Record<string, any>")
            return "Record<string, any>"

        # Handle Enum types
        if isinstance(typ, type) and issubclass(typ, enum.Enum):
            enum_values = []
            for member in typ:
                value = member.value
                if isinstance(value, str):
                    enum_values.append(f"'{value}'")
                elif isinstance(value, (int, float)):
                    enum_values.append(str(value))
                else:
                    enum_values.append("any")
            return " | ".join(enum_values)

        # Handle Union types
        origin = get_origin(typ)
        if origin == Union or str(type(typ)) == "<class 'types.UnionType'>":
            print(f"  • Processing Union type")
            args = get_args(typ)
            # Handle Optional[X] which is Union[X, None]
            if len(args) == 2 and args[1] in (type(None), None):
                print(f"    - Detected Optional type")
                inner = python_type_to_ts(args[0], exported_types, seen)
                print(f"    - Inner type: {inner}")
                result = f"{inner} | null"
                print(f"    - Final result: {result}")
                return result

            print(f"    - Processing Union args: {args}")
            mapped_types = []
            for arg in args:
                print(f"      • Processing arg: {arg}")
                mapped = python_type_to_ts(arg, exported_types, seen)
                print(f"      • Mapped to: {mapped}")
                mapped_types.append(mapped)
            result = " | ".join(mapped_types)
            print(f"    - Final Union result: {result}")
            return result

        # Handle container types
        if origin == dict:
            print(f"  • Processing Dict type")
            args = get_args(typ)
            key_type = python_type_to_ts(args[0], exported_types, seen)
            value_type = python_type_to_ts(args[1], exported_types, seen)
            result = f"Record<{key_type}, {value_type}>"
            print(f"    - Mapped to: {result}")
            return result
        elif origin == list:
            print(f"  • Processing List type")
            args = get_args(typ)
            result = f"Array<{python_type_to_ts(args[0], exported_types, seen)}>"
            print(f"    - Mapped to: {result}")
            return result
        elif origin in (AsyncIterator, AsyncIteratorABC):
            print(f"  • Processing AsyncIterator type")
            args = get_args(typ)
            print(f"    - Args: {args}")
            if args:
                print(f"    - Processing inner type: {args[0]}")
                inner_type = python_type_to_ts(args[0], exported_types, seen)
                print(f"    - Inner type mapped to: {inner_type}")
                result = f"AsyncGenerator<{inner_type}>"
                print(f"    - Final AsyncGenerator type: {result}")
                return result
            print("    - No args found, defaulting to any")
            return "AsyncGenerator<any>"
        elif origin == Callable:
            args = get_args(typ)
            if len(args) == 2:  # [args_tuple, return_type]
                param_types = args[0]
                return_type = args[1]
                params = ", ".join(python_type_to_ts(p, exported_types, seen) for p in param_types)
                return_ts = python_type_to_ts(return_type, exported_types, seen)
                return f"({params}) => {return_ts}"
            return "Function"
        elif origin == Awaitable:
            args = get_args(typ)
            return f"Promise<{python_type_to_ts(args[0], exported_types, seen)}>"

        # Handle class types
        if isinstance(typ, type):
            print(f"  • Processing class type: {typ.__name__}")
            # If this type is exported, use it from network_api_types
            if exported_types and typ in exported_types:
                print(f"    - Found in exported types")
                result = typ.__name__
            else:
                print(f"    - Using class name directly")
                result = typ.__name__
            print(f"    - Mapped to: {result}")
            return result

        return "any"
    finally:
        seen.remove(id(typ))

to_camel_case

to_camel_case(snake_str: str) -> str

Convert snake_case to camelCase.

Source code in azad/introspect_environment.py
def to_camel_case(snake_str: str) -> str:
    """Convert snake_case to camelCase."""
    components = snake_str.split('_')
    return components[0] + ''.join(x.title() for x in components[1:])

get_operation_from_source

get_operation_from_source(source: str) -> Optional[str]

Extract the operation name from a method's source code.

Source code in azad/introspect_environment.py
def get_operation_from_source(source: str) -> Optional[str]:
    """Extract the operation name from a method's source code."""
    for line in source.split('\n'):
        if '"operation":' in line:
            # Extract operation name from "operation": "tool_env_something"
            parts = line.split('"operation":')
            if len(parts) > 1:
                # Get the part after "operation":
                operation_part = parts[1].strip()
                # Remove any trailing comma and whitespace
                operation_part = operation_part.rstrip(',')
                # Extract the operation name from the quotes
                if '"' in operation_part:
                    return operation_part.split('"')[1]
    return None

generate_environment_interface

generate_environment_interface(registry: TypeRegistry, exported_types: Optional[Set[Type]] = None) -> List[str]

Generate the Environment interface from Environment.

Source code in azad/introspect_environment.py
def generate_environment_interface(registry: TypeRegistry, exported_types: Optional[Set[Type]] = None) -> List[str]:
    """Generate the Environment interface from Environment."""
    output = []
    output.append('// Environment interface that extends BaseEnvironment')
    output.append('export interface Environment extends BaseEnvironment {')

    # Use the saved environment methods
    for method_name, sig in registry.environment_methods:
        # Skip methods starting with underscore
        if method_name.startswith('_'):
            continue
        # Get method source
        method = getattr(Environment, method_name)
        source = inspect.getsource(method)

        # Extract operation name
        operation = get_operation_from_source(source)
        if not operation or not operation.startswith('tool_env_'):
            continue

        # Convert method name from operation
        ts_name = to_camel_case(operation.replace('tool_env_', ''))

        # Get type hints for the method
        method_hints = typing.get_type_hints(method)

        # Get parameters through introspection
        params = []
        print(f"\n{CYAN}Processing method parameters for {method_name}:{RESET}")
        for param_name, param in sig.parameters.items():
            if param_name == 'self':
                continue
            print(f"  • Parameter: {param_name}")
            # Skip parameters with default values
            if param.default == inspect.Parameter.empty:
                # Use type hints to get the actual type
                param_type = method_hints.get(param_name, Any)
                print(f"    - Type hint: {param_type}")
                ts_type = python_type_to_ts(param_type, exported_types)
                print(f"    - Mapped to TS: {ts_type}")
                params.append(f"{to_camel_case(param_name)}: {ts_type}")

        # Get return type through type hints
        print("\nReturn type:")
        return_type = method_hints.get('return', Any)
        print(f"  • Return type hint: {return_type}")
        print(f"  • Return origin: {get_origin(return_type)}")
        print(f"  • Return args: {get_args(return_type)}")
        ts_return_type = python_type_to_ts(return_type, exported_types)
        print(f"  • Mapped to TS: {return_type}")

        # Generate method signature with proper TypeScript return type
        print(f"\n{CYAN}DEBUG: Before Promise wrapping: {ts_return_type}{RESET}")
        ts_return_type = convert_brackets_to_angles(str(ts_return_type))
        print(f"{CYAN}DEBUG: After bracket conversion: {ts_return_type}{RESET}")
        method_signature = f'  {ts_name}({", ".join(params)}): Promise<{ts_return_type}>;'
        print(f"\nGenerated signature:\n{method_signature}")
        output.append(method_signature)

    # Process streaming methods using introspection
    print(f"\n{GREEN}Processing streaming methods:{RESET}")
    for method_name, sig in registry.streaming_methods:
        print(f"\n{CYAN}Processing streaming method: {method_name}{RESET}")
        ts_method_name = to_camel_case(method_name)
        params = []

        print("Parameters:")
        for param_name, param in sig.parameters.items():
            if param_name == 'self':
                continue
            print(f"  • Parameter: {param_name}")
            print(f"    - Annotation: {param.annotation}")
            # Use type hints to get the actual type
            param_type = param.annotation if param.annotation != inspect.Parameter.empty else Any
            print(f"    - Using type: {param_type}")
            ts_type = python_type_to_ts(param_type, exported_types)
            print(f"    - Mapped to TS: {ts_type}")
            params.append(f"{to_camel_case(param_name)}: {ts_type}")

        # Get return type through introspection
        print("\nReturn type:")
        return_annotation = sig.return_annotation if sig.return_annotation != inspect.Parameter.empty else Any
        print(f"  • Return annotation: {return_annotation}")
        print(f"  • Return origin: {get_origin(return_annotation)}")
        print(f"  • Return args: {get_args(return_annotation)}")
        return_type = python_type_to_ts(return_annotation, exported_types)
        print(f"  • Mapped to TS: {return_type}")

        # Get return type through introspection
        print("\nReturn type:")
        return_annotation = sig.return_annotation if sig.return_annotation != inspect.Parameter.empty else Any
        print(f"  • Return annotation: {return_annotation}")
        print(f"  • Return origin: {get_origin(return_annotation)}")
        print(f"  • Return args: {get_args(return_annotation)}")
        return_type = python_type_to_ts(return_annotation, exported_types)
        print(f"  • Mapped to TS: {return_type}")

        # Generate method signature with proper TypeScript return type
        print(f"\n{CYAN}DEBUG: Before Promise wrapping: {return_type}{RESET}")
        ts_return_type = convert_brackets_to_angles(str(return_type))
        print(f"{CYAN}DEBUG: After bracket conversion: {ts_return_type}{RESET}")
        method_signature = f'  {ts_method_name}({", ".join(params)}): Promise<{ts_return_type}>;'
        print(f"\nGenerated signature:\n{method_signature}")
        output.append(method_signature)

    # Add any additional streaming methods not already covered
    output.append('}')
    output.append('')
    return output

generate_operation_mapping

generate_operation_mapping(registry: TypeRegistry) -> List[str]

Generate the operation to method name mapping.

Source code in azad/introspect_environment.py
def generate_operation_mapping(registry: TypeRegistry) -> List[str]:
    """Generate the operation to method name mapping."""
    output = []
    output.append('/**')
    output.append(' * Maps operation strings to method names on the Environment interface')
    output.append(' */')
    output.append('export const operationToMethod = {')

    # Process each method to build the mapping
    for method_name, sig in registry.environment_methods:
        if method_name.startswith('_'):
            continue
        method = getattr(Environment, method_name)
        source = inspect.getsource(method)
        operation = get_operation_from_source(source)
        if operation and operation.startswith('tool_env_'):
            ts_name = to_camel_case(operation.replace('tool_env_', ''))
            output.append(f'    \'{operation}\': \'{ts_name}\',')

    output.append('} as const;')
    output.append('')
    output.append('type OperationType = keyof typeof operationToMethod;')
    output.append('')
    output.append('/**')
    output.append(' * Type guard to check if an operation exists in the mapping')
    output.append(' */')
    output.append('export function isKnownOperation(operation: string): operation is OperationType {')
    output.append('    return operation in operationToMethod;')
    output.append('}')
    output.append('')
    return output

generate_method_caller

generate_method_caller(registry: TypeRegistry, exported_types: Optional[Set[Type]] = None) -> List[str]

Generate the callEnvironmentMethod helper.

Source code in azad/introspect_environment.py
def generate_method_caller(registry: TypeRegistry, exported_types: Optional[Set[Type]] = None) -> List[str]:
    """Generate the callEnvironmentMethod helper."""
    output = []
    output.append('/**')
    output.append(' * Type-safe method caller for environment operations')
    output.append(' */')
    output.append('export async function callEnvironmentMethod(')
    output.append('    environment: Environment,')
    output.append('    operation: OperationType,')
    output.append('    data: Record<string, unknown>')
    output.append(') {')
    output.append('    switch (operation) {')

    # Generate case statements for each operation
    for method_name, sig in registry.environment_methods:
        if method_name.startswith('_'):
            continue
        method = getattr(Environment, method_name)
        source = inspect.getsource(method)
        operation = get_operation_from_source(source)
        if operation and operation.startswith('tool_env_'):
            ts_name = to_camel_case(operation.replace('tool_env_', ''))

            # Get parameters for this method
            params = []
            method_hints = typing.get_type_hints(method)
            for param_name, param in sig.parameters.items():
                if param_name == 'self':
                    continue
                # Skip parameters with default values
                if param.default != inspect.Parameter.empty:
                    continue
                param_type = method_hints.get(param_name, Any)
                params.append((param_name, param_type))

            output.append(f'        case \'{operation}\':')
            # Special case for getAllToolMetadata
            if method_name == 'get_all_tool_metadata':
                output.append('            return environment.getAllToolMetadata();')
                continue

            # Create parameter list based on number of parameters
            param_list = []
            for param_name, param_type in params:
                snake_param = param_name  # Original snake_case from data
                if param_type == str:
                    param_list.append(f'data.{snake_param} as string')
                elif param_type == Dict[str, Any]:
                    param_list.append(f'data.{snake_param} as Record<string, unknown>')
                elif hasattr(param_type, '__name__') and param_type.__name__ == 'ToolResult':
                    param_list.append(f'data.{snake_param} as ToolResult')
                elif hasattr(param_type, '__origin__') and param_type.__origin__ in (AsyncIterator, AsyncIteratorABC):
                    param_list.append(f'data.{snake_param} as AsyncGenerator<AINetworkEvent>')
                else:
                    param_list.append(f'data.{snake_param} as any')

            if param_list:
                output.append(f'            return environment.{ts_name}({", ".join(param_list)});')
            else:
                output.append(f'            return environment.{ts_name}();')

    output.append('    }')
    output.append('}')
    output.append('')
    return output

generate_typescript_interfaces

generate_typescript_interfaces(registry: TypeRegistry, exported_types: Optional[Set[Type]] = None, template_path: Optional[str] = None) -> str
Source code in azad/introspect_environment.py
def generate_typescript_interfaces(registry: TypeRegistry, exported_types: Optional[Set[Type]] = None, template_path: Optional[str] = None) -> str:
    template_content = ""
    if template_path:
        with open(template_path, 'r') as f:
            template_content = f.read()

    marker = "// AUTO-GENERATED BELOW HERE"
    before_content = template_content[:template_content.find(marker) + len(marker)] + "\n\n"

    output = []
    output.extend(generate_operation_mapping(registry))
    output.extend(generate_method_caller(registry, exported_types))

    print("\nDEBUG: Exported types:")
    if exported_types:
        for t in exported_types:
            print(f"  • {t.__name__} ({t.__module__})")
    else:
        print("  No exported types provided")

    imported_types = set()
    if exported_types:
        for typ in registry.get_all_types():
            actual_type = next((t for t in get_all_subclasses(BaseModel) 
                              if t.__name__ == typ.name), None)
            if actual_type and actual_type in exported_types:
                imported_types.add(actual_type.__name__)
                print(f"\nDEBUG: Will import {actual_type.__name__} from network_api_types")
            # else:
                print(f"\nDEBUG: Will generate {typ.name} locally")

        if imported_types:
            output.append('\n// Import types from network_api_types')
            output.append('import {')
            output.append('  ' + ',\n  '.join(sorted(imported_types)))
            output.append('} from "./network_api_types";\n')

    if 'EnvironmentResponse' not in imported_types:
        output.append("export interface EnvironmentResponse<T> {")
        output.append("  success: boolean;")
        output.append("  message: string | null;")
        output.append("  data: T | null;")
        output.append("}\n")

    # Generate interfaces for all BaseModel types except AINetworkEvent itself
    base_types = [t for t in registry.get_all_types() 
                  if not (t.name.startswith('EnvironmentResponse[') or t.name == 'EnvironmentResponse' or t.name == 'AINetworkEvent')]
    sorted_types = sorted(base_types, key=lambda t: (len(t.parent_types), t.name))

    for typ in sorted_types:
        actual_type = next((t for t in get_all_subclasses(BaseModel) 
                          if t.__name__ == typ.name), None)
        if not actual_type:
            continue

        if exported_types and actual_type in exported_types:
            continue

        output.append(f"export interface {actual_type.__name__} {{")
        for name, field in actual_type.model_fields.items():
            ts_type = python_type_to_ts(field.annotation, exported_types)
            # Start by checking if the field is not required
            is_optional = not field.is_required()
            # If there's a default_factory, only force required if the type is not Optional
            if field.default_factory is not None:
                union_args = get_args(field.annotation)
                if not (len(union_args) == 2 and type(None) in union_args):
                    is_optional = False
            # Special case: discriminator fields (e.g., 'type__', 'event') should not be optional
            if name in ("type__", "event"):
                is_optional = False
            optional_marker = "?" if is_optional else ""
            output.append(f"  {name}{optional_marker}: {ts_type};")

        output.append("}\n")

    # Generate type aliases for discovered Enums
    enum_types = [dt for dt in registry.get_all_types() if dt.is_enum]
    if enum_types:
        output.append("// Enums")
        import importlib
        for dt in sorted(enum_types, key=lambda x: x.name):
            try:
                mod = importlib.import_module(dt.module) if dt.module else None
                actual_enum = getattr(mod, dt.name) if mod else None
            except Exception:
                actual_enum = None
            if actual_enum and isinstance(actual_enum, type) and issubclass(actual_enum, enum.Enum):
                ts_enum = python_type_to_ts(actual_enum, exported_types)
            else:
                ts_enum = "any"
            output.append(f"export type {dt.name} = {ts_enum};\n")

    # Generate AINetworkEventUnion as a union type
    event_types = [t.__name__ for t in get_all_union_types(AINetworkEventUnion)]
    output.append("// Union type for all AINetworkEvent subclasses")
    output.append(f"export type AINetworkEvent = {' | '.join(event_types)};")
    output.append("")

    specialized_types = [t for t in registry.get_all_types() 
                        if t.name.startswith('EnvironmentResponse[')]
    for typ in specialized_types:
        inner_type = typ.name[len('EnvironmentResponse['):-1]
        type_alias = f"EnvironmentResponse{inner_type}"
        output.append(f"export type {type_alias} = EnvironmentResponse<{inner_type}>;\n")

    output.extend(generate_environment_interface(registry))

    return before_content + "\n".join(output) if template_path else "\n".join(output)

main

main(verbose: bool = True, output_dir: str = 'generated_client', template: Optional[str] = None, exported_types: Optional[Set[Type]] = None) -> None

Main entry point for environment introspection.

Source code in azad/introspect_environment.py
def main(verbose: bool = True, output_dir: str = "generated_client", 
         template: Optional[str] = None, exported_types: Optional[Set[Type]] = None) -> None:
    """Main entry point for environment introspection."""
    config = Config(verbose=verbose)

    print(f"{GREEN}{'─' * 60}")
    print(f"│ {CYAN}Azad Environment Interface Analysis{RESET}")
    print(f"├{'─' * 60}{RESET}")

    # Analyze methods and collect types
    registry = analyze_environment_methods(config)

    # Print discovered types
    print_discovered_types(registry)

    # Use template directory to find network_environment_types.ts
    template_path = os.path.join(template, "package", "src", "network_environment_types.ts") # type: ignore

    # Generate TypeScript interfaces
    ts_content = generate_typescript_interfaces(registry, exported_types, template_path)

    # Write to file
    output_file = os.path.join(output_dir, "package/src/network_environment_types.ts")
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    with open(output_file, "w") as f:
        f.write(ts_content)

    print(f"\nGenerated TypeScript interfaces in {output_file}")
    print(f"{GREEN}{'─' * 60}{RESET}")