Skip to content

azad.slipstream.base Module

azad.slipstream.base

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

SlipstreamError

Bases: Exception

Base exception class for all slipstream errors.

ConnectionClosedError

ConnectionClosedError(code: Optional[int] = None, reason: Optional[str] = None)

Bases: SlipstreamError

Raised when connection is closed normally (e.g. server shutdown).

Source code in azad/slipstream/base.py
def __init__(self, code: Optional[int] = None, reason: Optional[str] = None):
    self.code = code
    self.reason = reason
    super().__init__(f"Connection closed: {reason or 'no reason given'} (code={code})")

ConnectionLostError

ConnectionLostError(code: Optional[int] = None, reason: Optional[str] = None)

Bases: SlipstreamError

Raised when connection is lost unexpectedly.

Source code in azad/slipstream/base.py
def __init__(self, code: Optional[int] = None, reason: Optional[str] = None):
    self.code = code
    self.reason = reason
    super().__init__(f"Connection lost: {reason or 'unknown reason'} (code={code})")

ProtocolError

Bases: SlipstreamError

Raised when there is a protocol-level error.

ConnectionState

Bases: Enum

Connection states for state tracking.

MessageType

Bases: Enum

Message types supported by the protocol.

FrameModel

Bases: BaseModel

Attributes
peerID class-attribute instance-attribute
peerID: Optional[str] = None
requestId class-attribute instance-attribute
requestId: Optional[Tuple[str, int]] = Field(None, alias='requestId')
Functions
validate_peer_id
validate_peer_id(v, values)
Source code in azad/slipstream/base.py
@validator('peerID')
def validate_peer_id(cls, v, values):
    msg_type = values.get('type')
    if not msg_type:
        return v
    if msg_type in (
        MessageType.REQUEST,
        MessageType.STREAM_REQUEST,
        MessageType.STREAM_STREAM_REQUEST,
        MessageType.FIRE_AND_FORGET
    ):
        if not v:
            raise ValueError("peerID required for request messages")
    return v

BiStreamHandle

BiStreamHandle(protocol: Protocol, request_id: Tuple[str, int])

Manages a bi-directional stream with a particular request_id.

Source code in azad/slipstream/base.py
def __init__(self, protocol: 'Protocol', request_id: Tuple[str, int]):
    self._protocol = protocol
    self.request_id = request_id
    self._inbound_queue = asyncio.Queue()
    self._local_closed = False
    self._remote_closed = False
Attributes
request_id instance-attribute
request_id = request_id
Functions
send async
send(data: Any)

Send data to remote side as STREAM_DATA.

Source code in azad/slipstream/base.py
async def send(self, data: Any):
    """Send data to remote side as STREAM_DATA."""
    if self._local_closed:
        raise ConnectionError("Cannot send on a closed BiStreamHandle")
    logger.debug("BiStream sending data", extra={
        "request_id": self.request_id,
        "data": data
    })
    await self._protocol._send_frame(
        MessageType.STREAM_DATA, data, self.request_id
    )
aclose async
aclose()

Close our side of the stream (send STREAM_END).

Source code in azad/slipstream/base.py
async def aclose(self):
    """Close our side of the stream (send STREAM_END)."""
    if not self._local_closed:
        self._local_closed = True
        await self._protocol._send_frame(
            MessageType.STREAM_END, None, self.request_id
        )
is_closed
is_closed() -> bool

True if both sides have ended.

Source code in azad/slipstream/base.py
def is_closed(self) -> bool:
    """True if both sides have ended."""
    return self._local_closed and self._remote_closed

Protocol

Protocol(websocket)

Complete, decorator-based protocol implementation.

Source code in azad/slipstream/base.py
def __init__(self, websocket):
    self._websocket = websocket
    self.handler = None  # user sets a handler that has decorated methods
    self._request_counter = 0

    self._pending_requests: Dict[str, asyncio.Future] = {}
    self._request_tasks: Dict[str, asyncio.Task] = {}
    self._stream_tasks: Dict[str, asyncio.Task] = {}
    self._bistreams: Dict[str, BiStreamHandle] = {}

    import uuid
    self._peer_id = str(uuid.uuid4())
    self._remote_peer_id = None

    self._state = ConnectionState.CONNECTING
    self._close_code: Optional[int] = None
    self._close_reason: Optional[str] = None

    self._send_lock = asyncio.Lock()
    self._request_timeout = 600.0

    logger.debug(">>> LIFECYCLE: Creating new protocol instance", extra={
        "websocket_id": id(websocket),
        "request_counter": self._request_counter,
    })
Attributes
Functions
set_handler
set_handler(handler: ProtocolHandler)

Attach a user-defined handler that has @request_response, etc. methods.

Source code in azad/slipstream/base.py
def set_handler(self, handler: 'ProtocolHandler'):
    """Attach a user-defined handler that has @request_response, etc. methods."""
    self.handler = handler
    handler.protocol = self
listen async
listen()

Reads messages from the websocket and dispatches them.

Source code in azad/slipstream/base.py
async def listen(self):
    """Reads messages from the websocket and dispatches them."""
    if not self.websocket:
        return

    cleanup_error = None
    try:
        async for message in self.websocket:
            try:
                frame = validate_incoming_frame(message)
                self._check_peer_id_collision(frame)
                await self._handle_incoming_frame(frame)
            except ProtocolError as pe:
                logger.error(f"Protocol error: {pe}",exc_info=True)
                continue
    except websockets.exceptions.ConnectionClosed as e:
        cleanup_error = e
    except asyncio.CancelledError:
        cleanup_error = asyncio.CancelledError()
    except Exception as e:
        cleanup_error = e
    finally:
        try:
            if cleanup_error:
                if isinstance(cleanup_error, websockets.exceptions.ConnectionClosed):
                    await self._cleanup_pending_requests("connection closed")
                elif isinstance(cleanup_error, asyncio.CancelledError):
                    await self._cleanup_pending_requests("listener cancelled")
                else:
                    await self._cleanup_pending_requests("listener error")
            else:
                await self._cleanup_pending_requests("listen complete")
        except Exception:
            pass

        if cleanup_error:
            if isinstance(cleanup_error, asyncio.CancelledError):
                raise cleanup_error
            elif isinstance(cleanup_error, websockets.exceptions.ConnectionClosed):
                raise ConnectionClosedError(cleanup_error.code, str(cleanup_error))
            else:
                raise cleanup_error
request_response async
request_response(data: dict) -> dict

Send a single request, wait for single response.

Source code in azad/slipstream/base.py
async def request_response(self, data: dict) -> dict:
    """Send a single request, wait for single response."""
    request_id = (self._peer_id, self._request_counter)
    self._request_counter += 1

    fut = asyncio.Future()
    fut.operation = data.get("operation")
    self._add_pending_request(request_id, fut)

    await self._send_frame(MessageType.REQUEST, data, request_id)
    try:
        return await fut
    finally:
        self._remove_pending_request(request_id)
request_stream async
request_stream(data: dict) -> AsyncIterator[Any]

Send a request, get an async iterator of incoming items.

Source code in azad/slipstream/base.py
async def request_stream(self, data: dict) -> AsyncIterator[Any]:
    """Send a request, get an async iterator of incoming items."""
    if not self.websocket or self.websocket.state.name == "CLOSED":
        raise ConnectionError("WebSocket is closed")

    request_id = (self._peer_id, self._request_counter)
    self._request_counter += 1

    fut = asyncio.Future()
    fut.stream_queue = asyncio.Queue()
    fut.operation = data.get("operation")
    self._add_pending_request(request_id, fut)

    await self._send_frame(MessageType.STREAM_REQUEST, data, request_id)

    async def gen():
        while True:
            item = await fut.stream_queue.get()
            if item is None:
                break
            yield item

    return gen()
stream_stream async
stream_stream(data: dict) -> BiStreamHandle

Initiate a bi-directional stream, returning a handle.

Source code in azad/slipstream/base.py
async def stream_stream(self, data: dict) -> BiStreamHandle:
    """Initiate a bi-directional stream, returning a handle."""
    if not self.websocket or self.websocket.state.name == "CLOSED":
        raise ConnectionError("WebSocket is closed")

    request_id = (self._peer_id, self._request_counter)
    self._request_counter += 1

    handle = BiStreamHandle(self, request_id)
    key = self._get_handler_key(request_id)
    self._bistreams[key] = handle

    await self._send_frame(MessageType.STREAM_STREAM_REQUEST, data, request_id)
    return handle
fire_and_forget async
fire_and_forget(data: dict) -> None

Send a one-way message with no response.

Source code in azad/slipstream/base.py
async def fire_and_forget(self, data: dict) -> None:
    """Send a one-way message with no response."""
    await self._send_frame(MessageType.FIRE_AND_FORGET, data)

ProtocolHandler

ProtocolHandler()

A base class if you want to store reference to protocol, or do custom logic. Typically you just define your methods with the decorators directly.

Source code in azad/slipstream/base.py
def __init__(self):
    self.protocol: Optional[Protocol] = None

Functions

validate_incoming_frame

validate_incoming_frame(frame_data: str) -> FrameModel

Convert raw JSON → dict → FrameModel, raising if invalid.

Source code in azad/slipstream/base.py
def validate_incoming_frame(frame_data: str) -> FrameModel:
    """Convert raw JSON → dict → FrameModel, raising if invalid."""
    try:
        parsed_json = json.loads(frame_data)
        return FrameModel(**parsed_json)
    except (json.JSONDecodeError, ValidationError) as e:
        raise ProtocolError(f"Invalid frame or data: {e}")

request_response

request_response(func)
Source code in azad/slipstream/base.py
def request_response(func):
    func._slipstream_endpoint = {
        "type": MessageType.REQUEST,
        "returns": "single"  # single response
    }
    return func

request_stream

request_stream(func)
Source code in azad/slipstream/base.py
def request_stream(func):
    func._slipstream_endpoint = {
        "type": MessageType.STREAM_REQUEST,
        "returns": "stream"
    }
    return func

stream_stream

stream_stream(func)
Source code in azad/slipstream/base.py
def stream_stream(func):
    func._slipstream_endpoint = {
        "type": MessageType.STREAM_STREAM_REQUEST,
        "returns": "bistream"
    }
    return func

fire_and_forget

fire_and_forget(func)
Source code in azad/slipstream/base.py
def fire_and_forget(func):
    func._slipstream_endpoint = {
        "type": MessageType.FIRE_AND_FORGET,
        "returns": None
    }
    return func