diff options
| author | murilo ijanc | 2026-03-24 15:04:03 -0300 |
|---|---|---|
| committer | murilo ijanc | 2026-03-24 15:04:03 -0300 |
| commit | 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch) | |
| tree | 53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/node.rs | |
| download | tesseras-dht-9821aabf0b50d2487b07502d3d2cd89e7d62bdbe.tar.gz | |
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks.
Features:
- Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE)
- NAT traversal via DTUN hole-punching and proxy relay
- Reliable Datagram Protocol (RDP) with 7-state connection machine
- Datagram transport with automatic fragmentation/reassembly
- Ed25519 packet authentication
- 256-bit node IDs (Ed25519 public keys)
- Rate limiting, ban list, and eclipse attack mitigation
- Persistence and metrics
- OpenBSD and Linux support
Diffstat (limited to 'src/node.rs')
| -rw-r--r-- | src/node.rs | 1395 |
1 files changed, 1395 insertions, 0 deletions
diff --git a/src/node.rs b/src/node.rs new file mode 100644 index 0000000..fef917e --- /dev/null +++ b/src/node.rs @@ -0,0 +1,1395 @@ +//! Main facade: the `Node` node. +//! +//! Owns all subsystems and provides the public API for +//! joining the network, +//! storing/retrieving values, sending datagrams, and +//! using reliable transport (RDP). + +use std::collections::HashMap; +use std::fmt; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use crate::advertise::Advertise; +use crate::dgram::{self, Reassembler, SendQueue}; +use crate::dht::{DhtStorage, IterativeQuery, MaskBitExplorer}; +use crate::dtun::Dtun; +use crate::error::Error; +use crate::id::NodeId; +use crate::msg; +use crate::nat::{NatDetector, NatState}; +use crate::peers::{PeerInfo, PeerStore}; +use crate::proxy::Proxy; +use crate::rdp::{Rdp, RdpError, RdpState, RdpStatus}; +use crate::routing::RoutingTable; +use crate::socket::{NetLoop, UDP_TOKEN}; +use crate::timer::TimerWheel; +use crate::wire::{DOMAIN_INET, DOMAIN_INET6, HEADER_SIZE, MsgHeader, MsgType}; + +/// Default poll timeout when no timers are scheduled. +const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_millis(100); + +type DgramCallback = Box<dyn Fn(&[u8], &NodeId) + Send>; +type RdpCallback = + Box<dyn Fn(i32, &crate::rdp::RdpAddr, crate::rdp::RdpEvent) + Send>; + +/// The tesseras-dht node. +/// +/// This is the main entry point. It owns all subsystems +/// (DHT, DTUN, NAT detector, proxy, RDP, datagrams, +/// peers, timers, network I/O) and exposes a clean API +/// for the tesseras-dht node. +pub struct Node { + pub(crate) identity: crate::crypto::Identity, + pub(crate) id: NodeId, + pub(crate) net: NetLoop, + pub(crate) dht_table: RoutingTable, + pub(crate) dtun: Dtun, + pub(crate) nat: NatDetector, + pub(crate) proxy: Proxy, + pub(crate) rdp: Rdp, + pub(crate) storage: DhtStorage, + pub(crate) peers: PeerStore, + pub(crate) timers: TimerWheel, + pub(crate) advertise: Advertise, + pub(crate) reassembler: Reassembler, + pub(crate) send_queue: SendQueue, + pub(crate) explorer: MaskBitExplorer, + pub(crate) is_dtun: bool, + pub(crate) dgram_callback: Option<DgramCallback>, + pub(crate) rdp_callback: Option<RdpCallback>, + + /// Active iterative queries keyed by nonce. + pub(crate) queries: HashMap<u32, IterativeQuery>, + + /// Last bucket refresh time. + pub(crate) last_refresh: Instant, + + /// Last data restore time. + pub(crate) last_restore: Instant, + + /// Last maintain (mask_bit exploration) time. + pub(crate) last_maintain: Instant, + + /// Routing table persistence backend. + pub(crate) routing_persistence: Box<dyn crate::persist::RoutingPersistence>, + + /// Data persistence backend. + pub(crate) data_persistence: Box<dyn crate::persist::DataPersistence>, + + /// Metrics counters. + pub(crate) metrics: crate::metrics::Metrics, + /// Pending pings: nonce → (target NodeId, sent_at). + pub(crate) pending_pings: HashMap<u32, (NodeId, Instant)>, + /// Inbound rate limiter. + pub(crate) rate_limiter: crate::ratelimit::RateLimiter, + /// Node configuration. + pub(crate) config: crate::config::Config, + /// Ban list for misbehaving peers. + pub(crate) ban_list: crate::banlist::BanList, + /// Store acknowledgment tracker. + pub(crate) store_tracker: crate::store_track::StoreTracker, + /// Last node activity check time. + pub(crate) last_activity_check: Instant, + /// Last store retry sweep time. + pub(crate) last_store_retry: Instant, +} + +/// Builder for configuring a Node node. +/// +/// ```rust,no_run +/// use tesseras_dht::node::NodeBuilder; +/// use tesseras_dht::nat::NatState; +/// +/// let node = NodeBuilder::new() +/// .port(10000) +/// .nat(NatState::Global) +/// .seed(b"my-identity-seed") +/// .build() +/// .unwrap(); +/// ``` +pub struct NodeBuilder { + port: u16, + addr: Option<SocketAddr>, + pub(crate) nat: Option<NatState>, + seed: Option<Vec<u8>>, + enable_dtun: bool, + config: Option<crate::config::Config>, +} + +impl NodeBuilder { + pub fn new() -> Self { + Self { + port: 0, + addr: None, + nat: None, + seed: None, + enable_dtun: true, + config: None, + } + } + + /// Set the UDP port to bind. + pub fn port(mut self, port: u16) -> Self { + self.port = port; + self + } + + /// Set a specific bind address. + pub fn addr(mut self, addr: SocketAddr) -> Self { + self.addr = Some(addr); + self + } + + /// Set the NAT state. + pub fn nat(mut self, state: NatState) -> Self { + self.nat = Some(state); + self + } + + /// Set identity seed (deterministic keypair). + pub fn seed(mut self, data: &[u8]) -> Self { + self.seed = Some(data.to_vec()); + self + } + + /// Enable or disable DTUN. + pub fn dtun(mut self, enabled: bool) -> Self { + self.enable_dtun = enabled; + self + } + + /// Set the node configuration. + pub fn config(mut self, config: crate::config::Config) -> Self { + self.config = Some(config); + self + } + + /// Build the Node node. + pub fn build(self) -> Result<Node, Error> { + let addr = self + .addr + .unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], self.port))); + + let mut node = Node::bind_addr(addr)?; + + if let Some(seed) = &self.seed { + node.set_id(seed); + } + + if let Some(nat) = self.nat { + node.set_nat_state(nat); + } + + if !self.enable_dtun { + node.is_dtun = false; + } + + if let Some(config) = self.config { + node.config = config; + } + + Ok(node) + } +} + +impl Default for NodeBuilder { + fn default() -> Self { + Self::new() + } +} + +impl Node { + /// Create a new node and bind to `port` (IPv4). + /// + /// Generates a random node ID. Use `set_id` to + /// derive an ID from application data instead. + pub fn bind(port: u16) -> Result<Self, Error> { + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + Self::bind_addr(addr) + } + + /// Create a new node bound to `port` on IPv6. + /// + /// When using IPv6, DTUN is disabled and NAT state + /// is set to Global (IPv6 does not need NAT + /// traversal). + pub fn bind_v6(port: u16) -> Result<Self, Error> { + let addr = SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, port)); + let mut node = Self::bind_addr(addr)?; + node.is_dtun = false; + node.dtun.set_enabled(false); + node.nat.set_state(NatState::Global); + Ok(node) + } + + /// Create a new node bound to a specific address. + pub fn bind_addr(addr: SocketAddr) -> Result<Self, Error> { + let identity = crate::crypto::Identity::generate(); + let id = *identity.node_id(); + let net = NetLoop::bind(addr)?; + + log::info!("Node node {} bound to {}", id, net.local_addr()?); + + Ok(Self { + dht_table: RoutingTable::new(id), + dtun: Dtun::new(id), + nat: NatDetector::new(id), + proxy: Proxy::new(id), + rdp: Rdp::new(), + storage: DhtStorage::new(), + peers: PeerStore::new(), + timers: TimerWheel::new(), + advertise: Advertise::new(id), + reassembler: Reassembler::new(), + send_queue: SendQueue::new(), + explorer: MaskBitExplorer::new(id), + is_dtun: true, + dgram_callback: None, + rdp_callback: None, + queries: HashMap::new(), + last_refresh: Instant::now(), + last_restore: Instant::now(), + last_maintain: Instant::now(), + routing_persistence: Box::new(crate::persist::NoPersistence), + data_persistence: Box::new(crate::persist::NoPersistence), + metrics: crate::metrics::Metrics::new(), + pending_pings: HashMap::new(), + rate_limiter: crate::ratelimit::RateLimiter::default(), + config: crate::config::Config::default(), + ban_list: crate::banlist::BanList::new(), + store_tracker: crate::store_track::StoreTracker::new(), + last_activity_check: Instant::now(), + last_store_retry: Instant::now(), + identity, + id, + net, + }) + } + + // ── Identity ──────────────────────────────────── + + /// The local node ID. + pub fn id(&self) -> &NodeId { + &self.id + } + + /// The local node ID as a hex string. + pub fn id_hex(&self) -> String { + self.id.to_hex() + } + + /// Set the node identity from a 32-byte seed. + /// + /// Derives an Ed25519 keypair from the seed. + /// NodeId = public key. Deterministic: same seed + /// produces the same identity. + pub fn set_id(&mut self, data: &[u8]) { + // Hash to 32 bytes if input is not already 32 + let seed = if data.len() == 32 { + let mut s = [0u8; 32]; + s.copy_from_slice(data); + s + } else { + use sha2::{Digest, Sha256}; + let hash = Sha256::digest(data); + let mut s = [0u8; 32]; + s.copy_from_slice(&hash); + s + }; + self.identity = crate::crypto::Identity::from_seed(seed); + self.id = *self.identity.node_id(); + self.dht_table = RoutingTable::new(self.id); + self.dtun = Dtun::new(self.id); + self.explorer = MaskBitExplorer::new(self.id); + log::info!("Node ID set to {}", self.id); + } + + /// The node's Ed25519 public key (32 bytes). + pub fn public_key(&self) -> &[u8; 32] { + self.identity.public_key() + } + + // ── NAT state ─────────────────────────────────── + + /// Current NAT detection state. + pub fn nat_state(&self) -> NatState { + self.nat.state() + } + + /// Force the NAT state. + pub fn set_nat_state(&mut self, state: NatState) { + self.nat.set_state(state); + if state == NatState::Global { + // IPv6 or explicitly global: disable DTUN + if !self.is_dtun { + self.dtun.set_enabled(false); + } + } + } + + pub(crate) fn alloc_nonce(&mut self) -> u32 { + let mut buf = [0u8; 4]; + crate::sys::random_bytes(&mut buf); + u32::from_ne_bytes(buf) + } + + /// Safe cast of packet size to u16. + #[inline] + pub(crate) fn len16(n: usize) -> u16 { + u16::try_from(n).expect("packet size exceeds u16") + } + + // ── DHT operations ────────────────────────────── + + /// Store a key-value pair in the DHT. + /// + /// The key is hashed with SHA-256 to map it to the + /// 256-bit ID space. Stored locally and sent to + /// the k-closest known nodes. + /// + /// # Example + /// + /// ```rust,no_run + /// # let mut node = tesseras_dht::Node::bind(0).unwrap(); + /// node.put(b"paste-id", b"paste content", 3600, false); + /// ``` + pub fn put(&mut self, key: &[u8], value: &[u8], ttl: u16, is_unique: bool) { + let target_id = NodeId::from_key(key); + log::debug!( + "put: key={} target={}", + String::from_utf8_lossy(key), + target_id + ); + + // Store locally + let val = crate::dht::StoredValue { + key: key.to_vec(), + value: value.to_vec(), + id: target_id, + source: self.id, + ttl, + stored_at: std::time::Instant::now(), + is_unique, + original: 3, // ORIGINAL_PUT_NUM + recvd: std::collections::HashSet::new(), + version: crate::dht::now_version(), + }; + self.storage.store(val); + + // If behind symmetric NAT, route through proxy + if self.nat.state() == NatState::SymmetricNat { + if let Some(server) = self.proxy.server().cloned() { + let store_msg = msg::StoreMsg { + id: target_id, + from: self.id, + key: key.to_vec(), + value: value.to_vec(), + ttl, + is_unique, + }; + let total = HEADER_SIZE + + msg::STORE_FIXED + + store_msg.key.len() + + store_msg.value.len(); + let mut buf = vec![0u8; total]; + let hdr = MsgHeader::new( + MsgType::ProxyStore, + Self::len16(total), + self.id, + server.id, + ); + if hdr.write(&mut buf).is_ok() { + let _ = msg::write_store(&mut buf, &store_msg); + let _ = self.send_signed(&buf, server.addr); + } + log::info!("put: via proxy to {:?}", server.id); + return; + } + } + + // Direct: send STORE to k-closest known nodes + let closest = self + .dht_table + .closest(&target_id, self.config.num_find_node); + let store_msg = msg::StoreMsg { + id: target_id, + from: self.id, + key: key.to_vec(), + value: value.to_vec(), + ttl, + is_unique, + }; + for peer in &closest { + if let Err(e) = self.send_store(peer, &store_msg) { + log::warn!("Failed to send store to {:?}: {e}", peer.id); + } else { + self.store_tracker.track( + target_id, + key.to_vec(), + value.to_vec(), + ttl, + is_unique, + peer.clone(), + ); + } + } + log::info!("put: stored locally + sent to {} peers", closest.len()); + } + + /// Store multiple key-value pairs in the DHT. + /// + /// More efficient than calling `put()` in a loop: + /// groups stores by target peer to reduce redundant + /// lookups and sends. + pub fn put_batch(&mut self, entries: &[(&[u8], &[u8], u16, bool)]) { + // Group by target peer set to batch sends + struct BatchEntry { + target_id: NodeId, + key: Vec<u8>, + value: Vec<u8>, + ttl: u16, + is_unique: bool, + } + + let mut batch: Vec<BatchEntry> = Vec::with_capacity(entries.len()); + + for &(key, value, ttl, is_unique) in entries { + let target_id = NodeId::from_key(key); + + // Store locally + let val = crate::dht::StoredValue { + key: key.to_vec(), + value: value.to_vec(), + id: target_id, + source: self.id, + ttl, + stored_at: std::time::Instant::now(), + is_unique, + original: 3, + recvd: std::collections::HashSet::new(), + version: crate::dht::now_version(), + }; + self.storage.store(val); + + batch.push(BatchEntry { + target_id, + key: key.to_vec(), + value: value.to_vec(), + ttl, + is_unique, + }); + } + + // Collect unique peers across all targets to + // minimize redundant sends + let mut peer_stores: HashMap<NodeId, Vec<msg::StoreMsg>> = + HashMap::new(); + + for entry in &batch { + let closest = self + .dht_table + .closest(&entry.target_id, self.config.num_find_node); + let store_msg = msg::StoreMsg { + id: entry.target_id, + from: self.id, + key: entry.key.clone(), + value: entry.value.clone(), + ttl: entry.ttl, + is_unique: entry.is_unique, + }; + for peer in &closest { + peer_stores + .entry(peer.id) + .or_default() + .push(store_msg.clone()); + } + } + + // Send all stores grouped by peer + let mut total_sent = 0u32; + for (peer_id, stores) in &peer_stores { + if let Some(peer) = self.peers.get(peer_id).cloned() { + if self.ban_list.is_banned(&peer.addr) { + continue; + } + for store in stores { + if self.send_store(&peer, store).is_ok() { + total_sent += 1; + } + } + } + } + + log::info!( + "put_batch: {} entries stored locally, {total_sent} sends to {} peers", + batch.len(), + peer_stores.len(), + ); + } + + /// Retrieve multiple keys from the DHT. + /// + /// Returns a vec of (key, values) pairs. Local values + /// are returned immediately; missing keys trigger + /// iterative FIND_VALUE queries resolved via `poll()`. + pub fn get_batch( + &mut self, + keys: &[&[u8]], + ) -> Vec<(Vec<u8>, Vec<Vec<u8>>)> { + let mut results = Vec::with_capacity(keys.len()); + + for &key in keys { + let target_id = NodeId::from_key(key); + let local = self.storage.get(&target_id, key); + + if !local.is_empty() { + let vals: Vec<Vec<u8>> = + local.into_iter().map(|v| v.value).collect(); + results.push((key.to_vec(), vals)); + } else { + // Start iterative FIND_VALUE for missing keys + if let Err(e) = self.start_find_value(key) { + log::debug!("Batch find_value failed for key: {e}"); + } + results.push((key.to_vec(), Vec::new())); + } + } + + results + } + + /// Delete a key from the DHT. + /// + /// Sends a STORE with TTL=0 to the k-closest nodes, + /// which causes them to remove the value. + pub fn delete(&mut self, key: &[u8]) { + let target_id = NodeId::from_key(key); + + // Remove locally + self.storage.remove(&target_id, key); + + // Send TTL=0 store to closest nodes + let closest = self + .dht_table + .closest(&target_id, self.config.num_find_node); + let store_msg = msg::StoreMsg { + id: target_id, + from: self.id, + key: key.to_vec(), + value: Vec::new(), + ttl: 0, + is_unique: false, + }; + for peer in &closest { + let _ = self.send_store(peer, &store_msg); + } + log::info!("delete: removed locally + sent to {} peers", closest.len()); + } + + /// Retrieve values for a key from the DHT. + /// + /// First checks local storage. If not found, starts + /// an iterative FIND_VALUE query across the network. + /// Returns local values immediately; remote results + /// arrive via `poll()` and can be retrieved with a + /// subsequent `get()` call (they'll be cached + /// locally by `handle_dht_find_value_reply`). + pub fn get(&mut self, key: &[u8]) -> Vec<Vec<u8>> { + let target_id = NodeId::from_key(key); + + // Check local storage first + let local = self.storage.get(&target_id, key); + if !local.is_empty() { + return local.into_iter().map(|v| v.value).collect(); + } + + // Not found locally — start iterative FIND_VALUE + if let Err(e) = self.start_find_value(key) { + log::debug!("Failed to start find_value: {e}"); + } + + Vec::new() + } + + /// Retrieve values with blocking network lookup. + /// + /// Polls internally until the value is found or + /// `timeout` expires. Returns empty if not found. + pub fn get_blocking( + &mut self, + key: &[u8], + timeout: Duration, + ) -> Vec<Vec<u8>> { + let target_id = NodeId::from_key(key); + + // Check local first + let local = self.storage.get(&target_id, key); + if !local.is_empty() { + return local.into_iter().map(|v| v.value).collect(); + } + + // Start FIND_VALUE + if self.start_find_value(key).is_err() { + return Vec::new(); + } + + // Poll until found or timeout + let deadline = Instant::now() + timeout; + while Instant::now() < deadline { + let _ = self.poll(); + + let vals = self.storage.get(&target_id, key); + if !vals.is_empty() { + return vals.into_iter().map(|v| v.value).collect(); + } + + std::thread::sleep(Duration::from_millis(10)); + } + + Vec::new() + } + + /// Start an iterative FIND_VALUE query for a key. + /// + /// Returns the query nonce. Results arrive via + /// `handle_dht_find_value_reply` during `poll()`. + pub fn start_find_value(&mut self, key: &[u8]) -> Result<u32, Error> { + let target_id = NodeId::from_key(key); + let nonce = self.alloc_nonce(); + let mut query = + IterativeQuery::find_value(target_id, key.to_vec(), nonce); + + // Seed with our closest known nodes + let closest = self + .dht_table + .closest(&target_id, self.config.num_find_node); + query.closest = closest; + + self.queries.insert(nonce, query); + + // Send initial batch + self.send_query_batch(nonce)?; + + Ok(nonce) + } + + /// Send a FIND_VALUE message to a specific address. + pub(crate) fn send_find_value_msg( + &mut self, + to: SocketAddr, + target: NodeId, + key: &[u8], + ) -> Result<u32, Error> { + let nonce = self.alloc_nonce(); + let domain = if to.is_ipv4() { + DOMAIN_INET + } else { + DOMAIN_INET6 + }; + + let fv = msg::FindValueMsg { + nonce, + target, + domain, + key: key.to_vec(), + use_rdp: false, + }; + + let total = HEADER_SIZE + msg::FIND_VALUE_FIXED + key.len(); + let mut buf = vec![0u8; total]; + let hdr = MsgHeader::new( + MsgType::DhtFindValue, + Self::len16(total), + self.id, + NodeId::from_bytes([0; crate::id::ID_LEN]), + ); + hdr.write(&mut buf)?; + msg::write_find_value(&mut buf, &fv)?; + self.send_signed(&buf, to)?; + + log::debug!( + "Sent find_value to {to} target={target:?} key={} bytes", + key.len() + ); + Ok(nonce) + } + + // ── Datagram ──────────────────────────────────── + + /// Send a datagram to a destination node. + /// + /// If the destination's address is known, fragments + /// are sent immediately. Otherwise they are queued + /// for delivery once the address is resolved. + pub fn send_dgram(&mut self, data: &[u8], dst: &NodeId) { + let fragments = dgram::fragment(data); + log::debug!( + "send_dgram: {} bytes, {} fragment(s) to {:?}", + data.len(), + fragments.len(), + dst + ); + + // If behind symmetric NAT, route through proxy + if self.nat.state() == NatState::SymmetricNat { + if let Some(server) = self.proxy.server().cloned() { + for frag in &fragments { + let total = HEADER_SIZE + frag.len(); + let mut buf = vec![0u8; total]; + let hdr = MsgHeader::new( + MsgType::ProxyDgram, + Self::len16(total), + self.id, + *dst, + ); + if hdr.write(&mut buf).is_ok() { + buf[HEADER_SIZE..].copy_from_slice(frag); + let _ = self.send_signed(&buf, server.addr); + } + } + return; + } + } + + // Direct: send if we know the address + if let Some(peer) = self.peers.get(dst).cloned() { + for frag in &fragments { + self.send_dgram_raw(frag, &peer); + } + } else { + // Queue for later delivery + for frag in fragments { + self.send_queue.push(*dst, frag, self.id); + } + } + } + + /// Send a single dgram fragment wrapped in a + /// protocol message. + pub(crate) fn send_dgram_raw(&self, payload: &[u8], dst: &PeerInfo) { + let total = HEADER_SIZE + payload.len(); + let mut buf = vec![0u8; total]; + let hdr = + MsgHeader::new(MsgType::Dgram, Self::len16(total), self.id, dst.id); + if hdr.write(&mut buf).is_ok() { + buf[HEADER_SIZE..].copy_from_slice(payload); + let _ = self.send_signed(&buf, dst.addr); + } + } + + /// Set the callback for received datagrams. + pub fn set_dgram_callback<F>(&mut self, f: F) + where + F: Fn(&[u8], &NodeId) + Send + 'static, + { + self.dgram_callback = Some(Box::new(f)); + } + + /// Remove the datagram callback. + pub fn unset_dgram_callback(&mut self) { + self.dgram_callback = None; + } + + /// Set a callback for RDP events (ACCEPTED, + /// CONNECTED, READY2READ, RESET, FAILED, etc). + pub fn set_rdp_callback<F>(&mut self, f: F) + where + F: Fn(i32, &crate::rdp::RdpAddr, crate::rdp::RdpEvent) + Send + 'static, + { + self.rdp_callback = Some(Box::new(f)); + } + + /// Remove the RDP event callback. + pub fn unset_rdp_callback(&mut self) { + self.rdp_callback = None; + } + + // ── RDP (reliable transport) ──────────────────── + + /// Listen for RDP connections on `port`. + pub fn rdp_listen(&mut self, port: u16) -> Result<i32, RdpError> { + self.rdp.listen(port) + } + + /// Connect to a remote node via RDP. + /// + /// Sends a SYN packet immediately if the peer's + /// address is known. + pub fn rdp_connect( + &mut self, + sport: u16, + dst: &NodeId, + dport: u16, + ) -> Result<i32, RdpError> { + let desc = self.rdp.connect(sport, *dst, dport)?; + self.flush_rdp_output(desc); + Ok(desc) + } + + /// Close an RDP connection or listener. + pub fn rdp_close(&mut self, desc: i32) { + self.rdp.close(desc); + } + + /// Send data on an RDP connection. + /// + /// Enqueues data and flushes pending packets to the + /// network. + pub fn rdp_send( + &mut self, + desc: i32, + data: &[u8], + ) -> Result<usize, RdpError> { + let n = self.rdp.send(desc, data)?; + self.flush_rdp_output(desc); + Ok(n) + } + + /// Receive data from an RDP connection. + pub fn rdp_recv( + &mut self, + desc: i32, + buf: &mut [u8], + ) -> Result<usize, RdpError> { + self.rdp.recv(desc, buf) + } + + /// Get the state of an RDP descriptor. + pub fn rdp_state(&self, desc: i32) -> Result<RdpState, RdpError> { + self.rdp.get_state(desc) + } + + /// Get status of all RDP connections. + pub fn rdp_status(&self) -> Vec<RdpStatus> { + self.rdp.get_status() + } + + /// Set RDP maximum retransmission timeout. + pub fn rdp_set_max_retrans(&mut self, secs: u64) { + self.rdp.set_max_retrans(Duration::from_secs(secs)); + } + + /// Get RDP maximum retransmission timeout. + pub fn rdp_max_retrans(&self) -> u64 { + self.rdp.max_retrans().as_secs() + } + + // ── Event loop ────────────────────────────────── + + /// Process one iteration of the event loop. + /// + /// Polls for I/O events, processes incoming packets, + /// fires expired timers, and runs maintenance tasks. + pub fn poll(&mut self) -> Result<(), Error> { + self.poll_timeout(DEFAULT_POLL_TIMEOUT) + } + + /// Poll with a custom maximum timeout. + /// + /// Use a short timeout (e.g. 1ms) in tests with + /// many nodes to avoid blocking. + pub fn poll_timeout(&mut self, max_timeout: Duration) -> Result<(), Error> { + let timeout = self + .timers + .next_deadline() + .map(|d| d.min(max_timeout)) + .unwrap_or(max_timeout); + + self.net.poll_events(timeout)?; + + // Check if we got a UDP event. We must not hold + // a borrow on self.net while calling handle_packet, + // so we just check and then drain separately. + let has_udp = self.net.drain_events().any(|ev| ev.token() == UDP_TOKEN); + + if has_udp { + let mut buf = [0u8; 4096]; + while let Ok((len, from)) = self.net.recv_from(&mut buf) { + self.handle_packet(&buf[..len], from); + } + } + + // Fire timers + let _fired = self.timers.tick(); + + // Drive iterative queries + self.drive_queries(); + + // Drain send queue for destinations we now know + self.drain_send_queue(); + + // Drive RDP: tick timeouts and flush output + let rdp_actions = self.rdp.tick(); + for action in rdp_actions { + if let crate::rdp::RdpAction::Event { desc, event, .. } = action { + log::info!("RDP tick event: desc={desc} {event:?}"); + } + } + self.flush_all_rdp(); + + // Periodic maintenance + self.peers.refresh(); + self.storage.expire(); + self.advertise.refresh(); + self.reassembler.expire(); + self.send_queue.expire(); + self.refresh_buckets(); + self.probe_liveness(); + + self.rate_limiter.cleanup(); + + // Node activity monitor (proactive ping) + self.check_node_activity(); + + // Retry failed stores + self.retry_failed_stores(); + + // Ban list cleanup + self.ban_list.cleanup(); + self.store_tracker.cleanup(); + + // Expire stale pending pings (>10s) — record + // failure for unresponsive peers + let expired_pings: Vec<(u32, NodeId)> = self + .pending_pings + .iter() + .filter(|(_, (_, sent))| sent.elapsed().as_secs() >= 10) + .map(|(nonce, (id, _))| (*nonce, *id)) + .collect(); + for (nonce, peer_id) in &expired_pings { + if let Some(peer) = self.peers.get(peer_id) { + self.ban_list.record_failure(peer.addr); + } + // Also record failure in routing table for + // stale count / replacement cache logic + if let Some(evicted) = self.dht_table.record_failure(peer_id) { + log::debug!( + "Replaced stale peer {:?} from routing table", + evicted + ); + } + self.pending_pings.remove(nonce); + } + + // Data restore (every 120s) + if self.last_restore.elapsed() >= self.config.restore_interval { + self.last_restore = Instant::now(); + self.restore_data(); + } + + // Maintain: mask_bit exploration (every 120s) + if self.last_maintain.elapsed() >= self.config.maintain_interval { + self.last_maintain = Instant::now(); + self.run_maintain(); + } + + // DTUN maintenance + let dtun_targets = self.dtun.maintain(); + for target in dtun_targets { + if let Err(e) = self.start_find_node(target) { + log::debug!("DTUN maintain find_node failed: {e}"); + } + } + + // NAT re-detection + self.nat.expire_pending(); + + Ok(()) + } + + /// Run the event loop forever. + pub fn run(&mut self) -> ! { + loop { + if let Err(e) = self.poll() { + log::error!("Event loop error: {e}"); + } + } + } + + /// Set the node configuration. Call before `join()`. + pub fn set_config(&mut self, config: crate::config::Config) { + self.config = config; + } + + /// Get the current configuration. + pub fn config(&self) -> &crate::config::Config { + &self.config + } + + /// Set the routing table persistence backend. + pub fn set_routing_persistence( + &mut self, + p: Box<dyn crate::persist::RoutingPersistence>, + ) { + self.routing_persistence = p; + } + + /// Set the data persistence backend. + pub fn set_data_persistence( + &mut self, + p: Box<dyn crate::persist::DataPersistence>, + ) { + self.data_persistence = p; + } + + /// Load saved contacts and data from persistence + /// backends. Call after bind, before join. + pub fn load_persisted(&mut self) { + // Load routing table contacts + if let Ok(contacts) = self.routing_persistence.load_contacts() { + let mut loaded = 0usize; + for c in contacts { + // Validate: skip zero IDs and unspecified addrs + if c.id.is_zero() { + log::debug!("Skipping persisted contact: zero ID"); + continue; + } + if c.addr.ip().is_unspecified() { + log::debug!("Skipping persisted contact: unspecified addr"); + continue; + } + let peer = PeerInfo::new(c.id, c.addr); + self.dht_table.add(peer.clone()); + self.peers.add(peer); + loaded += 1; + } + log::info!("Loaded {loaded} persisted contacts"); + } + + // Load stored values + if let Ok(records) = self.data_persistence.load() { + for r in &records { + let val = crate::dht::StoredValue { + key: r.key.clone(), + value: r.value.clone(), + id: r.target_id, + source: r.source, + ttl: r.ttl, + stored_at: Instant::now(), + is_unique: r.is_unique, + original: 0, + recvd: std::collections::HashSet::new(), + version: 0, // persisted data has no version + }; + self.storage.store(val); + } + log::info!("Loaded {} persisted values", records.len()); + } + } + + /// Save current state to persistence backends. + /// Called during shutdown or periodically. + pub fn save_state(&self) { + // Save routing table contacts + let contacts: Vec<crate::persist::ContactRecord> = self + .dht_table + .closest(&self.id, 1000) // save all + .iter() + .map(|p| crate::persist::ContactRecord { + id: p.id, + addr: p.addr, + }) + .collect(); + + if let Err(e) = self.routing_persistence.save_contacts(&contacts) { + log::warn!("Failed to save contacts: {e}"); + } + + // Save stored values + let values = self.storage.all_values(); + let records: Vec<crate::persist::StoredRecord> = values + .iter() + .map(|v| crate::persist::StoredRecord { + key: v.key.clone(), + value: v.value.clone(), + target_id: v.id, + source: v.source, + ttl: v.remaining_ttl(), + is_unique: v.is_unique, + }) + .collect(); + + if let Err(e) = self.data_persistence.save(&records) { + log::warn!("Failed to save values: {e}"); + } + } + + /// Graceful shutdown: notify closest peers that we + /// are leaving, so they can remove us from their + /// routing tables immediately. + pub fn shutdown(&mut self) { + log::info!("Shutting down node {}", self.id); + + // Send a "leaving" ping to our closest peers + // so they know to remove us + let closest = + self.dht_table.closest(&self.id, self.config.num_find_node); + + for peer in &closest { + // Send FIN-like notification via advertise + let nonce = self.alloc_nonce(); + let size = HEADER_SIZE + 8; + let mut buf = vec![0u8; size]; + let hdr = MsgHeader::new( + MsgType::Advertise, + Self::len16(size), + self.id, + peer.id, + ); + if hdr.write(&mut buf).is_ok() { + buf[HEADER_SIZE..HEADER_SIZE + 4] + .copy_from_slice(&nonce.to_be_bytes()); + + // Session 0 = shutdown signal + buf[HEADER_SIZE + 4..HEADER_SIZE + 8].fill(0); + let _ = self.send_signed(&buf, peer.addr); + } + } + + log::info!("Shutdown: notified {} peers", closest.len()); + + // Persist state + self.save_state(); + } + + // ── Routing table access ──────────────────────── + + /// Number of peers in the DHT routing table. + pub fn routing_table_size(&self) -> usize { + self.dht_table.size() + } + + /// Number of known peers. + pub fn peer_count(&self) -> usize { + self.peers.len() + } + + /// Snapshot of metrics counters. + pub fn metrics(&self) -> crate::metrics::MetricsSnapshot { + self.metrics.snapshot() + } + + /// Number of stored DHT values. + pub fn storage_count(&self) -> usize { + self.storage.len() + } + + /// All stored DHT values (key, value bytes). + /// Used by applications to sync DHT-replicated data + /// to their own persistence layer. + pub fn dht_values(&self) -> Vec<(Vec<u8>, Vec<u8>)> { + self.storage + .all_values() + .into_iter() + .map(|v| (v.key, v.value)) + .collect() + } + + /// Number of currently banned peers. + pub fn ban_count(&self) -> usize { + self.ban_list.ban_count() + } + + /// Number of pending store operations awaiting ack. + pub fn pending_stores(&self) -> usize { + self.store_tracker.pending_count() + } + + /// Store tracker statistics: (acks, failures). + pub fn store_stats(&self) -> (u64, u64) { + (self.store_tracker.acks, self.store_tracker.failures) + } + + /// Print node state (debug). + pub fn print_state(&self) { + println!("MyID = {}", self.id); + println!(); + println!("Node State:"); + println!(" {:?}", self.nat_state()); + println!(); + println!("Routing Table: {} nodes", self.dht_table.size()); + self.dht_table.print_table(); + println!(); + println!("DTUN: {} registrations", self.dtun.registration_count()); + println!("Peers: {} known", self.peers.len()); + println!("Storage: {} values", self.storage.len()); + println!("Bans: {} active", self.ban_list.ban_count()); + println!( + "Stores: {} pending, {} acked, {} failed", + self.store_tracker.pending_count(), + self.store_tracker.acks, + self.store_tracker.failures, + ); + println!( + "RDP: {} connections, {} listeners", + self.rdp.connection_count(), + self.rdp.listener_count() + ); + } +} + +impl fmt::Display for Node { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Node({}, {:?}, {} peers, {} stored)", + self.id, + self.nat_state(), + self.dht_table.size(), + self.storage.len() + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn bind_creates_node() { + let node = Node::bind(0).unwrap(); + assert!(!node.id().is_zero()); + assert_eq!(node.routing_table_size(), 0); + assert_eq!(node.nat_state(), NatState::Unknown); + } + + #[test] + fn set_id_from_data() { + let mut node1 = Node::bind(0).unwrap(); + let mut node2 = Node::bind(0).unwrap(); + + let old_id = *node1.id(); + node1.set_id(b"my-application-id"); + assert_ne!(*node1.id(), old_id); + + // Deterministic: same seed → same ID + node2.set_id(b"my-application-id"); + assert_eq!(*node1.id(), *node2.id()); + + // Public key matches NodeId + assert_eq!(node1.public_key(), node1.id().as_bytes()); + } + + #[test] + fn id_hex_format() { + let node = Node::bind(0).unwrap(); + let hex = node.id_hex(); + assert_eq!(hex.len(), 64); + assert!(hex.chars().all(|c| c.is_ascii_hexdigit())); + } + + #[test] + fn nat_state_set_get() { + let mut node = Node::bind(0).unwrap(); + assert_eq!(node.nat_state(), NatState::Unknown); + + node.set_nat_state(NatState::Global); + assert_eq!(node.nat_state(), NatState::Global); + + node.set_nat_state(NatState::ConeNat); + assert_eq!(node.nat_state(), NatState::ConeNat); + + node.set_nat_state(NatState::SymmetricNat); + assert_eq!(node.nat_state(), NatState::SymmetricNat); + } + + #[test] + fn put_get_local() { + let mut node = Node::bind(0).unwrap(); + node.put(b"hello", b"world", 300, false); + + let vals = node.get(b"hello"); + assert_eq!(vals.len(), 1); + assert_eq!(vals[0], b"world"); + } + + #[test] + fn put_unique() { + let mut node = Node::bind(0).unwrap(); + node.put(b"key", b"val1", 300, true); + node.put(b"key", b"val2", 300, true); + + let vals = node.get(b"key"); + + // Unique from same source → replaced + assert_eq!(vals.len(), 1); + assert_eq!(vals[0], b"val2"); + } + + #[test] + fn get_nonexistent() { + let mut node = Node::bind(0).unwrap(); + let vals = node.get(b"nope"); + assert!(vals.is_empty()); + } + + #[test] + fn rdp_listen_and_close() { + let mut node = Node::bind(0).unwrap(); + let _desc = node.rdp_listen(5000).unwrap(); + + // Listener desc is not a connection, so + // rdp_state won't find it. Just verify close + // doesn't panic. + node.rdp_close(_desc); + } + + #[test] + fn rdp_connect_creates_syn() { + let mut node = Node::bind(0).unwrap(); + let dst = NodeId::from_bytes([0x01; 32]); + let desc = node.rdp_connect(0, &dst, 5000).unwrap(); + assert_eq!(node.rdp_state(desc).unwrap(), RdpState::SynSent); + } + + #[test] + fn rdp_status() { + let mut node = Node::bind(0).unwrap(); + let dst = NodeId::from_bytes([0x01; 32]); + node.rdp_connect(0, &dst, 5000).unwrap(); + let status = node.rdp_status(); + assert_eq!(status.len(), 1); + } + + #[test] + fn rdp_max_retrans() { + let mut node = Node::bind(0).unwrap(); + node.rdp_set_max_retrans(60); + assert_eq!(node.rdp_max_retrans(), 60); + } + + #[test] + fn display() { + let node = Node::bind(0).unwrap(); + let s = format!("{node}"); + assert!(s.starts_with("Node(")); + assert!(s.contains("Unknown")); + } + + #[test] + fn dgram_callback() { + let mut node = Node::bind(0).unwrap(); + assert!(node.dgram_callback.is_none()); + node.set_dgram_callback(|_data, _from| {}); + assert!(node.dgram_callback.is_some()); + node.unset_dgram_callback(); + assert!(node.dgram_callback.is_none()); + } + + #[test] + fn join_with_invalid_host() { + let mut node = Node::bind(0).unwrap(); + let result = node.join("this-host-does-not-exist.invalid", 3000); + assert!(result.is_err()); + } + + #[test] + fn poll_once() { + let mut node = Node::bind(0).unwrap(); + + // Should not block forever (default 1s timeout, + // but returns quickly with no events) + node.poll().unwrap(); + } +} |