Skip to content

azad.slipstream.server Module

azad.slipstream.server

WebSocket server implementation.

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

WebSocketServer

WebSocketServer(handler_class: Callable[[Any], ProtocolHandler], host: str = 'localhost', port: int = 8765)

Initialize the WebSocket server.

Parameters:

  • handler_class (Callable[[Any], ProtocolHandler]) –

    Factory function that takes callbacks and returns a ProtocolHandler

  • host (str, default: 'localhost' ) –

    The host to bind to

  • port (int, default: 8765 ) –

    The port to listen on

Source code in azad/slipstream/server.py
def __init__(self, handler_class: Callable[[Any], ProtocolHandler], host: str = "localhost", port: int = 8765):
    """Initialize the WebSocket server.

    Args:
        handler_class: Factory function that takes callbacks and returns a ProtocolHandler
        host: The host to bind to
        port: The port to listen on
    """
    self.handler_class = handler_class
    self.host = host
    self.port = port
    self._server: Optional[WSServer] = None
Attributes
handler_class instance-attribute
handler_class = handler_class
Functions
handle_connection async
handle_connection(websocket)

Handle an incoming WebSocket connection.

Parameters:

  • websocket

    The WebSocket connection to handle

Source code in azad/slipstream/server.py
async def handle_connection(self, websocket):
    """Handle an incoming WebSocket connection.

    Args:
        websocket: The WebSocket connection to handle
    """
    logger.debug("New connection established", extra={
        "websocket_state": websocket.state
    })

    # Create handler with None as initial callbacks
    handler = self.handler_class(None)

    # Create protocol and link them together
    protocol = Protocol(websocket)
    protocol.set_handler(handler)

    try:
        logger.debug("Starting protocol listener")
        await protocol.listen()
    except (ConnectionClosedError, ConnectionLostError) as e:
        # These are already logged appropriately
        pass
    except Exception as e:
        logger.error("Unexpected connection error", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "websocket_state": websocket.state
        }, exc_info=True)
    finally:
        await websocket.close()
start async
start()

Start the WebSocket server.

Source code in azad/slipstream/server.py
async def start(self):
    """Start the WebSocket server."""
    self._server = await serve(
        ws_handler=self.handle_connection,
        host=self.host,
        port=self.port,
        ping_interval=20,
        ping_timeout=20,
        compression=None,
        # 200 MB max message size
        max_size=200 * 2**20,
        read_limit=5 * 2**16,
        write_limit=5 * 2**16,
        create_protocol=WebSocketServerProtocol
    )
    self._server = cast(WSServer, self._server)
    logger.debug("Server started", extra={
        "host": self.host,
        "port": self.port,
        "uri": f"ws://{self.host}:{self.port}"
    })
stop async
stop()

Stop the WebSocket server.

Source code in azad/slipstream/server.py
async def stop(self):
    """Stop the WebSocket server."""
    if self._server:
        logger.debug("Stopping server")
        try:
            self._server.close()
            await self._server.wait_closed()
            logger.debug("Server stopped gracefully")
        except Exception as e:
            logger.warning("Error during server shutdown", extra={
                "error": str(e),
                "error_type": type(e).__name__
            }, exc_info=True)
        finally:
            self._server = None
create async classmethod
create(handler_class: Callable[[Any], ProtocolHandler], host: str = 'localhost', port: int = 8765)

Create and start a new WebSocket server.

Parameters:

  • handler_class (Callable[[Any], ProtocolHandler]) –

    The ProtocolHandler class to use for new connections

  • host (str, default: 'localhost' ) –

    The host to bind to

  • port (int, default: 8765 ) –

    The port to listen on

Returns:

  • A running WebSocketServer instance

Source code in azad/slipstream/server.py
@classmethod
async def create(cls, handler_class: Callable[[Any], ProtocolHandler], host: str = "localhost", port: int = 8765):
    """Create and start a new WebSocket server.

    Args:
        handler_class: The ProtocolHandler class to use for new connections
        host: The host to bind to
        port: The port to listen on

    Returns:
        A running WebSocketServer instance
    """
    server = cls(handler_class, host, port)
    await server.start()
    return server

Functions

run_server async

run_server(handler_class: Callable[[Any], ProtocolHandler], host: str = 'localhost', port: int = 8765)

Run a WebSocket server until interrupted.

Parameters:

  • handler_class (Callable[[Any], ProtocolHandler]) –

    The ProtocolHandler class to use for new connections

  • host (str, default: 'localhost' ) –

    The host to bind to

  • port (int, default: 8765 ) –

    The port to listen on

Source code in azad/slipstream/server.py
async def run_server(handler_class: Callable[[Any], ProtocolHandler], host: str = "localhost", port: int = 8765):
    """Run a WebSocket server until interrupted.

    Args:
        handler_class: The ProtocolHandler class to use for new connections
        host: The host to bind to
        port: The port to listen on
    """
    server = await WebSocketServer.create(handler_class, host, port)

    try:
        # Keep the server running until interrupted
        await asyncio.Future()  # run forever
    finally:
        await server.stop()