Documentation Index Fetch the complete documentation index at: https://mintlify.com/nubskr/walrus/llms.txt
Use this file to discover all available pages before exploring further.
Walrus uses Raft consensus (via the Octopii implementation of OpenRaft) for metadata coordination only . This design separates the control plane from the data plane, ensuring high write throughput while maintaining strong consistency for cluster topology.
Key Insight: Raft is used only for metadata (topics, segments, leaders) — not for data replication . This keeps the data path fast and scalable.
Raft vs Data Path Separation
What Goes Through Raft?
Metadata operations (infrequent, low throughput):
Creating topics: CreateTopic { name, initial_leader }
Rolling over segments: RolloverTopic { name, new_leader, sealed_count }
Adding nodes: UpsertNode { node_id, addr }
Cluster membership changes
// Example: Creating a topic (goes through Raft)
Controller :: ensure_topic ( "logs" )
└─► propose_metadata ( CreateTopic { name : "logs" , leader : 1 })
└─► Raft :: propose ( bytes ) // Replicated to all nodes
├─► AppendEntries RPC to followers
├─► Wait for quorum ( 2 of 3 nodes )
└─► Apply to all state machines
What Bypasses Raft?
Data operations (frequent, high throughput):
Writing messages: Direct to Walrus storage engine
Reading messages: Direct from Walrus storage engine
No Raft replication of actual data
No consensus overhead in data path
// Example: Writing data (bypasses Raft)
PUT logs "hello world"
└─► Controller :: append_for_topic ( "logs" , data )
└─► Storage :: append_by_key ( "logs:1" , data ) // Direct write
└─► Walrus :: batch_append_for_topic ( "logs:1" , data )
└─► Write to disk ( no Raft involved )
Operation Path Latency Throughput Create topic Raft consensus ~10-50ms Low (rare) Rollover segment Raft consensus ~10-50ms ~1 per minute Write data Direct storage ~1-2ms ~1M writes/s Read data Direct storage ~0.5-1ms Scales with replicas
By keeping Raft out of the data path, Walrus achieves Kafka-like throughput while maintaining strong metadata consistency .
Raft Architecture
Cluster Roles
At any given time, each node has a Raft role:
┌─────────────────────────────────────────────────────┐
│ 3-Node Cluster │
├─────────────────────────────────────────────────────┤
│ │
│ Node 1: LEADER (Raft) │
│ └─► Accepts metadata proposals │
│ Replicates to followers via AppendEntries │
│ │
│ Node 2: FOLLOWER (Raft) │
│ └─► Applies committed entries from leader │
│ Forwards proposals to leader │
│ │
│ Node 3: FOLLOWER (Raft) │
│ └─► Applies committed entries from leader │
│ Votes in elections │
│ │
└─────────────────────────────────────────────────────┘
Important: Raft leadership is separate from segment leadership:
Raft leader : Coordinates metadata changes
Segment leader : Handles writes for specific segments
Example:
Raft leader: Node 1
Segment leaders:
- "logs:1" → Node 2
- "logs:2" → Node 3
- "metrics:1" → Node 1
State Machine
Each node runs the same deterministic state machine that applies Raft-committed commands:
// metadata.rs - State machine implementation
pub struct Metadata {
state : Arc < RwLock < ClusterState >>,
}
pub struct ClusterState {
topics : HashMap < String , TopicState >, // Topic → metadata
nodes : HashMap < NodeId , RaftAddress >, // Node → address
}
pub struct TopicState {
current_segment : u64 ,
leader_node : NodeId ,
sealed_segments : HashMap < u64 , u64 >, // segment_id → entry_count
segment_leaders : HashMap < u64 , NodeId >, // segment_id → leader
}
Commands Applied:
CreateTopic
CreateTopic { name : "logs" , initial_leader : 1 }
└─► state . topics . insert ( "logs" , TopicState {
current_segment : 1 ,
leader_node : 1 ,
sealed_segments : {},
segment_leaders : { 1 → 1 }
})
RolloverTopic
RolloverTopic { name : "logs" , new_leader : 2 , sealed_count : 1_000_000 }
└─► state . topics[ "logs" ] = TopicState {
current_segment : 2 , // Increment
leader_node : 2 , // Transfer
sealed_segments : { 1 → 1_000_000 }, // Seal old
segment_leaders : { 1 → 1 , 2 → 2 } // Add new
}
UpsertNode
UpsertNode { node_id : 4 , addr : "10.0.0.4:6004" }
└─► state . nodes . insert ( 4 , "10.0.0.4:6004" )
Proposing Changes
Any node can propose metadata changes, but they must go through the Raft leader:
┌──────────────────────────────────────────────────────────┐
│ Metadata Proposal Flow │
├──────────────────────────────────────────────────────────┤
│ │
│ [1] Client → Node 2: REGISTER metrics │
│ │ │
│ └─► Controller::ensure_topic("metrics") │
│ │ │
│ └─► propose_metadata(CreateTopic{...}) │
│ │ │
│ ├─ Am I Raft leader? │
│ │ │
│ ┌──────┴──────┐ │
│ NO YES │
│ │ │ │
│ │ └─► [Skip to step 2] │
│ │ │
│ └─► Forward to Raft leader (Node 1) │
│ └─► InternalOp::ForwardMetadata │
│ │
│ [2] Node 1 (Raft Leader): │
│ │ │
│ └─► Raft::propose(serialized_command) │
│ │ │
│ ├─► Append to local Raft log │
│ │ Entry: [index=42, CreateTopic{...}] │
│ │ │
│ ├─► Send AppendEntries RPC to followers │
│ │ └─► Node 2: Append, ACK │
│ │ └─► Node 3: Append, ACK │
│ │ │
│ ├─► Wait for quorum (2 of 3) │
│ │ ✓ Received ACKs from 2 nodes │
│ │ │
│ └─► Commit entry (advance commit_index) │
│ │
│ [3] Apply to State Machine (ALL nodes): │
│ │ │
│ └─► Metadata::apply(command_bytes) │
│ │ │
│ ├─► Deserialize: CreateTopic{...} │
│ │ │
│ ├─► state.topics.insert("metrics", ...) │
│ │ │
│ └─► Return b"CREATED" │
│ │
│ Result: All nodes have consistent view of "metrics" │
│ │
└──────────────────────────────────────────────────────────┘
Committed Entry Application
Once a command is committed by Raft, it is applied to the state machine on every node :
// Simplified from metadata.rs
impl Metadata {
pub fn apply ( & self , cmd_bytes : & [ u8 ]) -> Vec < u8 > {
let command : MetadataCommand = bincode :: deserialize ( cmd_bytes ) ? ;
match command {
MetadataCommand :: CreateTopic { name , initial_leader } => {
let mut state = self . state . write ();
state . topics . insert ( name . clone (), TopicState {
current_segment : 1 ,
leader_node : initial_leader ,
sealed_segments : HashMap :: new (),
segment_leaders : [( 1 , initial_leader )] . into (),
});
b"CREATED" . to_vec ()
}
MetadataCommand :: RolloverTopic { name , new_leader , sealed_count } => {
let mut state = self . state . write ();
if let Some ( ts ) = state . topics . get_mut ( & name ) {
let old_segment = ts . current_segment;
ts . sealed_segments . insert ( old_segment , sealed_count );
ts . current_segment += 1 ;
ts . leader_node = new_leader ;
ts . segment_leaders . insert ( ts . current_segment, new_leader );
}
b"ROLLED_OVER" . to_vec ()
}
MetadataCommand :: UpsertNode { node_id , addr } => {
let mut state = self . state . write ();
state . nodes . insert ( node_id , addr );
b"UPSERTED" . to_vec ()
}
}
}
}
Lease Synchronization
Metadata changes trigger lease updates within 100ms:
┌────────────────────────────────────────────────────────┐
│ Lease Sync After Rollover │
├────────────────────────────────────────────────────────┤
│ │
│ T0: RolloverTopic committed via Raft │
│ └─► Metadata updated on all nodes: │
│ current_segment: 2 │
│ leader_node: 3 │
│ sealed_segments: {1 → 1_000_000} │
│ segment_leaders: {1 → 2, 2 → 3} │
│ │
│ T0+50ms: Node 2 lease sync tick │
│ └─► Controller::update_leases() │
│ ├─► Metadata::owned_topics(2) │
│ │ └─► Returns: [] (no longer owns logs:1) │
│ └─► Storage::update_leases({}) │
│ └─► Removes "logs:1" from lease set │
│ Future writes → NotLeaderError │
│ │
│ T0+100ms: Node 3 lease sync tick │
│ └─► Controller::update_leases() │
│ ├─► Metadata::owned_topics(3) │
│ │ └─► Returns: [("logs", 2)] │
│ └─► Storage::update_leases({"logs:2"}) │
│ └─► Grants "logs:2" lease │
│ Starts accepting writes │
│ │
│ Result: Leadership transferred in ≤100ms │
│ │
└────────────────────────────────────────────────────────┘
Leader Election
When Elections Happen
Raft elections occur when:
Cluster bootstrap (first node becomes leader)
Leader failure (heartbeat timeout)
Network partition heals
Manual leadership transfer
Election Process
┌──────────────────────────────────────────────────────┐
│ Raft Leader Election │
├──────────────────────────────────────────────────────┤
│ │
│ [1] Node 1 (Leader) fails │
│ └─► Stops sending heartbeats │
│ │
│ [2] Followers timeout (150-300ms random) │
│ Node 2: Timeout at 170ms │
│ Node 3: Timeout at 240ms │
│ │
│ [3] Node 2 starts election (first to timeout) │
│ ├─► Increment term: 5 → 6 │
│ ├─► Transition to Candidate │
│ ├─► Vote for self │
│ └─► Send RequestVote RPC to all nodes │
│ ├─► Node 3: Grant vote (term 6 valid) │
│ └─► Node 1: No response (failed) │
│ │
│ [4] Node 2 receives quorum (2 of 3 votes) │
│ └─► Transition to Leader │
│ ├─► Send heartbeat AppendEntries │
│ └─► Start accepting metadata proposals │
│ │
│ [5] Node 3 receives heartbeat from Node 2 │
│ └─► Recognize new leader, stay Follower │
│ │
│ Result: Node 2 is new Raft leader (term 6) │
│ Metadata proposals now route to Node 2 │
│ │
└──────────────────────────────────────────────────────┘
During leader election (typically 150-300ms), metadata operations are unavailable. However, data reads and writes continue uninterrupted since they bypass Raft.
Raft Persistence
What is Persisted?
Each node persists its Raft state to disk:
data_dir/
meta_wal/ # Raft log directory
├── 1234567890.log # Raft log entries
├── 1234567891.log
└── snapshot/ # State machine snapshots
└── snap_1000.db
Persisted Data:
Raft log : All committed metadata commands
Snapshots : Periodic compactions of state machine
Voted for : Last vote in current term
Current term : Raft term number
Recovery After Crash
When a node restarts:
Node Startup:
│
├─► Load Raft log from meta_wal/
│ └─► Replay all committed entries
│ └─► Rebuild metadata state machine
│
├─► If snapshot exists:
│ └─► Load snapshot (faster than full replay)
│ └─► Apply remaining log entries since snapshot
│
├─► Rejoin Raft cluster
│ └─► Receive latest entries from leader
│ └─► Catch up to current state
│
└─► Resume normal operation
└─► Metadata now consistent with cluster
Consistency Guarantees
Linearizability
Raft provides linearizable consistency for metadata:
// Scenario: Two clients create topics concurrently
Client A → Node 1 : REGISTER logs
Client B → Node 2 : REGISTER metrics
// Raft ensures total order:
[ Index 100 ] CreateTopic ( "logs" , leader = 1 ) // A's request
[ Index 101 ] CreateTopic ( "metrics" , leader = 2 ) // B's request
// All nodes apply in same order → consistent view
Read Consistency
Metadata reads are immediately consistent (no stale reads):
// Metadata reads query local state machine (already committed)
Controller :: get_topic_state ( "logs" )
└─► Metadata :: get_topic_state ( "logs" ) // Local read
└─► Returns current_segment , leader , etc .
No round-trip to Raft leader needed because:
Followers have already applied committed entries
State machine reflects all committed commands
Local reads are always consistent
Walrus includes a TLA+ specification modeling the distributed data plane and its interaction with Raft-replicated metadata:
Specification: distributed-walrus/spec/DistributedWalrus.tla
Verified Invariants
Topic metadata, WAL entries, and reader cursors stay synchronized across all operations.
Single Writer per Segment
Only the designated leader (from metadata) can write to each segment due to lease enforcement.
No Writes Past Open Segment
Closed (sealed) segments remain immutable after rollover — no writes accepted even if lease exists.
Entry counts for sealed segments in metadata match actual WAL contents on storage.
Cursors never exceed segment boundaries or sealed entry counts, preventing out-of-bounds reads.
Entries within each segment maintain strict ordering (no gaps or reordering).
Liveness Properties
Segments exceeding the entry threshold eventually trigger rollover via monitor loop.
Available entries eventually get consumed by readers (assuming readers continue polling).
The TLA+ specification abstracts Raft consensus as a single authoritative metadata source and models Walrus storage as per-segment entry sequences. Model checking with TLC verifies correctness under concurrent operations.
Monitoring Raft
Metrics Command
Query Raft cluster state via CLI:
> METRICS
{
"state" : "Leader",
"current_term" : 5,
"current_leader" : 1,
"membership" : {
"voters" : [1, 2, 3],
"learners" : []
},
"last_log_index" : 1042,
"last_applied" : 1042,
"snapshot" : {
"index" : 1000,
"term" : 4
}
}
Key Metrics
Metric Description stateCurrent Raft role (Leader, Follower, Candidate) current_termRaft term number (increments on elections) current_leaderNode ID of current Raft leader votersNodes participating in quorum last_log_indexIndex of last Raft log entry last_appliedIndex of last applied state machine command
Best Practices
Odd Number of Nodes Run 3, 5, or 7 nodes for proper quorum:
3 nodes : Tolerates 1 failure (requires 2 for quorum)
5 nodes : Tolerates 2 failures (requires 3 for quorum)
7 nodes : Tolerates 3 failures (requires 4 for quorum)
Separate Raft Port Use dedicated ports for Raft traffic:
Client port: :9091-9093 (application traffic)
Raft port: :6001-6003 (consensus traffic)
Prevents client traffic from affecting consensus
Monitor Elections Track leader stability:
Frequent elections → network issues
Check current_term growth rate
Ensure stable leader for performance
Snapshot Regularly Raft log grows with metadata changes:
Snapshots compact old entries
Faster node recovery
Reduced disk usage
Configuration
Raft Ports
# Node 1
--raft-port 6001
--raft-host 127.0.0.1
--raft-advertise-host 10.0.0.1 # External IP
# Node 2
--raft-port 6002
--join 10.0.0.1:6001 # Join existing cluster
Timeouts
Raft uses default OpenRaft timeouts:
Heartbeat interval : 500ms
Election timeout : 1.5-3s (randomized)
Replication lag tolerance : 1000 entries
Architecture Overview Learn about the overall system design and components
Topics and Segments Understand how metadata drives segment rollover