Erlang Integration

Hornbeam’s power comes from accessing Erlang features directly from Python. This guide covers shared state (ETS), distributed RPC, pub/sub messaging, and registered functions.

Shared State (ETS)

ETS (Erlang Term Storage) provides high-performance concurrent key-value storage accessible from Python.

Basic Operations

from hornbeam_erlang import (
    state_get, state_set, state_delete,
    state_incr, state_decr,
    state_get_multi, state_keys
)

# Get and set values
state_set('user:123', {'name': 'Alice', 'email': 'alice@example.com'})
user = state_get('user:123')  # Returns dict or None

# Delete
state_delete('user:123')

Atomic Counters

Perfect for metrics, rate limiting, and sequences:

# Increment (returns new value)
views = state_incr('page_views')           # +1
views = state_incr('page_views', 10)       # +10

# Decrement
remaining = state_decr('quota', 1)

# Use for rate limiting
def check_rate_limit(user_id, limit=100):
    key = f'rate:{user_id}:{minute()}'
    count = state_incr(key)
    if count == 1:
        state_set(f'{key}:ttl', 60)  # Set TTL on first access
    return count <= limit

Batch Operations

# Get multiple keys at once
users = state_get_multi(['user:1', 'user:2', 'user:3'])
# Returns: {'user:1': {...}, 'user:2': {...}, 'user:3': None}

# Find keys by prefix
user_keys = state_keys('user:')
# Returns: ['user:1', 'user:2', 'user:123', ...]

Use Cases

Caching:

def get_product(product_id):
    cached = state_get(f'product:{product_id}')
    if cached:
        return cached

    product = fetch_from_database(product_id)
    state_set(f'product:{product_id}', product)
    return product

Session Storage:

def get_session(session_id):
    return state_get(f'session:{session_id}')

def set_session(session_id, data):
    state_set(f'session:{session_id}', data)

Real-time Metrics:

def track_request(path, status, latency):
    state_incr(f'metrics:requests:{path}')
    state_incr(f'metrics:status:{status}')
    # Update latency histogram
    bucket = latency // 100 * 100  # Round to 100ms buckets
    state_incr(f'metrics:latency:{bucket}')

Distributed RPC

Call functions on remote Erlang nodes in a cluster:

from hornbeam_erlang import rpc_call, rpc_cast, nodes, node

# Get cluster info
current = node()           # This node's name
connected = nodes()        # List of connected nodes

# Synchronous call
result = rpc_call(
    'worker@gpu-server',   # Remote node
    'ml_model',            # Module
    'predict',             # Function
    [input_data],          # Arguments
    timeout_ms=30000       # Timeout
)

# Async call (fire and forget)
rpc_cast('logger@log-server', 'logger', 'log', [
    'info', 'User logged in', {'user_id': 123}
])

Distributed ML Inference

Spread ML workloads across GPU nodes:

from hornbeam_erlang import rpc_call, nodes
import asyncio

async def distributed_embedding(texts):
    """Distribute embedding computation across GPU nodes."""
    gpu_nodes = [n for n in nodes() if 'gpu' in n]

    if not gpu_nodes:
        # Fallback to local
        return local_embed(texts)

    # Split work across nodes
    chunks = split_list(texts, len(gpu_nodes))
    results = []

    for node, chunk in zip(gpu_nodes, chunks):
        result = rpc_call(
            node,
            'embedding_service',
            'encode',
            [chunk],
            timeout_ms=60000
        )
        results.extend(result)

    return results

Error Handling

from hornbeam_erlang import rpc_call

try:
    result = rpc_call('worker@remote', 'mod', 'func', [args])
except TimeoutError:
    # Node didn't respond in time
    result = fallback()
except ConnectionError:
    # Node not connected
    result = fallback()
except Exception as e:
    # Remote function raised an error
    log_error(f"RPC failed: {e}")
    result = fallback()

Pub/Sub Messaging

pg-based publish/subscribe for real-time messaging:

from hornbeam_erlang import publish, subscribe, unsubscribe

# Subscribe current process to topic
subscribe('notifications')
subscribe('user:123:events')

# Publish message (returns subscriber count)
count = publish('notifications', {
    'type': 'alert',
    'message': 'Server restart in 5 minutes'
})

# Unsubscribe
unsubscribe('notifications')

WebSocket with Pub/Sub

from hornbeam_erlang import publish, subscribe, receive_pubsub

async def websocket_handler(scope, receive, send):
    await send({'type': 'websocket.accept'})

    user_id = get_user_from_scope(scope)
    subscribe(f'user:{user_id}')

    try:
        while True:
            # Check for client messages
            message = await receive()
            if message['type'] == 'websocket.disconnect':
                break

            # Check for pub/sub messages
            pubsub_msg = receive_pubsub(timeout=0)
            if pubsub_msg:
                await send({
                    'type': 'websocket.send',
                    'text': json.dumps(pubsub_msg)
                })
    finally:
        unsubscribe(f'user:{user_id}')

Broadcasting Updates

def update_product(product_id, data):
    # Update in ETS
    state_set(f'product:{product_id}', data)

    # Notify all subscribers
    publish(f'product:{product_id}', {
        'type': 'updated',
        'product_id': product_id,
        'data': data
    })

Registered Functions

Call Erlang functions from Python:

Register in Erlang

%% Register a simple function
hornbeam:register_function(add, fun([A, B]) -> A + B end).

%% Register a module function
hornbeam:register_function(get_user, user_db, get).

%% Register with validation
hornbeam:register_function(validate_token, fun([Token]) ->
    case auth:verify(Token) of
        {ok, UserId} -> {ok, UserId};
        error -> {error, invalid_token}
    end
end).

Call from Python

from hornbeam_erlang import call, cast

# Synchronous call
result = call('add', 1, 2)  # Returns 3

# Get user from Erlang
user = call('get_user', user_id)

# Validate token
try:
    user_id = call('validate_token', token)
except Exception as e:
    return {'error': 'Invalid token'}

# Async call (fire and forget)
cast('log_event', 'user_login', {'user_id': 123})

Use Cases

Authentication:

%% Erlang side
hornbeam:register_function(verify_session, fun([SessionId]) ->
    case session_store:get(SessionId) of
        {ok, Session} -> {ok, Session};
        not_found -> {error, not_found}
    end
end).
# Python side
def get_current_user(request):
    session_id = request.cookies.get('session_id')
    if not session_id:
        return None

    try:
        session = call('verify_session', session_id)
        return session.get('user')
    except:
        return None

Feature Flags:

%% Erlang side
hornbeam:register_function(feature_enabled, fun([Feature, UserId]) ->
    feature_flags:is_enabled(Feature, UserId)
end).
# Python side
def get_features(user_id):
    return {
        'new_ui': call('feature_enabled', 'new_ui', user_id),
        'beta': call('feature_enabled', 'beta', user_id),
    }

Hooks

Execute Erlang code at key points in request lifecycle:

%% Configure hooks
hornbeam:start("app:application", #{
    hooks => #{
        on_request => fun(Request) ->
            %% Log, authenticate, modify request
            Request
        end,
        on_response => fun(Response) ->
            %% Modify response, log metrics
            Response
        end,
        on_error => fun(Error, Request) ->
            %% Error handling, alerting
            error_logger:error_msg("Error: ~p~n", [Error]),
            {500, "Internal Error"}
        end
    }
}).

Best Practices

1. Use ETS for Hot Data

# Good: Frequently accessed, changes often
state_set('rate_limit:user:123', count)
state_set('session:abc123', session_data)

# Bad: Large, rarely accessed (use database)
state_set('user:history', huge_list)

2. Atomic Operations

# Good: Atomic increment
count = state_incr('counter')

# Bad: Race condition
count = state_get('counter') or 0
state_set('counter', count + 1)

3. Key Naming Convention

# Use structured keys
state_set('user:123:profile', profile)
state_set('cache:product:456', product)
state_set('metric:requests:total', count)

4. Handle Missing Keys

# state_get returns None for missing keys
value = state_get('maybe_exists')
if value is None:
    value = compute_default()
    state_set('maybe_exists', value)

Next Steps