Skip to content

azad.predefined_tools.browser_use_tool Module

azad.predefined_tools.browser_use_tool

Attributes

logger module-attribute

logger = getLogger(__name__)

available_providers module-attribute

available_providers = ['gemini', 'openai', 'anthropic', 'openrouter', 'litellm_proxy']

Classes

BrowserUseTool

BrowserUseTool(config: Dict[str, Any], protocol: Protocol)

Bases: ServerToolBase

Server-side tool wrapper around browser-use library. Initiated by the client, runs on the server, and emits step updates.

Initialize the BrowserUseTool.

Parameters:

  • config (Dict[str, Any]) –

    Configuration dictionary containing optional settings such as: - headless: Whether to run browser in headless mode (optional, defaults to True) - screenshot_dir: Directory to save screenshots (optional, defaults to "screenshots") - timeout: Timeout for LLM requests in seconds (optional, defaults to 60) - cdp_endpoint: CDP endpoint for browser connection (optional)

  • protocol (Protocol) –

    The slipstream Protocol instance for communication.

Source code in azad/predefined_tools/browser_use_tool.py
def __init__(self, config: Dict[str, Any], protocol: Protocol):
    """
    Initialize the BrowserUseTool.

    Args:
        config: Configuration dictionary containing optional settings such as:
            - headless: Whether to run browser in headless mode (optional, defaults to True)
            - screenshot_dir: Directory to save screenshots (optional, defaults to "screenshots")
            - timeout: Timeout for LLM requests in seconds (optional, defaults to 60)
            - cdp_endpoint: CDP endpoint for browser connection (optional)
        protocol: The slipstream Protocol instance for communication.
    """
    super().__init__(config, protocol)

    # Extract general config with defaults
    headless = self.config.get("headless", True)
    screenshot_dir = self.config.get("screenshot_dir", "screenshots")
    self.timeout = self.config.get("timeout", 60)
    self.cdp_endpoint = self.config.get("cdp_endpoint", None)

    # Set up directory for screenshots
    self.screenshot_dir = Path(screenshot_dir)
    self.screenshot_dir.mkdir(exist_ok=True)

    # Configure browser
    browser_config = BrowserConfig(
        headless=headless,
        disable_security=True,  # Consider security implications
        cdp_url=self.cdp_endpoint,
    )

    self.browser_config = browser_config

    # Track steps within a run
    self.current_step = 0
    self.screenshot_paths = []
Attributes
timeout instance-attribute
timeout = get('timeout', 60)
cdp_endpoint instance-attribute
cdp_endpoint = get('cdp_endpoint', None)
screenshot_dir instance-attribute
screenshot_dir = Path(screenshot_dir)
browser_config instance-attribute
browser_config = browser_config
protocol instance-attribute
protocol = protocol
Functions
run async
run(args: Dict[str, Any], step_callback: Callable[[Dict[str, Any]], Awaitable[None]], server_tool_run_id: str) -> Dict[str, Any]

Run the browser agent with the given task.

Parameters:

  • args (Dict[str, Any]) –

    Dictionary containing arguments, expecting: - 'task' (str): The task description. - Optional parameters like 'user_api_key', 'model_name', 'max_steps'

  • step_callback (Callable[[Dict[str, Any]], Awaitable[None]]) –

    Async function provided by AzadAgent to emit ServerToolUpdateEvent.

  • server_tool_run_id (str) –

    Unique ID for this tool run.

Returns:

  • Dict[str, Any]

    Dictionary containing the final result.

Source code in azad/predefined_tools/browser_use_tool.py
async def run(
    self,
    args: Dict[str, Any],
    step_callback: Callable[[Dict[str, Any]], Awaitable[None]],
    server_tool_run_id: str
) -> Dict[str, Any]:
    """
    Run the browser agent with the given task.

    Args:
        args: Dictionary containing arguments, expecting:
              - 'task' (str): The task description.
              - Optional parameters like 'user_api_key', 'model_name', 'max_steps'
        step_callback: Async function provided by AzadAgent to emit ServerToolUpdateEvent.
        server_tool_run_id: Unique ID for this tool run.

    Returns:
        Dictionary containing the final result.
    """
    task_description = args.get("task")
    if not task_description:
        error_msg = "Missing required argument: 'task'"
        await step_callback({
            "server_tool_run_id": server_tool_run_id,
            "tool_name": self.tool_name,
            "status": "error",
            "data": {"error": error_msg}
        })
        return {"success": False, "error": error_msg}

    max_steps = args.get("max_steps", 25)
    # Get required parameters
    user_api_key = args.get("user_api_key")
    provider_id = args.get("provider_id")
    model_id = args.get("model_id")
    api_base = args.get("api_base")
    if not model_id:
        error_msg = "Missing required argument: 'model_id'"
        await step_callback({
            "server_tool_run_id": server_tool_run_id,
            "tool_name": self.tool_name,
            "status": "error",
            "data": {"error": error_msg}
        })
        return {"success": False, "error": error_msg}
    if not user_api_key:
        error_msg = "Missing required argument: 'user_api_key'"
        await step_callback({
            "server_tool_run_id": server_tool_run_id,
            "tool_name": self.tool_name,
            "status": "error",
            "data": {"error": error_msg}
        })
        return {"success": False, "error": error_msg}

    browser = None

    if provider_id not in available_providers:
        error_msg = f"Unsupported model_id: {provider_id}. Supported models are: {available_providers}"
        await step_callback({
            "server_tool_run_id": server_tool_run_id,
            "tool_name": self.tool_name,
            "status": "error",
            "data": {"error": error_msg}
        })
        return {"success": False, "error": error_msg}

    if "openrouter" == provider_id:
        llm = ChatOpenAI(
            model=model_id,
            api_key=utils.convert_to_secret_str(user_api_key),
            base_url="https://openrouter.ai/api/v1",
        )
        model_name = model_id

    elif "anthropic" == provider_id:
        llm = ChatAnthropic(
            model_name=model_id,
            api_key=utils.convert_to_secret_str(user_api_key),
            stop=None,
            timeout=self.timeout
        )
        model_name = model_id
    elif "gemini" == provider_id:
        llm = ChatGoogleGenerativeAI(
            model=model_id,
            api_key=utils.convert_to_secret_str(user_api_key),
        )
        model_name = model_id

    elif "openai" == provider_id:
        llm = ChatOpenAI(
            model=model_id,
            api_key=utils.convert_to_secret_str(user_api_key),
        )
        model_name = model_id
    elif "litellm_proxy" == provider_id:
        llm = ChatOpenAI(
            model=model_id,
            api_key=utils.convert_to_secret_str(user_api_key),
            base_url=api_base,
        )
        model_name = model_id
    else:
        error_msg = f"Unsupported model_id: {model_id}"
        await step_callback({
            "server_tool_run_id": server_tool_run_id,
            "tool_name": self.tool_name,
            "status": "error",
            "data": {"error": error_msg}
        })
        return {"success": False, "error": error_msg}

    try:
        # Emit starting event
        await step_callback({
            "server_tool_run_id": server_tool_run_id,
            "tool_name": self.tool_name,
            "status": "starting",
            "data": {"task": task_description, "max_steps": max_steps, "model_name": model_name}
        })

        # Reset step counter and screenshot paths for this run
        self.current_step = 0
        self.screenshot_paths = []

        # Create browser instance for this run
        browser = Browser(config=self.browser_config)

        # Define our step handler wrapper that integrates with the agent
        # This is the key function that links browser-use steps to our event system
        async def browser_step_handler(state: BrowserState, output: AgentOutput, step_number: int):
            self.current_step = step_number


            # Prepare the step data
            status = 'progress'
            step_info = {}

            if state:
                step_info = {
                    "url": state.url,
                    "title": state.title,
                    "tabs": state.tabs,
                }

                # Handle screenshot if available
                if state.screenshot:
                    try:
                        step_info["screenshot_base64"] = state.screenshot
                    except Exception as e:
                        logger.error(f"Step {step_number}: Failed to save screenshot: {e}")
                        step_info["screenshot_error"] = str(e)

            if output:
                step_info["next_goal"] = output.current_state.next_goal if output.current_state else ""
                step_info["actions"] = [action.model_dump() for action in output.action] if output.action else []

            # Call the step_callback to emit the event
            await step_callback({
                "server_tool_run_id": server_tool_run_id,
                "tool_name": self.tool_name,
                "step_number": step_number,
                "status": status,
                "data": step_info
            })

        # Create and run the agent
        agent = Agent(
            task=task_description,
            llm=llm,
            browser=browser,
            register_new_step_callback=browser_step_handler
        )

        # Run the agent
        result = await agent.run(max_steps=max_steps)

        # Prepare final result
        final_result_data = {
            "success": result.is_successful(),
            "done": result.is_done(),
            "steps_taken": self.current_step,
            "errors": result.errors(),
            "screenshot_paths": self.screenshot_paths,
            "final_output": result.final_result()
        }

        # Emit final completion event
        await step_callback({
            "server_tool_run_id": server_tool_run_id,
            "tool_name": self.tool_name,
            "status": "completed",
            "data": final_result_data
        })

        return final_result_data

    except Exception as e:
        logger.error(f"Error during browser automation run {server_tool_run_id}: {str(e)}", exc_info=True)
        error_data = {
            "success": False,
            "error": str(e),
            "steps_taken": self.current_step,
            "screenshot_paths": self.screenshot_paths
        }

        # Emit final error event
        await step_callback({
            "server_tool_run_id": server_tool_run_id,
            "tool_name": self.tool_name,
            "status": "error",
            "data": error_data
        })

        return error_data

    finally:
        # Ensure browser is closed
        if browser:
            await browser.close()
            logger.info(f"Browser closed for run {server_tool_run_id}")