Skip to Content
FeaturesStorage Snapshot

Storage Snapshot System

TOS implements an atomic snapshot mechanism that ensures database consistency across multi-step operations. This system provides transactional semantics for both RocksDB persistence and in-memory caches.

Implementation Status

FeatureStatus
Registry snapshot (A2A agents)Implemented
Atomic commit/rollbackImplemented
RAII-based SnapshotGuardImplemented
Read-your-writes consistencyImplemented
Health-check batch atomicityImplemented

The snapshot system is production-ready and handles all registry mutations atomically.

Why Snapshots?

Problem: Partial Updates

Without snapshots, multi-step operations can leave inconsistent state:

Operation: Register Agent Step 1: Write to RocksDB [SUCCESS] Step 2: Update memory cache [FAILURE - crash!] Result: Database has agent, but memory doesn't Next restart loads inconsistent state

Solution: Atomic Snapshots

With snapshots, all changes succeed or fail together:

Operation: Register Agent (with snapshot) Start Snapshot: Clone current state Step 1: Stage RocksDB write [PENDING] Step 2: Stage memory update [PENDING] Commit: Apply all changes [ATOMIC] Result: Either ALL changes apply, or NONE do

Architecture

Core Components

RegistryStore +-- db: Arc<DB> (RocksDB) +-- cache: RegistryCache (In-memory state) +-- snapshot: Option<RegistrySnapshot> +-- batch: WriteBatch (Pending DB writes) +-- cache: RegistryCache (Pending memory state) +-- dirty_skills: HashSet (Modified index keys) +-- dirty_accounts: HashSet (Modified account keys)

RegistryCache Structure

pub struct RegistryCache { /// Agent records by ID pub agents: HashMap<Hash, RegisteredAgent>, /// Skill-based index for filtering pub index_by_skill: HashMap<String, HashSet<Hash>>, /// Account-to-agent mapping pub index_by_account: HashMap<Hash, Hash>, }

SnapshotGuard (RAII Pattern)

pub struct SnapshotGuard<'a> { store: &'a mut RegistryStore, committed: bool, } impl Drop for SnapshotGuard<'_> { fn drop(&mut self) { if !self.committed { // Auto-rollback on failure or early return self.store.end_snapshot(false); } } }

Snapshot Lifecycle

1. Start Snapshot

let mut guard = store.start_snapshot()?; // Clones current cache into snapshot // Initializes empty WriteBatch

2. Apply Mutations

All mutations during snapshot:

  • Write to snapshot cache (not main cache)
  • Queue DB operations in WriteBatch
  • Track dirty index keys
store.insert_agent(agent)?; // Writes to snapshot store.update_agent(updated)?; // Writes to snapshot store.remove_agent(&id)?; // Writes to snapshot

3. Commit or Rollback

Commit (success):

guard.commit()?; // 1. Write RocksDB batch atomically // 2. Replace main cache with snapshot cache

Rollback (failure):

drop(guard); // Automatic via RAII // 1. Discard WriteBatch // 2. Discard snapshot cache // 3. Main cache unchanged

Read-Your-Writes Consistency

The snapshot system solves the “read-your-writes” problem:

Problem Without Snapshots

Batch Operation: 1. Insert Agent A with skill "AI" 2. Insert Agent B with skill "AI" 3. Query: "get all agents with skill AI" Without snapshot: Query may miss Agent A! (RocksDB batch hasn't been committed yet)

Solution With Snapshots

Batch Operation (with snapshot): 1. Insert Agent A -> Updates snapshot.cache 2. Insert Agent B -> Updates snapshot.cache 3. Query reads from snapshot.cache With snapshot: Query sees BOTH agents! (Reads from pending cache state)

Usage Example

Agent Registration

pub async fn register_agent( &self, card: AgentCard, endpoint: String, ) -> Result<RegisteredAgent, RegistryError> { let mut store = self.store.write().await; // Start atomic snapshot let mut guard = store.start_snapshot()?; // Perform registration (writes to snapshot) let agent = RegisteredAgent::new(card, endpoint); store.insert_agent(agent.clone())?; // Commit all changes atomically guard.commit()?; Ok(agent) }

Batch Health Checks

pub async fn run_health_checks(&self) -> Result<(), RegistryError> { let mut store = self.store.write().await; let mut guard = store.start_snapshot()?; // Check all agents (multiple updates) for agent_id in store.cache().agents.keys() { let status = check_health(agent_id).await; store.update_agent_status(agent_id, status)?; } // All updates commit together guard.commit()?; Ok(()) }

Error Handling

Snapshot Errors

pub enum RegistryError { /// Attempted to start snapshot while one is active SnapshotAlreadyActive, /// Attempted operation requiring active snapshot SnapshotNotActive, /// Database write failure DatabaseError(String), }

Commit Failure Recovery

If commit fails, the snapshot is preserved for retry:

let mut guard = store.start_snapshot()?; store.insert_agent(agent)?; match guard.commit() { Ok(()) => { /* Success */ } Err(e) => { // Snapshot still active - can retry guard.commit()?; // Try again } }

Security Hardening

SSRF Protection

The registry validates all agent endpoint URLs:

fn validate_endpoint_url(url: &str) -> Result<(), RegistryError> { // Block private IP ranges // - 127.x.x.x (loopback) // - 10.x.x.x (private) // - 172.16-31.x.x (private) // - 192.168.x.x (private) // - Link-local addresses }

Response Size Limits

const MAX_RESPONSE_BODY_SIZE: usize = 1_048_576; // 1 MB

Filter Input Limits

const MAX_FILTER_SKILLS: usize = 32; const MAX_FILTER_INPUT_MODES: usize = 16; const MAX_FILTER_OUTPUT_MODES: usize = 16;

Data Layout

RocksDB Keys

agent:<hash> -> RegisteredAgent JSON skill:<id> -> Vec<String> (agent_id hex) account:<pub> -> String (agent_id hex)

Index Rebuild Strategy

At commit time, indexes are rebuilt from cache (no DB reads):

fn rebuild_indexes(&mut self) { for skill in &self.snapshot.dirty_skills { let agent_ids: Vec<String> = self.snapshot.cache .index_by_skill .get(skill) .map(|set| set.iter().map(|h| h.to_hex()).collect()) .unwrap_or_default(); self.snapshot.batch.put( format!("skill:{}", skill), serde_json::to_vec(&agent_ids)? ); } }

Testing

Read-Your-Writes Test

#[test] fn snapshot_read_your_writes() { let mut store = RegistryStore::new_memory(); store.start_snapshot().unwrap(); // Insert two agents with same skill store.insert_agent(agent_a_with_skill("AI")).unwrap(); store.insert_agent(agent_b_with_skill("AI")).unwrap(); // Query should see both in snapshot let agents = store.filter_by_skill("AI"); assert_eq!(agents.len(), 2); store.end_snapshot(true).unwrap(); }

Rollback Test

#[test] fn snapshot_rollback_discards_changes() { let mut store = RegistryStore::new_memory(); store.start_snapshot().unwrap(); store.insert_agent(agent).unwrap(); // Rollback (don't commit) store.end_snapshot(false).unwrap(); // Agent should not exist assert!(store.cache().agents.is_empty()); }

Performance Considerations

OperationWithout SnapshotWith Snapshot
Single insert~100us~120us
Batch 100 inserts~10ms~5ms
Index query (mid-batch)InconsistentConsistent
RollbackN/A~50us

Snapshots add minimal overhead for single operations but significantly improve batch performance by deferring disk writes.

The snapshot pattern is used across TOS:

  • A2A Registry - Agent registration and health checks
  • Storage Cache - Block and transaction caching
  • Chain Sync - Atomic block acceptance

See Also

Last updated on