WebSocket Guide

Hornbeam provides full WebSocket support via ASGI, enabling real-time bidirectional communication for chat apps, live updates, multiplayer games, and more.

Basic WebSocket Handler

# app.py
async def application(scope, receive, send):
    if scope['type'] == 'websocket':
        # Accept the connection
        await send({'type': 'websocket.accept'})

        while True:
            message = await receive()

            if message['type'] == 'websocket.disconnect':
                break

            if message['type'] == 'websocket.receive':
                # Echo the message back
                text = message.get('text', '')
                await send({
                    'type': 'websocket.send',
                    'text': f'Echo: {text}'
                })
hornbeam:start("app:application", #{worker_class => asgi}).

WebSocket Scope

KeyTypeDescription
typestr"websocket"
pathstrURL path
query_stringbytesQuery string
headerslistRequest headers
subprotocolslistRequested subprotocols
clienttuple(host, port)
servertuple(host, port)

Message Types

Receive Messages

TypeFieldsDescription
websocket.connect-Client requesting connection
websocket.receivetext or bytesMessage from client
websocket.disconnectcodeClient disconnected

Send Messages

TypeFieldsDescription
websocket.acceptsubprotocolAccept connection
websocket.sendtext or bytesSend to client
websocket.closecode, reasonClose connection

Chat Room with Pub/Sub

Use Hornbeam’s Erlang pub/sub for multi-user chat:

# chat.py
from hornbeam_erlang import publish, subscribe, unsubscribe
import asyncio
import json

async def application(scope, receive, send):
    if scope['type'] == 'websocket':
        await send({'type': 'websocket.accept'})

        # Get room from path: /chat/room-name
        path = scope.get('path', '/chat/default')
        room = path.split('/')[-1] or 'default'
        topic = f'chat:{room}'

        # Subscribe to room
        subscribe(topic)

        try:
            # Create tasks for receiving from client and pub/sub
            await handle_chat(scope, receive, send, topic)
        finally:
            unsubscribe(topic)

async def handle_chat(scope, receive, send, topic):
    while True:
        message = await receive()

        if message['type'] == 'websocket.disconnect':
            break

        if message['type'] == 'websocket.receive':
            text = message.get('text', '')
            data = json.loads(text)

            # Broadcast to all subscribers
            publish(topic, {
                'type': 'message',
                'user': data.get('user', 'anonymous'),
                'text': data.get('text', '')
            })

        # Check for pub/sub messages
        pubsub_msg = check_pubsub()
        if pubsub_msg:
            await send({
                'type': 'websocket.send',
                'text': json.dumps(pubsub_msg)
            })

Binary Data

Send and receive binary data:

async def application(scope, receive, send):
    if scope['type'] == 'websocket':
        await send({'type': 'websocket.accept'})

        while True:
            message = await receive()

            if message['type'] == 'websocket.disconnect':
                break

            if message['type'] == 'websocket.receive':
                # Handle binary data
                if 'bytes' in message:
                    data = message['bytes']
                    # Process binary (e.g., image, protobuf)
                    result = process_binary(data)
                    await send({
                        'type': 'websocket.send',
                        'bytes': result
                    })
                # Handle text
                elif 'text' in message:
                    await send({
                        'type': 'websocket.send',
                        'text': message['text']
                    })

Subprotocols

Handle WebSocket subprotocols:

async def application(scope, receive, send):
    if scope['type'] == 'websocket':
        # Check requested subprotocols
        requested = scope.get('subprotocols', [])

        # Choose one we support
        if 'graphql-ws' in requested:
            subprotocol = 'graphql-ws'
        elif 'json' in requested:
            subprotocol = 'json'
        else:
            subprotocol = None

        await send({
            'type': 'websocket.accept',
            'subprotocol': subprotocol
        })

        # Handle based on protocol
        if subprotocol == 'graphql-ws':
            await handle_graphql(scope, receive, send)
        else:
            await handle_generic(scope, receive, send)

Connection with ETS State

Track connected clients using ETS:

from hornbeam_erlang import state_set, state_get, state_delete, state_incr
import uuid

async def application(scope, receive, send):
    if scope['type'] == 'websocket':
        # Generate unique client ID
        client_id = str(uuid.uuid4())

        await send({'type': 'websocket.accept'})

        # Register client
        state_set(f'ws:client:{client_id}', {
            'connected_at': time.time(),
            'path': scope.get('path', '/')
        })
        state_incr('ws:active_connections')

        try:
            await handle_connection(scope, receive, send, client_id)
        finally:
            # Cleanup on disconnect
            state_delete(f'ws:client:{client_id}')
            state_incr('ws:active_connections', -1)

async def handle_connection(scope, receive, send, client_id):
    while True:
        message = await receive()
        if message['type'] == 'websocket.disconnect':
            break
        # Handle messages...

Live Dashboard Example

Push real-time updates to dashboard:

from hornbeam_erlang import state_get
import asyncio
import json

async def application(scope, receive, send):
    if scope['type'] == 'websocket':
        await send({'type': 'websocket.accept'})

        # Start background task to push updates
        push_task = asyncio.create_task(
            push_metrics(send)
        )

        try:
            while True:
                message = await receive()
                if message['type'] == 'websocket.disconnect':
                    break
        finally:
            push_task.cancel()

async def push_metrics(send):
    while True:
        # Get metrics from ETS
        metrics = {
            'requests': state_get('metrics:requests') or 0,
            'errors': state_get('metrics:errors') or 0,
            'latency_p99': state_get('metrics:latency_p99') or 0,
        }

        await send({
            'type': 'websocket.send',
            'text': json.dumps(metrics)
        })

        await asyncio.sleep(1)  # Update every second

Error Handling

async def application(scope, receive, send):
    if scope['type'] == 'websocket':
        try:
            await send({'type': 'websocket.accept'})
            await handle_websocket(scope, receive, send)
        except Exception as e:
            # Close with error code
            await send({
                'type': 'websocket.close',
                'code': 1011,  # Internal error
                'reason': 'Internal server error'
            })

Close Codes

CodeMeaning
1000Normal closure
1001Going away
1002Protocol error
1003Unsupported data
1008Policy violation
1011Internal error

Configuration

hornbeam:start("app:application", #{
    worker_class => asgi,

    %% WebSocket settings
    websocket_timeout => 60000,        % Idle timeout (ms)
    websocket_max_frame_size => 16777216  % Max frame size (16MB)
}).

FastAPI WebSocket

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from hornbeam_erlang import state_incr

app = FastAPI()

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await websocket.accept()
    state_incr(f'room:{room}:users')

    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Room {room}: {data}")
    except WebSocketDisconnect:
        state_incr(f'room:{room}:users', -1)

Starlette WebSocket

from starlette.applications import Starlette
from starlette.routing import WebSocketRoute
from starlette.websockets import WebSocket

async def websocket_handler(websocket: WebSocket):
    await websocket.accept()
    while True:
        text = await websocket.receive_text()
        await websocket.send_text(f'Echo: {text}')

app = Starlette(routes=[
    WebSocketRoute('/ws', websocket_handler),
])

Testing WebSockets

Using websocat

websocat ws://localhost:8000/ws

Using Python

import asyncio
import websockets

async def test():
    async with websockets.connect('ws://localhost:8000/ws') as ws:
        await ws.send('Hello!')
        response = await ws.recv()
        print(response)

asyncio.run(test())

Next Steps