//! Store acknowledgment tracking. //! //! Tracks which STORE operations have been acknowledged //! by remote peers. Failed stores are retried with //! alternative peers to maintain data redundancy. use std::collections::HashMap; use std::time::{Duration, Instant}; use crate::id::NodeId; use crate::peers::PeerInfo; /// Maximum retry attempts before giving up. const MAX_RETRIES: u32 = 3; /// Time to wait for a store acknowledgment before /// considering it failed. const STORE_TIMEOUT: Duration = Duration::from_secs(5); /// Interval between retry sweeps. pub const RETRY_INTERVAL: Duration = Duration::from_secs(30); /// Tracks a pending STORE operation. #[derive(Debug, Clone)] struct PendingStore { /// Target NodeId (SHA-256 of key). target: NodeId, /// Raw key bytes. key: Vec, /// Value bytes. value: Vec, /// TTL at time of store. ttl: u16, /// Whether the value is unique. is_unique: bool, /// Peer we sent the STORE to. peer: PeerInfo, /// When the STORE was sent. sent_at: Instant, /// Number of retry attempts. retries: u32, } /// Tracks pending and failed STORE operations. pub struct StoreTracker { /// Pending stores keyed by (nonce, peer_addr). pending: HashMap<(NodeId, Vec), Vec>, /// Total successful stores. pub acks: u64, /// Total failed stores (exhausted retries). pub failures: u64, } impl StoreTracker { pub fn new() -> Self { Self { pending: HashMap::new(), acks: 0, failures: 0, } } /// Record that a STORE was sent to a peer. pub fn track( &mut self, target: NodeId, key: Vec, value: Vec, ttl: u16, is_unique: bool, peer: PeerInfo, ) { let entry = PendingStore { target, key: key.clone(), value, ttl, is_unique, peer, sent_at: Instant::now(), retries: 0, }; self.pending.entry((target, key)).or_default().push(entry); } /// Record a successful store acknowledgment from /// a peer (they stored our value). pub fn ack(&mut self, target: &NodeId, key: &[u8], peer_id: &NodeId) { let k = (*target, key.to_vec()); if let Some(stores) = self.pending.get_mut(&k) { let before = stores.len(); stores.retain(|s| s.peer.id != *peer_id); let removed = before - stores.len(); self.acks += removed as u64; if stores.is_empty() { self.pending.remove(&k); } } } /// Collect stores that timed out and need retry. /// Returns (target, key, value, ttl, is_unique, failed_peer) /// for each timed-out store. pub fn collect_timeouts(&mut self) -> Vec { let mut retries = Vec::new(); let mut exhausted_keys = Vec::new(); for (k, stores) in &mut self.pending { stores.retain_mut(|s| { if s.sent_at.elapsed() < STORE_TIMEOUT { return true; // still waiting } if s.retries >= MAX_RETRIES { // Exhausted retries return false; } s.retries += 1; retries.push(RetryInfo { target: s.target, key: s.key.clone(), value: s.value.clone(), ttl: s.ttl, is_unique: s.is_unique, failed_peer: s.peer.id, }); false // remove from pending (will be re-tracked if retried) }); if stores.is_empty() { exhausted_keys.push(k.clone()); } } self.failures += exhausted_keys.len() as u64; for k in &exhausted_keys { self.pending.remove(k); } retries } /// Remove all expired tracking entries (older than /// 2x timeout, cleanup safety net). pub fn cleanup(&mut self) { let cutoff = STORE_TIMEOUT * 2; self.pending.retain(|_, stores| { stores.retain(|s| s.sent_at.elapsed() < cutoff); !stores.is_empty() }); } /// Number of pending store operations. pub fn pending_count(&self) -> usize { self.pending.values().map(|v| v.len()).sum() } } impl Default for StoreTracker { fn default() -> Self { Self::new() } } /// Information needed to retry a failed store. pub struct RetryInfo { pub target: NodeId, pub key: Vec, pub value: Vec, pub ttl: u16, pub is_unique: bool, pub failed_peer: NodeId, } #[cfg(test)] mod tests { use super::*; use std::net::SocketAddr; fn peer(byte: u8, port: u16) -> PeerInfo { PeerInfo::new( NodeId::from_bytes([byte; 32]), SocketAddr::from(([127, 0, 0, 1], port)), ) } #[test] fn track_and_ack() { let mut t = StoreTracker::new(); let target = NodeId::from_key(b"k"); let p = peer(0x01, 3000); t.track(target, b"k".to_vec(), b"v".to_vec(), 300, false, p); assert_eq!(t.pending_count(), 1); t.ack(&target, b"k", &NodeId::from_bytes([0x01; 32])); assert_eq!(t.pending_count(), 0); assert_eq!(t.acks, 1); } #[test] fn timeout_triggers_retry() { let mut t = StoreTracker::new(); let target = NodeId::from_key(b"k"); let p = peer(0x01, 3000); t.track(target, b"k".to_vec(), b"v".to_vec(), 300, false, p); // No timeouts yet assert!(t.collect_timeouts().is_empty()); // Force timeout by waiting std::thread::sleep(Duration::from_millis(10)); // Hack: modify sent_at to force timeout for stores in t.pending.values_mut() { for s in stores.iter_mut() { s.sent_at = Instant::now() - STORE_TIMEOUT - Duration::from_secs(1); } } let retries = t.collect_timeouts(); assert_eq!(retries.len(), 1); assert_eq!(retries[0].key, b"k"); assert_eq!(retries[0].failed_peer, NodeId::from_bytes([0x01; 32])); } #[test] fn multiple_peers_tracked() { let mut t = StoreTracker::new(); let target = NodeId::from_key(b"k"); t.track( target, b"k".to_vec(), b"v".to_vec(), 300, false, peer(0x01, 3000), ); t.track( target, b"k".to_vec(), b"v".to_vec(), 300, false, peer(0x02, 3001), ); assert_eq!(t.pending_count(), 2); // Ack from one peer t.ack(&target, b"k", &NodeId::from_bytes([0x01; 32])); assert_eq!(t.pending_count(), 1); } #[test] fn cleanup_removes_old() { let mut t = StoreTracker::new(); let target = NodeId::from_key(b"k"); t.track( target, b"k".to_vec(), b"v".to_vec(), 300, false, peer(0x01, 3000), ); // Force old timestamp for stores in t.pending.values_mut() { for s in stores.iter_mut() { s.sent_at = Instant::now() - STORE_TIMEOUT * 3; } } t.cleanup(); assert_eq!(t.pending_count(), 0); } }