Distributed RPC

Hornbeam leverages Erlang’s built-in distribution to call functions across cluster nodes. This enables:

  • Distributed ML inference: Send computations to GPU nodes
  • Load distribution: Spread work across workers
  • Service isolation: Keep heavy processing on dedicated nodes
  • Fault tolerance: Failover to backup nodes

Cluster Setup

Node Configuration

Each Erlang node needs a name and cookie:

# Start nodes with names
erl -name web@192.168.1.10 -setcookie mysecret
erl -name gpu@192.168.1.20 -setcookie mysecret
erl -name db@192.168.1.30 -setcookie mysecret

Or via vm.args:

## vm.args
-name hornbeam@192.168.1.10
-setcookie mysecret
+K true
+A 128

Connecting Nodes

Nodes connect automatically when you first communicate:

%% From web node, connect to gpu node
net_adm:ping('gpu@192.168.1.20').
%% pong = connected, pang = failed

Or use Hornbeam’s API:

from hornbeam_erlang import nodes, node
from hornbeam_dist import connect, ping

# Check current node
print(f"This node: {node()}")

# Connect to GPU node
connect('gpu@192.168.1.20')

# Check connection
print(ping('gpu@192.168.1.20'))  # 'pong' or 'pang'

# List all connected nodes
print(nodes())

Basic RPC

Synchronous Calls

from hornbeam_erlang import rpc_call

# Call function on remote node
result = rpc_call(
    'worker@gpu-server',    # Node
    'ml_model',             # Module
    'predict',              # Function
    [input_data],           # Args (as list)
    timeout_ms=30000        # Timeout
)

Asynchronous Calls

Fire and forget - don’t wait for result:

from hornbeam_erlang import rpc_cast

# Log to remote server (non-blocking)
rpc_cast(
    'logger@log-server',
    'event_logger',
    'log',
    ['info', 'User logged in', {'user_id': 123}]
)

Distributed ML Patterns

GPU Node for Inference

# Python app on web node
from hornbeam_erlang import rpc_call, nodes

def get_embedding(text):
    """Get embedding from GPU node."""
    gpu_nodes = [n for n in nodes() if 'gpu' in n]

    if not gpu_nodes:
        raise RuntimeError("No GPU nodes available")

    return rpc_call(
        gpu_nodes[0],
        'embedding_service',
        'encode',
        [text],
        timeout_ms=10000
    )
%% embedding_service.erl on GPU node
-module(embedding_service).
-export([encode/1, batch_encode/1]).

encode(Text) ->
    %% Call Python model via py module
    py:call(embedding_model, encode, [Text]).

batch_encode(Texts) ->
    py:call(embedding_model, batch_encode, [Texts]).

Load Balancing

Distribute requests across workers:

from hornbeam_erlang import rpc_call, nodes
import random

def balanced_inference(data):
    """Round-robin across worker nodes."""
    workers = [n for n in nodes() if 'worker' in n]

    if not workers:
        return local_inference(data)

    # Random selection (simple load balancing)
    node = random.choice(workers)

    try:
        return rpc_call(node, 'inference', 'run', [data], 30000)
    except Exception:
        # Failover to another node
        workers.remove(node)
        if workers:
            return rpc_call(workers[0], 'inference', 'run', [data], 30000)
        return local_inference(data)

Parallel Distributed Processing

from hornbeam_erlang import rpc_call, nodes
from concurrent.futures import ThreadPoolExecutor, as_completed

def parallel_inference(items):
    """Process items in parallel across nodes."""
    workers = [n for n in nodes() if 'worker' in n]

    if not workers:
        # Local processing
        return [process(item) for item in items]

    # Distribute items across workers
    chunks = split_list(items, len(workers))
    results = []

    with ThreadPoolExecutor(max_workers=len(workers)) as executor:
        futures = {}
        for node, chunk in zip(workers, chunks):
            future = executor.submit(
                rpc_call,
                node,
                'batch_processor',
                'process_batch',
                [chunk],
                60000
            )
            futures[future] = node

        for future in as_completed(futures):
            try:
                result = future.result()
                results.extend(result)
            except Exception as e:
                print(f"Node {futures[future]} failed: {e}")

    return results

Erlang-Side Patterns

Service Discovery

%% Find nodes by role
-module(cluster).
-export([gpu_nodes/0, worker_nodes/0, find_node/1]).

gpu_nodes() ->
    [N || N <- nodes(), is_gpu_node(N)].

worker_nodes() ->
    [N || N <- nodes(), is_worker_node(N)].

is_gpu_node(Node) ->
    case atom_to_list(Node) of
        "gpu" ++ _ -> true;
        _ -> false
    end.

is_worker_node(Node) ->
    case atom_to_list(Node) of
        "worker" ++ _ -> true;
        _ -> false
    end.

find_node(Role) when is_atom(Role) ->
    Pattern = atom_to_list(Role),
    Nodes = [N || N <- nodes(),
             lists:prefix(Pattern, atom_to_list(N))],
    case Nodes of
        [] -> {error, no_nodes};
        [H|_] -> {ok, H}
    end.

Distributed Task Queue

%% task_queue.erl
-module(task_queue).
-export([submit/2, get_result/1]).

submit(Task, Opts) ->
    Workers = cluster:worker_nodes(),
    Worker = select_worker(Workers, Opts),
    TaskId = make_task_id(),

    %% Send to worker
    rpc:cast(Worker, task_worker, execute, [TaskId, Task]),

    %% Store pending task
    hornbeam_state:set(<<"task:", TaskId/binary>>, #{
        status => pending,
        worker => Worker,
        submitted => erlang:system_time(second)
    }),

    {ok, TaskId}.

get_result(TaskId) ->
    case hornbeam_state:get(<<"task:", TaskId/binary>>) of
        #{status := completed, result := Result} ->
            {ok, Result};
        #{status := pending} ->
            {error, pending};
        #{status := failed, error := Error} ->
            {error, Error};
        undefined ->
            {error, not_found}
    end.

select_worker(Workers, #{prefer_gpu := true}) ->
    case [W || W <- Workers, cluster:is_gpu_node(W)] of
        [] -> hd(Workers);
        GpuWorkers -> hd(GpuWorkers)
    end;
select_worker(Workers, _) ->
    %% Round-robin (use process dictionary for state)
    Index = case get(worker_index) of
        undefined -> 0;
        I -> (I + 1) rem length(Workers)
    end,
    put(worker_index, Index),
    lists:nth(Index + 1, Workers).

Error Handling

Timeout Handling

from hornbeam_erlang import rpc_call

def safe_rpc(node, module, function, args, timeout=5000, retries=3):
    """RPC with retry logic."""
    last_error = None

    for attempt in range(retries):
        try:
            return rpc_call(node, module, function, args, timeout)
        except TimeoutError:
            last_error = TimeoutError(f"Timeout after {timeout}ms")
            timeout = int(timeout * 1.5)  # Exponential backoff
        except ConnectionError as e:
            last_error = e
            time.sleep(0.1 * (attempt + 1))

    raise last_error

Failover

from hornbeam_erlang import rpc_call, nodes, ping

def resilient_call(service, function, args, timeout=10000):
    """Call with automatic failover."""
    service_nodes = [n for n in nodes() if service in n]

    for node in service_nodes:
        if ping(node) != 'pong':
            continue

        try:
            return rpc_call(node, service, function, args, timeout)
        except Exception as e:
            print(f"Node {node} failed: {e}")
            continue

    raise RuntimeError(f"All {service} nodes unavailable")

Circuit Breaker

from hornbeam_erlang import rpc_call, state_get, state_set, state_incr
import time

class CircuitBreaker:
    def __init__(self, node, threshold=5, reset_timeout=60):
        self.node = node
        self.threshold = threshold
        self.reset_timeout = reset_timeout
        self.key = f"circuit:{node}"

    def is_open(self):
        state = state_get(self.key)
        if not state:
            return False

        if state.get('open'):
            # Check if reset timeout passed
            if time.time() - state.get('opened_at', 0) > self.reset_timeout:
                state_set(self.key, {'failures': 0, 'open': False})
                return False
            return True
        return False

    def record_failure(self):
        failures = state_incr(f"{self.key}:failures")
        if failures >= self.threshold:
            state_set(self.key, {
                'open': True,
                'opened_at': time.time(),
                'failures': failures
            })

    def record_success(self):
        state_set(self.key, {'failures': 0, 'open': False})

    def call(self, module, function, args, timeout=5000):
        if self.is_open():
            raise RuntimeError(f"Circuit open for {self.node}")

        try:
            result = rpc_call(self.node, module, function, args, timeout)
            self.record_success()
            return result
        except Exception as e:
            self.record_failure()
            raise

Monitoring

Node Health

from hornbeam_erlang import nodes, rpc_call

def cluster_health():
    """Check health of all nodes."""
    health = {}

    for node in nodes():
        try:
            # Call health check on each node
            status = rpc_call(node, 'health', 'check', [], 5000)
            health[node] = {'status': 'healthy', **status}
        except TimeoutError:
            health[node] = {'status': 'timeout'}
        except Exception as e:
            health[node] = {'status': 'error', 'error': str(e)}

    return health

Metrics Collection

%% Collect metrics from all nodes
collect_cluster_metrics() ->
    Nodes = nodes(),
    Metrics = lists:map(fun(Node) ->
        case rpc:call(Node, metrics, get_all, [], 5000) of
            {badrpc, Reason} ->
                {Node, {error, Reason}};
            Result ->
                {Node, Result}
        end
    end, Nodes),
    maps:from_list(Metrics).

Best Practices

1. Use Appropriate Timeouts

# Short timeout for health checks
ping_result = rpc_call(node, 'health', 'ping', [], timeout_ms=1000)

# Longer timeout for ML inference
embedding = rpc_call(gpu_node, 'ml', 'encode', [text], timeout_ms=30000)

# Very long timeout for training
rpc_call(gpu_node, 'ml', 'train', [dataset], timeout_ms=3600000)

2. Batch Operations

# Bad: Many small calls
embeddings = []
for text in texts:
    emb = rpc_call(node, 'ml', 'encode', [text], 5000)
    embeddings.append(emb)

# Good: Single batch call
embeddings = rpc_call(node, 'ml', 'batch_encode', [texts], 30000)

3. Local Fallback

def get_embedding(text):
    gpu_nodes = [n for n in nodes() if 'gpu' in n]

    if gpu_nodes:
        try:
            return rpc_call(gpu_nodes[0], 'ml', 'encode', [text], 10000)
        except Exception:
            pass

    # Local fallback
    return local_model.encode(text)

4. Fire-and-Forget for Logging

from hornbeam_erlang import rpc_cast

# Don't wait for logging
rpc_cast('logger@server', 'audit_log', 'record', [
    'user_action',
    {'user_id': user_id, 'action': 'login', 'ip': ip}
])

See Also