Docs
Advanced Subsystems
Event Protocol

Event Protocol

⚠️

This is a v2 design — not yet implemented. See Architecture v2 for context.

The problem

The framework needs real-time feedback for multiple consumers: TUI renders streaming text, Jan Desktop renders rich tool call cards, API servers stream SSE, logging systems record audit trails. These consumers need structured events, session correlation, completeness, and safety (emitting to a disconnected consumer must not crash the agent).

Event types

Every lifecycle event carries a SessionId for correlation:


pub type SessionId = Uuid;
pub enum AgentEvent {
// Session lifecycle
SessionStarted { session_id: SessionId },
SessionEnded { session_id: SessionId, error: Option<String> },
// Turn lifecycle
TurnStarted { session_id: SessionId, turn_index: usize, max_turns: usize },
TurnCompleted { session_id: SessionId, turn_index: usize, final_turn: bool },
// Tool lifecycle
ToolCallStart { session_id: SessionId, call_id: String, name: String, args: Value },
ToolCallEnd { session_id: SessionId, call_id: String, name: String, success: bool,
output: Value, duration_ms: u64 },
// Streaming (high-frequency)
TextDelta(String),
Thinking(String),
PlanUpdate(Vec<PlanStep>),
// Resource tracking
Usage(TokenUsage),
ContextCompacted { session_id: SessionId, removed_turns: usize, summary_length: usize },
MemoryAccess { operation: MemoryOp, plugin_name: String, entry_count: usize },
// Diagnostic
Warning(String),
}

EventSender and EventReceiver


pub struct EventSender {
tx: tokio::sync::broadcast::Sender<AgentEvent>,
}
impl EventSender {
pub fn new(capacity: usize) -> (Self, EventReceiver);
/// Send an event. Silently drops if no receivers. Never fails, never blocks.
pub fn send(&self, event: AgentEvent) { let _ = self.tx.send(event); }
}

Why broadcast, not mpsc: Multiple consumers (TUI, logger, API) subscribe independently. Each gets its own copy. If a consumer is slow, it misses events rather than blocking the agent.

Capacity: Default 256. If a consumer falls behind, it gets a Lagged error and recovers by skipping.

Who emits what

ComponentEvents
AgentRuntimeSessionStarted, SessionEnded
TurnEngineTurnStarted, TurnCompleted, TextDelta, Thinking, Usage, ContextCompacted
ToolDispatchEngineToolCallStart, ToolCallEnd
Memory pluginsMemoryAccess
Agent coresPlanUpdate, Warning

Event timeline example

A ReAct turn with one tool call:


SessionStarted { session_id: "abc" }
TurnStarted { session_id: "abc", turn_index: 0, max_turns: 10 }
TextDelta("Let me search")
TextDelta(" for that...")
ToolCallStart { call_id: "tc_1", name: "web.search", args: {...} }
ToolCallEnd { call_id: "tc_1", success: true, output: {...}, duration_ms: 450 }
TurnCompleted { session_id: "abc", turn_index: 0, final_turn: false }
TurnStarted { session_id: "abc", turn_index: 1, max_turns: 10 }
TextDelta("Based on the search results,")
TextDelta(" here is what I found...")
Usage { prompt_tokens: 1200, completion_tokens: 150 }
TurnCompleted { session_id: "abc", turn_index: 1, final_turn: true }
SessionEnded { session_id: "abc", error: None }

Consumer patterns

TUI rendering


loop {
match events.recv().await {
Some(AgentEvent::TextDelta(text)) => tui.append_text(&text),
Some(AgentEvent::ToolCallStart { name, .. }) => tui.show_tool_spinner(&name),
Some(AgentEvent::ToolCallEnd { name, success, duration_ms, .. }) =>
tui.show_tool_result(&name, success, duration_ms),
Some(AgentEvent::TurnCompleted { final_turn: true, .. }) => break,
_ => {}
}
}

Audit logger


loop {
match events.recv().await {
Some(event @ AgentEvent::ToolCallStart { .. }) |
Some(event @ AgentEvent::ToolCallEnd { .. }) => audit_log.write(&event).await,
Some(AgentEvent::SessionEnded { .. }) => break,
_ => {}
}
}

Metrics collector


loop {
match events.recv().await {
Some(AgentEvent::Usage(usage)) => metrics.record_tokens(usage),
Some(AgentEvent::ToolCallEnd { duration_ms, name, .. }) =>
metrics.record_tool_latency(&name, duration_ms),
Some(AgentEvent::SessionEnded { .. }) => break,
_ => {}
}
}