aboutsummaryrefslogtreecommitdiffstats
path: root/src/store_track.rs
diff options
context:
space:
mode:
authormurilo ijanc2026-03-24 15:04:03 -0300
committermurilo ijanc2026-03-24 15:04:03 -0300
commit9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch)
tree53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/store_track.rs
downloadtesseras-dht-0.1.0.tar.gz
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support
Diffstat (limited to 'src/store_track.rs')
-rw-r--r--src/store_track.rs275
1 files changed, 275 insertions, 0 deletions
diff --git a/src/store_track.rs b/src/store_track.rs
new file mode 100644
index 0000000..a4ac78d
--- /dev/null
+++ b/src/store_track.rs
@@ -0,0 +1,275 @@
+//! Store acknowledgment tracking.
+//!
+//! Tracks which STORE operations have been acknowledged
+//! by remote peers. Failed stores are retried with
+//! alternative peers to maintain data redundancy.
+
+use std::collections::HashMap;
+use std::time::{Duration, Instant};
+
+use crate::id::NodeId;
+use crate::peers::PeerInfo;
+
+/// Maximum retry attempts before giving up.
+const MAX_RETRIES: u32 = 3;
+
+/// Time to wait for a store acknowledgment before
+/// considering it failed.
+const STORE_TIMEOUT: Duration = Duration::from_secs(5);
+
+/// Interval between retry sweeps.
+pub const RETRY_INTERVAL: Duration = Duration::from_secs(30);
+
+/// Tracks a pending STORE operation.
+#[derive(Debug, Clone)]
+struct PendingStore {
+ /// Target NodeId (SHA-256 of key).
+ target: NodeId,
+ /// Raw key bytes.
+ key: Vec<u8>,
+ /// Value bytes.
+ value: Vec<u8>,
+ /// TTL at time of store.
+ ttl: u16,
+ /// Whether the value is unique.
+ is_unique: bool,
+ /// Peer we sent the STORE to.
+ peer: PeerInfo,
+ /// When the STORE was sent.
+ sent_at: Instant,
+ /// Number of retry attempts.
+ retries: u32,
+}
+
+/// Tracks pending and failed STORE operations.
+pub struct StoreTracker {
+ /// Pending stores keyed by (nonce, peer_addr).
+ pending: HashMap<(NodeId, Vec<u8>), Vec<PendingStore>>,
+ /// Total successful stores.
+ pub acks: u64,
+ /// Total failed stores (exhausted retries).
+ pub failures: u64,
+}
+
+impl StoreTracker {
+ pub fn new() -> Self {
+ Self {
+ pending: HashMap::new(),
+ acks: 0,
+ failures: 0,
+ }
+ }
+
+ /// Record that a STORE was sent to a peer.
+ pub fn track(
+ &mut self,
+ target: NodeId,
+ key: Vec<u8>,
+ value: Vec<u8>,
+ ttl: u16,
+ is_unique: bool,
+ peer: PeerInfo,
+ ) {
+ let entry = PendingStore {
+ target,
+ key: key.clone(),
+ value,
+ ttl,
+ is_unique,
+ peer,
+ sent_at: Instant::now(),
+ retries: 0,
+ };
+ self.pending.entry((target, key)).or_default().push(entry);
+ }
+
+ /// Record a successful store acknowledgment from
+ /// a peer (they stored our value).
+ pub fn ack(&mut self, target: &NodeId, key: &[u8], peer_id: &NodeId) {
+ let k = (*target, key.to_vec());
+ if let Some(stores) = self.pending.get_mut(&k) {
+ let before = stores.len();
+ stores.retain(|s| s.peer.id != *peer_id);
+ let removed = before - stores.len();
+ self.acks += removed as u64;
+ if stores.is_empty() {
+ self.pending.remove(&k);
+ }
+ }
+ }
+
+ /// Collect stores that timed out and need retry.
+ /// Returns (target, key, value, ttl, is_unique, failed_peer)
+ /// for each timed-out store.
+ pub fn collect_timeouts(&mut self) -> Vec<RetryInfo> {
+ let mut retries = Vec::new();
+ let mut exhausted_keys = Vec::new();
+
+ for (k, stores) in &mut self.pending {
+ stores.retain_mut(|s| {
+ if s.sent_at.elapsed() < STORE_TIMEOUT {
+ return true; // still waiting
+ }
+ if s.retries >= MAX_RETRIES {
+ // Exhausted retries
+ return false;
+ }
+ s.retries += 1;
+ retries.push(RetryInfo {
+ target: s.target,
+ key: s.key.clone(),
+ value: s.value.clone(),
+ ttl: s.ttl,
+ is_unique: s.is_unique,
+ failed_peer: s.peer.id,
+ });
+ false // remove from pending (will be re-tracked if retried)
+ });
+ if stores.is_empty() {
+ exhausted_keys.push(k.clone());
+ }
+ }
+
+ self.failures += exhausted_keys.len() as u64;
+ for k in &exhausted_keys {
+ self.pending.remove(k);
+ }
+
+ retries
+ }
+
+ /// Remove all expired tracking entries (older than
+ /// 2x timeout, cleanup safety net).
+ pub fn cleanup(&mut self) {
+ let cutoff = STORE_TIMEOUT * 2;
+ self.pending.retain(|_, stores| {
+ stores.retain(|s| s.sent_at.elapsed() < cutoff);
+ !stores.is_empty()
+ });
+ }
+
+ /// Number of pending store operations.
+ pub fn pending_count(&self) -> usize {
+ self.pending.values().map(|v| v.len()).sum()
+ }
+}
+
+impl Default for StoreTracker {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Information needed to retry a failed store.
+pub struct RetryInfo {
+ pub target: NodeId,
+ pub key: Vec<u8>,
+ pub value: Vec<u8>,
+ pub ttl: u16,
+ pub is_unique: bool,
+ pub failed_peer: NodeId,
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::net::SocketAddr;
+
+ fn peer(byte: u8, port: u16) -> PeerInfo {
+ PeerInfo::new(
+ NodeId::from_bytes([byte; 32]),
+ SocketAddr::from(([127, 0, 0, 1], port)),
+ )
+ }
+
+ #[test]
+ fn track_and_ack() {
+ let mut t = StoreTracker::new();
+ let target = NodeId::from_key(b"k");
+ let p = peer(0x01, 3000);
+ t.track(target, b"k".to_vec(), b"v".to_vec(), 300, false, p);
+ assert_eq!(t.pending_count(), 1);
+
+ t.ack(&target, b"k", &NodeId::from_bytes([0x01; 32]));
+ assert_eq!(t.pending_count(), 0);
+ assert_eq!(t.acks, 1);
+ }
+
+ #[test]
+ fn timeout_triggers_retry() {
+ let mut t = StoreTracker::new();
+ let target = NodeId::from_key(b"k");
+ let p = peer(0x01, 3000);
+ t.track(target, b"k".to_vec(), b"v".to_vec(), 300, false, p);
+
+ // No timeouts yet
+ assert!(t.collect_timeouts().is_empty());
+
+ // Force timeout by waiting
+ std::thread::sleep(Duration::from_millis(10));
+
+ // Hack: modify sent_at to force timeout
+ for stores in t.pending.values_mut() {
+ for s in stores.iter_mut() {
+ s.sent_at =
+ Instant::now() - STORE_TIMEOUT - Duration::from_secs(1);
+ }
+ }
+
+ let retries = t.collect_timeouts();
+ assert_eq!(retries.len(), 1);
+ assert_eq!(retries[0].key, b"k");
+ assert_eq!(retries[0].failed_peer, NodeId::from_bytes([0x01; 32]));
+ }
+
+ #[test]
+ fn multiple_peers_tracked() {
+ let mut t = StoreTracker::new();
+ let target = NodeId::from_key(b"k");
+ t.track(
+ target,
+ b"k".to_vec(),
+ b"v".to_vec(),
+ 300,
+ false,
+ peer(0x01, 3000),
+ );
+ t.track(
+ target,
+ b"k".to_vec(),
+ b"v".to_vec(),
+ 300,
+ false,
+ peer(0x02, 3001),
+ );
+ assert_eq!(t.pending_count(), 2);
+
+ // Ack from one peer
+ t.ack(&target, b"k", &NodeId::from_bytes([0x01; 32]));
+ assert_eq!(t.pending_count(), 1);
+ }
+
+ #[test]
+ fn cleanup_removes_old() {
+ let mut t = StoreTracker::new();
+ let target = NodeId::from_key(b"k");
+ t.track(
+ target,
+ b"k".to_vec(),
+ b"v".to_vec(),
+ 300,
+ false,
+ peer(0x01, 3000),
+ );
+
+ // Force old timestamp
+ for stores in t.pending.values_mut() {
+ for s in stores.iter_mut() {
+ s.sent_at = Instant::now() - STORE_TIMEOUT * 3;
+ }
+ }
+
+ t.cleanup();
+ assert_eq!(t.pending_count(), 0);
+ }
+}