Skip to content

azad.slipstream.websocket Module

azad.slipstream.websocket

WebSocket-based network protocol implementation.

Attributes

logger module-attribute

logger = getLogger(__name__)

websockets_logger module-attribute

websockets_logger = getLogger('websockets')

Classes

ProtocolCallbacks

Bases: Protocol

Protocol interface defining required callback methods.

Functions
handle_request async
handle_request(request_id: str, data: dict) -> dict

Handle an incoming request.

Source code in azad/slipstream/websocket.py
async def handle_request(self, request_id: str, data: dict) -> dict:
    """Handle an incoming request."""
    ...
handle_stream_request async
handle_stream_request(request_id: str, data: dict) -> None

Handle an incoming stream request.

Source code in azad/slipstream/websocket.py
async def handle_stream_request(self, request_id: str, data: dict) -> None:
    """Handle an incoming stream request."""
    ...
handle_fire_and_forget async
handle_fire_and_forget(data: dict) -> None

Handle a fire-and-forget message.

Source code in azad/slipstream/websocket.py
async def handle_fire_and_forget(self, data: dict) -> None:
    """Handle a fire-and-forget message."""
    ...

NetworkProtocol

Bases: Protocol

Network protocol interface.

Functions
start_server async
start_server(port: int, callbacks: ProtocolCallbacks) -> None

Start the server.

Source code in azad/slipstream/websocket.py
async def start_server(self, port: int, callbacks: ProtocolCallbacks) -> None:
    """Start the server."""
    ...
stop_server async
stop_server(grace_period: float = 5.0) -> None

Stop the server.

Source code in azad/slipstream/websocket.py
async def stop_server(self, grace_period: float = 5.0) -> None:
    """Stop the server."""
    ...

WebSocketNetworkProtocol

WebSocketNetworkProtocol(handler_class: Optional[Callable[[Any], ProtocolHandler]] = None)

Bases: NetworkProtocol

WebSocket-based implementation of the NetworkProtocol interface.

Initialize the protocol.

Parameters:

  • handler_class (Optional[Callable[[Any], ProtocolHandler]], default: None ) –

    Optional factory function to create protocol handlers. If not provided, a default handler will be used.

Source code in azad/slipstream/websocket.py
def __init__(self, handler_class: Optional[Callable[[Any], ProtocolHandler]] = None):
    """Initialize the protocol.

    Args:
        handler_class: Optional factory function to create protocol handlers.
                     If not provided, a default handler will be used.
    """
    self._server: Optional[websockets.WebSocketServer] = None
    self._handler: Optional[ProtocolHandler] = None
    self._handler_class = handler_class
    self._state = ConnectionState.DISCONNECTED
Functions
start_server async
start_server(port: int, host: Optional[str]) -> None

Start the WebSocket server.

Parameters:

  • port (int) –

    Port to listen on

  • host (Optional[str]) –

    Host to bind to

Source code in azad/slipstream/websocket.py
async def start_server(self, port: int, host: Optional[str]) -> None:
    """Start the WebSocket server.

    Args:
        port: Port to listen on
        host: Host to bind to
    """
    if self._server:
        raise RuntimeError("Server already running")

    self._state = ConnectionState.CONNECTING

    # Maximum number of port attempts
    max_attempts = 10
    current_port = port

    for attempt in range(max_attempts):
        try:
            self._server = await server.WebSocketServer.create(
                handler_class=self._handler_class,
                port=current_port,
                host=host if host else "localhost"
            )
            self._state = ConnectionState.CONNECTED

            # If we're using a different port than the one requested, log it prominently
            if current_port != port:
                logger.warning(f"Original port {port} was in use, server started on port {current_port} instead", extra={
                    "original_port": port,
                    "actual_port": current_port,
                    "state": self._state.value
                })
            else:
                logger.info("WebSocket server started successfully", extra={
                    "port": current_port,
                    "state": self._state.value
                })
            break  # Successfully started the server, exit the loop
        except OSError as e:
            if e.errno == 48 and "address already in use" in str(e).lower():
                # Port is in use, try the next port
                current_port += 1
                logger.info(f"Port {current_port-1} already in use, trying port {current_port}", extra={
                    "original_port": port,
                    "attempt": attempt + 1,
                    "max_attempts": max_attempts
                })

                # If this was our last attempt, raise a more helpful error
                if attempt == max_attempts - 1:
                    self._state = ConnectionState.DISCONNECTED
                    msg = f"All ports from {port} to {current_port-1} are in use. Please specify a different port."
                    logger.error(msg, extra={
                        "error": str(e),
                        "original_port": port,
                        "state": self._state.value
                    })
                    raise ConnectionLostError(reason=msg)
            else:
                # A different error occurred, report it
                self._state = ConnectionState.DISCONNECTED
                logger.error("Failed to start WebSocket server", extra={
                    "error": str(e),
                    "port": current_port,
                    "state": self._state.value
                }, exc_info=True)
                raise ConnectionLostError(reason=f"Server startup failed: {str(e)}")
        except Exception as e:
            self._state = ConnectionState.DISCONNECTED
            logger.error("Failed to start WebSocket server", extra={
                "error": str(e),
                "port": current_port,
                "state": self._state.value
            }, exc_info=True)
            raise ConnectionLostError(reason=f"Server startup failed: {str(e)}")

    # Keep server running until explicitly stopped
    self._running = asyncio.Event()

    # Add signal handlers for graceful shutdown
    loop = asyncio.get_running_loop()
    try:
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, lambda: asyncio.create_task(self.stop_server()))
    except NotImplementedError:
        # Windows compatibility fallback (signal handlers not supported)
        logger.warning("Signal handlers not supported on this platform")

    # Wait until stopped
    try:
        await self._running.wait()
    except asyncio.CancelledError:
        logger.info("Server interrupted, shutting down")
        await self.stop_server()
stop_server async
stop_server(grace_period: float = 5.0) -> None

Stop the WebSocket server.

Parameters:

  • grace_period (float, default: 5.0 ) –

    Shutdown grace period in seconds

Source code in azad/slipstream/websocket.py
async def stop_server(self, grace_period: float = 5.0) -> None:
    """Stop the WebSocket server.

    Args:
        grace_period: Shutdown grace period in seconds
    """
    if not self._server:
        return

    logger.info("Initiating server shutdown", extra={
        "grace_period": grace_period,
        "state": self._state.value
    })

    self._state = ConnectionState.DISCONNECTING

    try:
        # Signal server to stop
        if hasattr(self, '_running'):
            self._running.set()

        # Set shutdown flag on handler
        if self._handler:
            self._handler._shutdown = True

        # Give ongoing operations time to complete
        if grace_period > 0:
            await asyncio.sleep(grace_period)

        # Stop server
        try:
            await self._server.stop()
            logger.info("Server stopped gracefully")
        except Exception as e:
            logger.warning("Error during server stop", extra={
                "error": str(e)
            }, exc_info=True)

    except Exception as e:
        logger.error("Error during server shutdown", extra={
            "error": str(e),
            "state": self._state.value
        }, exc_info=True)
        raise
    finally:
        self._server = None
        self._handler = None
        self._state = ConnectionState.DISCONNECTED
        logger.info("Server shutdown complete", extra={
            "state": self._state.value
        })

Modules