aboutsummaryrefslogtreecommitdiffstats
path: root/src/peers.rs
diff options
context:
space:
mode:
authormurilo ijanc2026-03-24 15:04:03 -0300
committermurilo ijanc2026-03-24 15:04:03 -0300
commit9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch)
tree53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/peers.rs
downloadtesseras-dht-9821aabf0b50d2487b07502d3d2cd89e7d62bdbe.tar.gz
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support
Diffstat (limited to 'src/peers.rs')
-rw-r--r--src/peers.rs337
1 files changed, 337 insertions, 0 deletions
diff --git a/src/peers.rs b/src/peers.rs
new file mode 100644
index 0000000..e575323
--- /dev/null
+++ b/src/peers.rs
@@ -0,0 +1,337 @@
+//! Peer node database.
+//!
+//! Bidirectional peer map with TTL-based expiry and
+//! timeout tracking.
+
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::time::{Duration, Instant};
+
+use crate::id::NodeId;
+
+/// TTL for peer entries (5 min).
+pub const PEERS_MAP_TTL: Duration = Duration::from_secs(300);
+
+/// TTL for timeout entries (30s).
+pub const PEERS_TIMEOUT_TTL: Duration = Duration::from_secs(30);
+
+/// Cleanup timer interval (30s).
+pub const PEERS_TIMER_INTERVAL: Duration = Duration::from_secs(30);
+
+/// NAT state of a peer.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum NatState {
+ Unknown,
+ Global,
+ Nat,
+ ConeNat,
+ SymmetricNat,
+}
+
+/// Information about a known peer.
+#[derive(Debug, Clone)]
+pub struct PeerInfo {
+ pub id: NodeId,
+ pub addr: SocketAddr,
+ pub domain: u16,
+ pub nat_state: NatState,
+ pub last_seen: Instant,
+ pub session: u32,
+
+ /// Ed25519 public key (if known). Since NodeId =
+ /// pubkey, this is always available when we know
+ /// the peer's ID.
+ pub public_key: Option<[u8; 32]>,
+}
+
+impl PeerInfo {
+ pub fn new(id: NodeId, addr: SocketAddr) -> Self {
+ // NodeId IS the public key (32 bytes)
+ let public_key = Some(*id.as_bytes());
+ Self {
+ id,
+ addr,
+ domain: if addr.is_ipv4() { 1 } else { 2 },
+ nat_state: NatState::Unknown,
+ last_seen: Instant::now(),
+ session: 0,
+ public_key,
+ }
+ }
+}
+
+/// Database of known peers with TTL-based expiry.
+///
+/// Provides forward lookup (id -> info) and reverse
+/// lookup (addr -> ids).
+type PeerCallback = Box<dyn Fn(&PeerInfo)>;
+
+/// Maximum number of tracked peers (prevents OOM).
+const MAX_PEERS: usize = 10_000;
+
+pub struct PeerStore {
+ by_id: HashMap<NodeId, PeerInfo>,
+ by_addr: HashMap<SocketAddr, Vec<NodeId>>,
+ timeouts: HashMap<NodeId, Instant>,
+ on_add: Option<PeerCallback>,
+}
+
+impl PeerStore {
+ pub fn new() -> Self {
+ Self {
+ by_id: HashMap::new(),
+ by_addr: HashMap::new(),
+ timeouts: HashMap::new(),
+ on_add: None,
+ }
+ }
+
+ /// Get peer info by ID.
+ pub fn get(&self, id: &NodeId) -> Option<&PeerInfo> {
+ self.by_id.get(id)
+ }
+
+ /// Get all peer IDs associated with an address.
+ pub fn ids_for_addr(&self, addr: &SocketAddr) -> Vec<NodeId> {
+ self.by_addr.get(addr).cloned().unwrap_or_default()
+ }
+
+ /// Add a peer, checking for duplicates.
+ ///
+ /// If the peer already exists, updates `last_seen`.
+ /// Returns `true` if newly added.
+ pub fn add(&mut self, peer: PeerInfo) -> bool {
+ let id = peer.id;
+ let addr = peer.addr;
+
+ // Limit check (updates are always allowed)
+ if self.by_id.len() >= MAX_PEERS && !self.by_id.contains_key(&id) {
+ return false;
+ }
+
+ if let Some(existing) = self.by_id.get_mut(&id) {
+ existing.last_seen = Instant::now();
+ existing.addr = addr;
+ return false;
+ }
+
+ self.by_addr.entry(addr).or_default().push(id);
+ if let Some(ref cb) = self.on_add {
+ cb(&peer);
+ }
+ self.by_id.insert(id, peer);
+ true
+ }
+
+ /// Add a peer with a session ID (for DTUN register).
+ ///
+ /// Returns `true` if the session matches or is new.
+ pub fn add_with_session(&mut self, peer: PeerInfo, session: u32) -> bool {
+ if let Some(existing) = self.by_id.get(&peer.id) {
+ if existing.session != session {
+ return false;
+ }
+ }
+ let mut peer = peer;
+ peer.session = session;
+ self.add(peer);
+ true
+ }
+
+ /// Add a peer, overwriting any existing entry.
+ pub fn add_force(&mut self, peer: PeerInfo) {
+ self.remove(&peer.id);
+ self.add(peer);
+ }
+
+ /// Remove a peer by ID.
+ pub fn remove(&mut self, id: &NodeId) -> Option<PeerInfo> {
+ if let Some(peer) = self.by_id.remove(id) {
+ if let Some(ids) = self.by_addr.get_mut(&peer.addr) {
+ ids.retain(|i| i != id);
+ if ids.is_empty() {
+ self.by_addr.remove(&peer.addr);
+ }
+ }
+ self.timeouts.remove(id);
+ Some(peer)
+ } else {
+ None
+ }
+ }
+
+ /// Remove all peers associated with an address.
+ pub fn remove_addr(&mut self, addr: &SocketAddr) {
+ if let Some(ids) = self.by_addr.remove(addr) {
+ for id in ids {
+ self.by_id.remove(&id);
+ self.timeouts.remove(&id);
+ }
+ }
+ }
+
+ /// Mark a peer as having timed out.
+ pub fn mark_timeout(&mut self, id: &NodeId) {
+ self.timeouts.insert(*id, Instant::now());
+ }
+
+ /// Check if a peer is in timeout state.
+ pub fn is_timeout(&self, id: &NodeId) -> bool {
+ if let Some(t) = self.timeouts.get(id) {
+ t.elapsed() < PEERS_TIMEOUT_TTL
+ } else {
+ false
+ }
+ }
+
+ /// Remove expired peers (older than MAP_TTL) and
+ /// stale timeout entries.
+ pub fn refresh(&mut self) {
+ let now = Instant::now();
+
+ // Remove expired peers
+ let expired: Vec<NodeId> = self
+ .by_id
+ .iter()
+ .filter(|(_, p)| now.duration_since(p.last_seen) >= PEERS_MAP_TTL)
+ .map(|(id, _)| *id)
+ .collect();
+
+ for id in expired {
+ self.remove(&id);
+ }
+
+ // Remove stale timeout entries
+ self.timeouts
+ .retain(|_, t| now.duration_since(*t) < PEERS_TIMEOUT_TTL);
+ }
+
+ /// Set a callback for when a peer is added.
+ pub fn on_add(&mut self, f: impl Fn(&PeerInfo) + 'static) {
+ self.on_add = Some(Box::new(f));
+ }
+
+ /// Number of known peers.
+ pub fn len(&self) -> usize {
+ self.by_id.len()
+ }
+
+ /// Check if the store is empty.
+ pub fn is_empty(&self) -> bool {
+ self.by_id.is_empty()
+ }
+
+ /// Iterate over all peers.
+ pub fn iter(&self) -> impl Iterator<Item = &PeerInfo> {
+ self.by_id.values()
+ }
+
+ /// Get all peer IDs.
+ pub fn ids(&self) -> Vec<NodeId> {
+ self.by_id.keys().copied().collect()
+ }
+}
+
+impl Default for PeerStore {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn peer(byte: u8, port: u16) -> PeerInfo {
+ PeerInfo::new(
+ NodeId::from_bytes([byte; 32]),
+ SocketAddr::from(([127, 0, 0, 1], port)),
+ )
+ }
+
+ #[test]
+ fn add_and_get() {
+ let mut store = PeerStore::new();
+ let p = peer(1, 3000);
+ assert!(store.add(p.clone()));
+ assert_eq!(store.len(), 1);
+ assert_eq!(store.get(&p.id).unwrap().addr, p.addr);
+ }
+
+ #[test]
+ fn add_duplicate_updates() {
+ let mut store = PeerStore::new();
+ let p = peer(1, 3000);
+ assert!(store.add(p.clone()));
+ assert!(!store.add(p)); // duplicate
+ assert_eq!(store.len(), 1);
+ }
+
+ #[test]
+ fn remove_by_id() {
+ let mut store = PeerStore::new();
+ let p = peer(1, 3000);
+ store.add(p.clone());
+ store.remove(&p.id);
+ assert!(store.is_empty());
+ }
+
+ #[test]
+ fn reverse_lookup() {
+ let mut store = PeerStore::new();
+ let addr: SocketAddr = "127.0.0.1:3000".parse().unwrap();
+ let p1 = PeerInfo::new(NodeId::from_bytes([1; 32]), addr);
+ let p2 = PeerInfo::new(NodeId::from_bytes([2; 32]), addr);
+ store.add(p1.clone());
+ store.add(p2.clone());
+
+ let ids = store.ids_for_addr(&addr);
+ assert_eq!(ids.len(), 2);
+ assert!(ids.contains(&p1.id));
+ assert!(ids.contains(&p2.id));
+ }
+
+ #[test]
+ fn remove_addr_removes_all() {
+ let mut store = PeerStore::new();
+ let addr: SocketAddr = "127.0.0.1:3000".parse().unwrap();
+ store.add(PeerInfo::new(NodeId::from_bytes([1; 32]), addr));
+ store.add(PeerInfo::new(NodeId::from_bytes([2; 32]), addr));
+ store.remove_addr(&addr);
+ assert!(store.is_empty());
+ }
+
+ #[test]
+ fn timeout_tracking() {
+ let mut store = PeerStore::new();
+ let id = NodeId::from_bytes([1; 32]);
+ assert!(!store.is_timeout(&id));
+ store.mark_timeout(&id);
+ assert!(store.is_timeout(&id));
+ }
+
+ #[test]
+ fn add_with_session() {
+ let mut store = PeerStore::new();
+ let p = peer(1, 3000);
+ assert!(store.add_with_session(p.clone(), 42));
+
+ // Same session: ok
+ assert!(store.add_with_session(peer(1, 3001), 42));
+
+ // Different session: rejected
+ assert!(!store.add_with_session(peer(1, 3002), 99));
+ }
+
+ #[test]
+ fn add_force_overwrites() {
+ let mut store = PeerStore::new();
+ store.add(peer(1, 3000));
+ store.add_force(peer(1, 4000));
+ assert_eq!(store.len(), 1);
+ assert_eq!(
+ store.get(&NodeId::from_bytes([1; 32])).unwrap().addr.port(),
+ 4000
+ );
+ }
+}