diff options
| author | murilo ijanc | 2026-03-24 15:04:03 -0300 |
|---|---|---|
| committer | murilo ijanc | 2026-03-24 15:04:03 -0300 |
| commit | 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch) | |
| tree | 53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/dht.rs | |
| download | tesseras-dht-e908bc01403f4b8ef2a65fa6be43716fd1c6e003.tar.gz | |
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks.
Features:
- Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE)
- NAT traversal via DTUN hole-punching and proxy relay
- Reliable Datagram Protocol (RDP) with 7-state connection machine
- Datagram transport with automatic fragmentation/reassembly
- Ed25519 packet authentication
- 256-bit node IDs (Ed25519 public keys)
- Rate limiting, ban list, and eclipse attack mitigation
- Persistence and metrics
- OpenBSD and Linux support
Diffstat (limited to 'src/dht.rs')
| -rw-r--r-- | src/dht.rs | 1028 |
1 files changed, 1028 insertions, 0 deletions
diff --git a/src/dht.rs b/src/dht.rs new file mode 100644 index 0000000..72ec019 --- /dev/null +++ b/src/dht.rs @@ -0,0 +1,1028 @@ +//! Kademlia DHT logic: store, find_node, find_value. +//! +//! Uses explicit state machines for iterative queries +//! instead of nested callbacks. + +use std::collections::{HashMap, HashSet}; +use std::time::{Duration, Instant}; + +use crate::id::NodeId; +use crate::peers::PeerInfo; +use crate::routing::NUM_FIND_NODE; + +// ── Constants ──────────────────────────────────────── + +/// Max parallel queries per lookup. +pub const MAX_QUERY: usize = 6; + +/// Timeout for a single RPC query (seconds). +pub const QUERY_TIMEOUT: Duration = Duration::from_secs(3); + +/// Interval between data restore cycles. +pub const RESTORE_INTERVAL: Duration = Duration::from_secs(120); + +/// Slow maintenance timer (refresh + restore). +pub const SLOW_TIMER_INTERVAL: Duration = Duration::from_secs(600); + +/// Fast maintenance timer (expire + sweep). +pub const FAST_TIMER_INTERVAL: Duration = Duration::from_secs(60); + +/// Number of original replicas for a put. +pub const ORIGINAL_PUT_NUM: i32 = 3; + +/// Timeout waiting for all values in find_value. +pub const RECVD_VALUE_TIMEOUT: Duration = Duration::from_secs(3); + +/// RDP port for store operations. +pub const RDP_STORE_PORT: u16 = 100; + +/// RDP port for get operations. +pub const RDP_GET_PORT: u16 = 101; + +/// RDP connection timeout. +pub const RDP_TIMEOUT: Duration = Duration::from_secs(30); + +// ── Stored data ───────────────────────────────────── + +/// A single stored value with metadata. +#[derive(Debug, Clone)] +pub struct StoredValue { + pub key: Vec<u8>, + pub value: Vec<u8>, + pub id: NodeId, + pub source: NodeId, + pub ttl: u16, + pub stored_at: Instant, + pub is_unique: bool, + + /// Number of original puts remaining. Starts at + /// `ORIGINAL_PUT_NUM` for originator, 0 for replicas. + pub original: i32, + + /// Set of node IDs that already received this value + /// during restore, to avoid duplicate sends. + pub recvd: HashSet<NodeId>, + + /// Monotonic version timestamp. Newer versions + /// (higher value) replace older ones from the same + /// source. Prevents stale replicas from overwriting + /// fresh data during restore/republish. + pub version: u64, +} + +/// Generate a monotonic version number based on the +/// current time (milliseconds since epoch, truncated +/// to u64). Sufficient for conflict resolution — two +/// stores in the same millisecond will have the same +/// version (tie-break: last write wins). +pub fn now_version() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + +impl StoredValue { + /// Check if this value has expired. + pub fn is_expired(&self) -> bool { + self.stored_at.elapsed() >= Duration::from_secs(self.ttl as u64) + } + + /// Remaining TTL in seconds. + pub fn remaining_ttl(&self) -> u16 { + let elapsed = self.stored_at.elapsed().as_secs(); + if elapsed >= self.ttl as u64 { + 0 + } else { + (self.ttl as u64 - elapsed) as u16 + } + } +} + +// ── Storage container ─────────────────────────────── + +/// Key for the two-level storage map: +/// first level is the target NodeId (SHA1 of key), +/// second level is the raw key bytes. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct StorageKey { + raw: Vec<u8>, +} + +/// Container for stored DHT values. +/// +/// Maps target_id -> raw_key -> set of values. +/// Maps target_id -> raw_key -> set of values. +pub struct DhtStorage { + data: HashMap<NodeId, HashMap<StorageKey, Vec<StoredValue>>>, + /// Maximum number of stored values (0 = unlimited). + max_entries: usize, +} + +/// Default maximum storage entries. +const DEFAULT_MAX_STORAGE: usize = 65536; + +impl DhtStorage { + pub fn new() -> Self { + Self { + data: HashMap::new(), + max_entries: DEFAULT_MAX_STORAGE, + } + } + + /// Set the maximum number of stored values. + pub fn set_max_entries(&mut self, max: usize) { + self.max_entries = max; + } + + /// Store a value. Handles `is_unique` semantics and + /// version-based conflict resolution. + /// + /// If a value with the same key from the same source + /// already exists with a higher version, the store is + /// rejected (prevents stale replicas from overwriting + /// fresh data). + pub fn store(&mut self, val: StoredValue) { + // Enforce storage limit + if self.max_entries > 0 && self.len() >= self.max_entries { + log::warn!( + "Storage full ({} entries), dropping store", + self.max_entries + ); + return; + } + + let key = StorageKey { + raw: val.key.clone(), + }; + let entry = + self.data.entry(val.id).or_default().entry(key).or_default(); + + if val.is_unique { + // Unique: replace existing from same source, + // but only if version is not older + if let Some(pos) = entry.iter().position(|v| v.source == val.source) + { + if val.version > 0 + && entry[pos].version > 0 + && val.version < entry[pos].version + { + log::debug!( + "Rejecting stale unique store: v{} < v{}", + val.version, + entry[pos].version, + ); + return; + } + entry[pos] = val; + } else if entry.is_empty() || !entry[0].is_unique { + entry.clear(); + entry.push(val); + } + return; + } + + // Non-unique: update if same value exists (any + // source), or append. Check version on update. + if let Some(pos) = entry.iter().position(|v| v.value == val.value) { + if val.version > 0 + && entry[pos].version > 0 + && val.version < entry[pos].version + { + log::debug!( + "Rejecting stale store: v{} < v{}", + val.version, + entry[pos].version, + ); + return; + } + entry[pos].ttl = val.ttl; + entry[pos].stored_at = val.stored_at; + entry[pos].version = val.version; + } else { + // Don't add if existing data is unique + if entry.len() == 1 && entry[0].is_unique { + return; + } + entry.push(val); + } + } + + /// Remove a specific value. + pub fn remove(&mut self, id: &NodeId, raw_key: &[u8]) { + let key = StorageKey { + raw: raw_key.to_vec(), + }; + if let Some(inner) = self.data.get_mut(id) { + inner.remove(&key); + if inner.is_empty() { + self.data.remove(id); + } + } + } + + /// Get all values for a target ID and key. + pub fn get(&self, id: &NodeId, raw_key: &[u8]) -> Vec<StoredValue> { + let key = StorageKey { + raw: raw_key.to_vec(), + }; + self.data + .get(id) + .and_then(|inner| inner.get(&key)) + .map(|vals| { + vals.iter().filter(|v| !v.is_expired()).cloned().collect() + }) + .unwrap_or_default() + } + + /// Remove all expired values. + pub fn expire(&mut self) { + self.data.retain(|_, inner| { + inner.retain(|_, vals| { + vals.retain(|v| !v.is_expired()); + !vals.is_empty() + }); + !inner.is_empty() + }); + } + + /// Iterate over all stored values (for restore). + pub fn all_values(&self) -> Vec<StoredValue> { + self.data + .values() + .flat_map(|inner| inner.values()) + .flat_map(|vals| vals.iter()) + .filter(|v| !v.is_expired()) + .cloned() + .collect() + } + + /// Decrement the `original` counter for a value. + /// Returns the new count, or -1 if not found + /// (in which case the value is inserted). + pub fn dec_original(&mut self, val: &StoredValue) -> i32 { + let key = StorageKey { + raw: val.key.clone(), + }; + if let Some(inner) = self.data.get_mut(&val.id) { + if let Some(vals) = inner.get_mut(&key) { + if let Some(existing) = vals + .iter_mut() + .find(|v| v.value == val.value && v.source == val.source) + { + if existing.original > 0 { + existing.original -= 1; + } + return existing.original; + } + } + } + + // Not found: insert it + self.store(val.clone()); + -1 + } + + /// Mark a node as having received a stored value + /// (for restore deduplication). + pub fn mark_received(&mut self, val: &StoredValue, node_id: NodeId) { + let key = StorageKey { + raw: val.key.clone(), + }; + if let Some(inner) = self.data.get_mut(&val.id) { + if let Some(vals) = inner.get_mut(&key) { + if let Some(existing) = + vals.iter_mut().find(|v| v.value == val.value) + { + existing.recvd.insert(node_id); + } + } + } + } + + /// Total number of stored values. + pub fn len(&self) -> usize { + self.data + .values() + .flat_map(|inner| inner.values()) + .map(|vals| vals.len()) + .sum() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl Default for DhtStorage { + fn default() -> Self { + Self::new() + } +} + +// ── Iterative query state machine ─────────────────── + +/// Phase of an iterative Kademlia query. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum QueryPhase { + /// Actively searching: sending queries and processing + /// replies. + Searching, + + /// No closer nodes found in last round: query has + /// converged. + Converged, + + /// Query complete: results are ready. + Done, +} + +/// State of an iterative FIND_NODE or FIND_VALUE query. +/// +/// Uses an explicit state machine for iterative lookup. +/// Maximum duration for an iterative query before +/// returning best-effort results. +pub const MAX_QUERY_DURATION: Duration = Duration::from_secs(30); + +pub struct IterativeQuery { + pub target: NodeId, + pub closest: Vec<PeerInfo>, + pub queried: HashSet<NodeId>, + pub pending: HashMap<NodeId, Instant>, + pub phase: QueryPhase, + pub is_find_value: bool, + pub key: Vec<u8>, + pub values: Vec<Vec<u8>>, + pub nonce: u32, + pub started_at: Instant, + + /// Number of iterative rounds completed. Each round + /// is a batch of queries followed by reply processing. + /// Measures the "depth" of the lookup — useful for + /// diagnosing network topology and routing efficiency. + pub hops: u32, +} + +impl IterativeQuery { + /// Create a new FIND_NODE query. + pub fn find_node(target: NodeId, nonce: u32) -> Self { + Self { + target, + closest: Vec::new(), + queried: HashSet::new(), + pending: HashMap::new(), + phase: QueryPhase::Searching, + is_find_value: false, + key: Vec::new(), + values: Vec::new(), + nonce, + started_at: Instant::now(), + hops: 0, + } + } + + /// Create a new FIND_VALUE query. + pub fn find_value(target: NodeId, key: Vec<u8>, nonce: u32) -> Self { + Self { + target, + closest: Vec::new(), + queried: HashSet::new(), + pending: HashMap::new(), + phase: QueryPhase::Searching, + is_find_value: true, + key, + values: Vec::new(), + nonce, + started_at: Instant::now(), + hops: 0, + } + } + + /// Select the next batch of peers to query. + /// + /// Returns up to `MAX_QUERY` un-queried peers from + /// `closest`, sorted by XOR distance to target. + pub fn next_to_query(&self) -> Vec<PeerInfo> { + let max = MAX_QUERY.saturating_sub(self.pending.len()); + self.closest + .iter() + .filter(|p| { + !self.queried.contains(&p.id) + && !self.pending.contains_key(&p.id) + }) + .take(max) + .cloned() + .collect() + } + + /// Process a reply: merge new nodes into closest, + /// remove from pending, detect convergence. + /// Increments the hop counter for route length + /// tracking. + pub fn process_reply(&mut self, from: &NodeId, nodes: Vec<PeerInfo>) { + self.pending.remove(from); + self.queried.insert(*from); + self.hops += 1; + + let prev_best = + self.closest.first().map(|p| self.target.distance(&p.id)); + + // Merge new nodes + for node in nodes { + if node.id != self.target + && !self.closest.iter().any(|c| c.id == node.id) + { + self.closest.push(node); + } + } + + // Sort by XOR distance + self.closest.sort_by(|a, b| { + let da = self.target.distance(&a.id); + let db = self.target.distance(&b.id); + da.cmp(&db) + }); + + // Trim to NUM_FIND_NODE + self.closest.truncate(NUM_FIND_NODE); + + // Check convergence: did the closest node change? + let new_best = + self.closest.first().map(|p| self.target.distance(&p.id)); + + if prev_best == new_best && self.pending.is_empty() { + self.phase = QueryPhase::Converged; + } + } + + /// Process a value reply (for FIND_VALUE). + pub fn process_value(&mut self, value: Vec<u8>) { + self.values.push(value); + self.phase = QueryPhase::Done; + } + + /// Mark a peer as timed out. + pub fn timeout(&mut self, id: &NodeId) { + self.pending.remove(id); + if self.pending.is_empty() && self.next_to_query().is_empty() { + self.phase = QueryPhase::Done; + } + } + + /// Expire all pending queries that have exceeded + /// the timeout. + pub fn expire_pending(&mut self) { + let expired: Vec<NodeId> = self + .pending + .iter() + .filter(|(_, sent_at)| sent_at.elapsed() >= QUERY_TIMEOUT) + .map(|(id, _)| *id) + .collect(); + + for id in expired { + self.timeout(&id); + } + } + + /// Check if the query is complete (converged, + /// finished, or timed out). + pub fn is_done(&self) -> bool { + self.phase == QueryPhase::Done + || self.phase == QueryPhase::Converged + || self.started_at.elapsed() >= MAX_QUERY_DURATION + } +} + +// ── Maintenance: mask_bit exploration ─────────────── + +/// Systematic exploration of the 256-bit ID space. +/// +/// Generates target IDs for find_node queries that probe +/// different regions of the network, populating distant +/// k-buckets that would otherwise remain empty. +/// +/// Used by both DHT and DTUN maintenance. +pub struct MaskBitExplorer { + local_id: NodeId, + mask_bit: usize, +} + +impl MaskBitExplorer { + pub fn new(local_id: NodeId) -> Self { + Self { + local_id, + mask_bit: 1, + } + } + + /// Generate the next pair of exploration targets. + /// + /// Each call produces two targets by clearing specific + /// bits in the local ID, then advances by 2 bits. + /// After bit 20, resets to 1. + pub fn next_targets(&mut self) -> (NodeId, NodeId) { + let id_bytes = *self.local_id.as_bytes(); + + let t1 = Self::clear_bit(id_bytes, self.mask_bit); + let t2 = Self::clear_bit(id_bytes, self.mask_bit + 1); + + self.mask_bit += 2; + if self.mask_bit > 20 { + self.mask_bit = 1; + } + + (t1, t2) + } + + /// Current mask_bit position (for testing). + pub fn position(&self) -> usize { + self.mask_bit + } + + fn clear_bit( + mut bytes: [u8; crate::id::ID_LEN], + bit_from_msb: usize, + ) -> NodeId { + if bit_from_msb == 0 || bit_from_msb > crate::id::ID_BITS { + return NodeId::from_bytes(bytes); + } + let pos = bit_from_msb - 1; // 0-indexed + let byte_idx = pos / 8; + let bit_idx = 7 - (pos % 8); + bytes[byte_idx] &= !(1 << bit_idx); + NodeId::from_bytes(bytes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::SocketAddr; + + fn make_peer(byte: u8, port: u16) -> PeerInfo { + PeerInfo::new( + NodeId::from_bytes([byte; 32]), + SocketAddr::from(([127, 0, 0, 1], port)), + ) + } + + // ── DhtStorage tests ──────────────────────────── + + #[test] + fn storage_store_and_get() { + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"test-key"); + let val = StoredValue { + key: b"test-key".to_vec(), + value: b"hello".to_vec(), + id, + source: NodeId::from_bytes([0x01; 32]), + ttl: 300, + stored_at: Instant::now(), + is_unique: false, + original: 3, + recvd: HashSet::new(), + version: 0, + }; + s.store(val); + let got = s.get(&id, b"test-key"); + assert_eq!(got.len(), 1); + assert_eq!(got[0].value, b"hello"); + } + + #[test] + fn storage_unique_replaces() { + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"uk"); + let src = NodeId::from_bytes([0x01; 32]); + + let v1 = StoredValue { + key: b"uk".to_vec(), + value: b"v1".to_vec(), + id, + source: src, + ttl: 300, + stored_at: Instant::now(), + is_unique: true, + original: 3, + recvd: HashSet::new(), + version: 0, + }; + s.store(v1); + + let v2 = StoredValue { + key: b"uk".to_vec(), + value: b"v2".to_vec(), + id, + source: src, + ttl: 300, + stored_at: Instant::now(), + is_unique: true, + original: 3, + recvd: HashSet::new(), + version: 0, + }; + s.store(v2); + + let got = s.get(&id, b"uk"); + assert_eq!(got.len(), 1); + assert_eq!(got[0].value, b"v2"); + } + + #[test] + fn storage_unique_rejects_other_source() { + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"uk"); + + let v1 = StoredValue { + key: b"uk".to_vec(), + value: b"v1".to_vec(), + id, + source: NodeId::from_bytes([0x01; 32]), + ttl: 300, + stored_at: Instant::now(), + is_unique: true, + original: 3, + recvd: HashSet::new(), + version: 0, + }; + s.store(v1); + + let v2 = StoredValue { + key: b"uk".to_vec(), + value: b"v2".to_vec(), + id, + source: NodeId::from_bytes([0x02; 32]), + ttl: 300, + stored_at: Instant::now(), + is_unique: true, + original: 3, + recvd: HashSet::new(), + version: 0, + }; + s.store(v2); + + let got = s.get(&id, b"uk"); + assert_eq!(got.len(), 1); + assert_eq!(got[0].value, b"v1"); + } + + #[test] + fn storage_multiple_non_unique() { + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"k"); + + for i in 0..3u8 { + s.store(StoredValue { + key: b"k".to_vec(), + value: vec![i], + id, + source: NodeId::from_bytes([i; 32]), + ttl: 300, + stored_at: Instant::now(), + is_unique: false, + original: 0, + recvd: HashSet::new(), + version: 0, + }); + } + + assert_eq!(s.get(&id, b"k").len(), 3); + } + + #[test] + fn storage_remove() { + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"k"); + s.store(StoredValue { + key: b"k".to_vec(), + value: b"v".to_vec(), + id, + source: NodeId::from_bytes([0x01; 32]), + ttl: 300, + stored_at: Instant::now(), + is_unique: false, + original: 0, + recvd: HashSet::new(), + version: 0, + }); + s.remove(&id, b"k"); + assert!(s.get(&id, b"k").is_empty()); + assert!(s.is_empty()); + } + + #[test] + fn storage_dec_original() { + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"k"); + let val = StoredValue { + key: b"k".to_vec(), + value: b"v".to_vec(), + id, + source: NodeId::from_bytes([0x01; 32]), + ttl: 300, + stored_at: Instant::now(), + is_unique: false, + original: 3, + recvd: HashSet::new(), + version: 0, + }; + s.store(val.clone()); + + assert_eq!(s.dec_original(&val), 2); + assert_eq!(s.dec_original(&val), 1); + assert_eq!(s.dec_original(&val), 0); + assert_eq!(s.dec_original(&val), 0); // stays at 0 + } + + #[test] + fn storage_mark_received() { + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"k"); + let val = StoredValue { + key: b"k".to_vec(), + value: b"v".to_vec(), + id, + source: NodeId::from_bytes([0x01; 32]), + ttl: 300, + stored_at: Instant::now(), + is_unique: false, + original: 0, + recvd: HashSet::new(), + version: 0, + }; + s.store(val.clone()); + + let node = NodeId::from_bytes([0x42; 32]); + s.mark_received(&val, node); + + let got = s.get(&id, b"k"); + assert!(got[0].recvd.contains(&node)); + } + + // ── IterativeQuery tests ──────────────────────── + + #[test] + fn query_process_reply_sorts() { + let target = NodeId::from_bytes([0x00; 32]); + let mut q = IterativeQuery::find_node(target, 1); + + // Simulate: we have node 0xFF pending + let far = NodeId::from_bytes([0xFF; 32]); + q.pending.insert(far, Instant::now()); + + // Reply with closer nodes + let nodes = vec![ + make_peer(0x10, 3000), + make_peer(0x01, 3001), + make_peer(0x05, 3002), + ]; + q.process_reply(&far, nodes); + + // Should be sorted by distance from target + assert_eq!(q.closest[0].id, NodeId::from_bytes([0x01; 32])); + assert_eq!(q.closest[1].id, NodeId::from_bytes([0x05; 32])); + assert_eq!(q.closest[2].id, NodeId::from_bytes([0x10; 32])); + } + + #[test] + fn query_converges_when_no_closer() { + let target = NodeId::from_bytes([0x00; 32]); + let mut q = IterativeQuery::find_node(target, 1); + + // Add initial closest + q.closest.push(make_peer(0x01, 3000)); + + // Simulate reply with no closer nodes + let from = NodeId::from_bytes([0x01; 32]); + q.pending.insert(from, Instant::now()); + q.process_reply(&from, vec![make_peer(0x02, 3001)]); + + // 0x01 is still closest, pending is empty -> converged + assert_eq!(q.phase, QueryPhase::Converged); + } + + #[test] + fn query_find_value_done_on_value() { + let target = NodeId::from_bytes([0x00; 32]); + let mut q = IterativeQuery::find_value(target, b"key".to_vec(), 1); + + q.process_value(b"found-it".to_vec()); + assert!(q.is_done()); + assert_eq!(q.values, vec![b"found-it".to_vec()]); + } + + // ── MaskBitExplorer tests ─────────────────────── + + #[test] + fn mask_bit_cycles() { + let id = NodeId::from_bytes([0xFF; 32]); + let mut explorer = MaskBitExplorer::new(id); + + assert_eq!(explorer.position(), 1); + explorer.next_targets(); + assert_eq!(explorer.position(), 3); + explorer.next_targets(); + assert_eq!(explorer.position(), 5); + + // Run through full cycle + for _ in 0..8 { + explorer.next_targets(); + } + + // 5 + 8*2 = 21 > 20, so reset to 1 + assert_eq!(explorer.position(), 1); + } + + #[test] + fn mask_bit_produces_different_targets() { + let id = NodeId::from_bytes([0xFF; 32]); + let mut explorer = MaskBitExplorer::new(id); + + let (t1, t2) = explorer.next_targets(); + assert_ne!(t1, t2); + assert_ne!(t1, id); + assert_ne!(t2, id); + } + + // ── Content versioning tests ────────────────── + + #[test] + fn version_rejects_stale_unique() { + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"vk"); + let src = NodeId::from_bytes([0x01; 32]); + + // Store version 100 + s.store(StoredValue { + key: b"vk".to_vec(), + value: b"new".to_vec(), + id, + source: src, + ttl: 300, + stored_at: Instant::now(), + is_unique: true, + original: 3, + recvd: HashSet::new(), + version: 100, + }); + + // Try to store older version 50 — should be rejected + s.store(StoredValue { + key: b"vk".to_vec(), + value: b"old".to_vec(), + id, + source: src, + ttl: 300, + stored_at: Instant::now(), + is_unique: true, + original: 3, + recvd: HashSet::new(), + version: 50, + }); + + let got = s.get(&id, b"vk"); + assert_eq!(got.len(), 1); + assert_eq!(got[0].value, b"new"); + assert_eq!(got[0].version, 100); + } + + #[test] + fn version_accepts_newer() { + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"vk"); + let src = NodeId::from_bytes([0x01; 32]); + + s.store(StoredValue { + key: b"vk".to_vec(), + value: b"old".to_vec(), + id, + source: src, + ttl: 300, + stored_at: Instant::now(), + is_unique: true, + original: 3, + recvd: HashSet::new(), + version: 50, + }); + + s.store(StoredValue { + key: b"vk".to_vec(), + value: b"new".to_vec(), + id, + source: src, + ttl: 300, + stored_at: Instant::now(), + is_unique: true, + original: 3, + recvd: HashSet::new(), + version: 100, + }); + + let got = s.get(&id, b"vk"); + assert_eq!(got[0].value, b"new"); + } + + #[test] + fn version_zero_always_accepted() { + // version=0 means "no versioning" — always accepted + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"vk"); + let src = NodeId::from_bytes([0x01; 32]); + + s.store(StoredValue { + key: b"vk".to_vec(), + value: b"v1".to_vec(), + id, + source: src, + ttl: 300, + stored_at: Instant::now(), + is_unique: true, + original: 3, + recvd: HashSet::new(), + version: 0, + }); + + s.store(StoredValue { + key: b"vk".to_vec(), + value: b"v2".to_vec(), + id, + source: src, + ttl: 300, + stored_at: Instant::now(), + is_unique: true, + original: 3, + recvd: HashSet::new(), + version: 0, + }); + + let got = s.get(&id, b"vk"); + assert_eq!(got[0].value, b"v2"); + } + + #[test] + fn version_rejects_stale_non_unique() { + let mut s = DhtStorage::new(); + let id = NodeId::from_key(b"nk"); + + // Store value with version 100 + s.store(StoredValue { + key: b"nk".to_vec(), + value: b"same".to_vec(), + id, + source: NodeId::from_bytes([0x01; 32]), + ttl: 300, + stored_at: Instant::now(), + is_unique: false, + original: 0, + recvd: HashSet::new(), + version: 100, + }); + + // Same value with older version — rejected + s.store(StoredValue { + key: b"nk".to_vec(), + value: b"same".to_vec(), + id, + source: NodeId::from_bytes([0x02; 32]), + ttl: 600, + stored_at: Instant::now(), + is_unique: false, + original: 0, + recvd: HashSet::new(), + version: 50, + }); + + let got = s.get(&id, b"nk"); + assert_eq!(got.len(), 1); + assert_eq!(got[0].version, 100); + assert_eq!(got[0].ttl, 300); // TTL not updated + } + + // ── Route length tests ──────────────────────── + + #[test] + fn query_hops_increment() { + let target = NodeId::from_bytes([0x00; 32]); + let mut q = IterativeQuery::find_node(target, 1); + assert_eq!(q.hops, 0); + + let from = NodeId::from_bytes([0xFF; 32]); + q.pending.insert(from, Instant::now()); + q.process_reply(&from, vec![make_peer(0x10, 3000)]); + assert_eq!(q.hops, 1); + + let from2 = NodeId::from_bytes([0x10; 32]); + q.pending.insert(from2, Instant::now()); + q.process_reply(&from2, vec![make_peer(0x05, 3001)]); + assert_eq!(q.hops, 2); + } + + #[test] + fn now_version_monotonic() { + let v1 = now_version(); + std::thread::sleep(std::time::Duration::from_millis(2)); + let v2 = now_version(); + assert!(v2 >= v1); + } +} |