Skip to content

azad.slipstream.client Module

azad.slipstream.client

WebSocket client implementation.

Attributes

Classes

WebSocketClient

WebSocketClient(handler_factory: Callable[[Any], ProtocolHandler])

Initialize the WebSocket client.

Parameters:

  • handler_factory (Callable[[Any], ProtocolHandler]) –

    Function that creates a new ProtocolHandler instance

Source code in azad/slipstream/client.py
def __init__(self, handler_factory: Callable[[Any], ProtocolHandler]):
    """Initialize the WebSocket client.

    Args:
        handler_factory: Function that creates a new ProtocolHandler instance
    """
    self.handler_factory = handler_factory
    self.websocket: Optional[websockets.WebSocketClientProtocol] = None
    self.protocol: Optional[Protocol] = None
    self._listen_task: Optional[asyncio.Task] = None
Attributes
handler_factory instance-attribute
handler_factory = handler_factory
websocket instance-attribute
websocket: Optional[WebSocketClientProtocol] = None
protocol instance-attribute
protocol: Optional[Protocol] = None
Functions
connect async
connect(uri: str = 'ws://localhost:8765')

Connect to a WebSocket server.

Parameters:

  • uri (str, default: 'ws://localhost:8765' ) –

    The WebSocket URI to connect to

Source code in azad/slipstream/client.py
async def connect(self, uri: str = "ws://localhost:8765"):
    """Connect to a WebSocket server.

    Args:
        uri: The WebSocket URI to connect to
    """
    if self.websocket:
        raise RuntimeError("Already connected")

    try:
        # Connect with explicit close timeout
        self.websocket = await websockets.connect(uri, close_timeout=5.0)

        # Create protocol with websocket
        self.protocol = Protocol(self.websocket)

        # Create handler and link them together
        handler = self.handler_factory(None)
        self.protocol.set_handler(handler)

        # Start listening for messages
        self._listen_task = asyncio.create_task(self._listen())

        # Wait for connection to stabilize and listener to be ready
        await asyncio.sleep(0.2)

        # Verify connection is ready
        if not self.websocket or self.websocket.state.name == "CLOSED":
            raise ConnectionError("Connection failed to stabilize")

        if not self.protocol:
            raise RuntimeError("Protocol initialization failed")

        logger.debug("Client connected successfully", extra={
            "uri": uri,
            "websocket_state": self.websocket.state if self.websocket else None,
            "has_protocol": self.protocol is not None,
            "listen_task_state": self._listen_task.done() if self._listen_task else None
        })

    except (websockets.exceptions.WebSocketException, ConnectionError) as e:
        logger.error("Failed to connect", extra={
            "uri": uri,
            "error": str(e)
        }, exc_info=True)
        # Clean up any partial connection
        await self._cleanup_connection()
        raise ConnectionLostError(reason=f"Connection failed: {str(e)}")
    except Exception as e:
        logger.error("Unexpected error during connect", extra={
            "uri": uri,
            "error": str(e),
            "error_type": type(e).__name__
        }, exc_info=True)
        # Clean up any partial connection
        await self._cleanup_connection()
        raise
disconnect async
disconnect()

Disconnect from the WebSocket server.

Source code in azad/slipstream/client.py
async def disconnect(self):
    """Disconnect from the WebSocket server."""
    if not self.websocket and not self.protocol and not self._listen_task:
        return  # Already disconnected

    logger.debug("Starting client disconnect", extra={
        "websocket_state": self.websocket.state if self.websocket else None,
        "has_protocol": self.protocol is not None,
        "listen_task_state": self._listen_task.done() if self._listen_task else None
    })

    await self._cleanup_connection()
request_response async
request_response(data)

Send a request and wait for a response.

Parameters:

  • data

    The request data to send

Returns:

  • The response data

Raises:

Source code in azad/slipstream/client.py
async def request_response(self, data):
    """
    Send a request and wait for a response.

    Args:
        data: The request data to send

    Returns:
        The response data

    Raises:
        RuntimeError: If not connected
        ConnectionClosedError: If connection closed normally
        ConnectionLostError: If connection lost unexpectedly
    """
    self._check_connection()

    try:
        return await self.protocol.request_response(data)
    except (ConnectionClosedError, ConnectionLostError, RuntimeError) as e:
        # These are already logged appropriately
        raise
    except websockets.exceptions.ConnectionClosed as e:
        logger.debug("Request ended: Connection closed", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "websocket_state": self.websocket.state if self.websocket else None,
            "code": e.code
        })
        raise ConnectionClosedError(e.code, str(e))
    except Exception as e:
        logger.error("Error in request_response", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "websocket_state": self.websocket.state if self.websocket else None
        }, exc_info=True)
        raise ConnectionLostError(reason=f"Request failed: {str(e)}")
request_stream async
request_stream(data)

Send a request and receive a stream of responses.

Parameters:

  • data

    The request data to send

Returns:

  • An async iterator of response items

Raises:

Source code in azad/slipstream/client.py
async def request_stream(self, data):
    """
    Send a request and receive a stream of responses.

    Args:
        data: The request data to send

    Returns:
        An async iterator of response items

    Raises:
        RuntimeError: If not connected
        ConnectionClosedError: If connection closed normally
        ConnectionLostError: If connection lost unexpectedly
    """
    if not self.protocol:
        raise RuntimeError("Not connected")

    try:
        stream = await self.protocol.request_stream(data)
        async for item in stream:
            yield item
    except (ConnectionClosedError, ConnectionLostError) as e:
        # These are already logged appropriately
        raise
    except Exception as e:
        # For connection errors we handle, use debug level
        if isinstance(e, (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK)):
            logger.debug("Stream ended: Connection closed", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            })
        else:
            logger.error("Error in request_stream", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            }, exc_info=True)
        raise ConnectionLostError(reason=f"Stream failed: {str(e)}")
fire_and_forget async
fire_and_forget(data)

Send a one-way message with no response.

Parameters:

  • data

    The message data to send

Raises:

Source code in azad/slipstream/client.py
async def fire_and_forget(self, data):
    """
    Send a one-way message with no response.

    Args:
        data: The message data to send

    Raises:
        RuntimeError: If not connected
        ConnectionClosedError: If connection closed normally
        ConnectionLostError: If connection lost unexpectedly
    """
    if not self.protocol:
        raise RuntimeError("Not connected")

    try:
        await self.protocol.fire_and_forget(data)
    except (ConnectionClosedError, ConnectionLostError) as e:
        # These are already logged appropriately
        raise
    except Exception as e:
        # For connection errors we handle, use debug level
        if isinstance(e, (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK)):
            logger.debug("Send ended: Connection closed", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            })
        else:
            logger.error("Error in fire_and_forget", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            }, exc_info=True)
        raise ConnectionLostError(reason=f"Send failed: {str(e)}")
create async classmethod
create(handler_factory: Callable[[Any], ProtocolHandler], uri: str = 'ws://localhost:8765')

Create and connect a new WebSocket client.

Parameters:

  • handler_factory (Callable[[Any], ProtocolHandler]) –

    Function that creates a new ProtocolHandler instance

  • uri (str, default: 'ws://localhost:8765' ) –

    The WebSocket URI to connect to

Returns:

  • A connected WebSocketClient instance

Source code in azad/slipstream/client.py
@classmethod
async def create(cls, handler_factory: Callable[[Any], ProtocolHandler], uri: str = "ws://localhost:8765"):
    """
    Create and connect a new WebSocket client.

    Args:
        handler_factory: Function that creates a new ProtocolHandler instance
        uri: The WebSocket URI to connect to

    Returns:
        A connected WebSocketClient instance
    """
    client = cls(handler_factory)
    await client.connect(uri)
    return client

Functions

run_client async

run_client(handler_factory: Callable[[Any], ProtocolHandler], uri: str = 'ws://localhost:8765')

Run a WebSocket client until interrupted.

Parameters:

  • handler_factory (Callable[[Any], ProtocolHandler]) –

    Function that creates a new ProtocolHandler instance

  • uri (str, default: 'ws://localhost:8765' ) –

    The WebSocket URI to connect to

Source code in azad/slipstream/client.py
async def run_client(handler_factory: Callable[[Any], ProtocolHandler], uri: str = "ws://localhost:8765"):
    """
    Run a WebSocket client until interrupted.

    Args:
        handler_factory: Function that creates a new ProtocolHandler instance
        uri: The WebSocket URI to connect to
    """
    client = await WebSocketClient.create(handler_factory, uri)

    try:
        # Keep the client running until interrupted
        await asyncio.Future()  # run forever
    except asyncio.CancelledError:
        # Task cancellation is normal during shutdown
        logger.debug("Client task cancelled", extra={
            "uri": uri,
            "task": asyncio.current_task().get_name() if asyncio.current_task() else None
        })
        raise
    except (ConnectionClosedError, ConnectionLostError,
            websockets.exceptions.ConnectionClosedError,
            websockets.exceptions.ConnectionClosedOK) as e:
        # Connection closure is expected during shutdown
        logger.debug("Client stopped due to connection closure", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "uri": uri,
            "code": getattr(e, "code", None),
            "task": asyncio.current_task().get_name() if asyncio.current_task() else None
        })
        raise
    except Exception as e:
        # Only log unexpected errors as errors
        logger.error("Unexpected error in client run loop", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "uri": uri,
            "task": asyncio.current_task().get_name() if asyncio.current_task() else None
        }, exc_info=True)
        raise
    finally:
        try:
            await client.disconnect()
        except (ConnectionClosedError, ConnectionLostError,
                websockets.exceptions.ConnectionClosedError,
                websockets.exceptions.ConnectionClosedOK) as e:
            # Connection closure during cleanup is expected
            logger.debug("Connection closed during client shutdown", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "uri": uri,
                "code": getattr(e, "code", None),
                "task": asyncio.current_task().get_name() if asyncio.current_task() else None
            })
        except Exception as e:
            # Only log unexpected errors as warnings during cleanup
            logger.warning("Unexpected error during client shutdown", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "uri": uri,
                "task": asyncio.current_task().get_name() if asyncio.current_task() else None
            }, exc_info=True)