Channels & Presence

Hornbeam provides channels for building real-time applications. Channels multiplex many topics over a single WebSocket connection, with built-in support for presence tracking, broadcasting, and event handling.

How Channels Work

Channels are multiplexed topics over a single WebSocket connection:

  • Socket: The WebSocket connection to the server
  • Channel: A topic-based connection (e.g., room:lobby)
  • Events: Messages sent/received on a channel
  • Presence: Track who’s online in a channel

The wire protocol uses JSON arrays: [join_ref, ref, topic, event, payload]

Python API

Defining Channels

Create a channel with a topic pattern. Patterns can include wildcards (*) to match any segment:

from hornbeam_channels import channel, broadcast, broadcast_from

# Create a channel with wildcard pattern
room = channel("room:*")

Join Handler

Handle join requests with @channel.on_join:

@room.on_join
def join_room(topic, topic_params, params, socket):
    """Handle joining a chat room.

    Args:
        topic: The full topic (e.g., "room:lobby")
        topic_params: Extracted params (e.g., {"room_id": "lobby"})
        params: Client-provided join params (e.g., {"username": "Alice"})
        socket: The Socket object

    Returns:
        ('ok', response, socket) - Successful join
        ('error', reason) - Rejected join
    """
    room_id = topic_params.get("room_id", "unknown")
    username = params.get("username", "anonymous")

    if not username or username == "anonymous":
        return ('error', {"reason": "username required"})

    # Store user info in socket assigns
    socket = socket.assign("room_id", room_id)
    socket = socket.assign("username", username)

    return ('ok', {
        "room_id": room_id,
        "message": f"Welcome to {room_id}!"
    }, socket)

Event Handlers

Handle events with @channel.on("event_name"):

@room.on("new_msg")
def handle_new_message(payload, socket):
    """Handle a new chat message.

    Returns:
        ('reply', response, socket) - Reply to the client
        ('noreply', socket) - No reply needed
        ('stop', reason, socket) - Stop the channel
    """
    body = payload.get("body", "")
    username = socket.assigns.get("username", "anonymous")

    if not body:
        return ('reply', {"status": "error", "reason": "empty message"}, socket)

    # Broadcast to all users except sender
    broadcast_from(socket, "new_msg", {
        "username": username,
        "body": body
    })

    return ('reply', {"status": "ok"}, socket)

Leave Handler

Handle cleanup when a user leaves:

@room.on_leave
def leave_room(topic, socket):
    """Handle leaving a chat room."""
    username = socket.assigns.get("username", "unknown")
    print(f"User {username} left {topic}")

Socket Assigns

Store per-connection state using socket.assign():

# Store a single value
socket = socket.assign("username", "Alice")

# Access stored values
username = socket.assigns.get("username")

# Merge multiple values
socket = socket.merge_assigns({"role": "admin", "joined_at": "now"})

Broadcasting

broadcast(topic, event, payload)

Send a message to all subscribers of a topic:

from hornbeam_channels import broadcast

# Broadcast to all users in a room
broadcast("room:lobby", "new_msg", {
    "username": "System",
    "body": "Welcome!"
})

broadcast_from(socket, event, payload)

Send a message to all subscribers except the sender:

from hornbeam_channels import broadcast_from

@room.on("new_msg")
def handle_message(payload, socket):
    # Broadcast to others (not back to sender)
    broadcast_from(socket, "new_msg", {
        "username": socket.assigns["username"],
        "body": payload["body"]
    })
    return ('noreply', socket)

Presence

Track who’s online in a channel with the Presence API. Presence is backed by CRDT (Conflict-free Replicated Data Type) for cluster-wide consistency.

Tracking Users

from hornbeam_presence import Presence

@room.on_join
def join_room(topic, topic_params, params, socket):
    username = params.get("username")
    socket = socket.assign("username", username)

    # Track this user's presence
    presence = Presence(topic)
    presence.track(socket, f"user:{username}", {
        "username": username,
        "joined_at": "now",
        "status": "online"
    })

    return ('ok', {"message": "Joined!"}, socket)

Updating Presence

@room.on("update_status")
def handle_status(payload, socket):
    username = socket.assigns["username"]
    status = payload.get("status", "online")

    presence = Presence(socket.topic)
    presence.update(socket, f"user:{username}", {
        "username": username,
        "status": status
    })

    return ('noreply', socket)

Listing Presences

from hornbeam_presence import Presence

# Get all presences for a topic
presence = Presence("room:lobby")
users = presence.list()
# Returns: {"user:alice": {"metas": [{"username": "Alice", ...}]}}

# Get a specific presence
alice = presence.get("user:alice")
# Returns: {"metas": [{"username": "Alice", "status": "online"}]}

Presence Events

The client receives automatic events when presence changes:

  • presence_state - Full state on join
  • presence_diff - Changes (joins/leaves)

JavaScript Client

Socket Connection

// Connect to the WebSocket endpoint
const socket = new Hornbeam.Socket('/ws', {
    params: { token: 'user_token' },
    logger: (kind, msg, data) => console.log(`${kind}: ${msg}`, data)
});

socket.onOpen(() => console.log('Connected'));
socket.onClose(() => console.log('Disconnected'));
socket.onError((error) => console.error('Error:', error));

socket.connect();

Joining Channels

// Create a channel
const channel = socket.channel('room:lobby', { username: 'Alice' });

// Join with callbacks
channel.join()
    .receive('ok', (resp) => {
        console.log('Joined!', resp);
    })
    .receive('error', (resp) => {
        console.error('Join failed:', resp.reason);
    })
    .receive('timeout', () => {
        console.error('Timeout');
    });

Sending and Receiving Events

// Send an event
channel.push('new_msg', { body: 'Hello!' })
    .receive('ok', () => console.log('Sent'))
    .receive('error', (e) => console.error('Error:', e));

// Receive events
channel.on('new_msg', (payload) => {
    console.log(`${payload.username}: ${payload.body}`);
});

Presence Sync

// Set up presence tracking
const presence = new Hornbeam.Presence(channel);

presence.onJoin((id, current, newPres) => {
    if (!current) {
        console.log(`${newPres.metas[0].username} joined`);
    }
});

presence.onLeave((id, current, leftPres) => {
    if (current.metas.length === 0) {
        console.log(`${leftPres.metas[0].username} left`);
    }
});

presence.onSync(() => {
    // Render updated presence list
    const users = presence.list((id, {metas}) => metas[0]);
    console.log('Online:', users.map(u => u.username));
});

Leaving Channels

channel.leave()
    .receive('ok', () => console.log('Left channel'));

// Disconnect socket entirely
socket.disconnect();

Topic Patterns

Channels support wildcard patterns for flexible routing:

PatternMatchestopic_params
room:lobbyroom:lobby only{}
room:*room:lobby, room:123{"room_id": "lobby"}
chat:*:*chat:team:general{"chat_id": "team", "channel_id": "general"}

Error Handling

@room.on("risky_action")
def handle_risky(payload, socket):
    try:
        result = do_something_risky(payload)
        return ('reply', {"result": result}, socket)
    except ValueError as e:
        return ('reply', {"error": str(e)}, socket)
    except Exception as e:
        # Stop the channel on fatal errors
        return ('stop', str(e), socket)

Running

Start your channel application:

hornbeam run your_module:app

Or from Erlang:

hornbeam:start("your_module:app", #{
    worker_class => asgi,
    channel_modules => ["your_module"]
}).

Next Steps