Architecture
Understanding Agenkit's layered architecture from the ground up.
Overview
Agenkit has a layered architecture where each layer is independent and composable. Every layer implements the same core Agent interface, enabling unlimited composability.
┌─────────────────────────────────────────────────────────────┐
│ YOUR APPLICATION │
│ (Your agents, business logic, AI models) │
└────────────────────┬────────────────────────────────────────┘
│
┌────────────────────┴────────────────────────────────────────┐
│ COMPOSITION LAYER │
│ Sequential • Parallel • Router • Fallback • Conditional │
│ (Orchestrate multiple agents into workflows) │
└────────────────────┬────────────────────────────────────────┘
│
┌────────────────────┴────────────────────────────────────────┐
│ MIDDLEWARE LAYER │
│ CircuitBreaker • Retry • Timeout • RateLimiter │
│ Caching • Batching • Tracing • Metrics │
│ (Add resilience, observability, performance) │
└────────────────────┬────────────────────────────────────────┘
│
┌────────────────────┴────────────────────────────────────────┐
│ TRANSPORT LAYER │
│ HTTP • gRPC • WebSocket • Local │
│ (Enable cross-process, cross-language communication) │
└────────────────────┬────────────────────────────────────────┘
│
┌────────────────────┴────────────────────────────────────────┐
│ CORE INTERFACES │
│ Agent • Message • Tool • ToolResult │
│ (The minimal contract everything implements) │
└─────────────────────────────────────────────────────────────┘
Layer 1: Core Interfaces
Location: agenkit/interfaces.py (Python), agenkit-go/agenkit/interfaces.go (Go)
This is the minimal contract that everything builds on. Only 4 primitives:
The Agent Interface
class Agent(ABC):
@property
@abstractmethod
def name(self) -> str:
"""Agent identifier"""
@abstractmethod
async def process(self, message: Message) -> Message:
"""Core method - process input, return output"""
That's it! An agent is just something with a name that can process messages.
The Message Data Class
@dataclass(frozen=True)
class Message:
role: str # "user", "agent", "system", "tool"
content: Any # Flexible - str, dict, list, anything
metadata: dict # Extension point for custom data
timestamp: datetime # For ordering and debugging
The Tool Interface
class Tool(ABC):
@property
@abstractmethod
def name(self) -> str:
"""Tool identifier"""
@abstractmethod
async def execute(self, **kwargs) -> ToolResult:
"""Execute the tool"""
The ToolResult Data Class
@dataclass(frozen=True)
class ToolResult:
success: bool # Did it work?
data: Any # The result
error: str | None # Error message if failed
Design Principle: Minimalism
These interfaces are minimal - just enough for interoperability, nothing more. No framework opinions leak in. You can implement them with any LLM library, any framework, any infrastructure.
Layer 2: Transport Layer
Location: agenkit/adapters/python/ (Python), agenkit-go/adapter/ (Go)
Enables agents to communicate across processes and across languages (Python ↔ Go).
Transport Types
1. Local Transport
Same process, no serialization:
from agenkit.adapters.python.local_agent import LocalAgent
local_agent = LocalAgent(my_agent)
response = await local_agent.process(message)
2. HTTP Transport
Remote agent over HTTP/1.1, HTTP/2, or HTTP/3:
from agenkit.adapters.python.remote_agent import RemoteAgent
remote_agent = RemoteAgent(
name="python-agent",
endpoint="http://localhost:8080"
)
response = await remote_agent.process(message)
3. gRPC Transport
High-performance binary protocol:
4. WebSocket Transport
Bidirectional streaming:
How Transports Work
The RemoteAgent class implements the same Agent interface but forwards calls to a remote server:
class RemoteAgent(Agent):
def __init__(self, name: str, endpoint: str):
self._transport = self._create_transport(endpoint)
async def process(self, message: Message) -> Message:
# 1. Serialize message
request_bytes = encode_message(message)
# 2. Send over transport (HTTP/gRPC/WebSocket)
response_bytes = await self._transport.send(request_bytes)
# 3. Deserialize response
return decode_message(response_bytes)
Key Point: From the caller's perspective, local and remote agents are identical - same interface!
Cross-Language Communication
Layer 3: Middleware Layer
Location: agenkit/middleware/ (Python), agenkit-go/middleware/ (Go)
Middleware wraps agents to add cross-cutting concerns like error handling, metrics, caching, etc.
The Middleware Pattern
Every middleware implements the Agent interface and wraps another agent:
class RetryDecorator(Agent):
def __init__(self, agent: Agent, config: RetryConfig):
self._agent = agent # Wrap the inner agent
self._config = config
@property
def name(self) -> str:
return self._agent.name # Delegate to inner agent
async def process(self, message: Message) -> Message:
# Add retry logic AROUND the inner agent
for attempt in range(self._config.max_attempts):
try:
return await self._agent.process(message)
except Exception as e:
if attempt == self._config.max_attempts - 1:
raise
await asyncio.sleep(backoff)
Available Middleware
| Middleware | Purpose | Example |
|---|---|---|
| RetryDecorator | Exponential backoff with jitter | RetryDecorator(agent, RetryConfig(max_attempts=3)) |
| CircuitBreaker | Fail-fast pattern | CircuitBreaker(agent, CircuitBreakerConfig()) |
| TimeoutDecorator | Request deadline enforcement | TimeoutDecorator(agent, timeout=10.0) |
| RateLimiter | Token bucket algorithm | RateLimiter(agent, requests_per_second=10) |
| CachingDecorator | LRU cache with TTL | CachingDecorator(agent, max_size=1000, ttl=300) |
| BatchingDecorator | Request aggregation | BatchingDecorator(agent, max_batch_size=10) |
| TracingMiddleware | OpenTelemetry tracing | TracingMiddleware(agent) |
| MetricsMiddleware | Prometheus metrics | MetricsMiddleware(agent) |
Stacking Middleware
Middleware is composable - you can stack them:
# Start with base agent
agent = MyAgent()
# Add caching
agent = CachingDecorator(agent, max_size=1000)
# Add retry logic
agent = RetryDecorator(agent, RetryConfig(max_attempts=3))
# Add circuit breaker
agent = CircuitBreaker(agent, CircuitBreakerConfig())
# Add timeout
agent = TimeoutDecorator(agent, timeout=30.0)
# Add metrics
agent = MetricsMiddleware(agent, service_name="my-service")
# Now use it - all middleware applies automatically!
response = await agent.process(message)
Request Flow Through Middleware Stack
Your Call
↓
MetricsMiddleware (records metrics)
↓
TimeoutDecorator (enforces deadline)
↓
CircuitBreaker (checks if circuit is open)
↓
RetryDecorator (retries on failure)
↓
CachingDecorator (checks cache first)
↓
MyAgent (your actual logic)
↓
Response flows back up
Layer 4: Composition Layer
Location: agenkit/composition/ (Python), agenkit-go/composition/ (Go)
Combine multiple agents into workflows. All composition patterns implement the Agent interface - so composed agents look like single agents!
1. Sequential Pattern (Pipeline)
Output of one agent becomes input of the next:
from agenkit.composition import SequentialAgent
# Output of agent1 → input of agent2 → input of agent3
pipeline = SequentialAgent([agent1, agent2, agent3])
result = await pipeline.process(message) # Flows through all 3
Use cases: Data pipelines, multi-stage processing, validation → processing → formatting
2. Parallel Pattern (Fan-out)
All agents process the SAME input concurrently:
from agenkit.composition import ParallelAgent
# All agents process the SAME input concurrently
parallel = ParallelAgent([sentiment_agent, summary_agent, translate_agent])
result = await parallel.process(message) # All 3 run in parallel
# result.content contains list of all responses
Use cases: Ensemble methods, multi-perspective analysis, redundant processing
3. Fallback Pattern (High Availability)
Try primary, fall back to secondary if it fails:
from agenkit.composition import FallbackAgent
# Try primary, fall back to secondary if it fails
ha_agent = FallbackAgent([primary_agent, backup_agent, last_resort_agent])
result = await ha_agent.process(message) # Uses first one that succeeds
Use cases: High availability, graceful degradation, multi-model fallback
4. Conditional Pattern (Router)
Route to different agents based on condition:
from agenkit.composition import ConditionalAgent
def route(message: Message) -> int:
if "weather" in message.content:
return 0 # weather_agent
elif "news" in message.content:
return 1 # news_agent
else:
return 2 # general_agent
router = ConditionalAgent(
agents=[weather_agent, news_agent, general_agent],
condition=route
)
Use cases: Intent routing, load balancing, A/B testing
Composing Compositions
Since compositions are agents, you can nest them:
# Parallel analysis, then sequential refinement
analysis_parallel = ParallelAgent([sentiment, topics, entities])
refinement_sequential = SequentialAgent([analysis_parallel, summarizer, formatter])
# Add retry + circuit breaker
production_agent = CircuitBreaker(
RetryDecorator(refinement_sequential)
)
Layer 5: Observability
Location: agenkit/observability/ (Python), agenkit-go/observability/ (Go)
Built-in distributed tracing, metrics, and logging using OpenTelemetry.
Distributed Tracing
from agenkit.observability import init_tracing, TracingMiddleware
# Initialize OpenTelemetry
init_tracing(
service_name="my-service",
otlp_endpoint="http://jaeger:4317"
)
# Wrap agent with tracing
traced_agent = TracingMiddleware(my_agent)
# Every call now creates spans!
response = await traced_agent.process(message)
Traces propagate across languages (Python → Go → Python) using W3C Trace Context!
Prometheus Metrics
from agenkit.observability import init_metrics, MetricsMiddleware
# Start Prometheus endpoint on :8001
init_metrics(service_name="my-service", port=8001)
# Wrap agent
metered_agent = MetricsMiddleware(my_agent)
# Automatic metrics:
# - agenkit_requests_total{agent="...", status="..."}
# - agenkit_request_duration_seconds{agent="..."}
# - agenkit_message_size_bytes{agent="..."}
Structured Logging
from agenkit.observability import configure_logging
configure_logging(
level=logging.INFO,
structured=True, # JSON logs
include_trace_context=True # Auto-inject trace_id, span_id
)
Key Design Principles
1. Everything is an Agent
- Middleware wraps agents and exposes Agent interface
- Compositions combine agents and expose Agent interface
- Remote agents proxy agents and expose Agent interface
- This means everything is composable!
2. Minimal Interfaces
- Agent only needs
name+process() - Message is just 4 fields
- No framework opinions, maximum flexibility
3. Decorator Pattern
- Middleware uses decorators (wrapping)
- Stack as many as you need
- Each adds one concern (Single Responsibility)
4. Language Agnostic
- Python and Go have identical APIs
- Full cross-language compatibility
- Same patterns, same behavior
5. Production Ready
- All 137 tests passing
- Comprehensive error handling
- Security hardened (non-root containers)
- Fully observable (traces, metrics, logs)
Complete Example
Putting it all together:
from agenkit import Agent, Message
from agenkit.adapters.python.remote_agent import RemoteAgent
from agenkit.middleware import RetryDecorator, CircuitBreaker, TimeoutDecorator
from agenkit.observability import TracingMiddleware, MetricsMiddleware
from agenkit.composition import ParallelAgent, SequentialAgent
# Your custom agent
class MyAgent(Agent):
@property
def name(self) -> str:
return "my-agent"
async def process(self, message: Message) -> Message:
# Your logic here
return Message(role="agent", content="Response")
# Remote Go agent (cross-language!)
go_agent = RemoteAgent(name="go-agent", endpoint="grpc://localhost:50051")
# Parallel analysis
analysis = ParallelAgent([MyAgent(), go_agent])
# Sequential pipeline
pipeline = SequentialAgent([analysis, MyAgent()])
# Add production middleware
production_agent = TracingMiddleware( # Distributed tracing
MetricsMiddleware( # Prometheus metrics
TimeoutDecorator( # 30s timeout
CircuitBreaker( # Fail-fast
RetryDecorator( # 3 retries
pipeline
)
),
timeout=30.0
)
)
)
# Use it!
response = await production_agent.process(
Message(role="user", content="Hello!")
)
This gives you:
- ✅ Cross-language communication (Python ↔ Go)
- ✅ Parallel + Sequential composition
- ✅ Automatic retries with exponential backoff
- ✅ Circuit breaker protection
- ✅ 30-second timeout enforcement
- ✅ Prometheus metrics collection
- ✅ Distributed tracing with Jaeger
All with one simple interface: await agent.process(message)!
Next Steps
- Interfaces - Deep dive into the core contracts
- Features - Explore each layer in detail
- Examples - See 28+ practical examples
- Guides - Build production systems