Skip to content

azad.slipstream Package

azad.slipstream

Slipstream - A lightweight WebSocket-based network protocol implementation.

Classes

Protocol

Protocol(websocket)

Complete, decorator-based protocol implementation.

Source code in azad/slipstream/base.py
def __init__(self, websocket):
    self._websocket = websocket
    self.handler = None  # user sets a handler that has decorated methods
    self._request_counter = 0

    self._pending_requests: Dict[str, asyncio.Future] = {}
    self._request_tasks: Dict[str, asyncio.Task] = {}
    self._stream_tasks: Dict[str, asyncio.Task] = {}
    self._bistreams: Dict[str, BiStreamHandle] = {}

    import uuid
    self._peer_id = str(uuid.uuid4())
    self._remote_peer_id = None

    self._state = ConnectionState.CONNECTING
    self._close_code: Optional[int] = None
    self._close_reason: Optional[str] = None

    self._send_lock = asyncio.Lock()
    self._request_timeout = 600.0

    logger.debug(">>> LIFECYCLE: Creating new protocol instance", extra={
        "websocket_id": id(websocket),
        "request_counter": self._request_counter,
    })
Attributes
Functions
set_handler
set_handler(handler: ProtocolHandler)

Attach a user-defined handler that has @request_response, etc. methods.

Source code in azad/slipstream/base.py
def set_handler(self, handler: 'ProtocolHandler'):
    """Attach a user-defined handler that has @request_response, etc. methods."""
    self.handler = handler
    handler.protocol = self
listen async
listen()

Reads messages from the websocket and dispatches them.

Source code in azad/slipstream/base.py
async def listen(self):
    """Reads messages from the websocket and dispatches them."""
    if not self.websocket:
        return

    cleanup_error = None
    try:
        async for message in self.websocket:
            try:
                frame = validate_incoming_frame(message)
                self._check_peer_id_collision(frame)
                await self._handle_incoming_frame(frame)
            except ProtocolError as pe:
                logger.error(f"Protocol error: {pe}",exc_info=True)
                continue
    except websockets.exceptions.ConnectionClosed as e:
        cleanup_error = e
    except asyncio.CancelledError:
        cleanup_error = asyncio.CancelledError()
    except Exception as e:
        cleanup_error = e
    finally:
        try:
            if cleanup_error:
                if isinstance(cleanup_error, websockets.exceptions.ConnectionClosed):
                    await self._cleanup_pending_requests("connection closed")
                elif isinstance(cleanup_error, asyncio.CancelledError):
                    await self._cleanup_pending_requests("listener cancelled")
                else:
                    await self._cleanup_pending_requests("listener error")
            else:
                await self._cleanup_pending_requests("listen complete")
        except Exception:
            pass

        if cleanup_error:
            if isinstance(cleanup_error, asyncio.CancelledError):
                raise cleanup_error
            elif isinstance(cleanup_error, websockets.exceptions.ConnectionClosed):
                raise ConnectionClosedError(cleanup_error.code, str(cleanup_error))
            else:
                raise cleanup_error
request_response async
request_response(data: dict) -> dict

Send a single request, wait for single response.

Source code in azad/slipstream/base.py
async def request_response(self, data: dict) -> dict:
    """Send a single request, wait for single response."""
    request_id = (self._peer_id, self._request_counter)
    self._request_counter += 1

    fut = asyncio.Future()
    fut.operation = data.get("operation")
    self._add_pending_request(request_id, fut)

    await self._send_frame(MessageType.REQUEST, data, request_id)
    try:
        return await fut
    finally:
        self._remove_pending_request(request_id)
request_stream async
request_stream(data: dict) -> AsyncIterator[Any]

Send a request, get an async iterator of incoming items.

Source code in azad/slipstream/base.py
async def request_stream(self, data: dict) -> AsyncIterator[Any]:
    """Send a request, get an async iterator of incoming items."""
    if not self.websocket or self.websocket.state.name == "CLOSED":
        raise ConnectionError("WebSocket is closed")

    request_id = (self._peer_id, self._request_counter)
    self._request_counter += 1

    fut = asyncio.Future()
    fut.stream_queue = asyncio.Queue()
    fut.operation = data.get("operation")
    self._add_pending_request(request_id, fut)

    await self._send_frame(MessageType.STREAM_REQUEST, data, request_id)

    async def gen():
        while True:
            item = await fut.stream_queue.get()
            if item is None:
                break
            yield item

    return gen()
stream_stream async
stream_stream(data: dict) -> BiStreamHandle

Initiate a bi-directional stream, returning a handle.

Source code in azad/slipstream/base.py
async def stream_stream(self, data: dict) -> BiStreamHandle:
    """Initiate a bi-directional stream, returning a handle."""
    if not self.websocket or self.websocket.state.name == "CLOSED":
        raise ConnectionError("WebSocket is closed")

    request_id = (self._peer_id, self._request_counter)
    self._request_counter += 1

    handle = BiStreamHandle(self, request_id)
    key = self._get_handler_key(request_id)
    self._bistreams[key] = handle

    await self._send_frame(MessageType.STREAM_STREAM_REQUEST, data, request_id)
    return handle
fire_and_forget async
fire_and_forget(data: dict) -> None

Send a one-way message with no response.

Source code in azad/slipstream/base.py
async def fire_and_forget(self, data: dict) -> None:
    """Send a one-way message with no response."""
    await self._send_frame(MessageType.FIRE_AND_FORGET, data)

ProtocolHandler

ProtocolHandler()

A base class if you want to store reference to protocol, or do custom logic. Typically you just define your methods with the decorators directly.

Source code in azad/slipstream/base.py
def __init__(self):
    self.protocol: Optional[Protocol] = None

MessageType

Bases: Enum

Message types supported by the protocol.

ConnectionState

Bases: Enum

Connection states for state tracking.

ConnectionClosedError

ConnectionClosedError(code: Optional[int] = None, reason: Optional[str] = None)

Bases: SlipstreamError

Raised when connection is closed normally (e.g. server shutdown).

Source code in azad/slipstream/base.py
def __init__(self, code: Optional[int] = None, reason: Optional[str] = None):
    self.code = code
    self.reason = reason
    super().__init__(f"Connection closed: {reason or 'no reason given'} (code={code})")

ConnectionLostError

ConnectionLostError(code: Optional[int] = None, reason: Optional[str] = None)

Bases: SlipstreamError

Raised when connection is lost unexpectedly.

Source code in azad/slipstream/base.py
def __init__(self, code: Optional[int] = None, reason: Optional[str] = None):
    self.code = code
    self.reason = reason
    super().__init__(f"Connection lost: {reason or 'unknown reason'} (code={code})")

BiStreamHandle

BiStreamHandle(protocol: Protocol, request_id: Tuple[str, int])

Manages a bi-directional stream with a particular request_id.

Source code in azad/slipstream/base.py
def __init__(self, protocol: 'Protocol', request_id: Tuple[str, int]):
    self._protocol = protocol
    self.request_id = request_id
    self._inbound_queue = asyncio.Queue()
    self._local_closed = False
    self._remote_closed = False
Attributes
request_id instance-attribute
request_id = request_id
Functions
send async
send(data: Any)

Send data to remote side as STREAM_DATA.

Source code in azad/slipstream/base.py
async def send(self, data: Any):
    """Send data to remote side as STREAM_DATA."""
    if self._local_closed:
        raise ConnectionError("Cannot send on a closed BiStreamHandle")
    logger.debug("BiStream sending data", extra={
        "request_id": self.request_id,
        "data": data
    })
    await self._protocol._send_frame(
        MessageType.STREAM_DATA, data, self.request_id
    )
aclose async
aclose()

Close our side of the stream (send STREAM_END).

Source code in azad/slipstream/base.py
async def aclose(self):
    """Close our side of the stream (send STREAM_END)."""
    if not self._local_closed:
        self._local_closed = True
        await self._protocol._send_frame(
            MessageType.STREAM_END, None, self.request_id
        )
is_closed
is_closed() -> bool

True if both sides have ended.

Source code in azad/slipstream/base.py
def is_closed(self) -> bool:
    """True if both sides have ended."""
    return self._local_closed and self._remote_closed

WebSocketClient

WebSocketClient(handler_factory: Callable[[Any], ProtocolHandler])

Initialize the WebSocket client.

Parameters:

  • handler_factory (Callable[[Any], ProtocolHandler]) –

    Function that creates a new ProtocolHandler instance

Source code in azad/slipstream/client.py
def __init__(self, handler_factory: Callable[[Any], ProtocolHandler]):
    """Initialize the WebSocket client.

    Args:
        handler_factory: Function that creates a new ProtocolHandler instance
    """
    self.handler_factory = handler_factory
    self.websocket: Optional[websockets.WebSocketClientProtocol] = None
    self.protocol: Optional[Protocol] = None
    self._listen_task: Optional[asyncio.Task] = None
Attributes
handler_factory instance-attribute
handler_factory = handler_factory
websocket instance-attribute
websocket: Optional[WebSocketClientProtocol] = None
protocol instance-attribute
protocol: Optional[Protocol] = None
Functions
connect async
connect(uri: str = 'ws://localhost:8765')

Connect to a WebSocket server.

Parameters:

  • uri (str, default: 'ws://localhost:8765' ) –

    The WebSocket URI to connect to

Source code in azad/slipstream/client.py
async def connect(self, uri: str = "ws://localhost:8765"):
    """Connect to a WebSocket server.

    Args:
        uri: The WebSocket URI to connect to
    """
    if self.websocket:
        raise RuntimeError("Already connected")

    try:
        # Connect with explicit close timeout
        self.websocket = await websockets.connect(uri, close_timeout=5.0)

        # Create protocol with websocket
        self.protocol = Protocol(self.websocket)

        # Create handler and link them together
        handler = self.handler_factory(None)
        self.protocol.set_handler(handler)

        # Start listening for messages
        self._listen_task = asyncio.create_task(self._listen())

        # Wait for connection to stabilize and listener to be ready
        await asyncio.sleep(0.2)

        # Verify connection is ready
        if not self.websocket or self.websocket.state.name == "CLOSED":
            raise ConnectionError("Connection failed to stabilize")

        if not self.protocol:
            raise RuntimeError("Protocol initialization failed")

        logger.debug("Client connected successfully", extra={
            "uri": uri,
            "websocket_state": self.websocket.state if self.websocket else None,
            "has_protocol": self.protocol is not None,
            "listen_task_state": self._listen_task.done() if self._listen_task else None
        })

    except (websockets.exceptions.WebSocketException, ConnectionError) as e:
        logger.error("Failed to connect", extra={
            "uri": uri,
            "error": str(e)
        }, exc_info=True)
        # Clean up any partial connection
        await self._cleanup_connection()
        raise ConnectionLostError(reason=f"Connection failed: {str(e)}")
    except Exception as e:
        logger.error("Unexpected error during connect", extra={
            "uri": uri,
            "error": str(e),
            "error_type": type(e).__name__
        }, exc_info=True)
        # Clean up any partial connection
        await self._cleanup_connection()
        raise
disconnect async
disconnect()

Disconnect from the WebSocket server.

Source code in azad/slipstream/client.py
async def disconnect(self):
    """Disconnect from the WebSocket server."""
    if not self.websocket and not self.protocol and not self._listen_task:
        return  # Already disconnected

    logger.debug("Starting client disconnect", extra={
        "websocket_state": self.websocket.state if self.websocket else None,
        "has_protocol": self.protocol is not None,
        "listen_task_state": self._listen_task.done() if self._listen_task else None
    })

    await self._cleanup_connection()
request_response async
request_response(data)

Send a request and wait for a response.

Parameters:

  • data

    The request data to send

Returns:

  • The response data

Raises:

Source code in azad/slipstream/client.py
async def request_response(self, data):
    """
    Send a request and wait for a response.

    Args:
        data: The request data to send

    Returns:
        The response data

    Raises:
        RuntimeError: If not connected
        ConnectionClosedError: If connection closed normally
        ConnectionLostError: If connection lost unexpectedly
    """
    self._check_connection()

    try:
        return await self.protocol.request_response(data)
    except (ConnectionClosedError, ConnectionLostError, RuntimeError) as e:
        # These are already logged appropriately
        raise
    except websockets.exceptions.ConnectionClosed as e:
        logger.debug("Request ended: Connection closed", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "websocket_state": self.websocket.state if self.websocket else None,
            "code": e.code
        })
        raise ConnectionClosedError(e.code, str(e))
    except Exception as e:
        logger.error("Error in request_response", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "websocket_state": self.websocket.state if self.websocket else None
        }, exc_info=True)
        raise ConnectionLostError(reason=f"Request failed: {str(e)}")
request_stream async
request_stream(data)

Send a request and receive a stream of responses.

Parameters:

  • data

    The request data to send

Returns:

  • An async iterator of response items

Raises:

Source code in azad/slipstream/client.py
async def request_stream(self, data):
    """
    Send a request and receive a stream of responses.

    Args:
        data: The request data to send

    Returns:
        An async iterator of response items

    Raises:
        RuntimeError: If not connected
        ConnectionClosedError: If connection closed normally
        ConnectionLostError: If connection lost unexpectedly
    """
    if not self.protocol:
        raise RuntimeError("Not connected")

    try:
        stream = await self.protocol.request_stream(data)
        async for item in stream:
            yield item
    except (ConnectionClosedError, ConnectionLostError) as e:
        # These are already logged appropriately
        raise
    except Exception as e:
        # For connection errors we handle, use debug level
        if isinstance(e, (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK)):
            logger.debug("Stream ended: Connection closed", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            })
        else:
            logger.error("Error in request_stream", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            }, exc_info=True)
        raise ConnectionLostError(reason=f"Stream failed: {str(e)}")
fire_and_forget async
fire_and_forget(data)

Send a one-way message with no response.

Parameters:

  • data

    The message data to send

Raises:

Source code in azad/slipstream/client.py
async def fire_and_forget(self, data):
    """
    Send a one-way message with no response.

    Args:
        data: The message data to send

    Raises:
        RuntimeError: If not connected
        ConnectionClosedError: If connection closed normally
        ConnectionLostError: If connection lost unexpectedly
    """
    if not self.protocol:
        raise RuntimeError("Not connected")

    try:
        await self.protocol.fire_and_forget(data)
    except (ConnectionClosedError, ConnectionLostError) as e:
        # These are already logged appropriately
        raise
    except Exception as e:
        # For connection errors we handle, use debug level
        if isinstance(e, (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK)):
            logger.debug("Send ended: Connection closed", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            })
        else:
            logger.error("Error in fire_and_forget", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            }, exc_info=True)
        raise ConnectionLostError(reason=f"Send failed: {str(e)}")
create async classmethod
create(handler_factory: Callable[[Any], ProtocolHandler], uri: str = 'ws://localhost:8765')

Create and connect a new WebSocket client.

Parameters:

  • handler_factory (Callable[[Any], ProtocolHandler]) –

    Function that creates a new ProtocolHandler instance

  • uri (str, default: 'ws://localhost:8765' ) –

    The WebSocket URI to connect to

Returns:

  • A connected WebSocketClient instance

Source code in azad/slipstream/client.py
@classmethod
async def create(cls, handler_factory: Callable[[Any], ProtocolHandler], uri: str = "ws://localhost:8765"):
    """
    Create and connect a new WebSocket client.

    Args:
        handler_factory: Function that creates a new ProtocolHandler instance
        uri: The WebSocket URI to connect to

    Returns:
        A connected WebSocketClient instance
    """
    client = cls(handler_factory)
    await client.connect(uri)
    return client

WebSocketServer

WebSocketServer(handler_class: Callable[[Any], ProtocolHandler], host: str = 'localhost', port: int = 8765)

Initialize the WebSocket server.

Parameters:

  • handler_class (Callable[[Any], ProtocolHandler]) –

    Factory function that takes callbacks and returns a ProtocolHandler

  • host (str, default: 'localhost' ) –

    The host to bind to

  • port (int, default: 8765 ) –

    The port to listen on

Source code in azad/slipstream/server.py
def __init__(self, handler_class: Callable[[Any], ProtocolHandler], host: str = "localhost", port: int = 8765):
    """Initialize the WebSocket server.

    Args:
        handler_class: Factory function that takes callbacks and returns a ProtocolHandler
        host: The host to bind to
        port: The port to listen on
    """
    self.handler_class = handler_class
    self.host = host
    self.port = port
    self._server: Optional[WSServer] = None
Attributes
handler_class instance-attribute
handler_class = handler_class
Functions
handle_connection async
handle_connection(websocket)

Handle an incoming WebSocket connection.

Parameters:

  • websocket

    The WebSocket connection to handle

Source code in azad/slipstream/server.py
async def handle_connection(self, websocket):
    """Handle an incoming WebSocket connection.

    Args:
        websocket: The WebSocket connection to handle
    """
    logger.debug("New connection established", extra={
        "websocket_state": websocket.state
    })

    # Create handler with None as initial callbacks
    handler = self.handler_class(None)

    # Create protocol and link them together
    protocol = Protocol(websocket)
    protocol.set_handler(handler)

    try:
        logger.debug("Starting protocol listener")
        await protocol.listen()
    except (ConnectionClosedError, ConnectionLostError) as e:
        # These are already logged appropriately
        pass
    except Exception as e:
        logger.error("Unexpected connection error", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "websocket_state": websocket.state
        }, exc_info=True)
    finally:
        await websocket.close()
start async
start()

Start the WebSocket server.

Source code in azad/slipstream/server.py
async def start(self):
    """Start the WebSocket server."""
    self._server = await serve(
        ws_handler=self.handle_connection,
        host=self.host,
        port=self.port,
        ping_interval=20,
        ping_timeout=20,
        compression=None,
        # 200 MB max message size
        max_size=200 * 2**20,
        read_limit=5 * 2**16,
        write_limit=5 * 2**16,
        create_protocol=WebSocketServerProtocol
    )
    self._server = cast(WSServer, self._server)
    logger.debug("Server started", extra={
        "host": self.host,
        "port": self.port,
        "uri": f"ws://{self.host}:{self.port}"
    })
stop async
stop()

Stop the WebSocket server.

Source code in azad/slipstream/server.py
async def stop(self):
    """Stop the WebSocket server."""
    if self._server:
        logger.debug("Stopping server")
        try:
            self._server.close()
            await self._server.wait_closed()
            logger.debug("Server stopped gracefully")
        except Exception as e:
            logger.warning("Error during server shutdown", extra={
                "error": str(e),
                "error_type": type(e).__name__
            }, exc_info=True)
        finally:
            self._server = None
create async classmethod
create(handler_class: Callable[[Any], ProtocolHandler], host: str = 'localhost', port: int = 8765)

Create and start a new WebSocket server.

Parameters:

  • handler_class (Callable[[Any], ProtocolHandler]) –

    The ProtocolHandler class to use for new connections

  • host (str, default: 'localhost' ) –

    The host to bind to

  • port (int, default: 8765 ) –

    The port to listen on

Returns:

  • A running WebSocketServer instance

Source code in azad/slipstream/server.py
@classmethod
async def create(cls, handler_class: Callable[[Any], ProtocolHandler], host: str = "localhost", port: int = 8765):
    """Create and start a new WebSocket server.

    Args:
        handler_class: The ProtocolHandler class to use for new connections
        host: The host to bind to
        port: The port to listen on

    Returns:
        A running WebSocketServer instance
    """
    server = cls(handler_class, host, port)
    await server.start()
    return server

WebSocketNetworkProtocol

WebSocketNetworkProtocol(handler_class: Optional[Callable[[Any], ProtocolHandler]] = None)

Bases: NetworkProtocol

WebSocket-based implementation of the NetworkProtocol interface.

Initialize the protocol.

Parameters:

  • handler_class (Optional[Callable[[Any], ProtocolHandler]], default: None ) –

    Optional factory function to create protocol handlers. If not provided, a default handler will be used.

Source code in azad/slipstream/websocket.py
def __init__(self, handler_class: Optional[Callable[[Any], ProtocolHandler]] = None):
    """Initialize the protocol.

    Args:
        handler_class: Optional factory function to create protocol handlers.
                     If not provided, a default handler will be used.
    """
    self._server: Optional[websockets.WebSocketServer] = None
    self._handler: Optional[ProtocolHandler] = None
    self._handler_class = handler_class
    self._state = ConnectionState.DISCONNECTED
Functions
start_server async
start_server(port: int, host: Optional[str]) -> None

Start the WebSocket server.

Parameters:

  • port (int) –

    Port to listen on

  • host (Optional[str]) –

    Host to bind to

Source code in azad/slipstream/websocket.py
async def start_server(self, port: int, host: Optional[str]) -> None:
    """Start the WebSocket server.

    Args:
        port: Port to listen on
        host: Host to bind to
    """
    if self._server:
        raise RuntimeError("Server already running")

    self._state = ConnectionState.CONNECTING

    # Maximum number of port attempts
    max_attempts = 10
    current_port = port

    for attempt in range(max_attempts):
        try:
            self._server = await server.WebSocketServer.create(
                handler_class=self._handler_class,
                port=current_port,
                host=host if host else "localhost"
            )
            self._state = ConnectionState.CONNECTED

            # If we're using a different port than the one requested, log it prominently
            if current_port != port:
                logger.warning(f"Original port {port} was in use, server started on port {current_port} instead", extra={
                    "original_port": port,
                    "actual_port": current_port,
                    "state": self._state.value
                })
            else:
                logger.info("WebSocket server started successfully", extra={
                    "port": current_port,
                    "state": self._state.value
                })
            break  # Successfully started the server, exit the loop
        except OSError as e:
            if e.errno == 48 and "address already in use" in str(e).lower():
                # Port is in use, try the next port
                current_port += 1
                logger.info(f"Port {current_port-1} already in use, trying port {current_port}", extra={
                    "original_port": port,
                    "attempt": attempt + 1,
                    "max_attempts": max_attempts
                })

                # If this was our last attempt, raise a more helpful error
                if attempt == max_attempts - 1:
                    self._state = ConnectionState.DISCONNECTED
                    msg = f"All ports from {port} to {current_port-1} are in use. Please specify a different port."
                    logger.error(msg, extra={
                        "error": str(e),
                        "original_port": port,
                        "state": self._state.value
                    })
                    raise ConnectionLostError(reason=msg)
            else:
                # A different error occurred, report it
                self._state = ConnectionState.DISCONNECTED
                logger.error("Failed to start WebSocket server", extra={
                    "error": str(e),
                    "port": current_port,
                    "state": self._state.value
                }, exc_info=True)
                raise ConnectionLostError(reason=f"Server startup failed: {str(e)}")
        except Exception as e:
            self._state = ConnectionState.DISCONNECTED
            logger.error("Failed to start WebSocket server", extra={
                "error": str(e),
                "port": current_port,
                "state": self._state.value
            }, exc_info=True)
            raise ConnectionLostError(reason=f"Server startup failed: {str(e)}")

    # Keep server running until explicitly stopped
    self._running = asyncio.Event()

    # Add signal handlers for graceful shutdown
    loop = asyncio.get_running_loop()
    try:
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, lambda: asyncio.create_task(self.stop_server()))
    except NotImplementedError:
        # Windows compatibility fallback (signal handlers not supported)
        logger.warning("Signal handlers not supported on this platform")

    # Wait until stopped
    try:
        await self._running.wait()
    except asyncio.CancelledError:
        logger.info("Server interrupted, shutting down")
        await self.stop_server()
stop_server async
stop_server(grace_period: float = 5.0) -> None

Stop the WebSocket server.

Parameters:

  • grace_period (float, default: 5.0 ) –

    Shutdown grace period in seconds

Source code in azad/slipstream/websocket.py
async def stop_server(self, grace_period: float = 5.0) -> None:
    """Stop the WebSocket server.

    Args:
        grace_period: Shutdown grace period in seconds
    """
    if not self._server:
        return

    logger.info("Initiating server shutdown", extra={
        "grace_period": grace_period,
        "state": self._state.value
    })

    self._state = ConnectionState.DISCONNECTING

    try:
        # Signal server to stop
        if hasattr(self, '_running'):
            self._running.set()

        # Set shutdown flag on handler
        if self._handler:
            self._handler._shutdown = True

        # Give ongoing operations time to complete
        if grace_period > 0:
            await asyncio.sleep(grace_period)

        # Stop server
        try:
            await self._server.stop()
            logger.info("Server stopped gracefully")
        except Exception as e:
            logger.warning("Error during server stop", extra={
                "error": str(e)
            }, exc_info=True)

    except Exception as e:
        logger.error("Error during server shutdown", extra={
            "error": str(e),
            "state": self._state.value
        }, exc_info=True)
        raise
    finally:
        self._server = None
        self._handler = None
        self._state = ConnectionState.DISCONNECTED
        logger.info("Server shutdown complete", extra={
            "state": self._state.value
        })

ProtocolCallbacks

Bases: Protocol

Protocol interface defining required callback methods.

Functions
handle_request async
handle_request(request_id: str, data: dict) -> dict

Handle an incoming request.

Source code in azad/slipstream/websocket.py
async def handle_request(self, request_id: str, data: dict) -> dict:
    """Handle an incoming request."""
    ...
handle_stream_request async
handle_stream_request(request_id: str, data: dict) -> None

Handle an incoming stream request.

Source code in azad/slipstream/websocket.py
async def handle_stream_request(self, request_id: str, data: dict) -> None:
    """Handle an incoming stream request."""
    ...
handle_fire_and_forget async
handle_fire_and_forget(data: dict) -> None

Handle a fire-and-forget message.

Source code in azad/slipstream/websocket.py
async def handle_fire_and_forget(self, data: dict) -> None:
    """Handle a fire-and-forget message."""
    ...

NetworkProtocol

Bases: Protocol

Network protocol interface.

Functions
start_server async
start_server(port: int, callbacks: ProtocolCallbacks) -> None

Start the server.

Source code in azad/slipstream/websocket.py
async def start_server(self, port: int, callbacks: ProtocolCallbacks) -> None:
    """Start the server."""
    ...
stop_server async
stop_server(grace_period: float = 5.0) -> None

Stop the server.

Source code in azad/slipstream/websocket.py
async def stop_server(self, grace_period: float = 5.0) -> None:
    """Stop the server."""
    ...

Modules

base

Attributes
logger module-attribute
logger = getLogger(__name__)
Classes
SlipstreamError

Bases: Exception

Base exception class for all slipstream errors.

ConnectionClosedError
ConnectionClosedError(code: Optional[int] = None, reason: Optional[str] = None)

Bases: SlipstreamError

Raised when connection is closed normally (e.g. server shutdown).

Source code in azad/slipstream/base.py
def __init__(self, code: Optional[int] = None, reason: Optional[str] = None):
    self.code = code
    self.reason = reason
    super().__init__(f"Connection closed: {reason or 'no reason given'} (code={code})")
ConnectionLostError
ConnectionLostError(code: Optional[int] = None, reason: Optional[str] = None)

Bases: SlipstreamError

Raised when connection is lost unexpectedly.

Source code in azad/slipstream/base.py
def __init__(self, code: Optional[int] = None, reason: Optional[str] = None):
    self.code = code
    self.reason = reason
    super().__init__(f"Connection lost: {reason or 'unknown reason'} (code={code})")
ProtocolError

Bases: SlipstreamError

Raised when there is a protocol-level error.

ConnectionState

Bases: Enum

Connection states for state tracking.

MessageType

Bases: Enum

Message types supported by the protocol.

FrameModel

Bases: BaseModel

Attributes
peerID class-attribute instance-attribute
peerID: Optional[str] = None
requestId class-attribute instance-attribute
requestId: Optional[Tuple[str, int]] = Field(None, alias='requestId')
Functions
validate_peer_id
validate_peer_id(v, values)
Source code in azad/slipstream/base.py
@validator('peerID')
def validate_peer_id(cls, v, values):
    msg_type = values.get('type')
    if not msg_type:
        return v
    if msg_type in (
        MessageType.REQUEST,
        MessageType.STREAM_REQUEST,
        MessageType.STREAM_STREAM_REQUEST,
        MessageType.FIRE_AND_FORGET
    ):
        if not v:
            raise ValueError("peerID required for request messages")
    return v
BiStreamHandle
BiStreamHandle(protocol: Protocol, request_id: Tuple[str, int])

Manages a bi-directional stream with a particular request_id.

Source code in azad/slipstream/base.py
def __init__(self, protocol: 'Protocol', request_id: Tuple[str, int]):
    self._protocol = protocol
    self.request_id = request_id
    self._inbound_queue = asyncio.Queue()
    self._local_closed = False
    self._remote_closed = False
Attributes
request_id instance-attribute
request_id = request_id
Functions
send async
send(data: Any)

Send data to remote side as STREAM_DATA.

Source code in azad/slipstream/base.py
async def send(self, data: Any):
    """Send data to remote side as STREAM_DATA."""
    if self._local_closed:
        raise ConnectionError("Cannot send on a closed BiStreamHandle")
    logger.debug("BiStream sending data", extra={
        "request_id": self.request_id,
        "data": data
    })
    await self._protocol._send_frame(
        MessageType.STREAM_DATA, data, self.request_id
    )
aclose async
aclose()

Close our side of the stream (send STREAM_END).

Source code in azad/slipstream/base.py
async def aclose(self):
    """Close our side of the stream (send STREAM_END)."""
    if not self._local_closed:
        self._local_closed = True
        await self._protocol._send_frame(
            MessageType.STREAM_END, None, self.request_id
        )
is_closed
is_closed() -> bool

True if both sides have ended.

Source code in azad/slipstream/base.py
def is_closed(self) -> bool:
    """True if both sides have ended."""
    return self._local_closed and self._remote_closed
Protocol
Protocol(websocket)

Complete, decorator-based protocol implementation.

Source code in azad/slipstream/base.py
def __init__(self, websocket):
    self._websocket = websocket
    self.handler = None  # user sets a handler that has decorated methods
    self._request_counter = 0

    self._pending_requests: Dict[str, asyncio.Future] = {}
    self._request_tasks: Dict[str, asyncio.Task] = {}
    self._stream_tasks: Dict[str, asyncio.Task] = {}
    self._bistreams: Dict[str, BiStreamHandle] = {}

    import uuid
    self._peer_id = str(uuid.uuid4())
    self._remote_peer_id = None

    self._state = ConnectionState.CONNECTING
    self._close_code: Optional[int] = None
    self._close_reason: Optional[str] = None

    self._send_lock = asyncio.Lock()
    self._request_timeout = 600.0

    logger.debug(">>> LIFECYCLE: Creating new protocol instance", extra={
        "websocket_id": id(websocket),
        "request_counter": self._request_counter,
    })
Attributes Functions
set_handler
set_handler(handler: ProtocolHandler)

Attach a user-defined handler that has @request_response, etc. methods.

Source code in azad/slipstream/base.py
def set_handler(self, handler: 'ProtocolHandler'):
    """Attach a user-defined handler that has @request_response, etc. methods."""
    self.handler = handler
    handler.protocol = self
listen async
listen()

Reads messages from the websocket and dispatches them.

Source code in azad/slipstream/base.py
async def listen(self):
    """Reads messages from the websocket and dispatches them."""
    if not self.websocket:
        return

    cleanup_error = None
    try:
        async for message in self.websocket:
            try:
                frame = validate_incoming_frame(message)
                self._check_peer_id_collision(frame)
                await self._handle_incoming_frame(frame)
            except ProtocolError as pe:
                logger.error(f"Protocol error: {pe}",exc_info=True)
                continue
    except websockets.exceptions.ConnectionClosed as e:
        cleanup_error = e
    except asyncio.CancelledError:
        cleanup_error = asyncio.CancelledError()
    except Exception as e:
        cleanup_error = e
    finally:
        try:
            if cleanup_error:
                if isinstance(cleanup_error, websockets.exceptions.ConnectionClosed):
                    await self._cleanup_pending_requests("connection closed")
                elif isinstance(cleanup_error, asyncio.CancelledError):
                    await self._cleanup_pending_requests("listener cancelled")
                else:
                    await self._cleanup_pending_requests("listener error")
            else:
                await self._cleanup_pending_requests("listen complete")
        except Exception:
            pass

        if cleanup_error:
            if isinstance(cleanup_error, asyncio.CancelledError):
                raise cleanup_error
            elif isinstance(cleanup_error, websockets.exceptions.ConnectionClosed):
                raise ConnectionClosedError(cleanup_error.code, str(cleanup_error))
            else:
                raise cleanup_error
request_response async
request_response(data: dict) -> dict

Send a single request, wait for single response.

Source code in azad/slipstream/base.py
async def request_response(self, data: dict) -> dict:
    """Send a single request, wait for single response."""
    request_id = (self._peer_id, self._request_counter)
    self._request_counter += 1

    fut = asyncio.Future()
    fut.operation = data.get("operation")
    self._add_pending_request(request_id, fut)

    await self._send_frame(MessageType.REQUEST, data, request_id)
    try:
        return await fut
    finally:
        self._remove_pending_request(request_id)
request_stream async
request_stream(data: dict) -> AsyncIterator[Any]

Send a request, get an async iterator of incoming items.

Source code in azad/slipstream/base.py
async def request_stream(self, data: dict) -> AsyncIterator[Any]:
    """Send a request, get an async iterator of incoming items."""
    if not self.websocket or self.websocket.state.name == "CLOSED":
        raise ConnectionError("WebSocket is closed")

    request_id = (self._peer_id, self._request_counter)
    self._request_counter += 1

    fut = asyncio.Future()
    fut.stream_queue = asyncio.Queue()
    fut.operation = data.get("operation")
    self._add_pending_request(request_id, fut)

    await self._send_frame(MessageType.STREAM_REQUEST, data, request_id)

    async def gen():
        while True:
            item = await fut.stream_queue.get()
            if item is None:
                break
            yield item

    return gen()
stream_stream async
stream_stream(data: dict) -> BiStreamHandle

Initiate a bi-directional stream, returning a handle.

Source code in azad/slipstream/base.py
async def stream_stream(self, data: dict) -> BiStreamHandle:
    """Initiate a bi-directional stream, returning a handle."""
    if not self.websocket or self.websocket.state.name == "CLOSED":
        raise ConnectionError("WebSocket is closed")

    request_id = (self._peer_id, self._request_counter)
    self._request_counter += 1

    handle = BiStreamHandle(self, request_id)
    key = self._get_handler_key(request_id)
    self._bistreams[key] = handle

    await self._send_frame(MessageType.STREAM_STREAM_REQUEST, data, request_id)
    return handle
fire_and_forget async
fire_and_forget(data: dict) -> None

Send a one-way message with no response.

Source code in azad/slipstream/base.py
async def fire_and_forget(self, data: dict) -> None:
    """Send a one-way message with no response."""
    await self._send_frame(MessageType.FIRE_AND_FORGET, data)
ProtocolHandler
ProtocolHandler()

A base class if you want to store reference to protocol, or do custom logic. Typically you just define your methods with the decorators directly.

Source code in azad/slipstream/base.py
def __init__(self):
    self.protocol: Optional[Protocol] = None
Functions
validate_incoming_frame
validate_incoming_frame(frame_data: str) -> FrameModel

Convert raw JSON → dict → FrameModel, raising if invalid.

Source code in azad/slipstream/base.py
def validate_incoming_frame(frame_data: str) -> FrameModel:
    """Convert raw JSON → dict → FrameModel, raising if invalid."""
    try:
        parsed_json = json.loads(frame_data)
        return FrameModel(**parsed_json)
    except (json.JSONDecodeError, ValidationError) as e:
        raise ProtocolError(f"Invalid frame or data: {e}")
request_response
request_response(func)
Source code in azad/slipstream/base.py
def request_response(func):
    func._slipstream_endpoint = {
        "type": MessageType.REQUEST,
        "returns": "single"  # single response
    }
    return func
request_stream
request_stream(func)
Source code in azad/slipstream/base.py
def request_stream(func):
    func._slipstream_endpoint = {
        "type": MessageType.STREAM_REQUEST,
        "returns": "stream"
    }
    return func
stream_stream
stream_stream(func)
Source code in azad/slipstream/base.py
def stream_stream(func):
    func._slipstream_endpoint = {
        "type": MessageType.STREAM_STREAM_REQUEST,
        "returns": "bistream"
    }
    return func
fire_and_forget
fire_and_forget(func)
Source code in azad/slipstream/base.py
def fire_and_forget(func):
    func._slipstream_endpoint = {
        "type": MessageType.FIRE_AND_FORGET,
        "returns": None
    }
    return func

client

WebSocket client implementation.

Attributes
Classes
WebSocketClient
WebSocketClient(handler_factory: Callable[[Any], ProtocolHandler])

Initialize the WebSocket client.

Parameters:

  • handler_factory (Callable[[Any], ProtocolHandler]) –

    Function that creates a new ProtocolHandler instance

Source code in azad/slipstream/client.py
def __init__(self, handler_factory: Callable[[Any], ProtocolHandler]):
    """Initialize the WebSocket client.

    Args:
        handler_factory: Function that creates a new ProtocolHandler instance
    """
    self.handler_factory = handler_factory
    self.websocket: Optional[websockets.WebSocketClientProtocol] = None
    self.protocol: Optional[Protocol] = None
    self._listen_task: Optional[asyncio.Task] = None
Attributes
handler_factory instance-attribute
handler_factory = handler_factory
websocket instance-attribute
websocket: Optional[WebSocketClientProtocol] = None
protocol instance-attribute
protocol: Optional[Protocol] = None
Functions
connect async
connect(uri: str = 'ws://localhost:8765')

Connect to a WebSocket server.

Parameters:

  • uri (str, default: 'ws://localhost:8765' ) –

    The WebSocket URI to connect to

Source code in azad/slipstream/client.py
async def connect(self, uri: str = "ws://localhost:8765"):
    """Connect to a WebSocket server.

    Args:
        uri: The WebSocket URI to connect to
    """
    if self.websocket:
        raise RuntimeError("Already connected")

    try:
        # Connect with explicit close timeout
        self.websocket = await websockets.connect(uri, close_timeout=5.0)

        # Create protocol with websocket
        self.protocol = Protocol(self.websocket)

        # Create handler and link them together
        handler = self.handler_factory(None)
        self.protocol.set_handler(handler)

        # Start listening for messages
        self._listen_task = asyncio.create_task(self._listen())

        # Wait for connection to stabilize and listener to be ready
        await asyncio.sleep(0.2)

        # Verify connection is ready
        if not self.websocket or self.websocket.state.name == "CLOSED":
            raise ConnectionError("Connection failed to stabilize")

        if not self.protocol:
            raise RuntimeError("Protocol initialization failed")

        logger.debug("Client connected successfully", extra={
            "uri": uri,
            "websocket_state": self.websocket.state if self.websocket else None,
            "has_protocol": self.protocol is not None,
            "listen_task_state": self._listen_task.done() if self._listen_task else None
        })

    except (websockets.exceptions.WebSocketException, ConnectionError) as e:
        logger.error("Failed to connect", extra={
            "uri": uri,
            "error": str(e)
        }, exc_info=True)
        # Clean up any partial connection
        await self._cleanup_connection()
        raise ConnectionLostError(reason=f"Connection failed: {str(e)}")
    except Exception as e:
        logger.error("Unexpected error during connect", extra={
            "uri": uri,
            "error": str(e),
            "error_type": type(e).__name__
        }, exc_info=True)
        # Clean up any partial connection
        await self._cleanup_connection()
        raise
disconnect async
disconnect()

Disconnect from the WebSocket server.

Source code in azad/slipstream/client.py
async def disconnect(self):
    """Disconnect from the WebSocket server."""
    if not self.websocket and not self.protocol and not self._listen_task:
        return  # Already disconnected

    logger.debug("Starting client disconnect", extra={
        "websocket_state": self.websocket.state if self.websocket else None,
        "has_protocol": self.protocol is not None,
        "listen_task_state": self._listen_task.done() if self._listen_task else None
    })

    await self._cleanup_connection()
request_response async
request_response(data)

Send a request and wait for a response.

Parameters:

  • data

    The request data to send

Returns:

  • The response data

Raises:

Source code in azad/slipstream/client.py
async def request_response(self, data):
    """
    Send a request and wait for a response.

    Args:
        data: The request data to send

    Returns:
        The response data

    Raises:
        RuntimeError: If not connected
        ConnectionClosedError: If connection closed normally
        ConnectionLostError: If connection lost unexpectedly
    """
    self._check_connection()

    try:
        return await self.protocol.request_response(data)
    except (ConnectionClosedError, ConnectionLostError, RuntimeError) as e:
        # These are already logged appropriately
        raise
    except websockets.exceptions.ConnectionClosed as e:
        logger.debug("Request ended: Connection closed", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "websocket_state": self.websocket.state if self.websocket else None,
            "code": e.code
        })
        raise ConnectionClosedError(e.code, str(e))
    except Exception as e:
        logger.error("Error in request_response", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "websocket_state": self.websocket.state if self.websocket else None
        }, exc_info=True)
        raise ConnectionLostError(reason=f"Request failed: {str(e)}")
request_stream async
request_stream(data)

Send a request and receive a stream of responses.

Parameters:

  • data

    The request data to send

Returns:

  • An async iterator of response items

Raises:

Source code in azad/slipstream/client.py
async def request_stream(self, data):
    """
    Send a request and receive a stream of responses.

    Args:
        data: The request data to send

    Returns:
        An async iterator of response items

    Raises:
        RuntimeError: If not connected
        ConnectionClosedError: If connection closed normally
        ConnectionLostError: If connection lost unexpectedly
    """
    if not self.protocol:
        raise RuntimeError("Not connected")

    try:
        stream = await self.protocol.request_stream(data)
        async for item in stream:
            yield item
    except (ConnectionClosedError, ConnectionLostError) as e:
        # These are already logged appropriately
        raise
    except Exception as e:
        # For connection errors we handle, use debug level
        if isinstance(e, (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK)):
            logger.debug("Stream ended: Connection closed", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            })
        else:
            logger.error("Error in request_stream", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            }, exc_info=True)
        raise ConnectionLostError(reason=f"Stream failed: {str(e)}")
fire_and_forget async
fire_and_forget(data)

Send a one-way message with no response.

Parameters:

  • data

    The message data to send

Raises:

Source code in azad/slipstream/client.py
async def fire_and_forget(self, data):
    """
    Send a one-way message with no response.

    Args:
        data: The message data to send

    Raises:
        RuntimeError: If not connected
        ConnectionClosedError: If connection closed normally
        ConnectionLostError: If connection lost unexpectedly
    """
    if not self.protocol:
        raise RuntimeError("Not connected")

    try:
        await self.protocol.fire_and_forget(data)
    except (ConnectionClosedError, ConnectionLostError) as e:
        # These are already logged appropriately
        raise
    except Exception as e:
        # For connection errors we handle, use debug level
        if isinstance(e, (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK)):
            logger.debug("Send ended: Connection closed", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            })
        else:
            logger.error("Error in fire_and_forget", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "websocket_state": self.websocket.state if self.websocket else None
            }, exc_info=True)
        raise ConnectionLostError(reason=f"Send failed: {str(e)}")
create async classmethod
create(handler_factory: Callable[[Any], ProtocolHandler], uri: str = 'ws://localhost:8765')

Create and connect a new WebSocket client.

Parameters:

  • handler_factory (Callable[[Any], ProtocolHandler]) –

    Function that creates a new ProtocolHandler instance

  • uri (str, default: 'ws://localhost:8765' ) –

    The WebSocket URI to connect to

Returns:

  • A connected WebSocketClient instance

Source code in azad/slipstream/client.py
@classmethod
async def create(cls, handler_factory: Callable[[Any], ProtocolHandler], uri: str = "ws://localhost:8765"):
    """
    Create and connect a new WebSocket client.

    Args:
        handler_factory: Function that creates a new ProtocolHandler instance
        uri: The WebSocket URI to connect to

    Returns:
        A connected WebSocketClient instance
    """
    client = cls(handler_factory)
    await client.connect(uri)
    return client
Functions
run_client async
run_client(handler_factory: Callable[[Any], ProtocolHandler], uri: str = 'ws://localhost:8765')

Run a WebSocket client until interrupted.

Parameters:

  • handler_factory (Callable[[Any], ProtocolHandler]) –

    Function that creates a new ProtocolHandler instance

  • uri (str, default: 'ws://localhost:8765' ) –

    The WebSocket URI to connect to

Source code in azad/slipstream/client.py
async def run_client(handler_factory: Callable[[Any], ProtocolHandler], uri: str = "ws://localhost:8765"):
    """
    Run a WebSocket client until interrupted.

    Args:
        handler_factory: Function that creates a new ProtocolHandler instance
        uri: The WebSocket URI to connect to
    """
    client = await WebSocketClient.create(handler_factory, uri)

    try:
        # Keep the client running until interrupted
        await asyncio.Future()  # run forever
    except asyncio.CancelledError:
        # Task cancellation is normal during shutdown
        logger.debug("Client task cancelled", extra={
            "uri": uri,
            "task": asyncio.current_task().get_name() if asyncio.current_task() else None
        })
        raise
    except (ConnectionClosedError, ConnectionLostError,
            websockets.exceptions.ConnectionClosedError,
            websockets.exceptions.ConnectionClosedOK) as e:
        # Connection closure is expected during shutdown
        logger.debug("Client stopped due to connection closure", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "uri": uri,
            "code": getattr(e, "code", None),
            "task": asyncio.current_task().get_name() if asyncio.current_task() else None
        })
        raise
    except Exception as e:
        # Only log unexpected errors as errors
        logger.error("Unexpected error in client run loop", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "uri": uri,
            "task": asyncio.current_task().get_name() if asyncio.current_task() else None
        }, exc_info=True)
        raise
    finally:
        try:
            await client.disconnect()
        except (ConnectionClosedError, ConnectionLostError,
                websockets.exceptions.ConnectionClosedError,
                websockets.exceptions.ConnectionClosedOK) as e:
            # Connection closure during cleanup is expected
            logger.debug("Connection closed during client shutdown", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "uri": uri,
                "code": getattr(e, "code", None),
                "task": asyncio.current_task().get_name() if asyncio.current_task() else None
            })
        except Exception as e:
            # Only log unexpected errors as warnings during cleanup
            logger.warning("Unexpected error during client shutdown", extra={
                "error": str(e),
                "error_type": type(e).__name__,
                "uri": uri,
                "task": asyncio.current_task().get_name() if asyncio.current_task() else None
            }, exc_info=True)

server

WebSocket server implementation.

Attributes
logger module-attribute
logger = getLogger(__name__)
Classes
WebSocketServer
WebSocketServer(handler_class: Callable[[Any], ProtocolHandler], host: str = 'localhost', port: int = 8765)

Initialize the WebSocket server.

Parameters:

  • handler_class (Callable[[Any], ProtocolHandler]) –

    Factory function that takes callbacks and returns a ProtocolHandler

  • host (str, default: 'localhost' ) –

    The host to bind to

  • port (int, default: 8765 ) –

    The port to listen on

Source code in azad/slipstream/server.py
def __init__(self, handler_class: Callable[[Any], ProtocolHandler], host: str = "localhost", port: int = 8765):
    """Initialize the WebSocket server.

    Args:
        handler_class: Factory function that takes callbacks and returns a ProtocolHandler
        host: The host to bind to
        port: The port to listen on
    """
    self.handler_class = handler_class
    self.host = host
    self.port = port
    self._server: Optional[WSServer] = None
Attributes
handler_class instance-attribute
handler_class = handler_class
Functions
handle_connection async
handle_connection(websocket)

Handle an incoming WebSocket connection.

Parameters:

  • websocket

    The WebSocket connection to handle

Source code in azad/slipstream/server.py
async def handle_connection(self, websocket):
    """Handle an incoming WebSocket connection.

    Args:
        websocket: The WebSocket connection to handle
    """
    logger.debug("New connection established", extra={
        "websocket_state": websocket.state
    })

    # Create handler with None as initial callbacks
    handler = self.handler_class(None)

    # Create protocol and link them together
    protocol = Protocol(websocket)
    protocol.set_handler(handler)

    try:
        logger.debug("Starting protocol listener")
        await protocol.listen()
    except (ConnectionClosedError, ConnectionLostError) as e:
        # These are already logged appropriately
        pass
    except Exception as e:
        logger.error("Unexpected connection error", extra={
            "error": str(e),
            "error_type": type(e).__name__,
            "websocket_state": websocket.state
        }, exc_info=True)
    finally:
        await websocket.close()
start async
start()

Start the WebSocket server.

Source code in azad/slipstream/server.py
async def start(self):
    """Start the WebSocket server."""
    self._server = await serve(
        ws_handler=self.handle_connection,
        host=self.host,
        port=self.port,
        ping_interval=20,
        ping_timeout=20,
        compression=None,
        # 200 MB max message size
        max_size=200 * 2**20,
        read_limit=5 * 2**16,
        write_limit=5 * 2**16,
        create_protocol=WebSocketServerProtocol
    )
    self._server = cast(WSServer, self._server)
    logger.debug("Server started", extra={
        "host": self.host,
        "port": self.port,
        "uri": f"ws://{self.host}:{self.port}"
    })
stop async
stop()

Stop the WebSocket server.

Source code in azad/slipstream/server.py
async def stop(self):
    """Stop the WebSocket server."""
    if self._server:
        logger.debug("Stopping server")
        try:
            self._server.close()
            await self._server.wait_closed()
            logger.debug("Server stopped gracefully")
        except Exception as e:
            logger.warning("Error during server shutdown", extra={
                "error": str(e),
                "error_type": type(e).__name__
            }, exc_info=True)
        finally:
            self._server = None
create async classmethod
create(handler_class: Callable[[Any], ProtocolHandler], host: str = 'localhost', port: int = 8765)

Create and start a new WebSocket server.

Parameters:

  • handler_class (Callable[[Any], ProtocolHandler]) –

    The ProtocolHandler class to use for new connections

  • host (str, default: 'localhost' ) –

    The host to bind to

  • port (int, default: 8765 ) –

    The port to listen on

Returns:

  • A running WebSocketServer instance

Source code in azad/slipstream/server.py
@classmethod
async def create(cls, handler_class: Callable[[Any], ProtocolHandler], host: str = "localhost", port: int = 8765):
    """Create and start a new WebSocket server.

    Args:
        handler_class: The ProtocolHandler class to use for new connections
        host: The host to bind to
        port: The port to listen on

    Returns:
        A running WebSocketServer instance
    """
    server = cls(handler_class, host, port)
    await server.start()
    return server
Functions
run_server async
run_server(handler_class: Callable[[Any], ProtocolHandler], host: str = 'localhost', port: int = 8765)

Run a WebSocket server until interrupted.

Parameters:

  • handler_class (Callable[[Any], ProtocolHandler]) –

    The ProtocolHandler class to use for new connections

  • host (str, default: 'localhost' ) –

    The host to bind to

  • port (int, default: 8765 ) –

    The port to listen on

Source code in azad/slipstream/server.py
async def run_server(handler_class: Callable[[Any], ProtocolHandler], host: str = "localhost", port: int = 8765):
    """Run a WebSocket server until interrupted.

    Args:
        handler_class: The ProtocolHandler class to use for new connections
        host: The host to bind to
        port: The port to listen on
    """
    server = await WebSocketServer.create(handler_class, host, port)

    try:
        # Keep the server running until interrupted
        await asyncio.Future()  # run forever
    finally:
        await server.stop()

websocket

WebSocket-based network protocol implementation.

Attributes
logger module-attribute
logger = getLogger(__name__)
websockets_logger module-attribute
websockets_logger = getLogger('websockets')
Classes
ProtocolCallbacks

Bases: Protocol

Protocol interface defining required callback methods.

Functions
handle_request async
handle_request(request_id: str, data: dict) -> dict

Handle an incoming request.

Source code in azad/slipstream/websocket.py
async def handle_request(self, request_id: str, data: dict) -> dict:
    """Handle an incoming request."""
    ...
handle_stream_request async
handle_stream_request(request_id: str, data: dict) -> None

Handle an incoming stream request.

Source code in azad/slipstream/websocket.py
async def handle_stream_request(self, request_id: str, data: dict) -> None:
    """Handle an incoming stream request."""
    ...
handle_fire_and_forget async
handle_fire_and_forget(data: dict) -> None

Handle a fire-and-forget message.

Source code in azad/slipstream/websocket.py
async def handle_fire_and_forget(self, data: dict) -> None:
    """Handle a fire-and-forget message."""
    ...
NetworkProtocol

Bases: Protocol

Network protocol interface.

Functions
start_server async
start_server(port: int, callbacks: ProtocolCallbacks) -> None

Start the server.

Source code in azad/slipstream/websocket.py
async def start_server(self, port: int, callbacks: ProtocolCallbacks) -> None:
    """Start the server."""
    ...
stop_server async
stop_server(grace_period: float = 5.0) -> None

Stop the server.

Source code in azad/slipstream/websocket.py
async def stop_server(self, grace_period: float = 5.0) -> None:
    """Stop the server."""
    ...
WebSocketNetworkProtocol
WebSocketNetworkProtocol(handler_class: Optional[Callable[[Any], ProtocolHandler]] = None)

Bases: NetworkProtocol

WebSocket-based implementation of the NetworkProtocol interface.

Initialize the protocol.

Parameters:

  • handler_class (Optional[Callable[[Any], ProtocolHandler]], default: None ) –

    Optional factory function to create protocol handlers. If not provided, a default handler will be used.

Source code in azad/slipstream/websocket.py
def __init__(self, handler_class: Optional[Callable[[Any], ProtocolHandler]] = None):
    """Initialize the protocol.

    Args:
        handler_class: Optional factory function to create protocol handlers.
                     If not provided, a default handler will be used.
    """
    self._server: Optional[websockets.WebSocketServer] = None
    self._handler: Optional[ProtocolHandler] = None
    self._handler_class = handler_class
    self._state = ConnectionState.DISCONNECTED
Functions
start_server async
start_server(port: int, host: Optional[str]) -> None

Start the WebSocket server.

Parameters:

  • port (int) –

    Port to listen on

  • host (Optional[str]) –

    Host to bind to

Source code in azad/slipstream/websocket.py
async def start_server(self, port: int, host: Optional[str]) -> None:
    """Start the WebSocket server.

    Args:
        port: Port to listen on
        host: Host to bind to
    """
    if self._server:
        raise RuntimeError("Server already running")

    self._state = ConnectionState.CONNECTING

    # Maximum number of port attempts
    max_attempts = 10
    current_port = port

    for attempt in range(max_attempts):
        try:
            self._server = await server.WebSocketServer.create(
                handler_class=self._handler_class,
                port=current_port,
                host=host if host else "localhost"
            )
            self._state = ConnectionState.CONNECTED

            # If we're using a different port than the one requested, log it prominently
            if current_port != port:
                logger.warning(f"Original port {port} was in use, server started on port {current_port} instead", extra={
                    "original_port": port,
                    "actual_port": current_port,
                    "state": self._state.value
                })
            else:
                logger.info("WebSocket server started successfully", extra={
                    "port": current_port,
                    "state": self._state.value
                })
            break  # Successfully started the server, exit the loop
        except OSError as e:
            if e.errno == 48 and "address already in use" in str(e).lower():
                # Port is in use, try the next port
                current_port += 1
                logger.info(f"Port {current_port-1} already in use, trying port {current_port}", extra={
                    "original_port": port,
                    "attempt": attempt + 1,
                    "max_attempts": max_attempts
                })

                # If this was our last attempt, raise a more helpful error
                if attempt == max_attempts - 1:
                    self._state = ConnectionState.DISCONNECTED
                    msg = f"All ports from {port} to {current_port-1} are in use. Please specify a different port."
                    logger.error(msg, extra={
                        "error": str(e),
                        "original_port": port,
                        "state": self._state.value
                    })
                    raise ConnectionLostError(reason=msg)
            else:
                # A different error occurred, report it
                self._state = ConnectionState.DISCONNECTED
                logger.error("Failed to start WebSocket server", extra={
                    "error": str(e),
                    "port": current_port,
                    "state": self._state.value
                }, exc_info=True)
                raise ConnectionLostError(reason=f"Server startup failed: {str(e)}")
        except Exception as e:
            self._state = ConnectionState.DISCONNECTED
            logger.error("Failed to start WebSocket server", extra={
                "error": str(e),
                "port": current_port,
                "state": self._state.value
            }, exc_info=True)
            raise ConnectionLostError(reason=f"Server startup failed: {str(e)}")

    # Keep server running until explicitly stopped
    self._running = asyncio.Event()

    # Add signal handlers for graceful shutdown
    loop = asyncio.get_running_loop()
    try:
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, lambda: asyncio.create_task(self.stop_server()))
    except NotImplementedError:
        # Windows compatibility fallback (signal handlers not supported)
        logger.warning("Signal handlers not supported on this platform")

    # Wait until stopped
    try:
        await self._running.wait()
    except asyncio.CancelledError:
        logger.info("Server interrupted, shutting down")
        await self.stop_server()
stop_server async
stop_server(grace_period: float = 5.0) -> None

Stop the WebSocket server.

Parameters:

  • grace_period (float, default: 5.0 ) –

    Shutdown grace period in seconds

Source code in azad/slipstream/websocket.py
async def stop_server(self, grace_period: float = 5.0) -> None:
    """Stop the WebSocket server.

    Args:
        grace_period: Shutdown grace period in seconds
    """
    if not self._server:
        return

    logger.info("Initiating server shutdown", extra={
        "grace_period": grace_period,
        "state": self._state.value
    })

    self._state = ConnectionState.DISCONNECTING

    try:
        # Signal server to stop
        if hasattr(self, '_running'):
            self._running.set()

        # Set shutdown flag on handler
        if self._handler:
            self._handler._shutdown = True

        # Give ongoing operations time to complete
        if grace_period > 0:
            await asyncio.sleep(grace_period)

        # Stop server
        try:
            await self._server.stop()
            logger.info("Server stopped gracefully")
        except Exception as e:
            logger.warning("Error during server stop", extra={
                "error": str(e)
            }, exc_info=True)

    except Exception as e:
        logger.error("Error during server shutdown", extra={
            "error": str(e),
            "state": self._state.value
        }, exc_info=True)
        raise
    finally:
        self._server = None
        self._handler = None
        self._state = ConnectionState.DISCONNECTED
        logger.info("Server shutdown complete", extra={
            "state": self._state.value
        })
Modules