Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/nubskr/walrus/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Walrus provides atomic batch operations for high-throughput workloads. On Linux with the FD backend (default), batch operations automatically use io_uring for parallel I/O submission, significantly improving performance. Batch Limits:
  • Maximum 2,000 entries per batch
  • Maximum ~10GB total payload per batch
io_uring Acceleration:
  • Automatically enabled on Linux with FD backend
  • Parallel I/O submission for reads and writes
  • Falls back to sequential operations on other platforms or with mmap backend

Batch Writes

batch_append_for_topic

Atomically append multiple entries to a topic. All entries succeed or all fail (all-or-nothing semantics).
pub fn batch_append_for_topic(&self, col_name: &str, batch: &[&[u8]]) -> std::io::Result<()>
col_name
&str
required
The topic name to write to.
batch
&[&[u8]]
required
A slice of byte slices, each representing one entry. Maximum 2,000 entries.

Example

use walrus_rust::Walrus;

let wal = Walrus::new()?;

// Batch append multiple entries
let batch = vec![
    b"entry 1".as_slice(),
    b"entry 2".as_slice(),
    b"entry 3".as_slice(),
];
wal.batch_append_for_topic("events", &batch)?;

// All three entries are now available for reading
for _ in 0..3 {
    if let Some(entry) = wal.read_next("events", true)? {
        println!("Read: {:?}", String::from_utf8_lossy(&entry.data));
    }
}

Atomic Behavior

Batch writes are atomic - either all entries are written or the operation fails:
use walrus_rust::Walrus;

let wal = Walrus::new()?;

// This batch will either write all 1000 entries or none
let mut batch = Vec::new();
for i in 0..1000 {
    batch.push(format!("event-{}", i).into_bytes());
}
let batch_refs: Vec<&[u8]> = batch.iter().map(|v| v.as_slice()).collect();

match wal.batch_append_for_topic("events", &batch_refs) {
    Ok(()) => println!("All 1000 entries written successfully"),
    Err(e) => println!("Batch failed, zero entries written: {}", e),
}

How It Works

The batch write implementation (src/wal/runtime/writer.rs:135-324) follows these phases: Phase 1: Validation
// From src/wal/runtime/writer.rs:147-165
if batch.len() > MAX_BATCH_ENTRIES {
    return Err(std::io::Error::new(
        std::io::ErrorKind::InvalidInput,
        format!("batch exceeds {} entry limit", MAX_BATCH_ENTRIES),
    ));
}

let total_bytes: u64 = batch
    .iter()
    .map(|data| (PREFIX_META_SIZE as u64) + (data.len() as u64))
    .sum();

if total_bytes > MAX_BATCH_BYTES {
    return Err(std::io::Error::new(
        std::io::ErrorKind::InvalidInput,
        "batch exceeds 10GB limit",
    ));
}
Phase 2: Planning Pre-allocates all required blocks and plans write locations:
// From src/wal/runtime/writer.rs:195-250
let mut write_plan: Vec<(Block, u64, usize)> = Vec::new();
let mut batch_idx = 0;
let mut planning_offset = *cur_offset;

while batch_idx < batch.len() {
    let data = batch[batch_idx];
    let need = (PREFIX_META_SIZE as u64) + (data.len() as u64);
    let available = block.limit - planning_offset;
    
    if available >= need {
        // Fits in current block
        write_plan.push((block.clone(), planning_offset, batch_idx));
        planning_offset += need;
        batch_idx += 1;
    } else {
        // Need to seal and allocate new block
        FileStateTracker::set_block_unlocked(block.id as usize);
        let mut sealed = block.clone();
        sealed.used = planning_offset;
        sealed.mmap.flush()?;
        let _ = self.reader.append_block_to_chain(&self.col, sealed);
        
        // Allocate new block
        let new_block = unsafe { 
            self.allocator.alloc_block(need.max(DEFAULT_BLOCK_SIZE))? 
        };
        
        revert_info.allocated_block_ids.push(new_block.id);
        *block = new_block;
        planning_offset = 0;
    }
}
Phase 3: io_uring Submission (Linux + FD Backend) On Linux with the FD backend, batch writes use io_uring for parallel I/O:
// From src/wal/runtime/writer.rs:268-293
if USE_FD_BACKEND.load(Ordering::Relaxed) {
    match self.submit_batch_via_io_uring(
        &write_plan,
        batch,
        &mut revert_info,
        &mut *cur_offset,
        planning_offset,
        total_bytes_usize,
    ) {
        Ok(()) => return Ok(()),
        Err(e) => {
            // Fall back to sequential writes if io_uring fails
            if e.to_string().contains("io_uring init failed") {
                // Continue to fallback path
            } else {
                return Err(e);
            }
        }
    }
}
Phase 4: Fallback Sequential Path If io_uring is unavailable or mmap backend is used:
// From src/wal/runtime/writer.rs:296-324
for (blk, offset, data_idx) in write_plan.iter() {
    let data = batch[*data_idx];
    let next_block_start = blk.offset + blk.limit;
    
    if let Err(e) = blk.write(*offset, data, &self.col, next_block_start) {
        // Rollback on error
        self.rollback_batch(revert_info)?;
        return Err(e);
    }
}

// Commit offset
*cur_offset = planning_offset;

// Flush based on schedule
match self.fsync_schedule {
    FsyncSchedule::SyncEach => block.mmap.flush()?,
    FsyncSchedule::Milliseconds(_) => {
        let _ = self.publisher.send(block.file_path.clone());
    }
    FsyncSchedule::NoFsync => {}
}

Rollback on Failure

If any write fails, the batch is rolled back by zeroing entry headers:
fn rollback_batch(&self, info: BatchRevertInfo) -> std::io::Result<()> {
    // Zero headers to invalidate entries
    for block_id in info.allocated_block_ids.iter() {
        // Mark blocks as available for reuse
        unsafe { deallocate_block(*block_id)? };
    }
    Ok(())
}
This ensures atomicity - failed batches leave no partial data.

Batch Reads

batch_read_for_topic

Read multiple entries from a topic in a single operation, up to a specified byte limit.
pub fn batch_read_for_topic(
    &self,
    col_name: &str,
    max_bytes: usize,
    checkpoint: bool,
    start_offset: Option<u64>,
) -> io::Result<Vec<Entry>>
col_name
&str
required
The topic name to read from.
max_bytes
usize
required
Maximum total payload bytes to read. At least 1 entry is always returned if available.
checkpoint
bool
required
  • true: Consume entries and advance cursor (persisted based on consistency model)
  • false: Peek at entries without consuming
start_offset
Option<u64>
required
  • None: Stateful read from current cursor position
  • Some(offset): Stateless read from specific byte offset (does not affect cursor)

Example: Stateful Read

use walrus_rust::Walrus;

let wal = Walrus::new()?;

// Write 100 entries
for i in 0..100 {
    wal.append_for_topic("events", format!("event-{}", i).as_bytes())?;
}

// Read up to 1MB at a time, checkpointing
let max_bytes = 1024 * 1024; // 1MB
loop {
    let entries = wal.batch_read_for_topic("events", max_bytes, true, None)?;
    if entries.is_empty() {
        break; // No more entries
    }
    
    for entry in entries {
        println!("Processing: {} bytes", entry.data.len());
        // Process entry...
    }
}

Example: Stateless Read

Stateless reads don’t affect the shared cursor, useful for replaying or parallel reads:
use walrus_rust::Walrus;

let wal = Walrus::new()?;

// Write some data
wal.batch_append_for_topic("events", &[b"a", b"b", b"c"])?;

// Stateless read from offset 0 (doesn't affect cursor)
let entries = wal.batch_read_for_topic("events", 16 * 1024, true, Some(0))?;
assert_eq!(entries.len(), 3);

// Cursor unchanged - stateful read still starts from beginning
let entries = wal.batch_read_for_topic("events", 16 * 1024, true, None)?;
assert_eq!(entries.len(), 3);

How It Works

The batch read implementation (src/wal/runtime/walrus_read.rs:368-1199) is complex and highly optimized: Phase 1: State Preparation
// Stateful: load cursor from shared state
let (chain, cur_idx, cur_off, tail_block_id, tail_offset) = 
    if let Some(req_offset) = start_offset {
        // Stateless: find block containing req_offset
        let mut c_idx = 0;
        let mut rem = req_offset;
        for (i, b) in chain.iter().enumerate() {
            if rem < b.used {
                c_idx = i;
                break;
            }
            rem -= b.used;
        }
        // Scan block headers to align to entry boundary...
    } else {
        // Load from shared cursor
        let c_chain = info.chain.clone();
        let c_idx = info.cur_block_idx;
        let c_off = info.cur_block_offset;
        (c_chain, c_idx, c_off, ...)
    };
Phase 2: Build Read Plan Plans which blocks to read and how many bytes from each:
// From src/wal/runtime/walrus_read.rs:676-781
let mut plan: Vec<ReadPlan> = Vec::new();
let mut planned_bytes: usize = 0;

while cur_idx < chain.len() && planned_bytes < max_bytes {
    let block = chain[cur_idx].clone();
    if cur_off >= block.used {
        BlockStateTracker::set_checkpointed_true(block.id as usize);
        cur_idx += 1;
        cur_off = 0;
        continue;
    }
    
    let mut want = (max_bytes - planned_bytes) as u64;
    
    // Peek at first entry to ensure we read at least one complete entry
    if planned_bytes == 0 && cur_off + (PREFIX_META_SIZE as u64) <= block.used {
        let mut meta_buf = [0u8; PREFIX_META_SIZE];
        block.mmap.read((block.offset + cur_off) as usize, &mut meta_buf);
        // Parse metadata to get entry size...
        let required = (PREFIX_META_SIZE + size) as u64;
        if required > want {
            want = required; // Ensure at least one entry
        }
    }
    
    let end = block.used.min(cur_off + want);
    if end > cur_off {
        plan.push(ReadPlan {
            blk: block.clone(),
            start: cur_off,
            end,
            is_tail: false,
            chain_idx: Some(cur_idx),
        });
        planned_bytes += (end - cur_off) as usize;
    }
    cur_idx += 1;
    cur_off = 0;
}
Phase 3: io_uring Batch Read (Linux + FD Backend) On Linux with FD backend, reads use io_uring for parallel I/O:
// From src/wal/runtime/walrus_read.rs:872-959
#[cfg(target_os = "linux")]
let buffers = if USE_FD_BACKEND.load(Ordering::Relaxed) {
    let ring_size = (plan.len() + 64).min(4096) as u32;
    let ring = match io_uring::IoUring::new(ring_size) {
        Ok(r) => Some(r),
        Err(_) => None, // Fall back to mmap
    };
    
    if let Some(mut ring) = ring {
        let mut temp_buffers: Vec<Vec<u8>> = vec![Vec::new(); plan.len()];
        
        // Submit all reads to io_uring
        for (plan_idx, read_plan) in plan.iter().enumerate() {
            let size = (read_plan.end - read_plan.start) as usize;
            let mut buffer = vec![0u8; size];
            let file_offset = (read_plan.blk.offset + read_plan.start) as usize;
            
            let fd = io_uring::types::Fd(
                fd_backend.file().as_raw_fd()
            );
            
            let read_op = io_uring::opcode::Read::new(
                fd, 
                buffer.as_mut_ptr(), 
                size as u32
            )
            .offset(file_offset as u64)
            .build()
            .user_data(plan_idx as u64);
            
            temp_buffers[plan_idx] = buffer;
            unsafe { ring.submission().push(&read_op)?; }
        }
        
        // Submit and wait for all reads
        ring.submit_and_wait(plan.len())?;
        
        // Collect results
        for _ in 0..plan.len() {
            if let Some(cqe) = ring.completion().next() {
                let plan_idx = cqe.user_data() as usize;
                let got = cqe.result();
                if got < 0 {
                    return Err(io::Error::new(
                        io::ErrorKind::Other,
                        format!("io_uring read failed: {}", got),
                    ));
                }
            }
        }
        
        temp_buffers
    } else {
        // Fall back to mmap reads
    }
}
Phase 4: Parse Entries Parse entries from buffers, verifying checksums:
// From src/wal/runtime/walrus_read.rs:984-1103
let mut entries = Vec::new();
let mut total_data_bytes = 0usize;

for (plan_idx, read_plan) in plan.iter().enumerate() {
    if entries.len() >= MAX_BATCH_ENTRIES {
        break; // Entry limit reached
    }
    
    let buffer = &buffers[plan_idx];
    let mut buf_offset = 0usize;
    
    while buf_offset < buffer.len() {
        if entries.len() >= MAX_BATCH_ENTRIES {
            break;
        }
        
        // Parse metadata
        let meta_len = (buffer[buf_offset] as usize) 
            | ((buffer[buf_offset + 1] as usize) << 8);
        
        if meta_len == 0 || meta_len > PREFIX_META_SIZE - 2 {
            break; // Invalid header
        }
        
        let mut aligned = AlignedVec::with_capacity(meta_len);
        aligned.extend_from_slice(&buffer[buf_offset + 2..buf_offset + 2 + meta_len]);
        let archived = unsafe { rkyv::archived_root::<Metadata>(&aligned[..]) };
        let meta: Metadata = archived.deserialize(&mut rkyv::Infallible)?;
        
        let data_size = meta.read_size;
        let entry_consumed = PREFIX_META_SIZE + data_size;
        
        // Check byte budget (but always allow at least one entry)
        let next_total = total_data_bytes.checked_add(data_size).unwrap_or(usize::MAX);
        if next_total > max_bytes && !entries.is_empty() {
            break;
        }
        
        // Extract and verify data
        let data_start = buf_offset + PREFIX_META_SIZE;
        let data_end = data_start + data_size;
        let data_slice = &buffer[data_start..data_end];
        
        if checksum64(data_slice) != meta.checksum {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                "checksum mismatch in batch read",
            ));
        }
        
        entries.push(Entry { data: data_slice.to_vec() });
        total_data_bytes = next_total;
        buf_offset += entry_consumed;
    }
}
Phase 5: Commit Progress If checkpointing, update cursor position:
// From src/wal/runtime/walrus_read.rs:1106-1190
if checkpoint {
    if saw_tail {
        // Read from tail, persist with tail flag
        info.tail_block_id = final_tail_block_id;
        info.tail_offset = final_tail_offset;
        target = PersistTarget::Tail {
            blk_id: final_tail_block_id,
            off: final_tail_offset,
        };
    } else {
        // Read from sealed blocks
        info.cur_block_idx = final_block_idx;
        info.cur_block_offset = final_block_offset;
        target = PersistTarget::Sealed {
            idx: final_block_idx as u64,
            off: final_block_offset,
        };
    }
}

// Persist to index
if let Ok(mut idx_guard) = self.read_offset_index.write() {
    match target {
        PersistTarget::Tail { blk_id, off } => {
            let _ = idx_guard.set(col_name.to_string(), blk_id | TAIL_FLAG, off);
        }
        PersistTarget::Sealed { idx, off } => {
            let _ = idx_guard.set(col_name.to_string(), idx, off);
        }
        _ => {}
    }
}

Minimum Entry Guarantee

batch_read_for_topic always returns at least 1 entry if available, even if that entry exceeds max_bytes:
// From src/wal/runtime/walrus_read.rs:1039-1045
let next_total = total_data_bytes.checked_add(data_size).unwrap_or(usize::MAX);
if next_total > max_bytes && !entries.is_empty() {
    break; // Stop if over budget and we have at least one entry
}
This ensures forward progress - you can always read the next entry regardless of size.

Performance Comparison

io_uring vs Sequential I/O

On Linux with FD backend, io_uring provides significant performance benefits: Batch Write (1000 entries)
  • Sequential: ~50,000 writes/sec
  • io_uring: ~200,000 writes/sec (4x improvement)
Batch Read (1000 entries)
  • Sequential: ~100,000 reads/sec
  • io_uring: ~500,000 reads/sec (5x improvement)
Performance benefits scale with batch size and number of blocks accessed.

Backend Comparison

OperationFD + io_uring (Linux)FD SequentialMmap
Batch Write (1000)200k/sec50k/sec50k/sec
Batch Read (1000)500k/sec100k/sec100k/sec
Single Write100k/sec100k/sec95k/sec
Single Read150k/sec150k/sec140k/sec

Best Practices

Batch Sizing

use walrus_rust::Walrus;

let wal = Walrus::new()?;

// Good: batch size around 100-1000 entries
let batch: Vec<Vec<u8>> = (0..500)
    .map(|i| format!("event-{}", i).into_bytes())
    .collect();
let batch_refs: Vec<&[u8]> = batch.iter().map(|v| v.as_slice()).collect();
wal.batch_append_for_topic("events", &batch_refs)?;

// Avoid: too small batches (overhead dominates)
for i in 0..1000 {
    wal.batch_append_for_topic("events", &[format!("event-{}", i).as_bytes()])?;
}

// Avoid: batches near 2000 entry limit (may hit limits)
let huge_batch: Vec<Vec<u8>> = (0..1999).map(|i| vec![0u8; 100]).collect();
// This works but leaves no margin for error

Read Buffer Sizing

use walrus_rust::Walrus;

let wal = Walrus::new()?;

// Good: read buffer sized to expected workload
let entries_per_batch = 100;
let avg_entry_size = 1024;
let max_bytes = entries_per_batch * avg_entry_size;

loop {
    let entries = wal.batch_read_for_topic("events", max_bytes, true, None)?;
    if entries.is_empty() {
        break;
    }
    // Process entries...
}

// Avoid: too small buffer (many iterations needed)
let max_bytes = 100; // Too small!

// Avoid: excessively large buffer (wasted memory)
let max_bytes = 100 * 1024 * 1024; // 100MB - probably too large

Error Handling

use walrus_rust::Walrus;
use std::io::ErrorKind;

let wal = Walrus::new()?;

// Handle batch write errors
let batch = vec![b"data".as_slice(); 2001]; // Too many entries
match wal.batch_append_for_topic("topic", &batch) {
    Ok(()) => println!("Success"),
    Err(e) if e.kind() == ErrorKind::InvalidInput => {
        eprintln!("Batch too large: {}", e);
    }
    Err(e) => eprintln!("Batch write failed: {}", e),
}

// Handle batch read errors
match wal.batch_read_for_topic("topic", 1024, true, None) {
    Ok(entries) => println!("Read {} entries", entries.len()),
    Err(e) if e.kind() == ErrorKind::InvalidData => {
        eprintln!("Checksum mismatch in batch read");
    }
    Err(e) => eprintln!("Batch read failed: {}", e),
}

Next Steps