//! Main facade: the `Node` node. //! //! Owns all subsystems and provides the public API for //! joining the network, //! storing/retrieving values, sending datagrams, and //! using reliable transport (RDP). use std::collections::HashMap; use std::fmt; use std::net::SocketAddr; use std::time::{Duration, Instant}; use crate::advertise::Advertise; use crate::dgram::{self, Reassembler, SendQueue}; use crate::dht::{DhtStorage, IterativeQuery, MaskBitExplorer}; use crate::dtun::Dtun; use crate::error::Error; use crate::id::NodeId; use crate::msg; use crate::nat::{NatDetector, NatState}; use crate::peers::{PeerInfo, PeerStore}; use crate::proxy::Proxy; use crate::rdp::{Rdp, RdpError, RdpState, RdpStatus}; use crate::routing::RoutingTable; use crate::socket::{NetLoop, UDP_TOKEN}; use crate::timer::TimerWheel; use crate::wire::{DOMAIN_INET, DOMAIN_INET6, HEADER_SIZE, MsgHeader, MsgType}; /// Default poll timeout when no timers are scheduled. const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_millis(100); type DgramCallback = Box; type RdpCallback = Box; /// The tesseras-dht node. /// /// This is the main entry point. It owns all subsystems /// (DHT, DTUN, NAT detector, proxy, RDP, datagrams, /// peers, timers, network I/O) and exposes a clean API /// for the tesseras-dht node. pub struct Node { pub(crate) identity: crate::crypto::Identity, pub(crate) id: NodeId, pub(crate) net: NetLoop, pub(crate) dht_table: RoutingTable, pub(crate) dtun: Dtun, pub(crate) nat: NatDetector, pub(crate) proxy: Proxy, pub(crate) rdp: Rdp, pub(crate) storage: DhtStorage, pub(crate) peers: PeerStore, pub(crate) timers: TimerWheel, pub(crate) advertise: Advertise, pub(crate) reassembler: Reassembler, pub(crate) send_queue: SendQueue, pub(crate) explorer: MaskBitExplorer, pub(crate) is_dtun: bool, pub(crate) dgram_callback: Option, pub(crate) rdp_callback: Option, /// Active iterative queries keyed by nonce. pub(crate) queries: HashMap, /// Last bucket refresh time. pub(crate) last_refresh: Instant, /// Last data restore time. pub(crate) last_restore: Instant, /// Last maintain (mask_bit exploration) time. pub(crate) last_maintain: Instant, /// Routing table persistence backend. pub(crate) routing_persistence: Box, /// Data persistence backend. pub(crate) data_persistence: Box, /// Metrics counters. pub(crate) metrics: crate::metrics::Metrics, /// Pending pings: nonce → (target NodeId, sent_at). pub(crate) pending_pings: HashMap, /// Inbound rate limiter. pub(crate) rate_limiter: crate::ratelimit::RateLimiter, /// Node configuration. pub(crate) config: crate::config::Config, /// Ban list for misbehaving peers. pub(crate) ban_list: crate::banlist::BanList, /// Store acknowledgment tracker. pub(crate) store_tracker: crate::store_track::StoreTracker, /// Last node activity check time. pub(crate) last_activity_check: Instant, /// Last store retry sweep time. pub(crate) last_store_retry: Instant, } /// Builder for configuring a Node node. /// /// ```rust,no_run /// use tesseras_dht::node::NodeBuilder; /// use tesseras_dht::nat::NatState; /// /// let node = NodeBuilder::new() /// .port(10000) /// .nat(NatState::Global) /// .seed(b"my-identity-seed") /// .build() /// .unwrap(); /// ``` pub struct NodeBuilder { port: u16, addr: Option, pub(crate) nat: Option, seed: Option>, enable_dtun: bool, config: Option, } impl NodeBuilder { pub fn new() -> Self { Self { port: 0, addr: None, nat: None, seed: None, enable_dtun: true, config: None, } } /// Set the UDP port to bind. pub fn port(mut self, port: u16) -> Self { self.port = port; self } /// Set a specific bind address. pub fn addr(mut self, addr: SocketAddr) -> Self { self.addr = Some(addr); self } /// Set the NAT state. pub fn nat(mut self, state: NatState) -> Self { self.nat = Some(state); self } /// Set identity seed (deterministic keypair). pub fn seed(mut self, data: &[u8]) -> Self { self.seed = Some(data.to_vec()); self } /// Enable or disable DTUN. pub fn dtun(mut self, enabled: bool) -> Self { self.enable_dtun = enabled; self } /// Set the node configuration. pub fn config(mut self, config: crate::config::Config) -> Self { self.config = Some(config); self } /// Build the Node node. pub fn build(self) -> Result { let addr = self .addr .unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], self.port))); let mut node = Node::bind_addr(addr)?; if let Some(seed) = &self.seed { node.set_id(seed); } if let Some(nat) = self.nat { node.set_nat_state(nat); } if !self.enable_dtun { node.is_dtun = false; } if let Some(config) = self.config { node.config = config; } Ok(node) } } impl Default for NodeBuilder { fn default() -> Self { Self::new() } } impl Node { /// Create a new node and bind to `port` (IPv4). /// /// Generates a random node ID. Use `set_id` to /// derive an ID from application data instead. pub fn bind(port: u16) -> Result { let addr = SocketAddr::from(([0, 0, 0, 0], port)); Self::bind_addr(addr) } /// Create a new node bound to `port` on IPv6. /// /// When using IPv6, DTUN is disabled and NAT state /// is set to Global (IPv6 does not need NAT /// traversal). pub fn bind_v6(port: u16) -> Result { let addr = SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, port)); let mut node = Self::bind_addr(addr)?; node.is_dtun = false; node.dtun.set_enabled(false); node.nat.set_state(NatState::Global); Ok(node) } /// Create a new node bound to a specific address. pub fn bind_addr(addr: SocketAddr) -> Result { let identity = crate::crypto::Identity::generate(); let id = *identity.node_id(); let net = NetLoop::bind(addr)?; log::info!("Node node {} bound to {}", id, net.local_addr()?); Ok(Self { dht_table: RoutingTable::new(id), dtun: Dtun::new(id), nat: NatDetector::new(id), proxy: Proxy::new(id), rdp: Rdp::new(), storage: DhtStorage::new(), peers: PeerStore::new(), timers: TimerWheel::new(), advertise: Advertise::new(id), reassembler: Reassembler::new(), send_queue: SendQueue::new(), explorer: MaskBitExplorer::new(id), is_dtun: true, dgram_callback: None, rdp_callback: None, queries: HashMap::new(), last_refresh: Instant::now(), last_restore: Instant::now(), last_maintain: Instant::now(), routing_persistence: Box::new(crate::persist::NoPersistence), data_persistence: Box::new(crate::persist::NoPersistence), metrics: crate::metrics::Metrics::new(), pending_pings: HashMap::new(), rate_limiter: crate::ratelimit::RateLimiter::default(), config: crate::config::Config::default(), ban_list: crate::banlist::BanList::new(), store_tracker: crate::store_track::StoreTracker::new(), last_activity_check: Instant::now(), last_store_retry: Instant::now(), identity, id, net, }) } // ── Identity ──────────────────────────────────── /// The local node ID. pub fn id(&self) -> &NodeId { &self.id } /// The local node ID as a hex string. pub fn id_hex(&self) -> String { self.id.to_hex() } /// Set the node identity from a 32-byte seed. /// /// Derives an Ed25519 keypair from the seed. /// NodeId = public key. Deterministic: same seed /// produces the same identity. pub fn set_id(&mut self, data: &[u8]) { // Hash to 32 bytes if input is not already 32 let seed = if data.len() == 32 { let mut s = [0u8; 32]; s.copy_from_slice(data); s } else { use sha2::{Digest, Sha256}; let hash = Sha256::digest(data); let mut s = [0u8; 32]; s.copy_from_slice(&hash); s }; self.identity = crate::crypto::Identity::from_seed(seed); self.id = *self.identity.node_id(); self.dht_table = RoutingTable::new(self.id); self.dtun = Dtun::new(self.id); self.explorer = MaskBitExplorer::new(self.id); log::info!("Node ID set to {}", self.id); } /// The node's Ed25519 public key (32 bytes). pub fn public_key(&self) -> &[u8; 32] { self.identity.public_key() } // ── NAT state ─────────────────────────────────── /// Current NAT detection state. pub fn nat_state(&self) -> NatState { self.nat.state() } /// Force the NAT state. pub fn set_nat_state(&mut self, state: NatState) { self.nat.set_state(state); if state == NatState::Global { // IPv6 or explicitly global: disable DTUN if !self.is_dtun { self.dtun.set_enabled(false); } } } pub(crate) fn alloc_nonce(&mut self) -> u32 { let mut buf = [0u8; 4]; crate::sys::random_bytes(&mut buf); u32::from_ne_bytes(buf) } /// Safe cast of packet size to u16. #[inline] pub(crate) fn len16(n: usize) -> u16 { u16::try_from(n).expect("packet size exceeds u16") } // ── DHT operations ────────────────────────────── /// Store a key-value pair in the DHT. /// /// The key is hashed with SHA-256 to map it to the /// 256-bit ID space. Stored locally and sent to /// the k-closest known nodes. /// /// # Example /// /// ```rust,no_run /// # let mut node = tesseras_dht::Node::bind(0).unwrap(); /// node.put(b"paste-id", b"paste content", 3600, false); /// ``` pub fn put(&mut self, key: &[u8], value: &[u8], ttl: u16, is_unique: bool) { let target_id = NodeId::from_key(key); log::debug!( "put: key={} target={}", String::from_utf8_lossy(key), target_id ); // Store locally let val = crate::dht::StoredValue { key: key.to_vec(), value: value.to_vec(), id: target_id, source: self.id, ttl, stored_at: std::time::Instant::now(), is_unique, original: 3, // ORIGINAL_PUT_NUM recvd: std::collections::HashSet::new(), version: crate::dht::now_version(), }; self.storage.store(val); // If behind symmetric NAT, route through proxy if self.nat.state() == NatState::SymmetricNat { if let Some(server) = self.proxy.server().cloned() { let store_msg = msg::StoreMsg { id: target_id, from: self.id, key: key.to_vec(), value: value.to_vec(), ttl, is_unique, }; let total = HEADER_SIZE + msg::STORE_FIXED + store_msg.key.len() + store_msg.value.len(); let mut buf = vec![0u8; total]; let hdr = MsgHeader::new( MsgType::ProxyStore, Self::len16(total), self.id, server.id, ); if hdr.write(&mut buf).is_ok() { let _ = msg::write_store(&mut buf, &store_msg); let _ = self.send_signed(&buf, server.addr); } log::info!("put: via proxy to {:?}", server.id); return; } } // Direct: send STORE to k-closest known nodes let closest = self .dht_table .closest(&target_id, self.config.num_find_node); let store_msg = msg::StoreMsg { id: target_id, from: self.id, key: key.to_vec(), value: value.to_vec(), ttl, is_unique, }; for peer in &closest { if let Err(e) = self.send_store(peer, &store_msg) { log::warn!("Failed to send store to {:?}: {e}", peer.id); } else { self.store_tracker.track( target_id, key.to_vec(), value.to_vec(), ttl, is_unique, peer.clone(), ); } } log::info!("put: stored locally + sent to {} peers", closest.len()); } /// Store multiple key-value pairs in the DHT. /// /// More efficient than calling `put()` in a loop: /// groups stores by target peer to reduce redundant /// lookups and sends. pub fn put_batch(&mut self, entries: &[(&[u8], &[u8], u16, bool)]) { // Group by target peer set to batch sends struct BatchEntry { target_id: NodeId, key: Vec, value: Vec, ttl: u16, is_unique: bool, } let mut batch: Vec = Vec::with_capacity(entries.len()); for &(key, value, ttl, is_unique) in entries { let target_id = NodeId::from_key(key); // Store locally let val = crate::dht::StoredValue { key: key.to_vec(), value: value.to_vec(), id: target_id, source: self.id, ttl, stored_at: std::time::Instant::now(), is_unique, original: 3, recvd: std::collections::HashSet::new(), version: crate::dht::now_version(), }; self.storage.store(val); batch.push(BatchEntry { target_id, key: key.to_vec(), value: value.to_vec(), ttl, is_unique, }); } // Collect unique peers across all targets to // minimize redundant sends let mut peer_stores: HashMap> = HashMap::new(); for entry in &batch { let closest = self .dht_table .closest(&entry.target_id, self.config.num_find_node); let store_msg = msg::StoreMsg { id: entry.target_id, from: self.id, key: entry.key.clone(), value: entry.value.clone(), ttl: entry.ttl, is_unique: entry.is_unique, }; for peer in &closest { peer_stores .entry(peer.id) .or_default() .push(store_msg.clone()); } } // Send all stores grouped by peer let mut total_sent = 0u32; for (peer_id, stores) in &peer_stores { if let Some(peer) = self.peers.get(peer_id).cloned() { if self.ban_list.is_banned(&peer.addr) { continue; } for store in stores { if self.send_store(&peer, store).is_ok() { total_sent += 1; } } } } log::info!( "put_batch: {} entries stored locally, {total_sent} sends to {} peers", batch.len(), peer_stores.len(), ); } /// Retrieve multiple keys from the DHT. /// /// Returns a vec of (key, values) pairs. Local values /// are returned immediately; missing keys trigger /// iterative FIND_VALUE queries resolved via `poll()`. pub fn get_batch( &mut self, keys: &[&[u8]], ) -> Vec<(Vec, Vec>)> { let mut results = Vec::with_capacity(keys.len()); for &key in keys { let target_id = NodeId::from_key(key); let local = self.storage.get(&target_id, key); if !local.is_empty() { let vals: Vec> = local.into_iter().map(|v| v.value).collect(); results.push((key.to_vec(), vals)); } else { // Start iterative FIND_VALUE for missing keys if let Err(e) = self.start_find_value(key) { log::debug!("Batch find_value failed for key: {e}"); } results.push((key.to_vec(), Vec::new())); } } results } /// Delete a key from the DHT. /// /// Sends a STORE with TTL=0 to the k-closest nodes, /// which causes them to remove the value. pub fn delete(&mut self, key: &[u8]) { let target_id = NodeId::from_key(key); // Remove locally self.storage.remove(&target_id, key); // Send TTL=0 store to closest nodes let closest = self .dht_table .closest(&target_id, self.config.num_find_node); let store_msg = msg::StoreMsg { id: target_id, from: self.id, key: key.to_vec(), value: Vec::new(), ttl: 0, is_unique: false, }; for peer in &closest { let _ = self.send_store(peer, &store_msg); } log::info!("delete: removed locally + sent to {} peers", closest.len()); } /// Retrieve values for a key from the DHT. /// /// First checks local storage. If not found, starts /// an iterative FIND_VALUE query across the network. /// Returns local values immediately; remote results /// arrive via `poll()` and can be retrieved with a /// subsequent `get()` call (they'll be cached /// locally by `handle_dht_find_value_reply`). pub fn get(&mut self, key: &[u8]) -> Vec> { let target_id = NodeId::from_key(key); // Check local storage first let local = self.storage.get(&target_id, key); if !local.is_empty() { return local.into_iter().map(|v| v.value).collect(); } // Not found locally — start iterative FIND_VALUE if let Err(e) = self.start_find_value(key) { log::debug!("Failed to start find_value: {e}"); } Vec::new() } /// Retrieve values with blocking network lookup. /// /// Polls internally until the value is found or /// `timeout` expires. Returns empty if not found. pub fn get_blocking( &mut self, key: &[u8], timeout: Duration, ) -> Vec> { let target_id = NodeId::from_key(key); // Check local first let local = self.storage.get(&target_id, key); if !local.is_empty() { return local.into_iter().map(|v| v.value).collect(); } // Start FIND_VALUE if self.start_find_value(key).is_err() { return Vec::new(); } // Poll until found or timeout let deadline = Instant::now() + timeout; while Instant::now() < deadline { let _ = self.poll(); let vals = self.storage.get(&target_id, key); if !vals.is_empty() { return vals.into_iter().map(|v| v.value).collect(); } std::thread::sleep(Duration::from_millis(10)); } Vec::new() } /// Start an iterative FIND_VALUE query for a key. /// /// Returns the query nonce. Results arrive via /// `handle_dht_find_value_reply` during `poll()`. pub fn start_find_value(&mut self, key: &[u8]) -> Result { let target_id = NodeId::from_key(key); let nonce = self.alloc_nonce(); let mut query = IterativeQuery::find_value(target_id, key.to_vec(), nonce); // Seed with our closest known nodes let closest = self .dht_table .closest(&target_id, self.config.num_find_node); query.closest = closest; self.queries.insert(nonce, query); // Send initial batch self.send_query_batch(nonce)?; Ok(nonce) } /// Send a FIND_VALUE message to a specific address. pub(crate) fn send_find_value_msg( &mut self, to: SocketAddr, target: NodeId, key: &[u8], ) -> Result { let nonce = self.alloc_nonce(); let domain = if to.is_ipv4() { DOMAIN_INET } else { DOMAIN_INET6 }; let fv = msg::FindValueMsg { nonce, target, domain, key: key.to_vec(), use_rdp: false, }; let total = HEADER_SIZE + msg::FIND_VALUE_FIXED + key.len(); let mut buf = vec![0u8; total]; let hdr = MsgHeader::new( MsgType::DhtFindValue, Self::len16(total), self.id, NodeId::from_bytes([0; crate::id::ID_LEN]), ); hdr.write(&mut buf)?; msg::write_find_value(&mut buf, &fv)?; self.send_signed(&buf, to)?; log::debug!( "Sent find_value to {to} target={target:?} key={} bytes", key.len() ); Ok(nonce) } // ── Datagram ──────────────────────────────────── /// Send a datagram to a destination node. /// /// If the destination's address is known, fragments /// are sent immediately. Otherwise they are queued /// for delivery once the address is resolved. pub fn send_dgram(&mut self, data: &[u8], dst: &NodeId) { let fragments = dgram::fragment(data); log::debug!( "send_dgram: {} bytes, {} fragment(s) to {:?}", data.len(), fragments.len(), dst ); // If behind symmetric NAT, route through proxy if self.nat.state() == NatState::SymmetricNat { if let Some(server) = self.proxy.server().cloned() { for frag in &fragments { let total = HEADER_SIZE + frag.len(); let mut buf = vec![0u8; total]; let hdr = MsgHeader::new( MsgType::ProxyDgram, Self::len16(total), self.id, *dst, ); if hdr.write(&mut buf).is_ok() { buf[HEADER_SIZE..].copy_from_slice(frag); let _ = self.send_signed(&buf, server.addr); } } return; } } // Direct: send if we know the address if let Some(peer) = self.peers.get(dst).cloned() { for frag in &fragments { self.send_dgram_raw(frag, &peer); } } else { // Queue for later delivery for frag in fragments { self.send_queue.push(*dst, frag, self.id); } } } /// Send a single dgram fragment wrapped in a /// protocol message. pub(crate) fn send_dgram_raw(&self, payload: &[u8], dst: &PeerInfo) { let total = HEADER_SIZE + payload.len(); let mut buf = vec![0u8; total]; let hdr = MsgHeader::new(MsgType::Dgram, Self::len16(total), self.id, dst.id); if hdr.write(&mut buf).is_ok() { buf[HEADER_SIZE..].copy_from_slice(payload); let _ = self.send_signed(&buf, dst.addr); } } /// Set the callback for received datagrams. pub fn set_dgram_callback(&mut self, f: F) where F: Fn(&[u8], &NodeId) + Send + 'static, { self.dgram_callback = Some(Box::new(f)); } /// Remove the datagram callback. pub fn unset_dgram_callback(&mut self) { self.dgram_callback = None; } /// Set a callback for RDP events (ACCEPTED, /// CONNECTED, READY2READ, RESET, FAILED, etc). pub fn set_rdp_callback(&mut self, f: F) where F: Fn(i32, &crate::rdp::RdpAddr, crate::rdp::RdpEvent) + Send + 'static, { self.rdp_callback = Some(Box::new(f)); } /// Remove the RDP event callback. pub fn unset_rdp_callback(&mut self) { self.rdp_callback = None; } // ── RDP (reliable transport) ──────────────────── /// Listen for RDP connections on `port`. pub fn rdp_listen(&mut self, port: u16) -> Result { self.rdp.listen(port) } /// Connect to a remote node via RDP. /// /// Sends a SYN packet immediately if the peer's /// address is known. pub fn rdp_connect( &mut self, sport: u16, dst: &NodeId, dport: u16, ) -> Result { let desc = self.rdp.connect(sport, *dst, dport)?; self.flush_rdp_output(desc); Ok(desc) } /// Close an RDP connection or listener. pub fn rdp_close(&mut self, desc: i32) { self.rdp.close(desc); } /// Send data on an RDP connection. /// /// Enqueues data and flushes pending packets to the /// network. pub fn rdp_send( &mut self, desc: i32, data: &[u8], ) -> Result { let n = self.rdp.send(desc, data)?; self.flush_rdp_output(desc); Ok(n) } /// Receive data from an RDP connection. pub fn rdp_recv( &mut self, desc: i32, buf: &mut [u8], ) -> Result { self.rdp.recv(desc, buf) } /// Get the state of an RDP descriptor. pub fn rdp_state(&self, desc: i32) -> Result { self.rdp.get_state(desc) } /// Get status of all RDP connections. pub fn rdp_status(&self) -> Vec { self.rdp.get_status() } /// Set RDP maximum retransmission timeout. pub fn rdp_set_max_retrans(&mut self, secs: u64) { self.rdp.set_max_retrans(Duration::from_secs(secs)); } /// Get RDP maximum retransmission timeout. pub fn rdp_max_retrans(&self) -> u64 { self.rdp.max_retrans().as_secs() } // ── Event loop ────────────────────────────────── /// Process one iteration of the event loop. /// /// Polls for I/O events, processes incoming packets, /// fires expired timers, and runs maintenance tasks. pub fn poll(&mut self) -> Result<(), Error> { self.poll_timeout(DEFAULT_POLL_TIMEOUT) } /// Poll with a custom maximum timeout. /// /// Use a short timeout (e.g. 1ms) in tests with /// many nodes to avoid blocking. pub fn poll_timeout(&mut self, max_timeout: Duration) -> Result<(), Error> { let timeout = self .timers .next_deadline() .map(|d| d.min(max_timeout)) .unwrap_or(max_timeout); self.net.poll_events(timeout)?; // Check if we got a UDP event. We must not hold // a borrow on self.net while calling handle_packet, // so we just check and then drain separately. let has_udp = self.net.drain_events().any(|ev| ev.token() == UDP_TOKEN); if has_udp { let mut buf = [0u8; 4096]; while let Ok((len, from)) = self.net.recv_from(&mut buf) { self.handle_packet(&buf[..len], from); } } // Fire timers let _fired = self.timers.tick(); // Drive iterative queries self.drive_queries(); // Drain send queue for destinations we now know self.drain_send_queue(); // Drive RDP: tick timeouts and flush output let rdp_actions = self.rdp.tick(); for action in rdp_actions { if let crate::rdp::RdpAction::Event { desc, event, .. } = action { log::info!("RDP tick event: desc={desc} {event:?}"); } } self.flush_all_rdp(); // Periodic maintenance self.peers.refresh(); self.storage.expire(); self.advertise.refresh(); self.reassembler.expire(); self.send_queue.expire(); self.refresh_buckets(); self.probe_liveness(); self.rate_limiter.cleanup(); // Node activity monitor (proactive ping) self.check_node_activity(); // Retry failed stores self.retry_failed_stores(); // Ban list cleanup self.ban_list.cleanup(); self.store_tracker.cleanup(); // Expire stale pending pings (>10s) — record // failure for unresponsive peers let expired_pings: Vec<(u32, NodeId)> = self .pending_pings .iter() .filter(|(_, (_, sent))| sent.elapsed().as_secs() >= 10) .map(|(nonce, (id, _))| (*nonce, *id)) .collect(); for (nonce, peer_id) in &expired_pings { if let Some(peer) = self.peers.get(peer_id) { self.ban_list.record_failure(peer.addr); } // Also record failure in routing table for // stale count / replacement cache logic if let Some(evicted) = self.dht_table.record_failure(peer_id) { log::debug!( "Replaced stale peer {:?} from routing table", evicted ); } self.pending_pings.remove(nonce); } // Data restore (every 120s) if self.last_restore.elapsed() >= self.config.restore_interval { self.last_restore = Instant::now(); self.restore_data(); } // Maintain: mask_bit exploration (every 120s) if self.last_maintain.elapsed() >= self.config.maintain_interval { self.last_maintain = Instant::now(); self.run_maintain(); } // DTUN maintenance let dtun_targets = self.dtun.maintain(); for target in dtun_targets { if let Err(e) = self.start_find_node(target) { log::debug!("DTUN maintain find_node failed: {e}"); } } // NAT re-detection self.nat.expire_pending(); Ok(()) } /// Run the event loop forever. pub fn run(&mut self) -> ! { loop { if let Err(e) = self.poll() { log::error!("Event loop error: {e}"); } } } /// Set the node configuration. Call before `join()`. pub fn set_config(&mut self, config: crate::config::Config) { self.config = config; } /// Get the current configuration. pub fn config(&self) -> &crate::config::Config { &self.config } /// Set the routing table persistence backend. pub fn set_routing_persistence( &mut self, p: Box, ) { self.routing_persistence = p; } /// Set the data persistence backend. pub fn set_data_persistence( &mut self, p: Box, ) { self.data_persistence = p; } /// Load saved contacts and data from persistence /// backends. Call after bind, before join. pub fn load_persisted(&mut self) { // Load routing table contacts if let Ok(contacts) = self.routing_persistence.load_contacts() { let mut loaded = 0usize; for c in contacts { // Validate: skip zero IDs and unspecified addrs if c.id.is_zero() { log::debug!("Skipping persisted contact: zero ID"); continue; } if c.addr.ip().is_unspecified() { log::debug!("Skipping persisted contact: unspecified addr"); continue; } let peer = PeerInfo::new(c.id, c.addr); self.dht_table.add(peer.clone()); self.peers.add(peer); loaded += 1; } log::info!("Loaded {loaded} persisted contacts"); } // Load stored values if let Ok(records) = self.data_persistence.load() { for r in &records { let val = crate::dht::StoredValue { key: r.key.clone(), value: r.value.clone(), id: r.target_id, source: r.source, ttl: r.ttl, stored_at: Instant::now(), is_unique: r.is_unique, original: 0, recvd: std::collections::HashSet::new(), version: 0, // persisted data has no version }; self.storage.store(val); } log::info!("Loaded {} persisted values", records.len()); } } /// Save current state to persistence backends. /// Called during shutdown or periodically. pub fn save_state(&self) { // Save routing table contacts let contacts: Vec = self .dht_table .closest(&self.id, 1000) // save all .iter() .map(|p| crate::persist::ContactRecord { id: p.id, addr: p.addr, }) .collect(); if let Err(e) = self.routing_persistence.save_contacts(&contacts) { log::warn!("Failed to save contacts: {e}"); } // Save stored values let values = self.storage.all_values(); let records: Vec = values .iter() .map(|v| crate::persist::StoredRecord { key: v.key.clone(), value: v.value.clone(), target_id: v.id, source: v.source, ttl: v.remaining_ttl(), is_unique: v.is_unique, }) .collect(); if let Err(e) = self.data_persistence.save(&records) { log::warn!("Failed to save values: {e}"); } } /// Graceful shutdown: notify closest peers that we /// are leaving, so they can remove us from their /// routing tables immediately. pub fn shutdown(&mut self) { log::info!("Shutting down node {}", self.id); // Send a "leaving" ping to our closest peers // so they know to remove us let closest = self.dht_table.closest(&self.id, self.config.num_find_node); for peer in &closest { // Send FIN-like notification via advertise let nonce = self.alloc_nonce(); let size = HEADER_SIZE + 8; let mut buf = vec![0u8; size]; let hdr = MsgHeader::new( MsgType::Advertise, Self::len16(size), self.id, peer.id, ); if hdr.write(&mut buf).is_ok() { buf[HEADER_SIZE..HEADER_SIZE + 4] .copy_from_slice(&nonce.to_be_bytes()); // Session 0 = shutdown signal buf[HEADER_SIZE + 4..HEADER_SIZE + 8].fill(0); let _ = self.send_signed(&buf, peer.addr); } } log::info!("Shutdown: notified {} peers", closest.len()); // Persist state self.save_state(); } // ── Routing table access ──────────────────────── /// Number of peers in the DHT routing table. pub fn routing_table_size(&self) -> usize { self.dht_table.size() } /// Number of known peers. pub fn peer_count(&self) -> usize { self.peers.len() } /// Snapshot of metrics counters. pub fn metrics(&self) -> crate::metrics::MetricsSnapshot { self.metrics.snapshot() } /// Number of stored DHT values. pub fn storage_count(&self) -> usize { self.storage.len() } /// All stored DHT values (key, value bytes). /// Used by applications to sync DHT-replicated data /// to their own persistence layer. pub fn dht_values(&self) -> Vec<(Vec, Vec)> { self.storage .all_values() .into_iter() .map(|v| (v.key, v.value)) .collect() } /// Number of currently banned peers. pub fn ban_count(&self) -> usize { self.ban_list.ban_count() } /// Number of pending store operations awaiting ack. pub fn pending_stores(&self) -> usize { self.store_tracker.pending_count() } /// Store tracker statistics: (acks, failures). pub fn store_stats(&self) -> (u64, u64) { (self.store_tracker.acks, self.store_tracker.failures) } /// Print node state (debug). pub fn print_state(&self) { println!("MyID = {}", self.id); println!(); println!("Node State:"); println!(" {:?}", self.nat_state()); println!(); println!("Routing Table: {} nodes", self.dht_table.size()); self.dht_table.print_table(); println!(); println!("DTUN: {} registrations", self.dtun.registration_count()); println!("Peers: {} known", self.peers.len()); println!("Storage: {} values", self.storage.len()); println!("Bans: {} active", self.ban_list.ban_count()); println!( "Stores: {} pending, {} acked, {} failed", self.store_tracker.pending_count(), self.store_tracker.acks, self.store_tracker.failures, ); println!( "RDP: {} connections, {} listeners", self.rdp.connection_count(), self.rdp.listener_count() ); } } impl fmt::Display for Node { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, "Node({}, {:?}, {} peers, {} stored)", self.id, self.nat_state(), self.dht_table.size(), self.storage.len() ) } } #[cfg(test)] mod tests { use super::*; #[test] fn bind_creates_node() { let node = Node::bind(0).unwrap(); assert!(!node.id().is_zero()); assert_eq!(node.routing_table_size(), 0); assert_eq!(node.nat_state(), NatState::Unknown); } #[test] fn set_id_from_data() { let mut node1 = Node::bind(0).unwrap(); let mut node2 = Node::bind(0).unwrap(); let old_id = *node1.id(); node1.set_id(b"my-application-id"); assert_ne!(*node1.id(), old_id); // Deterministic: same seed → same ID node2.set_id(b"my-application-id"); assert_eq!(*node1.id(), *node2.id()); // Public key matches NodeId assert_eq!(node1.public_key(), node1.id().as_bytes()); } #[test] fn id_hex_format() { let node = Node::bind(0).unwrap(); let hex = node.id_hex(); assert_eq!(hex.len(), 64); assert!(hex.chars().all(|c| c.is_ascii_hexdigit())); } #[test] fn nat_state_set_get() { let mut node = Node::bind(0).unwrap(); assert_eq!(node.nat_state(), NatState::Unknown); node.set_nat_state(NatState::Global); assert_eq!(node.nat_state(), NatState::Global); node.set_nat_state(NatState::ConeNat); assert_eq!(node.nat_state(), NatState::ConeNat); node.set_nat_state(NatState::SymmetricNat); assert_eq!(node.nat_state(), NatState::SymmetricNat); } #[test] fn put_get_local() { let mut node = Node::bind(0).unwrap(); node.put(b"hello", b"world", 300, false); let vals = node.get(b"hello"); assert_eq!(vals.len(), 1); assert_eq!(vals[0], b"world"); } #[test] fn put_unique() { let mut node = Node::bind(0).unwrap(); node.put(b"key", b"val1", 300, true); node.put(b"key", b"val2", 300, true); let vals = node.get(b"key"); // Unique from same source → replaced assert_eq!(vals.len(), 1); assert_eq!(vals[0], b"val2"); } #[test] fn get_nonexistent() { let mut node = Node::bind(0).unwrap(); let vals = node.get(b"nope"); assert!(vals.is_empty()); } #[test] fn rdp_listen_and_close() { let mut node = Node::bind(0).unwrap(); let _desc = node.rdp_listen(5000).unwrap(); // Listener desc is not a connection, so // rdp_state won't find it. Just verify close // doesn't panic. node.rdp_close(_desc); } #[test] fn rdp_connect_creates_syn() { let mut node = Node::bind(0).unwrap(); let dst = NodeId::from_bytes([0x01; 32]); let desc = node.rdp_connect(0, &dst, 5000).unwrap(); assert_eq!(node.rdp_state(desc).unwrap(), RdpState::SynSent); } #[test] fn rdp_status() { let mut node = Node::bind(0).unwrap(); let dst = NodeId::from_bytes([0x01; 32]); node.rdp_connect(0, &dst, 5000).unwrap(); let status = node.rdp_status(); assert_eq!(status.len(), 1); } #[test] fn rdp_max_retrans() { let mut node = Node::bind(0).unwrap(); node.rdp_set_max_retrans(60); assert_eq!(node.rdp_max_retrans(), 60); } #[test] fn display() { let node = Node::bind(0).unwrap(); let s = format!("{node}"); assert!(s.starts_with("Node(")); assert!(s.contains("Unknown")); } #[test] fn dgram_callback() { let mut node = Node::bind(0).unwrap(); assert!(node.dgram_callback.is_none()); node.set_dgram_callback(|_data, _from| {}); assert!(node.dgram_callback.is_some()); node.unset_dgram_callback(); assert!(node.dgram_callback.is_none()); } #[test] fn join_with_invalid_host() { let mut node = Node::bind(0).unwrap(); let result = node.join("this-host-does-not-exist.invalid", 3000); assert!(result.is_err()); } #[test] fn poll_once() { let mut node = Node::bind(0).unwrap(); // Should not block forever (default 1s timeout, // but returns quickly with no events) node.poll().unwrap(); } }