Storage Snapshot System
TOS implements an atomic snapshot mechanism that ensures database consistency across multi-step operations. This system provides transactional semantics for both RocksDB persistence and in-memory caches.
Implementation Status
| Feature | Status |
|---|---|
| Registry snapshot (A2A agents) | Implemented |
| Atomic commit/rollback | Implemented |
| RAII-based SnapshotGuard | Implemented |
| Read-your-writes consistency | Implemented |
| Health-check batch atomicity | Implemented |
The snapshot system is production-ready and handles all registry mutations atomically.
Why Snapshots?
Problem: Partial Updates
Without snapshots, multi-step operations can leave inconsistent state:
Operation: Register Agent
Step 1: Write to RocksDB [SUCCESS]
Step 2: Update memory cache [FAILURE - crash!]
Result: Database has agent, but memory doesn't
Next restart loads inconsistent stateSolution: Atomic Snapshots
With snapshots, all changes succeed or fail together:
Operation: Register Agent (with snapshot)
Start Snapshot: Clone current state
Step 1: Stage RocksDB write [PENDING]
Step 2: Stage memory update [PENDING]
Commit: Apply all changes [ATOMIC]
Result: Either ALL changes apply, or NONE doArchitecture
Core Components
RegistryStore
+-- db: Arc<DB> (RocksDB)
+-- cache: RegistryCache (In-memory state)
+-- snapshot: Option<RegistrySnapshot>
+-- batch: WriteBatch (Pending DB writes)
+-- cache: RegistryCache (Pending memory state)
+-- dirty_skills: HashSet (Modified index keys)
+-- dirty_accounts: HashSet (Modified account keys)RegistryCache Structure
pub struct RegistryCache {
/// Agent records by ID
pub agents: HashMap<Hash, RegisteredAgent>,
/// Skill-based index for filtering
pub index_by_skill: HashMap<String, HashSet<Hash>>,
/// Account-to-agent mapping
pub index_by_account: HashMap<Hash, Hash>,
}SnapshotGuard (RAII Pattern)
pub struct SnapshotGuard<'a> {
store: &'a mut RegistryStore,
committed: bool,
}
impl Drop for SnapshotGuard<'_> {
fn drop(&mut self) {
if !self.committed {
// Auto-rollback on failure or early return
self.store.end_snapshot(false);
}
}
}Snapshot Lifecycle
1. Start Snapshot
let mut guard = store.start_snapshot()?;
// Clones current cache into snapshot
// Initializes empty WriteBatch2. Apply Mutations
All mutations during snapshot:
- Write to snapshot cache (not main cache)
- Queue DB operations in WriteBatch
- Track dirty index keys
store.insert_agent(agent)?; // Writes to snapshot
store.update_agent(updated)?; // Writes to snapshot
store.remove_agent(&id)?; // Writes to snapshot3. Commit or Rollback
Commit (success):
guard.commit()?;
// 1. Write RocksDB batch atomically
// 2. Replace main cache with snapshot cacheRollback (failure):
drop(guard); // Automatic via RAII
// 1. Discard WriteBatch
// 2. Discard snapshot cache
// 3. Main cache unchangedRead-Your-Writes Consistency
The snapshot system solves the “read-your-writes” problem:
Problem Without Snapshots
Batch Operation:
1. Insert Agent A with skill "AI"
2. Insert Agent B with skill "AI"
3. Query: "get all agents with skill AI"
Without snapshot: Query may miss Agent A!
(RocksDB batch hasn't been committed yet)Solution With Snapshots
Batch Operation (with snapshot):
1. Insert Agent A -> Updates snapshot.cache
2. Insert Agent B -> Updates snapshot.cache
3. Query reads from snapshot.cache
With snapshot: Query sees BOTH agents!
(Reads from pending cache state)Usage Example
Agent Registration
pub async fn register_agent(
&self,
card: AgentCard,
endpoint: String,
) -> Result<RegisteredAgent, RegistryError> {
let mut store = self.store.write().await;
// Start atomic snapshot
let mut guard = store.start_snapshot()?;
// Perform registration (writes to snapshot)
let agent = RegisteredAgent::new(card, endpoint);
store.insert_agent(agent.clone())?;
// Commit all changes atomically
guard.commit()?;
Ok(agent)
}Batch Health Checks
pub async fn run_health_checks(&self) -> Result<(), RegistryError> {
let mut store = self.store.write().await;
let mut guard = store.start_snapshot()?;
// Check all agents (multiple updates)
for agent_id in store.cache().agents.keys() {
let status = check_health(agent_id).await;
store.update_agent_status(agent_id, status)?;
}
// All updates commit together
guard.commit()?;
Ok(())
}Error Handling
Snapshot Errors
pub enum RegistryError {
/// Attempted to start snapshot while one is active
SnapshotAlreadyActive,
/// Attempted operation requiring active snapshot
SnapshotNotActive,
/// Database write failure
DatabaseError(String),
}Commit Failure Recovery
If commit fails, the snapshot is preserved for retry:
let mut guard = store.start_snapshot()?;
store.insert_agent(agent)?;
match guard.commit() {
Ok(()) => { /* Success */ }
Err(e) => {
// Snapshot still active - can retry
guard.commit()?; // Try again
}
}Security Hardening
SSRF Protection
The registry validates all agent endpoint URLs:
fn validate_endpoint_url(url: &str) -> Result<(), RegistryError> {
// Block private IP ranges
// - 127.x.x.x (loopback)
// - 10.x.x.x (private)
// - 172.16-31.x.x (private)
// - 192.168.x.x (private)
// - Link-local addresses
}Response Size Limits
const MAX_RESPONSE_BODY_SIZE: usize = 1_048_576; // 1 MBFilter Input Limits
const MAX_FILTER_SKILLS: usize = 32;
const MAX_FILTER_INPUT_MODES: usize = 16;
const MAX_FILTER_OUTPUT_MODES: usize = 16;Data Layout
RocksDB Keys
agent:<hash> -> RegisteredAgent JSON
skill:<id> -> Vec<String> (agent_id hex)
account:<pub> -> String (agent_id hex)Index Rebuild Strategy
At commit time, indexes are rebuilt from cache (no DB reads):
fn rebuild_indexes(&mut self) {
for skill in &self.snapshot.dirty_skills {
let agent_ids: Vec<String> = self.snapshot.cache
.index_by_skill
.get(skill)
.map(|set| set.iter().map(|h| h.to_hex()).collect())
.unwrap_or_default();
self.snapshot.batch.put(
format!("skill:{}", skill),
serde_json::to_vec(&agent_ids)?
);
}
}Testing
Read-Your-Writes Test
#[test]
fn snapshot_read_your_writes() {
let mut store = RegistryStore::new_memory();
store.start_snapshot().unwrap();
// Insert two agents with same skill
store.insert_agent(agent_a_with_skill("AI")).unwrap();
store.insert_agent(agent_b_with_skill("AI")).unwrap();
// Query should see both in snapshot
let agents = store.filter_by_skill("AI");
assert_eq!(agents.len(), 2);
store.end_snapshot(true).unwrap();
}Rollback Test
#[test]
fn snapshot_rollback_discards_changes() {
let mut store = RegistryStore::new_memory();
store.start_snapshot().unwrap();
store.insert_agent(agent).unwrap();
// Rollback (don't commit)
store.end_snapshot(false).unwrap();
// Agent should not exist
assert!(store.cache().agents.is_empty());
}Performance Considerations
| Operation | Without Snapshot | With Snapshot |
|---|---|---|
| Single insert | ~100us | ~120us |
| Batch 100 inserts | ~10ms | ~5ms |
| Index query (mid-batch) | Inconsistent | Consistent |
| Rollback | N/A | ~50us |
Snapshots add minimal overhead for single operations but significantly improve batch performance by deferring disk writes.
Related Systems
The snapshot pattern is used across TOS:
- A2A Registry - Agent registration and health checks
- Storage Cache - Block and transaction caching
- Chain Sync - Atomic block acceptance
See Also
- A2A Protocol - Agent registration
- P2P Protocol - Chain synchronization