Documentation Index
Fetch the complete documentation index at: https://mintlify.com/nubskr/walrus/llms.txt
Use this file to discover all available pages before exploring further.
Overview
TheNodeController is the core orchestration component in Walrus that glues together metadata (cluster state), bucket storage (Walrus I/O), and RPC surfaces (Kafka facade + internal forwarding). It coordinates all interactions between these systems without parsing Kafka protocol details or managing retention policies.
Location: distributed-walrus/src/controller/mod.rs:31
Core Responsibilities
Request Routing
Routes append and read operations to the correct leader nodes based on metadata
Lease Management
Synchronizes write leases with metadata to ensure only leaders can write
Request Forwarding
Forwards requests to remote nodes when the current node is not the leader
Structure
Fields
| Field | Type | Purpose |
|---|---|---|
node_id | NodeId | Unique identifier for this node in the cluster |
bucket | Arc<Storage> | Handle to the underlying Walrus storage engine |
metadata | Arc<Metadata> | Cluster metadata including topic/segment/leader mappings |
raft | Arc<OctopiiNode> | Raft consensus node for metadata replication |
offsets | Arc<RwLock<HashMap<String, u64>>> | Tracks logical offsets per WAL key |
read_cursors | Arc<Mutex<HashMap<String, ReadCursor>>> | Per-topic read cursors for shared consumption |
test_fail_* | AtomicBool | Fault injection flags for testing |
Key Operations
Append Operations
append_for_topic
Appends data to a topic. Routes the request to the current leader node.
Location: mod.rs:165
- Looks up topic state from metadata to find the current leader and segment
- If this node is the leader, performs local append via
forward_append - Otherwise, forwards the request to the remote leader via
forward_append_remote - Returns error if topic doesn’t exist
forward_append (Internal)
Handles local append operations with lease verification and retry logic.
Location: internal.rs:6
- Updates leases to ensure this node has write permission
- Attempts append with retry via
append_with_retry(2 attempts) - Records the append in offset tracking
- Checks if segment rollover is needed via
maybe_rollover - Returns
InternalResp::OkorInternalResp::Error
append_with_retry
Retry logic for appends with lease synchronization.
Location: mod.rs:540
Read Operations
read_one_for_topic
Reads a single entry for a topic using the provided cursor, automatically advancing across sealed segments.
Location: mod.rs:199
- Initializes cursor to segment 1 if unset
- Advances to next segment when current segment is fully consumed
- Uses metadata’s
sealed_countto determine when segments are complete - Routes reads to appropriate leader (current or historical segment leader)
Ok(Some(data))- Successfully read an entryOk(None)- No data available (reached end of current segment)Err(_)- Topic doesn’t exist or read error
read_one_for_topic_shared
Shared read path that maintains a per-topic cursor across client connections.
Location: mod.rs:271
read_cursors to enable consumer groups or shared consumption patterns.
forward_read (Internal)
Performs local read from storage.
Location: mod.rs:304
InternalResp::ReadResult with data and high watermark (tracked entry count).
Topic Management
ensure_topic
Creates a topic if it doesn’t already exist, selecting an initial leader.
Location: mod.rs:124
- If no nodes exist, uses current node as leader
- Otherwise, hashes topic name to deterministically select a node
- Proposes
MetadataCmd::CreateTopicto Raft - Refreshes leases after creation
maybe_rollover (Internal)
Checks if a segment has reached capacity and triggers rollover to a new segment.
Location: mod.rs:457
- Checks if tracked entry count exceeds
max_segment_entries()threshold - Retrieves current Raft membership to select next leader
- Round-robin leader selection among cluster members
- Proposes
MetadataCmd::RolloverTopicwith sealed segment count
Segment rollover is triggered automatically after appends. The sealed segment count is recorded in metadata for cursor advancement during reads.
Lease Management
update_leases
Synchronizes storage leases with current metadata state.
Location: mod.rs:91
- Queries metadata for all topics where
node_idis the leader - Generates set of expected WAL keys (
t_<topic>_s_<segment>) - Syncs peer addresses from metadata to Raft layer
- Updates bucket storage with new lease set
- Lease update loop (every 100ms)
- After metadata changes
- During retry operations
run_lease_update_loop
Background task that periodically refreshes leases.
Location: mod.rs:284
update_leases().
Request Forwarding
forward_append_remote
Forwards append requests to remote leader nodes.
Location: mod.rs:321
- Resolves leader node address from metadata
- Serializes
InternalOp::ForwardAppendpayload - Sends RPC request via Raft handler with 15s timeout
- Deserializes response and returns result
forward_read_remote
Forwards read requests to remote nodes.
Location: mod.rs:494
Metadata Operations
propose_metadata
Proposes metadata changes through Raft consensus.
Location: mod.rs:373
- If this node is Raft leader, proposes directly with 10s timeout
- Otherwise, waits up to 5s to discover current leader
- Forwards proposal to remote leader if necessary
- Retries with 100ms intervals if leader is unknown
upsert_node
Registers or updates a node in cluster metadata.
Location: mod.rs:158
MetadataCmd::UpsertNode to Raft.
handle_join_cluster
Handles cluster join requests from new nodes.
Location: mod.rs:567
- Resolves node address (with DNS lookup fallback)
- Adds node as Raft learner
- Records node in metadata via
upsert_node - Spawns background task to promote learner to voter once caught up (60s timeout, 120 attempts)
RPC Handler
handle_rpc
Main RPC dispatch handler for internal operations.
Location: mod.rs:44
ForwardAppend- Handles forwarded append requestsForwardRead- Handles forwarded read requestsForwardMetadata- Handles forwarded metadata proposalsJoinCluster- Handles cluster join requestsTestControl- Fault injection for testing
Helper Functions
wal_key
Generates a WAL key from topic name and segment number.
Location: types.rs:3
t_<topic>_s_<segment> (e.g., t_events_s_1)
parse_wal_key
Parses a WAL key back into topic and segment.
Location: types.rs:8
t_events_s_1 → ("events", 1)
Read Cursor
Location:types.rs:19
segment- Current segment being readdelivered_in_segment- Number of entries delivered from this segment
Lifecycle
- Initialization - Controller is created with node ID, storage, metadata, and Raft instances
- Lease Loop - Background task continuously syncs leases with metadata
- Request Processing - Handles Kafka API calls and internal RPC operations
- Leader Changes - Automatically adapts to leadership changes via metadata updates
- Segment Rollover - Automatically creates new segments when capacity reached
Thread Safety
All shared state is protected by:RwLockfor offsets (read-heavy, write-occasional)Mutexfor read cursors (exclusive access needed)- Atomic bools for test flags (lock-free)
- Arc for shared ownership across async tasks
Error Handling
NotLeaderForPartition
NotLeaderForPartition
Returned when attempting to append to a WAL key without a valid lease. Clients should retry against the correct leader.
Raft Propose Timeout
Raft Propose Timeout
Metadata proposals timeout after 10 seconds. Indicates Raft consensus issues or cluster unavailability.
Unknown Topic
Unknown Topic
Returned when operating on a topic that hasn’t been created. Use
ensure_topic first.Forward Failures
Forward Failures
Remote forwarding can fail due to network issues, leader unavailability, or lease mismatches. Includes 15s RPC timeout.
Testing Hooks
The controller includes fault injection capabilities for testing:test_fail_forward_read- Forces read forwarding to failtest_fail_monitor- Forces monitoring operations to failtest_fail_dir_size- Forces directory size checks to fail
TestControl RPC operations.
Related Components
- Metadata - Cluster state management
- Bucket Storage - Lease-based storage operations