Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/nubskr/walrus/llms.txt

Use this file to discover all available pages before exploring further.

Writing Data

append_for_topic

Append a single entry to a topic. This is the fundamental write operation in Walrus.
pub fn append_for_topic(&self, col_name: &str, raw_bytes: &[u8]) -> std::io::Result<()>
col_name
&str
required
The topic name to write to. Topics are created automatically on first write.
raw_bytes
&[u8]
required
The raw byte data to append. Maximum size is limited by block size (10MB).

Example

use walrus_rust::Walrus;

let wal = Walrus::new()?;

// Append a simple message
wal.append_for_topic("my-topic", b"Hello, Walrus!")?;

// Append serialized data
let data = serde_json::json!({
    "event": "user_signup",
    "user_id": 12345,
    "timestamp": 1234567890
});
wal.append_for_topic("events", data.to_string().as_bytes())?;

// Append binary data
let binary_data = vec![0x01, 0x02, 0x03, 0x04];
wal.append_for_topic("binary", &binary_data)?;

How It Works

When you call append_for_topic:
  1. Writer Creation: If this is the first write to the topic, a Writer is created and allocated an initial 10MB block (src/wal/runtime/walrus.rs:206-238)
  2. Block Check: The writer checks if the entry fits in the current block (src/wal/runtime/writer.rs:70-78)
  3. Write Entry: Entry is written with a 256-byte metadata header containing:
    • Entry size
    • Topic name
    • Checksum (FNV-1a 64-bit)
    • Next block pointer
  4. Fsync Handling: Depending on FsyncSchedule, the write is either:
    • Immediately synced to disk (SyncEach)
    • Queued for background fsync (Milliseconds)
    • Not synced (NoFsync)

Block Sealing

If an entry doesn’t fit in the current block, the block is automatically sealed:
// From src/wal/runtime/writer.rs:70-96
let need = (PREFIX_META_SIZE as u64) + (data.len() as u64);
if *cur + need > block.limit {
    // Seal current block
    FileStateTracker::set_block_unlocked(block.id as usize);
    let mut sealed = block.clone();
    sealed.used = *cur;
    sealed.mmap.flush()?;
    
    // Append to reader chain for consumption
    let _ = self.reader.append_block_to_chain(&self.col, sealed);
    
    // Allocate new block
    let new_block = unsafe { self.allocator.alloc_block(need) }?;
    *block = new_block;
    *cur = 0;
}
Sealed blocks become available for reading immediately.

Reading Data

read_next

Read the next entry from a topic, optionally checkpointing the read position.
pub fn read_next(&self, col_name: &str, checkpoint: bool) -> io::Result<Option<Entry>>
col_name
&str
required
The topic name to read from.
checkpoint
bool
required
  • true: Consume the entry and advance the read cursor (persisted based on consistency model)
  • false: Peek at the entry without consuming it (cursor stays at current position)

Return Value

Returns Option<Entry> where:
  • Some(Entry) if an entry is available
  • None if no more entries are available (caught up with writes)
pub struct Entry {
    pub data: Vec<u8>,
}

Example: Checkpointing

use walrus_rust::Walrus;

let wal = Walrus::new()?;

// Write some data
wal.append_for_topic("events", b"event-1")?;
wal.append_for_topic("events", b"event-2")?;
wal.append_for_topic("events", b"event-3")?;

// Checkpoint=true: consume and advance cursor
if let Some(entry) = wal.read_next("events", true)? {
    println!("Consumed: {:?}", String::from_utf8_lossy(&entry.data));
    // Cursor is now at event-2
}

// Next read gets event-2
if let Some(entry) = wal.read_next("events", true)? {
    println!("Consumed: {:?}", String::from_utf8_lossy(&entry.data));
    // Cursor is now at event-3
}

Example: Peeking

use walrus_rust::Walrus;

let wal = Walrus::new()?;
wal.append_for_topic("events", b"event-1")?;

// Checkpoint=false: peek without consuming
if let Some(entry) = wal.read_next("events", false)? {
    println!("Peeking: {:?}", String::from_utf8_lossy(&entry.data));
    // Cursor still at event-1
}

// Reading again returns the same entry
if let Some(entry) = wal.read_next("events", false)? {
    println!("Still peeking: {:?}", String::from_utf8_lossy(&entry.data));
    // Still at event-1
}

// Now consume it
if let Some(entry) = wal.read_next("events", true)? {
    println!("Consumed: {:?}", String::from_utf8_lossy(&entry.data));
    // Cursor advanced past event-1
}

// No more entries
assert!(wal.read_next("events", true)?.is_none());

How It Works

read_next operates in two phases: 1. Sealed Block Path (src/wal/runtime/walrus_read.rs:124-188) Reads from sealed, immutable blocks in the reader chain:
if info.cur_block_idx < info.chain.len() {
    let idx = info.cur_block_idx;
    let off = info.cur_block_offset;
    let block = info.chain[idx].clone();
    
    if off >= block.used {
        // Block exhausted, move to next
        BlockStateTracker::set_checkpointed_true(block.id as usize);
        info.cur_block_idx += 1;
        info.cur_block_offset = 0;
        continue;
    }
    
    // Read entry from block
    match block.read(off) {
        Ok((entry, consumed)) => {
            let new_off = off + consumed as u64;
            if checkpoint {
                info.cur_block_offset = new_off;
                // Persist position based on consistency model
                maybe_persist = if self.should_persist(&mut info, false) {
                    Some((info.cur_block_idx as u64, new_off))
                } else { None };
            }
            return Ok(Some(entry));
        }
        Err(_) => return Ok(None),
    }
}
2. Tail Path (src/wal/runtime/walrus_read.rs:191-343) Reads from the active writer block (the “tail”):
// Get active writer's block and offset
let (active_block, written) = writer_arc.snapshot_block()?;

if tail_off < written {
    match active_block.read(tail_off) {
        Ok((entry, consumed)) => {
            let new_off = tail_off + consumed as u64;
            if checkpoint {
                info.tail_block_id = active_block.id;
                info.tail_offset = new_off;
                // Persist with tail sentinel flag
                maybe_persist = if self.should_persist(&mut info, false) {
                    Some((tail_block_id | TAIL_FLAG, new_off))
                } else { None };
            }
            return Ok(Some(entry));
        }
        Err(_) => return Ok(None),
    }
}
The tail path uses a special sentinel flag (1u64 << 63) in the persisted block ID to indicate that the position is in the active writer block, not a sealed block.

Persistence Behavior

When checkpoint=true, the read position is persisted according to the consistency model: StrictlyAtOnce
// Every checkpoint is immediately persisted
let wal = Walrus::with_consistency(ReadConsistency::StrictlyAtOnce)?;
wal.read_next("topic", true)?;  // Persists to disk
AtLeastOnce
// Persists every N reads
let wal = Walrus::with_consistency(
    ReadConsistency::AtLeastOnce { persist_every: 100 }
)?;

for _ in 0..100 {
    wal.read_next("topic", true)?;  // Only 100th read persists
}
See Configuration for more details on consistency models.

Entry Metadata

Each entry is stored with a 256-byte metadata header (src/wal/block.rs:12-19):
struct Metadata {
    read_size: usize,        // Payload size
    owned_by: String,        // Topic name
    next_block_start: u64,   // Pointer to next block
    checksum: u64,           // FNV-1a 64-bit checksum
}
The header format (src/wal/block.rs:63-68):
[2 bytes: metadata length] [metadata bytes] [padding to 256 bytes]
This allows forward compatibility - if metadata structure grows, older readers can skip unknown fields.

Checksum Verification

Every read verifies the data integrity using FNV-1a 64-bit checksums (src/wal/block.rs:116-129):
// Verify checksum
let expected = meta.checksum;
if checksum64(&ret_buffer) != expected {
    return Err(io::Error::new(
        io::ErrorKind::InvalidData,
        "checksum mismatch, data corruption detected",
    ));
}
Corrupted entries return an error instead of silently returning bad data.

Topic Entry Counting

Walrus tracks the number of unconsumed entries per topic in memory (src/wal/runtime/walrus.rs:129-143):
let wal = Walrus::new()?;

wal.append_for_topic("events", b"e1")?;
wal.append_for_topic("events", b"e2")?;
assert_eq!(wal.get_topic_entry_count("events"), 2);

// Checkpoint decrements count
wal.read_next("events", true)?;
assert_eq!(wal.get_topic_entry_count("events"), 1);

// Peek doesn't decrement
wal.read_next("events", false)?;
assert_eq!(wal.get_topic_entry_count("events"), 1);
Counts survive restarts by reconstructing from the persisted read offset and WAL files.

Error Handling

Both operations can return std::io::Error:
use walrus_rust::Walrus;
use std::io::ErrorKind;

let wal = Walrus::new()?;

// Handle write errors
match wal.append_for_topic("topic", b"data") {
    Ok(()) => println!("Write successful"),
    Err(e) if e.kind() == ErrorKind::OutOfMemory => {
        eprintln!("No space available for allocation");
    }
    Err(e) => eprintln!("Write failed: {}", e),
}

// Handle read errors
match wal.read_next("topic", true) {
    Ok(Some(entry)) => println!("Read: {:?}", entry.data),
    Ok(None) => println!("No more entries"),
    Err(e) if e.kind() == ErrorKind::InvalidData => {
        eprintln!("Checksum mismatch - data corruption");
    }
    Err(e) => eprintln!("Read failed: {}", e),
}

Performance Tips

  1. Batch When Possible: For multiple writes, use batch_append_for_topic for better throughput
  2. Adjust Fsync Schedule: Lower fsync frequency increases throughput at the cost of durability
  3. Use AtLeastOnce: For high-throughput reads, AtLeastOnce mode reduces persistence overhead
  4. Peek Strategically: Use checkpoint=false when you need to inspect data before committing to consumption

Next Steps