Hooks
Hooks provide a bidirectional callback system between Python and Erlang. They enable:
- Python calling Erlang: Execute Erlang functions from your Python web app
- Erlang calling Python: Invoke Python ML models or services from Erlang
- Streaming: Stream results from either side
- Async execution: Fire-and-forget or await results
Concepts
A hook is a registered handler identified by an app_path string. Handlers can be:
- Python functions, classes, or instances
- Erlang functions or module:function references
When you call execute(app_path, action, args), Hornbeam routes to the registered handler and calls the appropriate method/function.
Python Handlers
Function Handler
The simplest handler is a function that dispatches on action:
from hornbeam_erlang import register_hook, execute
def embedding_handler(action, *args, **kwargs):
if action == 'encode':
text = args[0]
return model.encode(text)
elif action == 'similarity':
a, b = args[0], args[1]
return cosine_similarity(
model.encode(a),
model.encode(b)
)
else:
raise ValueError(f"Unknown action: {action}")
# Register during app startup
register_hook('embeddings', embedding_handler)
# Call from anywhere
embedding = execute('embeddings', 'encode', "Hello world")
sim = execute('embeddings', 'similarity', "Hello", "Hi there")
Class Handler
For stateful handlers, use a class. Hornbeam instantiates it and calls methods matching action names:
from hornbeam_erlang import register_hook
class EmbeddingService:
def __init__(self):
# Called once when hook is registered
self.model = SentenceTransformer('all-MiniLM-L6-v2')
def encode(self, text):
"""Action: encode"""
return self.model.encode(text).tolist()
def similarity(self, text_a, text_b):
"""Action: similarity"""
emb_a = self.model.encode(text_a)
emb_b = self.model.encode(text_b)
return float(cosine_similarity([emb_a], [emb_b])[0][0])
def batch_encode(self, texts):
"""Action: batch_encode"""
return self.model.encode(texts).tolist()
# Register class (instantiated automatically)
register_hook('embeddings', EmbeddingService)
Instance Handler
If you need custom initialization, pass an instance:
from hornbeam_erlang import register_hook
class MLService:
def __init__(self, model_name, device):
self.model = load_model(model_name).to(device)
def predict(self, input_data):
return self.model(input_data)
# Custom initialization
service = MLService('bert-base', device='cuda:0')
register_hook('ml', service)
Erlang Handlers
Function Handler
Register an anonymous function:
hornbeam_hooks:reg(<<"calculator">>, fun(Action, Args, _Kwargs) ->
case Action of
<<"add">> ->
[A, B] = Args,
A + B;
<<"multiply">> ->
[A, B] = Args,
A * B;
_ ->
{error, unknown_action}
end
end).
Module Handler
Register a module:function reference:
%% In my_handler.erl
-module(my_handler).
-export([handle/3]).
handle(<<"get_user">>, [UserId], _Kwargs) ->
user_db:fetch(UserId);
handle(<<"create_user">>, [Name, Email], _Kwargs) ->
user_db:create(#{name => Name, email => Email});
handle(Action, _Args, _Kwargs) ->
{error, {unknown_action, Action}}.
%% Register
hornbeam_hooks:reg(<<"users">>, my_handler, handle, 3).
Calling Hooks
From Python
from hornbeam_erlang import execute, execute_async, await_result, stream
# Synchronous call
result = execute('embeddings', 'encode', text)
# Call with kwargs
result = execute('service', 'process', data, option='fast')
# Async call
task_id = execute_async('ml', 'train', large_dataset)
# ... do other work ...
result = await_result(task_id, timeout_ms=60000)
# Streaming
for chunk in stream('llm', 'generate', prompt):
print(chunk, end='', flush=True)
From Erlang
%% Synchronous
{ok, Embedding} = hornbeam_hooks:execute(<<"embeddings">>, <<"encode">>, [Text], #{}).
%% Async
{ok, TaskId} = hornbeam_hooks:execute_async(<<"ml">>, <<"train">>, [Data], #{}),
%% ... do other work ...
{ok, Result} = hornbeam_hooks:await_result(TaskId, 60000).
%% Streaming
{ok, Gen} = hornbeam_hooks:stream(<<"llm">>, <<"generate">>, [Prompt], #{}),
stream_loop(Gen).
stream_loop(Gen) ->
case Gen() of
{value, Chunk} ->
io:format("~s", [Chunk]),
stream_loop(Gen);
done ->
ok
end.
Streaming Handlers
Python Streaming
Return a generator for streaming responses:
from hornbeam_erlang import register_hook
def llm_handler(action, *args, **kwargs):
if action == 'generate':
prompt = args[0]
# Return generator for streaming
return generate_stream(prompt)
def generate_stream(prompt):
response = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
register_hook('llm', llm_handler)
Erlang Streaming
Use helper functions to create streams:
%% Stream from a list
hornbeam_hooks:reg(<<"numbers">>, fun(<<"range">>, [Start, End], _) ->
List = lists:seq(Start, End),
hornbeam_hooks:stream_from_list(List)
end).
%% Stream from a function
hornbeam_hooks:reg(<<"counter">>, fun(<<"count">>, [Max], _) ->
Ref = make_ref(),
put({counter, Ref}, 0),
hornbeam_hooks:stream_from_fun(fun() ->
N = get({counter, Ref}),
if
N >= Max -> hornbeam_hooks:stream_done();
true ->
put({counter, Ref}, N + 1),
hornbeam_hooks:stream_chunk(N)
end
end)
end).
ASGI Lifespan Integration
Register hooks during ASGI lifespan startup:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from hornbeam_erlang import register_hook, unregister_hook
@asynccontextmanager
async def lifespan(app):
# Startup: register hooks
model = load_embedding_model()
def handler(action, *args, **kwargs):
if action == 'encode':
return model.encode(args[0]).tolist()
register_hook('embeddings', handler)
yield # App runs here
# Shutdown: cleanup
unregister_hook('embeddings')
app = FastAPI(lifespan=lifespan)
@app.post("/embed")
async def embed(text: str):
# Use the registered hook
from hornbeam_erlang import execute
embedding = execute('embeddings', 'encode', text)
return {"embedding": embedding}
Cross-Language Patterns
Python Service Called from Erlang
# Python: Register ML service
from hornbeam_erlang import register_hook
class ImageClassifier:
def __init__(self):
self.model = load_model('resnet50')
def classify(self, image_url):
image = download_image(image_url)
return self.model.predict(image)
register_hook('classifier', ImageClassifier)
%% Erlang: Call from request handler
handle_request(Req) ->
ImageUrl = get_image_url(Req),
{ok, Classification} = hornbeam_hooks:execute(
<<"classifier">>,
<<"classify">>,
[ImageUrl],
#{}
),
respond_json(Req, Classification).
Erlang Service Called from Python
%% Erlang: Register authentication service
hornbeam_hooks:reg(<<"auth">>, fun
(<<"verify_token">>, [Token], _) ->
case auth_server:verify(Token) of
{ok, UserId} -> {ok, UserId};
error -> {error, invalid_token}
end;
(<<"create_session">>, [UserId], _) ->
SessionId = auth_server:create_session(UserId),
{ok, SessionId}
end).
# Python: Call from web handler
from hornbeam_erlang import execute
def login_required(f):
def wrapper(request, *args, **kwargs):
token = request.headers.get('Authorization')
try:
user_id = execute('auth', 'verify_token', token)
request.user_id = user_id
return f(request, *args, **kwargs)
except:
return {'error': 'Unauthorized'}, 401
return wrapper
Best Practices
1. Use Descriptive App Paths
# Good
register_hook('embeddings.sentence_transformer', handler)
register_hook('auth.token_validator', handler)
# Avoid
register_hook('h1', handler)
2. Handle Errors Gracefully
def handler(action, *args, **kwargs):
try:
if action == 'encode':
return model.encode(args[0])
except Exception as e:
# Return error tuple for Erlang compatibility
return {'error': str(e)}
3. Register During Lifespan
# Good: Register in lifespan
@asynccontextmanager
async def lifespan(app):
register_hook('service', handler)
yield
unregister_hook('service')
# Avoid: Register at module level (may cause issues)
register_hook('service', handler) # Don't do this
4. Use Streaming for Large Responses
# Good: Stream large responses
def handler(action, *args, **kwargs):
if action == 'get_all_users':
return stream_users() # Generator
def stream_users():
for batch in db.iter_users(batch_size=100):
for user in batch:
yield user
# Avoid: Load everything in memory
def handler(action, *args, **kwargs):
if action == 'get_all_users':
return list(db.get_all_users()) # Memory hog!
See Also
- Python API Reference - Full function documentation
- Erlang API Reference - Erlang modules (hex.pm)
- Custom Apps Guide - Building full applications