Embedding Service Example
A production-ready embedding service using sentence-transformers with ETS-backed caching for high-throughput semantic search.
Project Structure
embedding_service/
├── app.py # FastAPI application
├── embeddings.py # Embedding logic
├── search.py # Search implementation
├── config.py # Configuration
└── requirements.txt
Application Code
# app.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from hornbeam_erlang import state_get, state_set, state_incr
from hornbeam_ml import cached_inference, cache_stats
import numpy as np
from typing import Optional
# ============================================================
# Configuration
# ============================================================
class Config:
MODEL_NAME = 'all-MiniLM-L6-v2'
CACHE_PREFIX = 'emb'
MAX_BATCH_SIZE = 100
SIMILARITY_THRESHOLD = 0.5
config = Config()
# ============================================================
# Models
# ============================================================
class EmbedRequest(BaseModel):
texts: list[str]
cache: bool = True
class EmbedResponse(BaseModel):
embeddings: list[list[float]]
dimensions: int
cached_count: int
class IndexRequest(BaseModel):
id: str
text: str
metadata: Optional[dict] = None
class SearchRequest(BaseModel):
query: str
top_k: int = 10
threshold: float = 0.5
class SearchResult(BaseModel):
id: str
text: str
score: float
metadata: Optional[dict]
# ============================================================
# ML Model
# ============================================================
model = None
@asynccontextmanager
async def lifespan(app):
global model
print(f"Loading model: {config.MODEL_NAME}")
from sentence_transformers import SentenceTransformer
model = SentenceTransformer(config.MODEL_NAME)
print(f"Model loaded. Dimensions: {model.get_sentence_embedding_dimension()}")
# Warm up cache
_ = model.encode("warmup")
yield
model = None
app = FastAPI(
title="Embedding Service",
description="High-performance embedding service with ETS caching",
lifespan=lifespan
)
# ============================================================
# Embedding Endpoints
# ============================================================
@app.post("/embed", response_model=EmbedResponse)
async def embed(request: EmbedRequest):
"""Generate embeddings for texts."""
if len(request.texts) > config.MAX_BATCH_SIZE:
raise HTTPException(
status_code=400,
detail=f"Maximum batch size is {config.MAX_BATCH_SIZE}"
)
state_incr('metrics:embed_requests')
state_incr('metrics:embed_texts', len(request.texts))
embeddings = []
cached_count = 0
for text in request.texts:
if request.cache:
# Use cached inference
emb = cached_inference(
model.encode,
text,
cache_prefix=config.CACHE_PREFIX
)
# Check if it was a cache hit
if state_get(f'_cache_hit'):
cached_count += 1
else:
emb = model.encode(text)
embeddings.append(emb.tolist())
return EmbedResponse(
embeddings=embeddings,
dimensions=len(embeddings[0]) if embeddings else 0,
cached_count=cached_count
)
@app.post("/embed/batch")
async def embed_batch(request: EmbedRequest):
"""Batch embed for better throughput (less caching)."""
if len(request.texts) > config.MAX_BATCH_SIZE:
raise HTTPException(status_code=400, detail="Batch too large")
state_incr('metrics:batch_requests')
# Batch encode is more efficient but doesn't use per-item cache
embeddings = model.encode(request.texts)
return {
'embeddings': embeddings.tolist(),
'dimensions': embeddings.shape[1]
}
# ============================================================
# Index & Search
# ============================================================
@app.post("/index")
async def index_document(request: IndexRequest):
"""Index a document for search."""
state_incr('metrics:index_requests')
# Generate embedding
embedding = cached_inference(model.encode, request.text)
# Store document
doc = {
'id': request.id,
'text': request.text,
'embedding': embedding.tolist(),
'metadata': request.metadata or {}
}
state_set(f'doc:{request.id}', doc)
# Update index stats
state_incr('index:doc_count')
return {'indexed': request.id}
@app.delete("/index/{doc_id}")
async def delete_document(doc_id: str):
"""Remove document from index."""
from hornbeam_erlang import state_delete
if not state_get(f'doc:{doc_id}'):
raise HTTPException(status_code=404, detail="Document not found")
state_delete(f'doc:{doc_id}')
state_incr('index:doc_count', -1)
return {'deleted': doc_id}
@app.post("/search", response_model=list[SearchResult])
async def search(request: SearchRequest):
"""Semantic search across indexed documents."""
state_incr('metrics:search_requests')
# Get query embedding
query_emb = cached_inference(model.encode, request.query)
query_emb = np.array(query_emb)
# Search all documents
from hornbeam_erlang import state_keys
results = []
for key in state_keys('doc:'):
doc = state_get(key)
if not doc:
continue
doc_emb = np.array(doc['embedding'])
# Cosine similarity
similarity = np.dot(query_emb, doc_emb) / (
np.linalg.norm(query_emb) * np.linalg.norm(doc_emb)
)
if similarity >= request.threshold:
results.append(SearchResult(
id=doc['id'],
text=doc['text'],
score=float(similarity),
metadata=doc.get('metadata')
))
# Sort by score
results.sort(key=lambda x: x.score, reverse=True)
return results[:request.top_k]
# ============================================================
# Stats & Health
# ============================================================
@app.get("/stats")
async def get_stats():
"""Get service statistics."""
ml_cache = cache_stats()
return {
'model': config.MODEL_NAME,
'index': {
'documents': state_get('index:doc_count') or 0
},
'requests': {
'embed': state_get('metrics:embed_requests') or 0,
'batch': state_get('metrics:batch_requests') or 0,
'search': state_get('metrics:search_requests') or 0,
'index': state_get('metrics:index_requests') or 0
},
'cache': {
'hits': ml_cache.get('hits', 0),
'misses': ml_cache.get('misses', 0),
'hit_rate': ml_cache.get('hit_rate', 0)
}
}
@app.get("/health")
async def health():
return {
'status': 'healthy',
'model_loaded': model is not None
}
Running
rebar3 shell
hornbeam:start("app:app", #{
worker_class => asgi,
lifespan => on,
pythonpath => ["embedding_service"],
workers => 4,
timeout => 60000
}).
Usage
# Index documents
curl -X POST http://localhost:8000/index \
-H "Content-Type: application/json" \
-d '{"id": "1", "text": "Python is a programming language", "metadata": {"category": "tech"}}'
curl -X POST http://localhost:8000/index \
-H "Content-Type: application/json" \
-d '{"id": "2", "text": "Erlang is great for concurrent systems"}'
curl -X POST http://localhost:8000/index \
-H "Content-Type: application/json" \
-d '{"id": "3", "text": "Machine learning uses neural networks"}'
# Search
curl -X POST http://localhost:8000/search \
-H "Content-Type: application/json" \
-d '{"query": "programming languages", "top_k": 5}'
# Direct embedding
curl -X POST http://localhost:8000/embed \
-H "Content-Type: application/json" \
-d '{"texts": ["Hello world", "How are you?"]}'
# Check stats
curl http://localhost:8000/stats
Requirements
# requirements.txt
fastapi>=0.100
sentence-transformers>=2.0
numpy>=1.20
Performance Tips
- Use batch endpoints for bulk operations
- Enable caching for repeated queries
- Increase workers for more parallelism
- Use GPU for faster inference (install torch with CUDA)
Next Steps
- Distributed ML - Scale across cluster
- ML Integration Guide - More patterns