diff options
| author | murilo ijanc | 2026-03-24 15:04:03 -0300 |
|---|---|---|
| committer | murilo ijanc | 2026-03-24 15:04:03 -0300 |
| commit | 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch) | |
| tree | 53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/store_track.rs | |
| download | tesseras-dht-9821aabf0b50d2487b07502d3d2cd89e7d62bdbe.tar.gz | |
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks.
Features:
- Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE)
- NAT traversal via DTUN hole-punching and proxy relay
- Reliable Datagram Protocol (RDP) with 7-state connection machine
- Datagram transport with automatic fragmentation/reassembly
- Ed25519 packet authentication
- 256-bit node IDs (Ed25519 public keys)
- Rate limiting, ban list, and eclipse attack mitigation
- Persistence and metrics
- OpenBSD and Linux support
Diffstat (limited to 'src/store_track.rs')
| -rw-r--r-- | src/store_track.rs | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/src/store_track.rs b/src/store_track.rs new file mode 100644 index 0000000..a4ac78d --- /dev/null +++ b/src/store_track.rs @@ -0,0 +1,275 @@ +//! Store acknowledgment tracking. +//! +//! Tracks which STORE operations have been acknowledged +//! by remote peers. Failed stores are retried with +//! alternative peers to maintain data redundancy. + +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use crate::id::NodeId; +use crate::peers::PeerInfo; + +/// Maximum retry attempts before giving up. +const MAX_RETRIES: u32 = 3; + +/// Time to wait for a store acknowledgment before +/// considering it failed. +const STORE_TIMEOUT: Duration = Duration::from_secs(5); + +/// Interval between retry sweeps. +pub const RETRY_INTERVAL: Duration = Duration::from_secs(30); + +/// Tracks a pending STORE operation. +#[derive(Debug, Clone)] +struct PendingStore { + /// Target NodeId (SHA-256 of key). + target: NodeId, + /// Raw key bytes. + key: Vec<u8>, + /// Value bytes. + value: Vec<u8>, + /// TTL at time of store. + ttl: u16, + /// Whether the value is unique. + is_unique: bool, + /// Peer we sent the STORE to. + peer: PeerInfo, + /// When the STORE was sent. + sent_at: Instant, + /// Number of retry attempts. + retries: u32, +} + +/// Tracks pending and failed STORE operations. +pub struct StoreTracker { + /// Pending stores keyed by (nonce, peer_addr). + pending: HashMap<(NodeId, Vec<u8>), Vec<PendingStore>>, + /// Total successful stores. + pub acks: u64, + /// Total failed stores (exhausted retries). + pub failures: u64, +} + +impl StoreTracker { + pub fn new() -> Self { + Self { + pending: HashMap::new(), + acks: 0, + failures: 0, + } + } + + /// Record that a STORE was sent to a peer. + pub fn track( + &mut self, + target: NodeId, + key: Vec<u8>, + value: Vec<u8>, + ttl: u16, + is_unique: bool, + peer: PeerInfo, + ) { + let entry = PendingStore { + target, + key: key.clone(), + value, + ttl, + is_unique, + peer, + sent_at: Instant::now(), + retries: 0, + }; + self.pending.entry((target, key)).or_default().push(entry); + } + + /// Record a successful store acknowledgment from + /// a peer (they stored our value). + pub fn ack(&mut self, target: &NodeId, key: &[u8], peer_id: &NodeId) { + let k = (*target, key.to_vec()); + if let Some(stores) = self.pending.get_mut(&k) { + let before = stores.len(); + stores.retain(|s| s.peer.id != *peer_id); + let removed = before - stores.len(); + self.acks += removed as u64; + if stores.is_empty() { + self.pending.remove(&k); + } + } + } + + /// Collect stores that timed out and need retry. + /// Returns (target, key, value, ttl, is_unique, failed_peer) + /// for each timed-out store. + pub fn collect_timeouts(&mut self) -> Vec<RetryInfo> { + let mut retries = Vec::new(); + let mut exhausted_keys = Vec::new(); + + for (k, stores) in &mut self.pending { + stores.retain_mut(|s| { + if s.sent_at.elapsed() < STORE_TIMEOUT { + return true; // still waiting + } + if s.retries >= MAX_RETRIES { + // Exhausted retries + return false; + } + s.retries += 1; + retries.push(RetryInfo { + target: s.target, + key: s.key.clone(), + value: s.value.clone(), + ttl: s.ttl, + is_unique: s.is_unique, + failed_peer: s.peer.id, + }); + false // remove from pending (will be re-tracked if retried) + }); + if stores.is_empty() { + exhausted_keys.push(k.clone()); + } + } + + self.failures += exhausted_keys.len() as u64; + for k in &exhausted_keys { + self.pending.remove(k); + } + + retries + } + + /// Remove all expired tracking entries (older than + /// 2x timeout, cleanup safety net). + pub fn cleanup(&mut self) { + let cutoff = STORE_TIMEOUT * 2; + self.pending.retain(|_, stores| { + stores.retain(|s| s.sent_at.elapsed() < cutoff); + !stores.is_empty() + }); + } + + /// Number of pending store operations. + pub fn pending_count(&self) -> usize { + self.pending.values().map(|v| v.len()).sum() + } +} + +impl Default for StoreTracker { + fn default() -> Self { + Self::new() + } +} + +/// Information needed to retry a failed store. +pub struct RetryInfo { + pub target: NodeId, + pub key: Vec<u8>, + pub value: Vec<u8>, + pub ttl: u16, + pub is_unique: bool, + pub failed_peer: NodeId, +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::SocketAddr; + + fn peer(byte: u8, port: u16) -> PeerInfo { + PeerInfo::new( + NodeId::from_bytes([byte; 32]), + SocketAddr::from(([127, 0, 0, 1], port)), + ) + } + + #[test] + fn track_and_ack() { + let mut t = StoreTracker::new(); + let target = NodeId::from_key(b"k"); + let p = peer(0x01, 3000); + t.track(target, b"k".to_vec(), b"v".to_vec(), 300, false, p); + assert_eq!(t.pending_count(), 1); + + t.ack(&target, b"k", &NodeId::from_bytes([0x01; 32])); + assert_eq!(t.pending_count(), 0); + assert_eq!(t.acks, 1); + } + + #[test] + fn timeout_triggers_retry() { + let mut t = StoreTracker::new(); + let target = NodeId::from_key(b"k"); + let p = peer(0x01, 3000); + t.track(target, b"k".to_vec(), b"v".to_vec(), 300, false, p); + + // No timeouts yet + assert!(t.collect_timeouts().is_empty()); + + // Force timeout by waiting + std::thread::sleep(Duration::from_millis(10)); + + // Hack: modify sent_at to force timeout + for stores in t.pending.values_mut() { + for s in stores.iter_mut() { + s.sent_at = + Instant::now() - STORE_TIMEOUT - Duration::from_secs(1); + } + } + + let retries = t.collect_timeouts(); + assert_eq!(retries.len(), 1); + assert_eq!(retries[0].key, b"k"); + assert_eq!(retries[0].failed_peer, NodeId::from_bytes([0x01; 32])); + } + + #[test] + fn multiple_peers_tracked() { + let mut t = StoreTracker::new(); + let target = NodeId::from_key(b"k"); + t.track( + target, + b"k".to_vec(), + b"v".to_vec(), + 300, + false, + peer(0x01, 3000), + ); + t.track( + target, + b"k".to_vec(), + b"v".to_vec(), + 300, + false, + peer(0x02, 3001), + ); + assert_eq!(t.pending_count(), 2); + + // Ack from one peer + t.ack(&target, b"k", &NodeId::from_bytes([0x01; 32])); + assert_eq!(t.pending_count(), 1); + } + + #[test] + fn cleanup_removes_old() { + let mut t = StoreTracker::new(); + let target = NodeId::from_key(b"k"); + t.track( + target, + b"k".to_vec(), + b"v".to_vec(), + 300, + false, + peer(0x01, 3000), + ); + + // Force old timestamp + for stores in t.pending.values_mut() { + for s in stores.iter_mut() { + s.sent_at = Instant::now() - STORE_TIMEOUT * 3; + } + } + + t.cleanup(); + assert_eq!(t.pending_count(), 0); + } +} |