aboutsummaryrefslogtreecommitdiffstats
path: root/src/node.rs
diff options
context:
space:
mode:
authormurilo ijanc2026-03-24 15:04:03 -0300
committermurilo ijanc2026-03-24 15:04:03 -0300
commit9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch)
tree53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/node.rs
downloadtesseras-dht-9821aabf0b50d2487b07502d3d2cd89e7d62bdbe.tar.gz
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support
Diffstat (limited to 'src/node.rs')
-rw-r--r--src/node.rs1395
1 files changed, 1395 insertions, 0 deletions
diff --git a/src/node.rs b/src/node.rs
new file mode 100644
index 0000000..fef917e
--- /dev/null
+++ b/src/node.rs
@@ -0,0 +1,1395 @@
+//! Main facade: the `Node` node.
+//!
+//! Owns all subsystems and provides the public API for
+//! joining the network,
+//! storing/retrieving values, sending datagrams, and
+//! using reliable transport (RDP).
+
+use std::collections::HashMap;
+use std::fmt;
+use std::net::SocketAddr;
+use std::time::{Duration, Instant};
+
+use crate::advertise::Advertise;
+use crate::dgram::{self, Reassembler, SendQueue};
+use crate::dht::{DhtStorage, IterativeQuery, MaskBitExplorer};
+use crate::dtun::Dtun;
+use crate::error::Error;
+use crate::id::NodeId;
+use crate::msg;
+use crate::nat::{NatDetector, NatState};
+use crate::peers::{PeerInfo, PeerStore};
+use crate::proxy::Proxy;
+use crate::rdp::{Rdp, RdpError, RdpState, RdpStatus};
+use crate::routing::RoutingTable;
+use crate::socket::{NetLoop, UDP_TOKEN};
+use crate::timer::TimerWheel;
+use crate::wire::{DOMAIN_INET, DOMAIN_INET6, HEADER_SIZE, MsgHeader, MsgType};
+
+/// Default poll timeout when no timers are scheduled.
+const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_millis(100);
+
+type DgramCallback = Box<dyn Fn(&[u8], &NodeId) + Send>;
+type RdpCallback =
+ Box<dyn Fn(i32, &crate::rdp::RdpAddr, crate::rdp::RdpEvent) + Send>;
+
+/// The tesseras-dht node.
+///
+/// This is the main entry point. It owns all subsystems
+/// (DHT, DTUN, NAT detector, proxy, RDP, datagrams,
+/// peers, timers, network I/O) and exposes a clean API
+/// for the tesseras-dht node.
+pub struct Node {
+ pub(crate) identity: crate::crypto::Identity,
+ pub(crate) id: NodeId,
+ pub(crate) net: NetLoop,
+ pub(crate) dht_table: RoutingTable,
+ pub(crate) dtun: Dtun,
+ pub(crate) nat: NatDetector,
+ pub(crate) proxy: Proxy,
+ pub(crate) rdp: Rdp,
+ pub(crate) storage: DhtStorage,
+ pub(crate) peers: PeerStore,
+ pub(crate) timers: TimerWheel,
+ pub(crate) advertise: Advertise,
+ pub(crate) reassembler: Reassembler,
+ pub(crate) send_queue: SendQueue,
+ pub(crate) explorer: MaskBitExplorer,
+ pub(crate) is_dtun: bool,
+ pub(crate) dgram_callback: Option<DgramCallback>,
+ pub(crate) rdp_callback: Option<RdpCallback>,
+
+ /// Active iterative queries keyed by nonce.
+ pub(crate) queries: HashMap<u32, IterativeQuery>,
+
+ /// Last bucket refresh time.
+ pub(crate) last_refresh: Instant,
+
+ /// Last data restore time.
+ pub(crate) last_restore: Instant,
+
+ /// Last maintain (mask_bit exploration) time.
+ pub(crate) last_maintain: Instant,
+
+ /// Routing table persistence backend.
+ pub(crate) routing_persistence: Box<dyn crate::persist::RoutingPersistence>,
+
+ /// Data persistence backend.
+ pub(crate) data_persistence: Box<dyn crate::persist::DataPersistence>,
+
+ /// Metrics counters.
+ pub(crate) metrics: crate::metrics::Metrics,
+ /// Pending pings: nonce → (target NodeId, sent_at).
+ pub(crate) pending_pings: HashMap<u32, (NodeId, Instant)>,
+ /// Inbound rate limiter.
+ pub(crate) rate_limiter: crate::ratelimit::RateLimiter,
+ /// Node configuration.
+ pub(crate) config: crate::config::Config,
+ /// Ban list for misbehaving peers.
+ pub(crate) ban_list: crate::banlist::BanList,
+ /// Store acknowledgment tracker.
+ pub(crate) store_tracker: crate::store_track::StoreTracker,
+ /// Last node activity check time.
+ pub(crate) last_activity_check: Instant,
+ /// Last store retry sweep time.
+ pub(crate) last_store_retry: Instant,
+}
+
+/// Builder for configuring a Node node.
+///
+/// ```rust,no_run
+/// use tesseras_dht::node::NodeBuilder;
+/// use tesseras_dht::nat::NatState;
+///
+/// let node = NodeBuilder::new()
+/// .port(10000)
+/// .nat(NatState::Global)
+/// .seed(b"my-identity-seed")
+/// .build()
+/// .unwrap();
+/// ```
+pub struct NodeBuilder {
+ port: u16,
+ addr: Option<SocketAddr>,
+ pub(crate) nat: Option<NatState>,
+ seed: Option<Vec<u8>>,
+ enable_dtun: bool,
+ config: Option<crate::config::Config>,
+}
+
+impl NodeBuilder {
+ pub fn new() -> Self {
+ Self {
+ port: 0,
+ addr: None,
+ nat: None,
+ seed: None,
+ enable_dtun: true,
+ config: None,
+ }
+ }
+
+ /// Set the UDP port to bind.
+ pub fn port(mut self, port: u16) -> Self {
+ self.port = port;
+ self
+ }
+
+ /// Set a specific bind address.
+ pub fn addr(mut self, addr: SocketAddr) -> Self {
+ self.addr = Some(addr);
+ self
+ }
+
+ /// Set the NAT state.
+ pub fn nat(mut self, state: NatState) -> Self {
+ self.nat = Some(state);
+ self
+ }
+
+ /// Set identity seed (deterministic keypair).
+ pub fn seed(mut self, data: &[u8]) -> Self {
+ self.seed = Some(data.to_vec());
+ self
+ }
+
+ /// Enable or disable DTUN.
+ pub fn dtun(mut self, enabled: bool) -> Self {
+ self.enable_dtun = enabled;
+ self
+ }
+
+ /// Set the node configuration.
+ pub fn config(mut self, config: crate::config::Config) -> Self {
+ self.config = Some(config);
+ self
+ }
+
+ /// Build the Node node.
+ pub fn build(self) -> Result<Node, Error> {
+ let addr = self
+ .addr
+ .unwrap_or_else(|| SocketAddr::from(([0, 0, 0, 0], self.port)));
+
+ let mut node = Node::bind_addr(addr)?;
+
+ if let Some(seed) = &self.seed {
+ node.set_id(seed);
+ }
+
+ if let Some(nat) = self.nat {
+ node.set_nat_state(nat);
+ }
+
+ if !self.enable_dtun {
+ node.is_dtun = false;
+ }
+
+ if let Some(config) = self.config {
+ node.config = config;
+ }
+
+ Ok(node)
+ }
+}
+
+impl Default for NodeBuilder {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl Node {
+ /// Create a new node and bind to `port` (IPv4).
+ ///
+ /// Generates a random node ID. Use `set_id` to
+ /// derive an ID from application data instead.
+ pub fn bind(port: u16) -> Result<Self, Error> {
+ let addr = SocketAddr::from(([0, 0, 0, 0], port));
+ Self::bind_addr(addr)
+ }
+
+ /// Create a new node bound to `port` on IPv6.
+ ///
+ /// When using IPv6, DTUN is disabled and NAT state
+ /// is set to Global (IPv6 does not need NAT
+ /// traversal).
+ pub fn bind_v6(port: u16) -> Result<Self, Error> {
+ let addr = SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, port));
+ let mut node = Self::bind_addr(addr)?;
+ node.is_dtun = false;
+ node.dtun.set_enabled(false);
+ node.nat.set_state(NatState::Global);
+ Ok(node)
+ }
+
+ /// Create a new node bound to a specific address.
+ pub fn bind_addr(addr: SocketAddr) -> Result<Self, Error> {
+ let identity = crate::crypto::Identity::generate();
+ let id = *identity.node_id();
+ let net = NetLoop::bind(addr)?;
+
+ log::info!("Node node {} bound to {}", id, net.local_addr()?);
+
+ Ok(Self {
+ dht_table: RoutingTable::new(id),
+ dtun: Dtun::new(id),
+ nat: NatDetector::new(id),
+ proxy: Proxy::new(id),
+ rdp: Rdp::new(),
+ storage: DhtStorage::new(),
+ peers: PeerStore::new(),
+ timers: TimerWheel::new(),
+ advertise: Advertise::new(id),
+ reassembler: Reassembler::new(),
+ send_queue: SendQueue::new(),
+ explorer: MaskBitExplorer::new(id),
+ is_dtun: true,
+ dgram_callback: None,
+ rdp_callback: None,
+ queries: HashMap::new(),
+ last_refresh: Instant::now(),
+ last_restore: Instant::now(),
+ last_maintain: Instant::now(),
+ routing_persistence: Box::new(crate::persist::NoPersistence),
+ data_persistence: Box::new(crate::persist::NoPersistence),
+ metrics: crate::metrics::Metrics::new(),
+ pending_pings: HashMap::new(),
+ rate_limiter: crate::ratelimit::RateLimiter::default(),
+ config: crate::config::Config::default(),
+ ban_list: crate::banlist::BanList::new(),
+ store_tracker: crate::store_track::StoreTracker::new(),
+ last_activity_check: Instant::now(),
+ last_store_retry: Instant::now(),
+ identity,
+ id,
+ net,
+ })
+ }
+
+ // ── Identity ────────────────────────────────────
+
+ /// The local node ID.
+ pub fn id(&self) -> &NodeId {
+ &self.id
+ }
+
+ /// The local node ID as a hex string.
+ pub fn id_hex(&self) -> String {
+ self.id.to_hex()
+ }
+
+ /// Set the node identity from a 32-byte seed.
+ ///
+ /// Derives an Ed25519 keypair from the seed.
+ /// NodeId = public key. Deterministic: same seed
+ /// produces the same identity.
+ pub fn set_id(&mut self, data: &[u8]) {
+ // Hash to 32 bytes if input is not already 32
+ let seed = if data.len() == 32 {
+ let mut s = [0u8; 32];
+ s.copy_from_slice(data);
+ s
+ } else {
+ use sha2::{Digest, Sha256};
+ let hash = Sha256::digest(data);
+ let mut s = [0u8; 32];
+ s.copy_from_slice(&hash);
+ s
+ };
+ self.identity = crate::crypto::Identity::from_seed(seed);
+ self.id = *self.identity.node_id();
+ self.dht_table = RoutingTable::new(self.id);
+ self.dtun = Dtun::new(self.id);
+ self.explorer = MaskBitExplorer::new(self.id);
+ log::info!("Node ID set to {}", self.id);
+ }
+
+ /// The node's Ed25519 public key (32 bytes).
+ pub fn public_key(&self) -> &[u8; 32] {
+ self.identity.public_key()
+ }
+
+ // ── NAT state ───────────────────────────────────
+
+ /// Current NAT detection state.
+ pub fn nat_state(&self) -> NatState {
+ self.nat.state()
+ }
+
+ /// Force the NAT state.
+ pub fn set_nat_state(&mut self, state: NatState) {
+ self.nat.set_state(state);
+ if state == NatState::Global {
+ // IPv6 or explicitly global: disable DTUN
+ if !self.is_dtun {
+ self.dtun.set_enabled(false);
+ }
+ }
+ }
+
+ pub(crate) fn alloc_nonce(&mut self) -> u32 {
+ let mut buf = [0u8; 4];
+ crate::sys::random_bytes(&mut buf);
+ u32::from_ne_bytes(buf)
+ }
+
+ /// Safe cast of packet size to u16.
+ #[inline]
+ pub(crate) fn len16(n: usize) -> u16 {
+ u16::try_from(n).expect("packet size exceeds u16")
+ }
+
+ // ── DHT operations ──────────────────────────────
+
+ /// Store a key-value pair in the DHT.
+ ///
+ /// The key is hashed with SHA-256 to map it to the
+ /// 256-bit ID space. Stored locally and sent to
+ /// the k-closest known nodes.
+ ///
+ /// # Example
+ ///
+ /// ```rust,no_run
+ /// # let mut node = tesseras_dht::Node::bind(0).unwrap();
+ /// node.put(b"paste-id", b"paste content", 3600, false);
+ /// ```
+ pub fn put(&mut self, key: &[u8], value: &[u8], ttl: u16, is_unique: bool) {
+ let target_id = NodeId::from_key(key);
+ log::debug!(
+ "put: key={} target={}",
+ String::from_utf8_lossy(key),
+ target_id
+ );
+
+ // Store locally
+ let val = crate::dht::StoredValue {
+ key: key.to_vec(),
+ value: value.to_vec(),
+ id: target_id,
+ source: self.id,
+ ttl,
+ stored_at: std::time::Instant::now(),
+ is_unique,
+ original: 3, // ORIGINAL_PUT_NUM
+ recvd: std::collections::HashSet::new(),
+ version: crate::dht::now_version(),
+ };
+ self.storage.store(val);
+
+ // If behind symmetric NAT, route through proxy
+ if self.nat.state() == NatState::SymmetricNat {
+ if let Some(server) = self.proxy.server().cloned() {
+ let store_msg = msg::StoreMsg {
+ id: target_id,
+ from: self.id,
+ key: key.to_vec(),
+ value: value.to_vec(),
+ ttl,
+ is_unique,
+ };
+ let total = HEADER_SIZE
+ + msg::STORE_FIXED
+ + store_msg.key.len()
+ + store_msg.value.len();
+ let mut buf = vec![0u8; total];
+ let hdr = MsgHeader::new(
+ MsgType::ProxyStore,
+ Self::len16(total),
+ self.id,
+ server.id,
+ );
+ if hdr.write(&mut buf).is_ok() {
+ let _ = msg::write_store(&mut buf, &store_msg);
+ let _ = self.send_signed(&buf, server.addr);
+ }
+ log::info!("put: via proxy to {:?}", server.id);
+ return;
+ }
+ }
+
+ // Direct: send STORE to k-closest known nodes
+ let closest = self
+ .dht_table
+ .closest(&target_id, self.config.num_find_node);
+ let store_msg = msg::StoreMsg {
+ id: target_id,
+ from: self.id,
+ key: key.to_vec(),
+ value: value.to_vec(),
+ ttl,
+ is_unique,
+ };
+ for peer in &closest {
+ if let Err(e) = self.send_store(peer, &store_msg) {
+ log::warn!("Failed to send store to {:?}: {e}", peer.id);
+ } else {
+ self.store_tracker.track(
+ target_id,
+ key.to_vec(),
+ value.to_vec(),
+ ttl,
+ is_unique,
+ peer.clone(),
+ );
+ }
+ }
+ log::info!("put: stored locally + sent to {} peers", closest.len());
+ }
+
+ /// Store multiple key-value pairs in the DHT.
+ ///
+ /// More efficient than calling `put()` in a loop:
+ /// groups stores by target peer to reduce redundant
+ /// lookups and sends.
+ pub fn put_batch(&mut self, entries: &[(&[u8], &[u8], u16, bool)]) {
+ // Group by target peer set to batch sends
+ struct BatchEntry {
+ target_id: NodeId,
+ key: Vec<u8>,
+ value: Vec<u8>,
+ ttl: u16,
+ is_unique: bool,
+ }
+
+ let mut batch: Vec<BatchEntry> = Vec::with_capacity(entries.len());
+
+ for &(key, value, ttl, is_unique) in entries {
+ let target_id = NodeId::from_key(key);
+
+ // Store locally
+ let val = crate::dht::StoredValue {
+ key: key.to_vec(),
+ value: value.to_vec(),
+ id: target_id,
+ source: self.id,
+ ttl,
+ stored_at: std::time::Instant::now(),
+ is_unique,
+ original: 3,
+ recvd: std::collections::HashSet::new(),
+ version: crate::dht::now_version(),
+ };
+ self.storage.store(val);
+
+ batch.push(BatchEntry {
+ target_id,
+ key: key.to_vec(),
+ value: value.to_vec(),
+ ttl,
+ is_unique,
+ });
+ }
+
+ // Collect unique peers across all targets to
+ // minimize redundant sends
+ let mut peer_stores: HashMap<NodeId, Vec<msg::StoreMsg>> =
+ HashMap::new();
+
+ for entry in &batch {
+ let closest = self
+ .dht_table
+ .closest(&entry.target_id, self.config.num_find_node);
+ let store_msg = msg::StoreMsg {
+ id: entry.target_id,
+ from: self.id,
+ key: entry.key.clone(),
+ value: entry.value.clone(),
+ ttl: entry.ttl,
+ is_unique: entry.is_unique,
+ };
+ for peer in &closest {
+ peer_stores
+ .entry(peer.id)
+ .or_default()
+ .push(store_msg.clone());
+ }
+ }
+
+ // Send all stores grouped by peer
+ let mut total_sent = 0u32;
+ for (peer_id, stores) in &peer_stores {
+ if let Some(peer) = self.peers.get(peer_id).cloned() {
+ if self.ban_list.is_banned(&peer.addr) {
+ continue;
+ }
+ for store in stores {
+ if self.send_store(&peer, store).is_ok() {
+ total_sent += 1;
+ }
+ }
+ }
+ }
+
+ log::info!(
+ "put_batch: {} entries stored locally, {total_sent} sends to {} peers",
+ batch.len(),
+ peer_stores.len(),
+ );
+ }
+
+ /// Retrieve multiple keys from the DHT.
+ ///
+ /// Returns a vec of (key, values) pairs. Local values
+ /// are returned immediately; missing keys trigger
+ /// iterative FIND_VALUE queries resolved via `poll()`.
+ pub fn get_batch(
+ &mut self,
+ keys: &[&[u8]],
+ ) -> Vec<(Vec<u8>, Vec<Vec<u8>>)> {
+ let mut results = Vec::with_capacity(keys.len());
+
+ for &key in keys {
+ let target_id = NodeId::from_key(key);
+ let local = self.storage.get(&target_id, key);
+
+ if !local.is_empty() {
+ let vals: Vec<Vec<u8>> =
+ local.into_iter().map(|v| v.value).collect();
+ results.push((key.to_vec(), vals));
+ } else {
+ // Start iterative FIND_VALUE for missing keys
+ if let Err(e) = self.start_find_value(key) {
+ log::debug!("Batch find_value failed for key: {e}");
+ }
+ results.push((key.to_vec(), Vec::new()));
+ }
+ }
+
+ results
+ }
+
+ /// Delete a key from the DHT.
+ ///
+ /// Sends a STORE with TTL=0 to the k-closest nodes,
+ /// which causes them to remove the value.
+ pub fn delete(&mut self, key: &[u8]) {
+ let target_id = NodeId::from_key(key);
+
+ // Remove locally
+ self.storage.remove(&target_id, key);
+
+ // Send TTL=0 store to closest nodes
+ let closest = self
+ .dht_table
+ .closest(&target_id, self.config.num_find_node);
+ let store_msg = msg::StoreMsg {
+ id: target_id,
+ from: self.id,
+ key: key.to_vec(),
+ value: Vec::new(),
+ ttl: 0,
+ is_unique: false,
+ };
+ for peer in &closest {
+ let _ = self.send_store(peer, &store_msg);
+ }
+ log::info!("delete: removed locally + sent to {} peers", closest.len());
+ }
+
+ /// Retrieve values for a key from the DHT.
+ ///
+ /// First checks local storage. If not found, starts
+ /// an iterative FIND_VALUE query across the network.
+ /// Returns local values immediately; remote results
+ /// arrive via `poll()` and can be retrieved with a
+ /// subsequent `get()` call (they'll be cached
+ /// locally by `handle_dht_find_value_reply`).
+ pub fn get(&mut self, key: &[u8]) -> Vec<Vec<u8>> {
+ let target_id = NodeId::from_key(key);
+
+ // Check local storage first
+ let local = self.storage.get(&target_id, key);
+ if !local.is_empty() {
+ return local.into_iter().map(|v| v.value).collect();
+ }
+
+ // Not found locally — start iterative FIND_VALUE
+ if let Err(e) = self.start_find_value(key) {
+ log::debug!("Failed to start find_value: {e}");
+ }
+
+ Vec::new()
+ }
+
+ /// Retrieve values with blocking network lookup.
+ ///
+ /// Polls internally until the value is found or
+ /// `timeout` expires. Returns empty if not found.
+ pub fn get_blocking(
+ &mut self,
+ key: &[u8],
+ timeout: Duration,
+ ) -> Vec<Vec<u8>> {
+ let target_id = NodeId::from_key(key);
+
+ // Check local first
+ let local = self.storage.get(&target_id, key);
+ if !local.is_empty() {
+ return local.into_iter().map(|v| v.value).collect();
+ }
+
+ // Start FIND_VALUE
+ if self.start_find_value(key).is_err() {
+ return Vec::new();
+ }
+
+ // Poll until found or timeout
+ let deadline = Instant::now() + timeout;
+ while Instant::now() < deadline {
+ let _ = self.poll();
+
+ let vals = self.storage.get(&target_id, key);
+ if !vals.is_empty() {
+ return vals.into_iter().map(|v| v.value).collect();
+ }
+
+ std::thread::sleep(Duration::from_millis(10));
+ }
+
+ Vec::new()
+ }
+
+ /// Start an iterative FIND_VALUE query for a key.
+ ///
+ /// Returns the query nonce. Results arrive via
+ /// `handle_dht_find_value_reply` during `poll()`.
+ pub fn start_find_value(&mut self, key: &[u8]) -> Result<u32, Error> {
+ let target_id = NodeId::from_key(key);
+ let nonce = self.alloc_nonce();
+ let mut query =
+ IterativeQuery::find_value(target_id, key.to_vec(), nonce);
+
+ // Seed with our closest known nodes
+ let closest = self
+ .dht_table
+ .closest(&target_id, self.config.num_find_node);
+ query.closest = closest;
+
+ self.queries.insert(nonce, query);
+
+ // Send initial batch
+ self.send_query_batch(nonce)?;
+
+ Ok(nonce)
+ }
+
+ /// Send a FIND_VALUE message to a specific address.
+ pub(crate) fn send_find_value_msg(
+ &mut self,
+ to: SocketAddr,
+ target: NodeId,
+ key: &[u8],
+ ) -> Result<u32, Error> {
+ let nonce = self.alloc_nonce();
+ let domain = if to.is_ipv4() {
+ DOMAIN_INET
+ } else {
+ DOMAIN_INET6
+ };
+
+ let fv = msg::FindValueMsg {
+ nonce,
+ target,
+ domain,
+ key: key.to_vec(),
+ use_rdp: false,
+ };
+
+ let total = HEADER_SIZE + msg::FIND_VALUE_FIXED + key.len();
+ let mut buf = vec![0u8; total];
+ let hdr = MsgHeader::new(
+ MsgType::DhtFindValue,
+ Self::len16(total),
+ self.id,
+ NodeId::from_bytes([0; crate::id::ID_LEN]),
+ );
+ hdr.write(&mut buf)?;
+ msg::write_find_value(&mut buf, &fv)?;
+ self.send_signed(&buf, to)?;
+
+ log::debug!(
+ "Sent find_value to {to} target={target:?} key={} bytes",
+ key.len()
+ );
+ Ok(nonce)
+ }
+
+ // ── Datagram ────────────────────────────────────
+
+ /// Send a datagram to a destination node.
+ ///
+ /// If the destination's address is known, fragments
+ /// are sent immediately. Otherwise they are queued
+ /// for delivery once the address is resolved.
+ pub fn send_dgram(&mut self, data: &[u8], dst: &NodeId) {
+ let fragments = dgram::fragment(data);
+ log::debug!(
+ "send_dgram: {} bytes, {} fragment(s) to {:?}",
+ data.len(),
+ fragments.len(),
+ dst
+ );
+
+ // If behind symmetric NAT, route through proxy
+ if self.nat.state() == NatState::SymmetricNat {
+ if let Some(server) = self.proxy.server().cloned() {
+ for frag in &fragments {
+ let total = HEADER_SIZE + frag.len();
+ let mut buf = vec![0u8; total];
+ let hdr = MsgHeader::new(
+ MsgType::ProxyDgram,
+ Self::len16(total),
+ self.id,
+ *dst,
+ );
+ if hdr.write(&mut buf).is_ok() {
+ buf[HEADER_SIZE..].copy_from_slice(frag);
+ let _ = self.send_signed(&buf, server.addr);
+ }
+ }
+ return;
+ }
+ }
+
+ // Direct: send if we know the address
+ if let Some(peer) = self.peers.get(dst).cloned() {
+ for frag in &fragments {
+ self.send_dgram_raw(frag, &peer);
+ }
+ } else {
+ // Queue for later delivery
+ for frag in fragments {
+ self.send_queue.push(*dst, frag, self.id);
+ }
+ }
+ }
+
+ /// Send a single dgram fragment wrapped in a
+ /// protocol message.
+ pub(crate) fn send_dgram_raw(&self, payload: &[u8], dst: &PeerInfo) {
+ let total = HEADER_SIZE + payload.len();
+ let mut buf = vec![0u8; total];
+ let hdr =
+ MsgHeader::new(MsgType::Dgram, Self::len16(total), self.id, dst.id);
+ if hdr.write(&mut buf).is_ok() {
+ buf[HEADER_SIZE..].copy_from_slice(payload);
+ let _ = self.send_signed(&buf, dst.addr);
+ }
+ }
+
+ /// Set the callback for received datagrams.
+ pub fn set_dgram_callback<F>(&mut self, f: F)
+ where
+ F: Fn(&[u8], &NodeId) + Send + 'static,
+ {
+ self.dgram_callback = Some(Box::new(f));
+ }
+
+ /// Remove the datagram callback.
+ pub fn unset_dgram_callback(&mut self) {
+ self.dgram_callback = None;
+ }
+
+ /// Set a callback for RDP events (ACCEPTED,
+ /// CONNECTED, READY2READ, RESET, FAILED, etc).
+ pub fn set_rdp_callback<F>(&mut self, f: F)
+ where
+ F: Fn(i32, &crate::rdp::RdpAddr, crate::rdp::RdpEvent) + Send + 'static,
+ {
+ self.rdp_callback = Some(Box::new(f));
+ }
+
+ /// Remove the RDP event callback.
+ pub fn unset_rdp_callback(&mut self) {
+ self.rdp_callback = None;
+ }
+
+ // ── RDP (reliable transport) ────────────────────
+
+ /// Listen for RDP connections on `port`.
+ pub fn rdp_listen(&mut self, port: u16) -> Result<i32, RdpError> {
+ self.rdp.listen(port)
+ }
+
+ /// Connect to a remote node via RDP.
+ ///
+ /// Sends a SYN packet immediately if the peer's
+ /// address is known.
+ pub fn rdp_connect(
+ &mut self,
+ sport: u16,
+ dst: &NodeId,
+ dport: u16,
+ ) -> Result<i32, RdpError> {
+ let desc = self.rdp.connect(sport, *dst, dport)?;
+ self.flush_rdp_output(desc);
+ Ok(desc)
+ }
+
+ /// Close an RDP connection or listener.
+ pub fn rdp_close(&mut self, desc: i32) {
+ self.rdp.close(desc);
+ }
+
+ /// Send data on an RDP connection.
+ ///
+ /// Enqueues data and flushes pending packets to the
+ /// network.
+ pub fn rdp_send(
+ &mut self,
+ desc: i32,
+ data: &[u8],
+ ) -> Result<usize, RdpError> {
+ let n = self.rdp.send(desc, data)?;
+ self.flush_rdp_output(desc);
+ Ok(n)
+ }
+
+ /// Receive data from an RDP connection.
+ pub fn rdp_recv(
+ &mut self,
+ desc: i32,
+ buf: &mut [u8],
+ ) -> Result<usize, RdpError> {
+ self.rdp.recv(desc, buf)
+ }
+
+ /// Get the state of an RDP descriptor.
+ pub fn rdp_state(&self, desc: i32) -> Result<RdpState, RdpError> {
+ self.rdp.get_state(desc)
+ }
+
+ /// Get status of all RDP connections.
+ pub fn rdp_status(&self) -> Vec<RdpStatus> {
+ self.rdp.get_status()
+ }
+
+ /// Set RDP maximum retransmission timeout.
+ pub fn rdp_set_max_retrans(&mut self, secs: u64) {
+ self.rdp.set_max_retrans(Duration::from_secs(secs));
+ }
+
+ /// Get RDP maximum retransmission timeout.
+ pub fn rdp_max_retrans(&self) -> u64 {
+ self.rdp.max_retrans().as_secs()
+ }
+
+ // ── Event loop ──────────────────────────────────
+
+ /// Process one iteration of the event loop.
+ ///
+ /// Polls for I/O events, processes incoming packets,
+ /// fires expired timers, and runs maintenance tasks.
+ pub fn poll(&mut self) -> Result<(), Error> {
+ self.poll_timeout(DEFAULT_POLL_TIMEOUT)
+ }
+
+ /// Poll with a custom maximum timeout.
+ ///
+ /// Use a short timeout (e.g. 1ms) in tests with
+ /// many nodes to avoid blocking.
+ pub fn poll_timeout(&mut self, max_timeout: Duration) -> Result<(), Error> {
+ let timeout = self
+ .timers
+ .next_deadline()
+ .map(|d| d.min(max_timeout))
+ .unwrap_or(max_timeout);
+
+ self.net.poll_events(timeout)?;
+
+ // Check if we got a UDP event. We must not hold
+ // a borrow on self.net while calling handle_packet,
+ // so we just check and then drain separately.
+ let has_udp = self.net.drain_events().any(|ev| ev.token() == UDP_TOKEN);
+
+ if has_udp {
+ let mut buf = [0u8; 4096];
+ while let Ok((len, from)) = self.net.recv_from(&mut buf) {
+ self.handle_packet(&buf[..len], from);
+ }
+ }
+
+ // Fire timers
+ let _fired = self.timers.tick();
+
+ // Drive iterative queries
+ self.drive_queries();
+
+ // Drain send queue for destinations we now know
+ self.drain_send_queue();
+
+ // Drive RDP: tick timeouts and flush output
+ let rdp_actions = self.rdp.tick();
+ for action in rdp_actions {
+ if let crate::rdp::RdpAction::Event { desc, event, .. } = action {
+ log::info!("RDP tick event: desc={desc} {event:?}");
+ }
+ }
+ self.flush_all_rdp();
+
+ // Periodic maintenance
+ self.peers.refresh();
+ self.storage.expire();
+ self.advertise.refresh();
+ self.reassembler.expire();
+ self.send_queue.expire();
+ self.refresh_buckets();
+ self.probe_liveness();
+
+ self.rate_limiter.cleanup();
+
+ // Node activity monitor (proactive ping)
+ self.check_node_activity();
+
+ // Retry failed stores
+ self.retry_failed_stores();
+
+ // Ban list cleanup
+ self.ban_list.cleanup();
+ self.store_tracker.cleanup();
+
+ // Expire stale pending pings (>10s) — record
+ // failure for unresponsive peers
+ let expired_pings: Vec<(u32, NodeId)> = self
+ .pending_pings
+ .iter()
+ .filter(|(_, (_, sent))| sent.elapsed().as_secs() >= 10)
+ .map(|(nonce, (id, _))| (*nonce, *id))
+ .collect();
+ for (nonce, peer_id) in &expired_pings {
+ if let Some(peer) = self.peers.get(peer_id) {
+ self.ban_list.record_failure(peer.addr);
+ }
+ // Also record failure in routing table for
+ // stale count / replacement cache logic
+ if let Some(evicted) = self.dht_table.record_failure(peer_id) {
+ log::debug!(
+ "Replaced stale peer {:?} from routing table",
+ evicted
+ );
+ }
+ self.pending_pings.remove(nonce);
+ }
+
+ // Data restore (every 120s)
+ if self.last_restore.elapsed() >= self.config.restore_interval {
+ self.last_restore = Instant::now();
+ self.restore_data();
+ }
+
+ // Maintain: mask_bit exploration (every 120s)
+ if self.last_maintain.elapsed() >= self.config.maintain_interval {
+ self.last_maintain = Instant::now();
+ self.run_maintain();
+ }
+
+ // DTUN maintenance
+ let dtun_targets = self.dtun.maintain();
+ for target in dtun_targets {
+ if let Err(e) = self.start_find_node(target) {
+ log::debug!("DTUN maintain find_node failed: {e}");
+ }
+ }
+
+ // NAT re-detection
+ self.nat.expire_pending();
+
+ Ok(())
+ }
+
+ /// Run the event loop forever.
+ pub fn run(&mut self) -> ! {
+ loop {
+ if let Err(e) = self.poll() {
+ log::error!("Event loop error: {e}");
+ }
+ }
+ }
+
+ /// Set the node configuration. Call before `join()`.
+ pub fn set_config(&mut self, config: crate::config::Config) {
+ self.config = config;
+ }
+
+ /// Get the current configuration.
+ pub fn config(&self) -> &crate::config::Config {
+ &self.config
+ }
+
+ /// Set the routing table persistence backend.
+ pub fn set_routing_persistence(
+ &mut self,
+ p: Box<dyn crate::persist::RoutingPersistence>,
+ ) {
+ self.routing_persistence = p;
+ }
+
+ /// Set the data persistence backend.
+ pub fn set_data_persistence(
+ &mut self,
+ p: Box<dyn crate::persist::DataPersistence>,
+ ) {
+ self.data_persistence = p;
+ }
+
+ /// Load saved contacts and data from persistence
+ /// backends. Call after bind, before join.
+ pub fn load_persisted(&mut self) {
+ // Load routing table contacts
+ if let Ok(contacts) = self.routing_persistence.load_contacts() {
+ let mut loaded = 0usize;
+ for c in contacts {
+ // Validate: skip zero IDs and unspecified addrs
+ if c.id.is_zero() {
+ log::debug!("Skipping persisted contact: zero ID");
+ continue;
+ }
+ if c.addr.ip().is_unspecified() {
+ log::debug!("Skipping persisted contact: unspecified addr");
+ continue;
+ }
+ let peer = PeerInfo::new(c.id, c.addr);
+ self.dht_table.add(peer.clone());
+ self.peers.add(peer);
+ loaded += 1;
+ }
+ log::info!("Loaded {loaded} persisted contacts");
+ }
+
+ // Load stored values
+ if let Ok(records) = self.data_persistence.load() {
+ for r in &records {
+ let val = crate::dht::StoredValue {
+ key: r.key.clone(),
+ value: r.value.clone(),
+ id: r.target_id,
+ source: r.source,
+ ttl: r.ttl,
+ stored_at: Instant::now(),
+ is_unique: r.is_unique,
+ original: 0,
+ recvd: std::collections::HashSet::new(),
+ version: 0, // persisted data has no version
+ };
+ self.storage.store(val);
+ }
+ log::info!("Loaded {} persisted values", records.len());
+ }
+ }
+
+ /// Save current state to persistence backends.
+ /// Called during shutdown or periodically.
+ pub fn save_state(&self) {
+ // Save routing table contacts
+ let contacts: Vec<crate::persist::ContactRecord> = self
+ .dht_table
+ .closest(&self.id, 1000) // save all
+ .iter()
+ .map(|p| crate::persist::ContactRecord {
+ id: p.id,
+ addr: p.addr,
+ })
+ .collect();
+
+ if let Err(e) = self.routing_persistence.save_contacts(&contacts) {
+ log::warn!("Failed to save contacts: {e}");
+ }
+
+ // Save stored values
+ let values = self.storage.all_values();
+ let records: Vec<crate::persist::StoredRecord> = values
+ .iter()
+ .map(|v| crate::persist::StoredRecord {
+ key: v.key.clone(),
+ value: v.value.clone(),
+ target_id: v.id,
+ source: v.source,
+ ttl: v.remaining_ttl(),
+ is_unique: v.is_unique,
+ })
+ .collect();
+
+ if let Err(e) = self.data_persistence.save(&records) {
+ log::warn!("Failed to save values: {e}");
+ }
+ }
+
+ /// Graceful shutdown: notify closest peers that we
+ /// are leaving, so they can remove us from their
+ /// routing tables immediately.
+ pub fn shutdown(&mut self) {
+ log::info!("Shutting down node {}", self.id);
+
+ // Send a "leaving" ping to our closest peers
+ // so they know to remove us
+ let closest =
+ self.dht_table.closest(&self.id, self.config.num_find_node);
+
+ for peer in &closest {
+ // Send FIN-like notification via advertise
+ let nonce = self.alloc_nonce();
+ let size = HEADER_SIZE + 8;
+ let mut buf = vec![0u8; size];
+ let hdr = MsgHeader::new(
+ MsgType::Advertise,
+ Self::len16(size),
+ self.id,
+ peer.id,
+ );
+ if hdr.write(&mut buf).is_ok() {
+ buf[HEADER_SIZE..HEADER_SIZE + 4]
+ .copy_from_slice(&nonce.to_be_bytes());
+
+ // Session 0 = shutdown signal
+ buf[HEADER_SIZE + 4..HEADER_SIZE + 8].fill(0);
+ let _ = self.send_signed(&buf, peer.addr);
+ }
+ }
+
+ log::info!("Shutdown: notified {} peers", closest.len());
+
+ // Persist state
+ self.save_state();
+ }
+
+ // ── Routing table access ────────────────────────
+
+ /// Number of peers in the DHT routing table.
+ pub fn routing_table_size(&self) -> usize {
+ self.dht_table.size()
+ }
+
+ /// Number of known peers.
+ pub fn peer_count(&self) -> usize {
+ self.peers.len()
+ }
+
+ /// Snapshot of metrics counters.
+ pub fn metrics(&self) -> crate::metrics::MetricsSnapshot {
+ self.metrics.snapshot()
+ }
+
+ /// Number of stored DHT values.
+ pub fn storage_count(&self) -> usize {
+ self.storage.len()
+ }
+
+ /// All stored DHT values (key, value bytes).
+ /// Used by applications to sync DHT-replicated data
+ /// to their own persistence layer.
+ pub fn dht_values(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
+ self.storage
+ .all_values()
+ .into_iter()
+ .map(|v| (v.key, v.value))
+ .collect()
+ }
+
+ /// Number of currently banned peers.
+ pub fn ban_count(&self) -> usize {
+ self.ban_list.ban_count()
+ }
+
+ /// Number of pending store operations awaiting ack.
+ pub fn pending_stores(&self) -> usize {
+ self.store_tracker.pending_count()
+ }
+
+ /// Store tracker statistics: (acks, failures).
+ pub fn store_stats(&self) -> (u64, u64) {
+ (self.store_tracker.acks, self.store_tracker.failures)
+ }
+
+ /// Print node state (debug).
+ pub fn print_state(&self) {
+ println!("MyID = {}", self.id);
+ println!();
+ println!("Node State:");
+ println!(" {:?}", self.nat_state());
+ println!();
+ println!("Routing Table: {} nodes", self.dht_table.size());
+ self.dht_table.print_table();
+ println!();
+ println!("DTUN: {} registrations", self.dtun.registration_count());
+ println!("Peers: {} known", self.peers.len());
+ println!("Storage: {} values", self.storage.len());
+ println!("Bans: {} active", self.ban_list.ban_count());
+ println!(
+ "Stores: {} pending, {} acked, {} failed",
+ self.store_tracker.pending_count(),
+ self.store_tracker.acks,
+ self.store_tracker.failures,
+ );
+ println!(
+ "RDP: {} connections, {} listeners",
+ self.rdp.connection_count(),
+ self.rdp.listener_count()
+ );
+ }
+}
+
+impl fmt::Display for Node {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "Node({}, {:?}, {} peers, {} stored)",
+ self.id,
+ self.nat_state(),
+ self.dht_table.size(),
+ self.storage.len()
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn bind_creates_node() {
+ let node = Node::bind(0).unwrap();
+ assert!(!node.id().is_zero());
+ assert_eq!(node.routing_table_size(), 0);
+ assert_eq!(node.nat_state(), NatState::Unknown);
+ }
+
+ #[test]
+ fn set_id_from_data() {
+ let mut node1 = Node::bind(0).unwrap();
+ let mut node2 = Node::bind(0).unwrap();
+
+ let old_id = *node1.id();
+ node1.set_id(b"my-application-id");
+ assert_ne!(*node1.id(), old_id);
+
+ // Deterministic: same seed → same ID
+ node2.set_id(b"my-application-id");
+ assert_eq!(*node1.id(), *node2.id());
+
+ // Public key matches NodeId
+ assert_eq!(node1.public_key(), node1.id().as_bytes());
+ }
+
+ #[test]
+ fn id_hex_format() {
+ let node = Node::bind(0).unwrap();
+ let hex = node.id_hex();
+ assert_eq!(hex.len(), 64);
+ assert!(hex.chars().all(|c| c.is_ascii_hexdigit()));
+ }
+
+ #[test]
+ fn nat_state_set_get() {
+ let mut node = Node::bind(0).unwrap();
+ assert_eq!(node.nat_state(), NatState::Unknown);
+
+ node.set_nat_state(NatState::Global);
+ assert_eq!(node.nat_state(), NatState::Global);
+
+ node.set_nat_state(NatState::ConeNat);
+ assert_eq!(node.nat_state(), NatState::ConeNat);
+
+ node.set_nat_state(NatState::SymmetricNat);
+ assert_eq!(node.nat_state(), NatState::SymmetricNat);
+ }
+
+ #[test]
+ fn put_get_local() {
+ let mut node = Node::bind(0).unwrap();
+ node.put(b"hello", b"world", 300, false);
+
+ let vals = node.get(b"hello");
+ assert_eq!(vals.len(), 1);
+ assert_eq!(vals[0], b"world");
+ }
+
+ #[test]
+ fn put_unique() {
+ let mut node = Node::bind(0).unwrap();
+ node.put(b"key", b"val1", 300, true);
+ node.put(b"key", b"val2", 300, true);
+
+ let vals = node.get(b"key");
+
+ // Unique from same source → replaced
+ assert_eq!(vals.len(), 1);
+ assert_eq!(vals[0], b"val2");
+ }
+
+ #[test]
+ fn get_nonexistent() {
+ let mut node = Node::bind(0).unwrap();
+ let vals = node.get(b"nope");
+ assert!(vals.is_empty());
+ }
+
+ #[test]
+ fn rdp_listen_and_close() {
+ let mut node = Node::bind(0).unwrap();
+ let _desc = node.rdp_listen(5000).unwrap();
+
+ // Listener desc is not a connection, so
+ // rdp_state won't find it. Just verify close
+ // doesn't panic.
+ node.rdp_close(_desc);
+ }
+
+ #[test]
+ fn rdp_connect_creates_syn() {
+ let mut node = Node::bind(0).unwrap();
+ let dst = NodeId::from_bytes([0x01; 32]);
+ let desc = node.rdp_connect(0, &dst, 5000).unwrap();
+ assert_eq!(node.rdp_state(desc).unwrap(), RdpState::SynSent);
+ }
+
+ #[test]
+ fn rdp_status() {
+ let mut node = Node::bind(0).unwrap();
+ let dst = NodeId::from_bytes([0x01; 32]);
+ node.rdp_connect(0, &dst, 5000).unwrap();
+ let status = node.rdp_status();
+ assert_eq!(status.len(), 1);
+ }
+
+ #[test]
+ fn rdp_max_retrans() {
+ let mut node = Node::bind(0).unwrap();
+ node.rdp_set_max_retrans(60);
+ assert_eq!(node.rdp_max_retrans(), 60);
+ }
+
+ #[test]
+ fn display() {
+ let node = Node::bind(0).unwrap();
+ let s = format!("{node}");
+ assert!(s.starts_with("Node("));
+ assert!(s.contains("Unknown"));
+ }
+
+ #[test]
+ fn dgram_callback() {
+ let mut node = Node::bind(0).unwrap();
+ assert!(node.dgram_callback.is_none());
+ node.set_dgram_callback(|_data, _from| {});
+ assert!(node.dgram_callback.is_some());
+ node.unset_dgram_callback();
+ assert!(node.dgram_callback.is_none());
+ }
+
+ #[test]
+ fn join_with_invalid_host() {
+ let mut node = Node::bind(0).unwrap();
+ let result = node.join("this-host-does-not-exist.invalid", 3000);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn poll_once() {
+ let mut node = Node::bind(0).unwrap();
+
+ // Should not block forever (default 1s timeout,
+ // but returns quickly with no events)
+ node.poll().unwrap();
+ }
+}