diff options
| author | murilo ijanc | 2026-03-24 15:04:03 -0300 |
|---|---|---|
| committer | murilo ijanc | 2026-03-24 15:04:03 -0300 |
| commit | 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch) | |
| tree | 53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/peers.rs | |
| download | tesseras-dht-0.1.0.tar.gz | |
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks.
Features:
- Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE)
- NAT traversal via DTUN hole-punching and proxy relay
- Reliable Datagram Protocol (RDP) with 7-state connection machine
- Datagram transport with automatic fragmentation/reassembly
- Ed25519 packet authentication
- 256-bit node IDs (Ed25519 public keys)
- Rate limiting, ban list, and eclipse attack mitigation
- Persistence and metrics
- OpenBSD and Linux support
Diffstat (limited to 'src/peers.rs')
| -rw-r--r-- | src/peers.rs | 337 |
1 files changed, 337 insertions, 0 deletions
diff --git a/src/peers.rs b/src/peers.rs new file mode 100644 index 0000000..e575323 --- /dev/null +++ b/src/peers.rs @@ -0,0 +1,337 @@ +//! Peer node database. +//! +//! Bidirectional peer map with TTL-based expiry and +//! timeout tracking. + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use crate::id::NodeId; + +/// TTL for peer entries (5 min). +pub const PEERS_MAP_TTL: Duration = Duration::from_secs(300); + +/// TTL for timeout entries (30s). +pub const PEERS_TIMEOUT_TTL: Duration = Duration::from_secs(30); + +/// Cleanup timer interval (30s). +pub const PEERS_TIMER_INTERVAL: Duration = Duration::from_secs(30); + +/// NAT state of a peer. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NatState { + Unknown, + Global, + Nat, + ConeNat, + SymmetricNat, +} + +/// Information about a known peer. +#[derive(Debug, Clone)] +pub struct PeerInfo { + pub id: NodeId, + pub addr: SocketAddr, + pub domain: u16, + pub nat_state: NatState, + pub last_seen: Instant, + pub session: u32, + + /// Ed25519 public key (if known). Since NodeId = + /// pubkey, this is always available when we know + /// the peer's ID. + pub public_key: Option<[u8; 32]>, +} + +impl PeerInfo { + pub fn new(id: NodeId, addr: SocketAddr) -> Self { + // NodeId IS the public key (32 bytes) + let public_key = Some(*id.as_bytes()); + Self { + id, + addr, + domain: if addr.is_ipv4() { 1 } else { 2 }, + nat_state: NatState::Unknown, + last_seen: Instant::now(), + session: 0, + public_key, + } + } +} + +/// Database of known peers with TTL-based expiry. +/// +/// Provides forward lookup (id -> info) and reverse +/// lookup (addr -> ids). +type PeerCallback = Box<dyn Fn(&PeerInfo)>; + +/// Maximum number of tracked peers (prevents OOM). +const MAX_PEERS: usize = 10_000; + +pub struct PeerStore { + by_id: HashMap<NodeId, PeerInfo>, + by_addr: HashMap<SocketAddr, Vec<NodeId>>, + timeouts: HashMap<NodeId, Instant>, + on_add: Option<PeerCallback>, +} + +impl PeerStore { + pub fn new() -> Self { + Self { + by_id: HashMap::new(), + by_addr: HashMap::new(), + timeouts: HashMap::new(), + on_add: None, + } + } + + /// Get peer info by ID. + pub fn get(&self, id: &NodeId) -> Option<&PeerInfo> { + self.by_id.get(id) + } + + /// Get all peer IDs associated with an address. + pub fn ids_for_addr(&self, addr: &SocketAddr) -> Vec<NodeId> { + self.by_addr.get(addr).cloned().unwrap_or_default() + } + + /// Add a peer, checking for duplicates. + /// + /// If the peer already exists, updates `last_seen`. + /// Returns `true` if newly added. + pub fn add(&mut self, peer: PeerInfo) -> bool { + let id = peer.id; + let addr = peer.addr; + + // Limit check (updates are always allowed) + if self.by_id.len() >= MAX_PEERS && !self.by_id.contains_key(&id) { + return false; + } + + if let Some(existing) = self.by_id.get_mut(&id) { + existing.last_seen = Instant::now(); + existing.addr = addr; + return false; + } + + self.by_addr.entry(addr).or_default().push(id); + if let Some(ref cb) = self.on_add { + cb(&peer); + } + self.by_id.insert(id, peer); + true + } + + /// Add a peer with a session ID (for DTUN register). + /// + /// Returns `true` if the session matches or is new. + pub fn add_with_session(&mut self, peer: PeerInfo, session: u32) -> bool { + if let Some(existing) = self.by_id.get(&peer.id) { + if existing.session != session { + return false; + } + } + let mut peer = peer; + peer.session = session; + self.add(peer); + true + } + + /// Add a peer, overwriting any existing entry. + pub fn add_force(&mut self, peer: PeerInfo) { + self.remove(&peer.id); + self.add(peer); + } + + /// Remove a peer by ID. + pub fn remove(&mut self, id: &NodeId) -> Option<PeerInfo> { + if let Some(peer) = self.by_id.remove(id) { + if let Some(ids) = self.by_addr.get_mut(&peer.addr) { + ids.retain(|i| i != id); + if ids.is_empty() { + self.by_addr.remove(&peer.addr); + } + } + self.timeouts.remove(id); + Some(peer) + } else { + None + } + } + + /// Remove all peers associated with an address. + pub fn remove_addr(&mut self, addr: &SocketAddr) { + if let Some(ids) = self.by_addr.remove(addr) { + for id in ids { + self.by_id.remove(&id); + self.timeouts.remove(&id); + } + } + } + + /// Mark a peer as having timed out. + pub fn mark_timeout(&mut self, id: &NodeId) { + self.timeouts.insert(*id, Instant::now()); + } + + /// Check if a peer is in timeout state. + pub fn is_timeout(&self, id: &NodeId) -> bool { + if let Some(t) = self.timeouts.get(id) { + t.elapsed() < PEERS_TIMEOUT_TTL + } else { + false + } + } + + /// Remove expired peers (older than MAP_TTL) and + /// stale timeout entries. + pub fn refresh(&mut self) { + let now = Instant::now(); + + // Remove expired peers + let expired: Vec<NodeId> = self + .by_id + .iter() + .filter(|(_, p)| now.duration_since(p.last_seen) >= PEERS_MAP_TTL) + .map(|(id, _)| *id) + .collect(); + + for id in expired { + self.remove(&id); + } + + // Remove stale timeout entries + self.timeouts + .retain(|_, t| now.duration_since(*t) < PEERS_TIMEOUT_TTL); + } + + /// Set a callback for when a peer is added. + pub fn on_add(&mut self, f: impl Fn(&PeerInfo) + 'static) { + self.on_add = Some(Box::new(f)); + } + + /// Number of known peers. + pub fn len(&self) -> usize { + self.by_id.len() + } + + /// Check if the store is empty. + pub fn is_empty(&self) -> bool { + self.by_id.is_empty() + } + + /// Iterate over all peers. + pub fn iter(&self) -> impl Iterator<Item = &PeerInfo> { + self.by_id.values() + } + + /// Get all peer IDs. + pub fn ids(&self) -> Vec<NodeId> { + self.by_id.keys().copied().collect() + } +} + +impl Default for PeerStore { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn peer(byte: u8, port: u16) -> PeerInfo { + PeerInfo::new( + NodeId::from_bytes([byte; 32]), + SocketAddr::from(([127, 0, 0, 1], port)), + ) + } + + #[test] + fn add_and_get() { + let mut store = PeerStore::new(); + let p = peer(1, 3000); + assert!(store.add(p.clone())); + assert_eq!(store.len(), 1); + assert_eq!(store.get(&p.id).unwrap().addr, p.addr); + } + + #[test] + fn add_duplicate_updates() { + let mut store = PeerStore::new(); + let p = peer(1, 3000); + assert!(store.add(p.clone())); + assert!(!store.add(p)); // duplicate + assert_eq!(store.len(), 1); + } + + #[test] + fn remove_by_id() { + let mut store = PeerStore::new(); + let p = peer(1, 3000); + store.add(p.clone()); + store.remove(&p.id); + assert!(store.is_empty()); + } + + #[test] + fn reverse_lookup() { + let mut store = PeerStore::new(); + let addr: SocketAddr = "127.0.0.1:3000".parse().unwrap(); + let p1 = PeerInfo::new(NodeId::from_bytes([1; 32]), addr); + let p2 = PeerInfo::new(NodeId::from_bytes([2; 32]), addr); + store.add(p1.clone()); + store.add(p2.clone()); + + let ids = store.ids_for_addr(&addr); + assert_eq!(ids.len(), 2); + assert!(ids.contains(&p1.id)); + assert!(ids.contains(&p2.id)); + } + + #[test] + fn remove_addr_removes_all() { + let mut store = PeerStore::new(); + let addr: SocketAddr = "127.0.0.1:3000".parse().unwrap(); + store.add(PeerInfo::new(NodeId::from_bytes([1; 32]), addr)); + store.add(PeerInfo::new(NodeId::from_bytes([2; 32]), addr)); + store.remove_addr(&addr); + assert!(store.is_empty()); + } + + #[test] + fn timeout_tracking() { + let mut store = PeerStore::new(); + let id = NodeId::from_bytes([1; 32]); + assert!(!store.is_timeout(&id)); + store.mark_timeout(&id); + assert!(store.is_timeout(&id)); + } + + #[test] + fn add_with_session() { + let mut store = PeerStore::new(); + let p = peer(1, 3000); + assert!(store.add_with_session(p.clone(), 42)); + + // Same session: ok + assert!(store.add_with_session(peer(1, 3001), 42)); + + // Different session: rejected + assert!(!store.add_with_session(peer(1, 3002), 99)); + } + + #[test] + fn add_force_overwrites() { + let mut store = PeerStore::new(); + store.add(peer(1, 3000)); + store.add_force(peer(1, 4000)); + assert_eq!(store.len(), 1); + assert_eq!( + store.get(&NodeId::from_bytes([1; 32])).unwrap().addr.port(), + 4000 + ); + } +} |