//! Peer node database. //! //! Bidirectional peer map with TTL-based expiry and //! timeout tracking. use std::collections::HashMap; use std::net::SocketAddr; use std::time::{Duration, Instant}; use crate::id::NodeId; /// TTL for peer entries (5 min). pub const PEERS_MAP_TTL: Duration = Duration::from_secs(300); /// TTL for timeout entries (30s). pub const PEERS_TIMEOUT_TTL: Duration = Duration::from_secs(30); /// Cleanup timer interval (30s). pub const PEERS_TIMER_INTERVAL: Duration = Duration::from_secs(30); /// NAT state of a peer. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum NatState { Unknown, Global, Nat, ConeNat, SymmetricNat, } /// Information about a known peer. #[derive(Debug, Clone)] pub struct PeerInfo { pub id: NodeId, pub addr: SocketAddr, pub domain: u16, pub nat_state: NatState, pub last_seen: Instant, pub session: u32, /// Ed25519 public key (if known). Since NodeId = /// pubkey, this is always available when we know /// the peer's ID. pub public_key: Option<[u8; 32]>, } impl PeerInfo { pub fn new(id: NodeId, addr: SocketAddr) -> Self { // NodeId IS the public key (32 bytes) let public_key = Some(*id.as_bytes()); Self { id, addr, domain: if addr.is_ipv4() { 1 } else { 2 }, nat_state: NatState::Unknown, last_seen: Instant::now(), session: 0, public_key, } } } /// Database of known peers with TTL-based expiry. /// /// Provides forward lookup (id -> info) and reverse /// lookup (addr -> ids). type PeerCallback = Box; /// Maximum number of tracked peers (prevents OOM). const MAX_PEERS: usize = 10_000; pub struct PeerStore { by_id: HashMap, by_addr: HashMap>, timeouts: HashMap, on_add: Option, } impl PeerStore { pub fn new() -> Self { Self { by_id: HashMap::new(), by_addr: HashMap::new(), timeouts: HashMap::new(), on_add: None, } } /// Get peer info by ID. pub fn get(&self, id: &NodeId) -> Option<&PeerInfo> { self.by_id.get(id) } /// Get all peer IDs associated with an address. pub fn ids_for_addr(&self, addr: &SocketAddr) -> Vec { self.by_addr.get(addr).cloned().unwrap_or_default() } /// Add a peer, checking for duplicates. /// /// If the peer already exists, updates `last_seen`. /// Returns `true` if newly added. pub fn add(&mut self, peer: PeerInfo) -> bool { let id = peer.id; let addr = peer.addr; // Limit check (updates are always allowed) if self.by_id.len() >= MAX_PEERS && !self.by_id.contains_key(&id) { return false; } if let Some(existing) = self.by_id.get_mut(&id) { existing.last_seen = Instant::now(); existing.addr = addr; return false; } self.by_addr.entry(addr).or_default().push(id); if let Some(ref cb) = self.on_add { cb(&peer); } self.by_id.insert(id, peer); true } /// Add a peer with a session ID (for DTUN register). /// /// Returns `true` if the session matches or is new. pub fn add_with_session(&mut self, peer: PeerInfo, session: u32) -> bool { if let Some(existing) = self.by_id.get(&peer.id) { if existing.session != session { return false; } } let mut peer = peer; peer.session = session; self.add(peer); true } /// Add a peer, overwriting any existing entry. pub fn add_force(&mut self, peer: PeerInfo) { self.remove(&peer.id); self.add(peer); } /// Remove a peer by ID. pub fn remove(&mut self, id: &NodeId) -> Option { if let Some(peer) = self.by_id.remove(id) { if let Some(ids) = self.by_addr.get_mut(&peer.addr) { ids.retain(|i| i != id); if ids.is_empty() { self.by_addr.remove(&peer.addr); } } self.timeouts.remove(id); Some(peer) } else { None } } /// Remove all peers associated with an address. pub fn remove_addr(&mut self, addr: &SocketAddr) { if let Some(ids) = self.by_addr.remove(addr) { for id in ids { self.by_id.remove(&id); self.timeouts.remove(&id); } } } /// Mark a peer as having timed out. pub fn mark_timeout(&mut self, id: &NodeId) { self.timeouts.insert(*id, Instant::now()); } /// Check if a peer is in timeout state. pub fn is_timeout(&self, id: &NodeId) -> bool { if let Some(t) = self.timeouts.get(id) { t.elapsed() < PEERS_TIMEOUT_TTL } else { false } } /// Remove expired peers (older than MAP_TTL) and /// stale timeout entries. pub fn refresh(&mut self) { let now = Instant::now(); // Remove expired peers let expired: Vec = self .by_id .iter() .filter(|(_, p)| now.duration_since(p.last_seen) >= PEERS_MAP_TTL) .map(|(id, _)| *id) .collect(); for id in expired { self.remove(&id); } // Remove stale timeout entries self.timeouts .retain(|_, t| now.duration_since(*t) < PEERS_TIMEOUT_TTL); } /// Set a callback for when a peer is added. pub fn on_add(&mut self, f: impl Fn(&PeerInfo) + 'static) { self.on_add = Some(Box::new(f)); } /// Number of known peers. pub fn len(&self) -> usize { self.by_id.len() } /// Check if the store is empty. pub fn is_empty(&self) -> bool { self.by_id.is_empty() } /// Iterate over all peers. pub fn iter(&self) -> impl Iterator { self.by_id.values() } /// Get all peer IDs. pub fn ids(&self) -> Vec { self.by_id.keys().copied().collect() } } impl Default for PeerStore { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { use super::*; fn peer(byte: u8, port: u16) -> PeerInfo { PeerInfo::new( NodeId::from_bytes([byte; 32]), SocketAddr::from(([127, 0, 0, 1], port)), ) } #[test] fn add_and_get() { let mut store = PeerStore::new(); let p = peer(1, 3000); assert!(store.add(p.clone())); assert_eq!(store.len(), 1); assert_eq!(store.get(&p.id).unwrap().addr, p.addr); } #[test] fn add_duplicate_updates() { let mut store = PeerStore::new(); let p = peer(1, 3000); assert!(store.add(p.clone())); assert!(!store.add(p)); // duplicate assert_eq!(store.len(), 1); } #[test] fn remove_by_id() { let mut store = PeerStore::new(); let p = peer(1, 3000); store.add(p.clone()); store.remove(&p.id); assert!(store.is_empty()); } #[test] fn reverse_lookup() { let mut store = PeerStore::new(); let addr: SocketAddr = "127.0.0.1:3000".parse().unwrap(); let p1 = PeerInfo::new(NodeId::from_bytes([1; 32]), addr); let p2 = PeerInfo::new(NodeId::from_bytes([2; 32]), addr); store.add(p1.clone()); store.add(p2.clone()); let ids = store.ids_for_addr(&addr); assert_eq!(ids.len(), 2); assert!(ids.contains(&p1.id)); assert!(ids.contains(&p2.id)); } #[test] fn remove_addr_removes_all() { let mut store = PeerStore::new(); let addr: SocketAddr = "127.0.0.1:3000".parse().unwrap(); store.add(PeerInfo::new(NodeId::from_bytes([1; 32]), addr)); store.add(PeerInfo::new(NodeId::from_bytes([2; 32]), addr)); store.remove_addr(&addr); assert!(store.is_empty()); } #[test] fn timeout_tracking() { let mut store = PeerStore::new(); let id = NodeId::from_bytes([1; 32]); assert!(!store.is_timeout(&id)); store.mark_timeout(&id); assert!(store.is_timeout(&id)); } #[test] fn add_with_session() { let mut store = PeerStore::new(); let p = peer(1, 3000); assert!(store.add_with_session(p.clone(), 42)); // Same session: ok assert!(store.add_with_session(peer(1, 3001), 42)); // Different session: rejected assert!(!store.add_with_session(peer(1, 3002), 99)); } #[test] fn add_force_overwrites() { let mut store = PeerStore::new(); store.add(peer(1, 3000)); store.add_force(peer(1, 4000)); assert_eq!(store.len(), 1); assert_eq!( store.get(&NodeId::from_bytes([1; 32])).unwrap().addr.port(), 4000 ); } }