ML Integration
Hornbeam makes Python ML/AI workloads production-ready with ETS caching, distributed inference, and Erlang’s fault tolerance.
Cached Inference
Use ETS to cache ML inference results:
from hornbeam_ml import cached_inference, cache_stats
# Load model at startup
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
def get_embedding(text):
# Automatically cached by input hash
return cached_inference(model.encode, text)
# Check cache performance
stats = cache_stats()
# {'hits': 1000, 'misses': 100, 'hit_rate': 0.91}
Custom Cache Keys
from hornbeam_ml import cached_inference
def get_embedding(text, model_name='default'):
# Custom cache key includes model name
return cached_inference(
model.encode,
text,
cache_key=f'{model_name}:{hash(text)}',
cache_prefix='embedding'
)
Cache with TTL
from hornbeam_erlang import state_get, state_set
import hashlib
import time
def cached_with_ttl(fn, input, ttl_seconds=3600):
"""Cache result with time-to-live."""
key = f'cache:{hashlib.md5(str(input).encode()).hexdigest()}'
cached = state_get(key)
if cached:
if time.time() - cached['timestamp'] < ttl_seconds:
return cached['result']
result = fn(input)
state_set(key, {
'result': result,
'timestamp': time.time()
})
return result
Embedding Service
Complete embedding service with caching:
# embedding_service.py
from fastapi import FastAPI
from pydantic import BaseModel
from hornbeam_ml import cached_inference, cache_stats
from hornbeam_erlang import state_incr
from sentence_transformers import SentenceTransformer
import numpy as np
app = FastAPI()
model = None
@app.on_event("startup")
async def load_model():
global model
model = SentenceTransformer('all-MiniLM-L6-v2')
class EmbedRequest(BaseModel):
texts: list[str]
class EmbedResponse(BaseModel):
embeddings: list[list[float]]
cache_hits: int
cache_misses: int
@app.post("/embed", response_model=EmbedResponse)
async def embed(request: EmbedRequest):
state_incr('metrics:embed_requests')
embeddings = []
for text in request.texts:
emb = cached_inference(model.encode, text)
embeddings.append(emb.tolist())
stats = cache_stats()
return EmbedResponse(
embeddings=embeddings,
cache_hits=stats['hits'],
cache_misses=stats['misses']
)
@app.get("/stats")
async def get_stats():
return cache_stats()
hornbeam:start("embedding_service:app", #{
worker_class => asgi,
lifespan => on,
workers => 4
}).
Semantic Search
Build semantic search with ETS-cached embeddings:
from hornbeam_erlang import state_get, state_set, state_keys
from hornbeam_ml import cached_inference
import numpy as np
def index_document(doc_id, text):
"""Index a document for semantic search."""
embedding = cached_inference(model.encode, text)
state_set(f'doc:{doc_id}', {
'text': text,
'embedding': embedding.tolist()
})
def search(query, top_k=10):
"""Search documents by semantic similarity."""
query_emb = cached_inference(model.encode, query)
query_emb = np.array(query_emb)
# Get all documents
doc_keys = state_keys('doc:')
results = []
for key in doc_keys:
doc = state_get(key)
if doc:
doc_emb = np.array(doc['embedding'])
similarity = cosine_similarity(query_emb, doc_emb)
results.append({
'id': key.replace('doc:', ''),
'text': doc['text'],
'score': float(similarity)
})
# Sort by similarity
results.sort(key=lambda x: x['score'], reverse=True)
return results[:top_k]
def cosine_similarity(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
Distributed Inference
Spread ML workloads across a cluster:
from hornbeam_erlang import rpc_call, nodes
import asyncio
class DistributedInference:
def __init__(self, node_filter='gpu'):
self.node_filter = node_filter
def get_worker_nodes(self):
"""Get available GPU nodes."""
return [n for n in nodes() if self.node_filter in n]
async def predict_batch(self, inputs):
"""Distribute predictions across nodes."""
worker_nodes = self.get_worker_nodes()
if not worker_nodes:
# No GPU nodes, run locally
return self.local_predict(inputs)
# Split inputs across workers
chunks = self.split_inputs(inputs, len(worker_nodes))
results = []
for node, chunk in zip(worker_nodes, chunks):
try:
result = rpc_call(
node,
'ml_worker',
'predict',
[chunk],
timeout_ms=60000
)
results.extend(result)
except Exception as e:
# Fallback to local on failure
results.extend(self.local_predict(chunk))
return results
def split_inputs(self, inputs, n):
"""Split list into n roughly equal chunks."""
k, m = divmod(len(inputs), n)
return [inputs[i*k+min(i,m):(i+1)*k+min(i+1,m)] for i in range(n)]
def local_predict(self, inputs):
return model.predict(inputs)
GPU Node Setup
On each GPU node:
%% gpu_node.erl
-module(ml_worker).
-export([predict/1, encode/1]).
predict(Inputs) ->
py:call('model', 'predict', [Inputs]).
encode(Texts) ->
py:call('model', 'encode', [Texts]).
LLM Integration
Integrate with LLM APIs:
from hornbeam_erlang import state_get, state_set
import hashlib
import openai
def cached_llm_call(prompt, model="gpt-4", temperature=0):
"""Cache LLM responses for identical prompts."""
# Only cache deterministic (temp=0) responses
if temperature == 0:
cache_key = f'llm:{model}:{hashlib.md5(prompt.encode()).hexdigest()}'
cached = state_get(cache_key)
if cached:
return cached
response = openai.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature
)
result = response.choices[0].message.content
if temperature == 0:
state_set(cache_key, result)
return result
Streaming LLM Responses
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai
app = FastAPI()
async def stream_llm(prompt):
stream = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield f"data: {chunk.choices[0].delta.content}\n\n"
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def chat_stream(prompt: str):
return StreamingResponse(
stream_llm(prompt),
media_type="text/event-stream"
)
RAG (Retrieval-Augmented Generation)
Combine semantic search with LLM:
from hornbeam_ml import cached_inference
from hornbeam_erlang import state_get, state_keys
import openai
def rag_query(question, top_k=5):
"""Answer question using retrieved context."""
# 1. Retrieve relevant documents
docs = search(question, top_k=top_k)
# 2. Build context
context = "\n\n".join([
f"Document {i+1}:\n{doc['text']}"
for i, doc in enumerate(docs)
])
# 3. Generate answer with LLM
prompt = f"""Answer the question based on the following context.
Context:
{context}
Question: {question}
Answer:"""
response = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return {
'answer': response.choices[0].message.content,
'sources': [doc['id'] for doc in docs]
}
Model Loading with Lifespan
Load models at startup, not per-request:
from contextlib import asynccontextmanager
from fastapi import FastAPI
models = {}
@asynccontextmanager
async def lifespan(app):
# Startup: Load all models
models['embedding'] = SentenceTransformer('all-MiniLM-L6-v2')
models['classifier'] = load_classifier('model.pkl')
print("Models loaded!")
yield
# Shutdown: Cleanup
models.clear()
app = FastAPI(lifespan=lifespan)
@app.post("/embed")
async def embed(text: str):
return models['embedding'].encode(text).tolist()
Batch Processing
Efficient batch processing with BEAM parallelism:
from hornbeam_erlang import rpc_call
import concurrent.futures
def process_batch(items, batch_size=100):
"""Process items in parallel batches."""
results = []
# Split into batches
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
# Process batches in parallel using Erlang processes
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(process_single_batch, batch)
for batch in batches
]
for future in concurrent.futures.as_completed(futures):
results.extend(future.result())
return results
def process_single_batch(batch):
# This runs in a separate BEAM process
embeddings = model.encode(batch)
return embeddings.tolist()
Monitoring ML Performance
from hornbeam_erlang import state_incr, state_get
import time
class MLMetrics:
@staticmethod
def track_inference(model_name, latency_ms):
state_incr(f'ml:{model_name}:calls')
state_incr(f'ml:{model_name}:latency_total', int(latency_ms))
# Track latency buckets
bucket = (latency_ms // 100) * 100
state_incr(f'ml:{model_name}:latency:{bucket}')
@staticmethod
def get_stats(model_name):
calls = state_get(f'ml:{model_name}:calls') or 0
latency_total = state_get(f'ml:{model_name}:latency_total') or 0
return {
'calls': calls,
'avg_latency_ms': latency_total / calls if calls > 0 else 0
}
# Usage
def timed_inference(model, input):
start = time.time()
result = model.predict(input)
latency = (time.time() - start) * 1000
MLMetrics.track_inference('classifier', latency)
return result
Next Steps
- Embedding Service Example - Complete service
- Distributed ML Example - Cluster setup
- Erlang Python AI Guide - Low-level AI integration