//! Kademlia routing table with k-buckets. //! //! Each bucket holds //! up to `MAX_BUCKET_ENTRY` (20) peers ordered by last //! seen time (LRU). When a bucket is full, the least //! recently seen peer is pinged; if it doesn't respond, //! it's replaced by the new peer. use std::time::Instant; use crate::id::NodeId; use crate::peers::PeerInfo; /// Maximum entries per k-bucket. pub const MAX_BUCKET_ENTRY: usize = 20; /// Number of bits in a node ID. pub const ID_BITS: usize = crate::id::ID_BITS; /// Number of closest nodes to return in lookups /// Kademlia default: 10. pub const NUM_FIND_NODE: usize = 10; /// Maximum entries in the replacement cache per bucket. /// When a bucket is full, new contacts go here instead /// of being discarded (Kademlia paper section 2.4). const MAX_REPLACEMENT_CACHE: usize = 5; /// Number of consecutive failures before a contact is /// considered stale and eligible for replacement. const STALE_THRESHOLD: u32 = 3; /// Result of inserting a peer into the routing table. #[derive(Debug)] pub enum InsertResult { /// Peer was inserted into the bucket. Inserted, /// Peer already existed and was moved to tail (LRU). Updated, /// Bucket is full. Contains the LRU peer that should /// be pinged to decide eviction. BucketFull { lru: PeerInfo }, /// Peer is our own ID, ignored. IsSelf, } /// A single k-bucket holding up to K peers. struct KBucket { nodes: Vec, last_updated: Instant, /// Replacement cache: contacts seen when bucket is /// full. Used to replace stale contacts without /// losing discovered nodes (Kademlia paper §2.4). replacements: Vec, /// Consecutive failure count per node ID. Peers with /// count >= STALE_THRESHOLD are replaced by cached /// contacts. stale_counts: std::collections::HashMap, } impl KBucket { fn new() -> Self { Self { nodes: Vec::new(), last_updated: Instant::now(), replacements: Vec::new(), stale_counts: std::collections::HashMap::new(), } } fn len(&self) -> usize { self.nodes.len() } fn is_full(&self) -> bool { self.nodes.len() >= MAX_BUCKET_ENTRY } fn contains(&self, id: &NodeId) -> bool { self.nodes.iter().any(|n| n.id == *id) } fn find_pos(&self, id: &NodeId) -> Option { self.nodes.iter().position(|n| n.id == *id) } /// Insert or update a peer. Returns InsertResult. fn insert(&mut self, peer: PeerInfo) -> InsertResult { if let Some(pos) = self.find_pos(&peer.id) { // Move to tail (most recently seen) self.nodes.remove(pos); self.nodes.push(peer); self.last_updated = Instant::now(); // Clear stale count on successful contact self.stale_counts.remove(&self.nodes.last().unwrap().id); return InsertResult::Updated; } if self.is_full() { // Check if any existing contact is stale // enough to replace immediately if let Some(stale_pos) = self.find_stale() { let stale_id = self.nodes[stale_pos].id; self.stale_counts.remove(&stale_id); self.nodes.remove(stale_pos); self.nodes.push(peer); self.last_updated = Instant::now(); return InsertResult::Inserted; } // No stale contact: add to replacement cache self.add_to_cache(peer.clone()); // Return LRU (front) for ping check let lru = self.nodes[0].clone(); return InsertResult::BucketFull { lru }; } self.nodes.push(peer); self.last_updated = Instant::now(); InsertResult::Inserted } /// Find a contact whose stale count exceeds the /// threshold. Returns its position in the nodes vec. fn find_stale(&self) -> Option { for (i, node) in self.nodes.iter().enumerate() { if let Some(&count) = self.stale_counts.get(&node.id) { if count >= STALE_THRESHOLD { return Some(i); } } } None } /// Add a contact to the replacement cache. fn add_to_cache(&mut self, peer: PeerInfo) { // Update if already in cache if let Some(pos) = self .replacements .iter() .position(|r| r.id == peer.id) { self.replacements.remove(pos); self.replacements.push(peer); return; } if self.replacements.len() >= MAX_REPLACEMENT_CACHE { self.replacements.remove(0); // drop oldest } self.replacements.push(peer); } /// Record a failure for a contact. Returns true if /// the contact became stale (crossed threshold). fn record_failure(&mut self, id: &NodeId) -> bool { let count = self.stale_counts.entry(*id).or_insert(0); *count += 1; *count >= STALE_THRESHOLD } /// Try to replace a stale contact with the best /// replacement from cache. Returns the evicted ID /// if successful. fn try_replace_stale(&mut self, stale_id: &NodeId) -> Option { let pos = self.find_pos(stale_id)?; let replacement = self.replacements.pop()?; let evicted = self.nodes[pos].id; self.stale_counts.remove(&evicted); self.nodes.remove(pos); self.nodes.push(replacement); self.last_updated = Instant::now(); Some(evicted) } /// Number of contacts in the replacement cache. fn cache_len(&self) -> usize { self.replacements.len() } /// Replace the LRU node (front) with a new peer. /// Only succeeds if `old_id` matches the current LRU. fn evict_lru(&mut self, old_id: &NodeId, new: PeerInfo) -> bool { if let Some(front) = self.nodes.first() { if front.id == *old_id { self.nodes.remove(0); self.nodes.push(new); self.last_updated = Instant::now(); return true; } } false } fn remove(&mut self, id: &NodeId) -> bool { if let Some(pos) = self.find_pos(id) { self.nodes.remove(pos); true } else { false } } /// Mark a peer as recently seen (move to tail). fn mark_seen(&mut self, id: &NodeId) { if let Some(pos) = self.find_pos(id) { let peer = self.nodes.remove(pos); self.nodes.push(PeerInfo { last_seen: Instant::now(), ..peer }); self.last_updated = Instant::now(); self.stale_counts.remove(id); } } } /// Kademlia routing table. /// /// Maintains 256 k-buckets indexed by XOR distance from /// the local node. Each bucket holds up to /// `MAX_BUCKET_ENTRY` peers. /// Maximum nodes per /24 subnet in the routing table. /// Limits Sybil attack impact. pub const MAX_PER_SUBNET: usize = 2; pub struct RoutingTable { local_id: NodeId, buckets: Vec, /// Count of nodes per /24 subnet for Sybil /// resistance. subnet_counts: std::collections::HashMap<[u8; 3], usize>, /// Pinned bootstrap nodes — never evicted. pinned: std::collections::HashSet, } impl RoutingTable { /// Create a new routing table for the given local ID. pub fn new(local_id: NodeId) -> Self { let mut buckets = Vec::with_capacity(ID_BITS); for _ in 0..ID_BITS { buckets.push(KBucket::new()); } Self { local_id, buckets, subnet_counts: std::collections::HashMap::new(), pinned: std::collections::HashSet::new(), } } /// Pin a bootstrap node — it will never be evicted. pub fn pin(&mut self, id: NodeId) { self.pinned.insert(id); } /// Check if a node is pinned. pub fn is_pinned(&self, id: &NodeId) -> bool { self.pinned.contains(id) } /// Our own node ID. pub fn local_id(&self) -> &NodeId { &self.local_id } /// Determine the bucket index for a given node ID. /// /// Returns `None` if `id` equals our local ID. fn bucket_index(&self, id: &NodeId) -> Option { let dist = self.local_id.distance(id); if dist.is_zero() { return None; } let lz = dist.leading_zeros() as usize; // Bucket 0 = furthest (bit 0 differs), // Bucket 255 = closest (only bit 255 differs). // Index = 255 - leading_zeros. Some(ID_BITS - 1 - lz) } /// Add a peer to the routing table. /// /// Rejects the peer if its /24 subnet already has /// `MAX_PER_SUBNET` entries (Sybil resistance). pub fn add(&mut self, peer: PeerInfo) -> InsertResult { if peer.id == self.local_id { return InsertResult::IsSelf; } let idx = match self.bucket_index(&peer.id) { Some(i) => i, None => return InsertResult::IsSelf, }; // Sybil check: limit per /24 subnet // Skip for loopback (tests, local dev) let subnet = subnet_key(&peer.addr); let is_loopback = peer.addr.ip().is_loopback(); if !is_loopback && !self.buckets[idx].contains(&peer.id) { let count = self.subnet_counts.get(&subnet).copied().unwrap_or(0); if count >= MAX_PER_SUBNET { log::debug!( "Sybil: rejecting {:?} (subnet {:?} has {count} entries)", peer.id, subnet ); return InsertResult::BucketFull { lru: peer }; } } let result = self.buckets[idx].insert(peer); if matches!(result, InsertResult::Inserted) { *self.subnet_counts.entry(subnet).or_insert(0) += 1; } result } /// Remove a peer from the routing table. pub fn remove(&mut self, id: &NodeId) -> bool { // Never evict pinned bootstrap nodes if self.pinned.contains(id) { return false; } if let Some(idx) = self.bucket_index(id) { // Decrement subnet count if let Some(peer) = self.buckets[idx].nodes.iter().find(|p| p.id == *id) { let subnet = subnet_key(&peer.addr); if let Some(c) = self.subnet_counts.get_mut(&subnet) { *c = c.saturating_sub(1); if *c == 0 { self.subnet_counts.remove(&subnet); } } } self.buckets[idx].remove(id) } else { false } } /// Evict the LRU node in a bucket and insert a new /// peer. Called after a ping timeout confirms the LRU /// node is dead. pub fn evict_and_insert(&mut self, old_id: &NodeId, new: PeerInfo) -> bool { if let Some(idx) = self.bucket_index(&new.id) { self.buckets[idx].evict_lru(old_id, new) } else { false } } /// Mark a peer as recently seen. pub fn mark_seen(&mut self, id: &NodeId) { if let Some(idx) = self.bucket_index(id) { self.buckets[idx].mark_seen(id); } } /// Record a communication failure for a peer. /// If the peer becomes stale (exceeds threshold), /// tries to replace it with a cached contact. /// Returns the evicted NodeId if replacement happened. pub fn record_failure(&mut self, id: &NodeId) -> Option { // Never mark pinned nodes as stale if self.pinned.contains(id) { return None; } let idx = self.bucket_index(id)?; let became_stale = self.buckets[idx].record_failure(id); if became_stale { self.buckets[idx].try_replace_stale(id) } else { None } } /// Total number of contacts in all replacement caches. pub fn replacement_cache_size(&self) -> usize { self.buckets.iter().map(|b| b.cache_len()).sum() } /// Find the `count` closest peers to `target` by XOR /// distance, sorted closest-first. pub fn closest(&self, target: &NodeId, count: usize) -> Vec { let mut all: Vec = self .buckets .iter() .flat_map(|b| b.nodes.iter().cloned()) .collect(); all.sort_by(|a, b| { let da = target.distance(&a.id); let db = target.distance(&b.id); da.cmp(&db) }); all.truncate(count); all } /// Check if a given ID exists in the table. pub fn has_id(&self, id: &NodeId) -> bool { if let Some(idx) = self.bucket_index(id) { self.buckets[idx].contains(id) } else { false } } /// Total number of peers in the table. pub fn size(&self) -> usize { self.buckets.iter().map(|b| b.len()).sum() } /// Check if the table has no peers. pub fn is_empty(&self) -> bool { self.size() == 0 } /// Get fill level of each non-empty bucket (for /// debugging/metrics). pub fn bucket_fill_levels(&self) -> Vec<(usize, usize)> { self.buckets .iter() .enumerate() .filter(|(_, b)| b.len() > 0) .map(|(i, b)| (i, b.len())) .collect() } /// Find buckets that haven't been updated since /// `threshold` and return random target IDs for /// refresh lookups. /// /// This implements the Kademlia bucket refresh from /// the paper: pick a random ID in each stale bucket's /// range and do a find_node on it. pub fn stale_bucket_targets( &self, threshold: std::time::Duration, ) -> Vec { let now = Instant::now(); let mut targets = Vec::new(); for (i, bucket) in self.buckets.iter().enumerate() { if now.duration_since(bucket.last_updated) >= threshold { // Generate a random ID in this bucket's range. // The bucket at index i covers nodes where the // XOR distance has bit (255-i) as the highest // set bit. We create a target by XORing our ID // with a value that has bit (255-i) set. let bit_pos = ID_BITS - 1 - i; let byte_idx = bit_pos / 8; let bit_idx = 7 - (bit_pos % 8); let bytes = *self.local_id.as_bytes(); let mut buf = bytes; buf[byte_idx] ^= 1 << bit_idx; targets.push(NodeId::from_bytes(buf)); } } targets } /// Return the LRU (least recently seen) peer from /// each non-empty bucket, for liveness probing. /// /// The caller should ping each and call /// `mark_seen()` on reply, or `remove()` after /// repeated failures. pub fn lru_peers(&self) -> Vec { self.buckets .iter() .filter_map(|b| b.nodes.first().cloned()) .collect() } /// Print the routing table (debug). pub fn print_table(&self) { for (i, bucket) in self.buckets.iter().enumerate() { if bucket.len() > 0 { log::debug!("bucket {i}: {} nodes", bucket.len()); for node in &bucket.nodes { log::debug!(" {} @ {}", node.id, node.addr); } } } } } /// Extract /24 subnet key from a socket address. /// For IPv6, uses the first 6 bytes (/48). fn subnet_key(addr: &std::net::SocketAddr) -> [u8; 3] { match addr.ip() { std::net::IpAddr::V4(v4) => { let o = v4.octets(); [o[0], o[1], o[2]] } std::net::IpAddr::V6(v6) => { let o = v6.octets(); [o[0], o[1], o[2]] } } } #[cfg(test)] mod tests { use super::*; use std::net::SocketAddr; fn local_id() -> NodeId { NodeId::from_bytes([0x80; 32]) } fn peer_at(byte: u8, port: u16) -> PeerInfo { // Use different /24 subnets to avoid Sybil limit PeerInfo::new( NodeId::from_bytes([byte; 32]), SocketAddr::from(([10, 0, byte, 1], port)), ) } #[test] fn insert_self_is_ignored() { let mut rt = RoutingTable::new(local_id()); let p = PeerInfo::new(local_id(), "127.0.0.1:3000".parse().unwrap()); assert!(matches!(rt.add(p), InsertResult::IsSelf)); assert_eq!(rt.size(), 0); } #[test] fn insert_and_lookup() { let mut rt = RoutingTable::new(local_id()); let p = peer_at(0x01, 3000); assert!(matches!(rt.add(p.clone()), InsertResult::Inserted)); assert_eq!(rt.size(), 1); assert!(rt.has_id(&p.id)); } #[test] fn update_moves_to_tail() { let mut rt = RoutingTable::new(local_id()); let p1 = peer_at(0x01, 3000); let p2 = peer_at(0x02, 3001); rt.add(p1.clone()); rt.add(p2.clone()); // Re-add p1 should move to tail assert!(matches!(rt.add(p1.clone()), InsertResult::Updated)); } #[test] fn remove_peer() { let mut rt = RoutingTable::new(local_id()); let p = peer_at(0x01, 3000); rt.add(p.clone()); assert!(rt.remove(&p.id)); assert_eq!(rt.size(), 0); assert!(!rt.has_id(&p.id)); } #[test] fn closest_sorted_by_xor() { let mut rt = RoutingTable::new(local_id()); // Add peers with different distances from a target for i in 1..=5u8 { rt.add(peer_at(i, 3000 + i as u16)); } let target = NodeId::from_bytes([0x03; 32]); let closest = rt.closest(&target, 3); assert_eq!(closest.len(), 3); // Verify sorted by XOR distance for w in closest.windows(2) { let d0 = target.distance(&w[0].id); let d1 = target.distance(&w[1].id); assert!(d0 <= d1); } } #[test] fn closest_respects_count() { let mut rt = RoutingTable::new(local_id()); for i in 1..=30u8 { rt.add(peer_at(i, 3000 + i as u16)); } let target = NodeId::from_bytes([0x10; 32]); let closest = rt.closest(&target, 10); assert_eq!(closest.len(), 10); } #[test] fn bucket_full_returns_lru() { let lid = NodeId::from_bytes([0x00; 32]); let mut rt = RoutingTable::new(lid); // Fill a bucket with MAX_BUCKET_ENTRY peers. // All peers with [0xFF; 32] ^ small variations // will land in the same bucket (highest bit differs). for i in 0..MAX_BUCKET_ENTRY as u16 { let mut bytes = [0xFF; 32]; bytes[18] = (i >> 8) as u8; bytes[19] = i as u8; let p = PeerInfo::new( NodeId::from_bytes(bytes), // Different /24 per peer to avoid Sybil limit SocketAddr::from(([10, 0, i as u8, 1], 3000 + i)), ); assert!(matches!(rt.add(p), InsertResult::Inserted)); } assert_eq!(rt.size(), MAX_BUCKET_ENTRY); // Next insert should return BucketFull let mut extra_bytes = [0xFF; 32]; extra_bytes[19] = 0xFE; extra_bytes[18] = 0xFE; let extra = PeerInfo::new( NodeId::from_bytes(extra_bytes), SocketAddr::from(([10, 0, 250, 1], 9999)), ); let result = rt.add(extra); assert!(matches!(result, InsertResult::BucketFull { .. })); } #[test] fn evict_and_insert() { let lid = NodeId::from_bytes([0x00; 32]); let mut rt = RoutingTable::new(lid); // Fill bucket let mut first_id = NodeId::from_bytes([0xFF; 32]); for i in 0..MAX_BUCKET_ENTRY as u16 { let mut bytes = [0xFF; 32]; bytes[18] = (i >> 8) as u8; bytes[19] = i as u8; let id = NodeId::from_bytes(bytes); if i == 0 { first_id = id; } rt.add(PeerInfo::new( id, SocketAddr::from(([10, 0, i as u8, 1], 3000 + i)), )); } // Evict the first (LRU) and insert new let mut new_bytes = [0xFF; 32]; new_bytes[17] = 0x01; let new_peer = PeerInfo::new( NodeId::from_bytes(new_bytes), SocketAddr::from(([10, 0, 251, 1], 9999)), ); assert!(rt.evict_and_insert(&first_id, new_peer.clone())); assert!(!rt.has_id(&first_id)); assert!(rt.has_id(&new_peer.id)); assert_eq!(rt.size(), MAX_BUCKET_ENTRY); } #[test] fn stale_bucket_targets() { let mut rt = RoutingTable::new(local_id()); let p = peer_at(0x01, 3000); rt.add(p); // No stale buckets yet (just updated) let targets = rt.stale_bucket_targets(std::time::Duration::from_secs(0)); // At least the populated bucket should produce a target assert!(!targets.is_empty()); } #[test] fn empty_table() { let rt = RoutingTable::new(local_id()); assert!(rt.is_empty()); assert_eq!(rt.size(), 0); assert!(rt.closest(&NodeId::from_bytes([0x01; 32]), 10).is_empty()); } // ── Replacement cache tests ─────────────────── #[test] fn bucket_full_adds_to_cache() { let lid = NodeId::from_bytes([0x00; 32]); let mut rt = RoutingTable::new(lid); // Fill a bucket for i in 0..MAX_BUCKET_ENTRY as u16 { let mut bytes = [0xFF; 32]; bytes[18] = (i >> 8) as u8; bytes[19] = i as u8; rt.add(PeerInfo::new( NodeId::from_bytes(bytes), SocketAddr::from(([10, 0, i as u8, 1], 3000 + i)), )); } assert_eq!(rt.replacement_cache_size(), 0); // Next insert goes to replacement cache let mut extra = [0xFF; 32]; extra[18] = 0xFE; extra[19] = 0xFE; rt.add(PeerInfo::new( NodeId::from_bytes(extra), SocketAddr::from(([10, 0, 250, 1], 9999)), )); assert_eq!(rt.replacement_cache_size(), 1); } #[test] fn stale_contact_replaced_on_insert() { let lid = NodeId::from_bytes([0x00; 32]); let mut rt = RoutingTable::new(lid); // All peers have high bit set (byte 0 = 0xFF) // so they all land in the same bucket (bucket 0, // furthest). Vary low bytes to get unique IDs. for i in 0..MAX_BUCKET_ENTRY as u8 { let mut bytes = [0x00; 32]; bytes[0] = 0xFF; // same high bit → same bucket bytes[31] = i; rt.add(PeerInfo::new( NodeId::from_bytes(bytes), SocketAddr::from(([10, 0, i, 1], 3000 + i as u16)), )); } // Target: the first peer (bytes[31] = 0) let mut first = [0x00; 32]; first[0] = 0xFF; first[31] = 0; let first_id = NodeId::from_bytes(first); // Record failures until stale for _ in 0..STALE_THRESHOLD { rt.record_failure(&first_id); } // Next insert to same bucket should replace stale let mut new = [0x00; 32]; new[0] = 0xFF; new[31] = 0xFE; let new_id = NodeId::from_bytes(new); let result = rt.add(PeerInfo::new( new_id, SocketAddr::from(([10, 0, 254, 1], 9999)), )); assert!(matches!(result, InsertResult::Inserted)); assert!(!rt.has_id(&first_id)); assert!(rt.has_id(&new_id)); } #[test] fn record_failure_replaces_with_cache() { let lid = NodeId::from_bytes([0x00; 32]); let mut rt = RoutingTable::new(lid); // All in same bucket (byte 0 = 0xFF) for i in 0..MAX_BUCKET_ENTRY as u8 { let mut bytes = [0x00; 32]; bytes[0] = 0xFF; bytes[31] = i; rt.add(PeerInfo::new( NodeId::from_bytes(bytes), SocketAddr::from(([10, 0, i, 1], 3000 + i as u16)), )); } let mut target = [0x00; 32]; target[0] = 0xFF; target[31] = 0; let target_id = NodeId::from_bytes(target); // Add a replacement to cache (same bucket) let mut cache = [0x00; 32]; cache[0] = 0xFF; cache[31] = 0xFD; let cache_id = NodeId::from_bytes(cache); rt.add(PeerInfo::new( cache_id, SocketAddr::from(([10, 0, 253, 1], 8888)), )); assert_eq!(rt.replacement_cache_size(), 1); // Record failures until replacement happens for _ in 0..STALE_THRESHOLD { rt.record_failure(&target_id); } assert!(!rt.has_id(&target_id)); assert!(rt.has_id(&cache_id)); assert_eq!(rt.replacement_cache_size(), 0); } #[test] fn pinned_not_stale() { let lid = NodeId::from_bytes([0x00; 32]); let mut rt = RoutingTable::new(lid); let p = peer_at(0xFF, 3000); rt.add(p.clone()); rt.pin(p.id); // Failures should not evict pinned node for _ in 0..10 { assert!(rt.record_failure(&p.id).is_none()); } assert!(rt.has_id(&p.id)); } #[test] fn mark_seen_clears_stale() { let lid = NodeId::from_bytes([0x00; 32]); let mut rt = RoutingTable::new(lid); let p = peer_at(0xFF, 3000); rt.add(p.clone()); // Accumulate failures (but not enough to replace) rt.record_failure(&p.id); rt.record_failure(&p.id); // Successful contact clears stale count rt.mark_seen(&p.id); // More failures needed now rt.record_failure(&p.id); rt.record_failure(&p.id); // Still not stale (count reset to 0, now at 2 < 3) assert!(rt.has_id(&p.id)); } }