diff options
| author | murilo ijanc | 2026-03-24 15:04:03 -0300 |
|---|---|---|
| committer | murilo ijanc | 2026-03-24 15:04:03 -0300 |
| commit | 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch) | |
| tree | 53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/dtun.rs | |
| download | tesseras-dht-0.1.0.tar.gz | |
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks.
Features:
- Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE)
- NAT traversal via DTUN hole-punching and proxy relay
- Reliable Datagram Protocol (RDP) with 7-state connection machine
- Datagram transport with automatic fragmentation/reassembly
- Ed25519 packet authentication
- 256-bit node IDs (Ed25519 public keys)
- Rate limiting, ban list, and eclipse attack mitigation
- Persistence and metrics
- OpenBSD and Linux support
Diffstat (limited to 'src/dtun.rs')
| -rw-r--r-- | src/dtun.rs | 436 |
1 files changed, 436 insertions, 0 deletions
diff --git a/src/dtun.rs b/src/dtun.rs new file mode 100644 index 0000000..367262c --- /dev/null +++ b/src/dtun.rs @@ -0,0 +1,436 @@ +//! Distributed tunnel for NAT traversal (DTUN). +//! +//! Maintains a separate +//! routing table used to register NAT'd nodes and resolve +//! their addresses for hole-punching. +//! +//! ## How it works +//! +//! 1. A node behind NAT **registers** itself with the +//! k-closest global nodes (find_node + register). +//! 2. When another node wants to reach a NAT'd node, it +//! does a **find_value** in the DTUN table to discover +//! which global node holds the registration. +//! 3. It then sends a **request** to that global node, +//! which forwards the request to the NAT'd node, +//! causing it to send a packet that punches a hole. + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use crate::id::NodeId; +use crate::peers::PeerInfo; +use crate::routing::RoutingTable; + +// ── Constants ──────────────────────────────────────── + +/// k-closest nodes for DTUN lookups. +pub const DTUN_NUM_FIND_NODE: usize = 10; + +/// Max parallel queries. +pub const DTUN_MAX_QUERY: usize = 6; + +/// Query timeout. +pub const DTUN_QUERY_TIMEOUT: Duration = Duration::from_secs(2); + +/// Retries for reachability requests. +pub const DTUN_REQUEST_RETRY: usize = 2; + +/// Request timeout. +pub const DTUN_REQUEST_TIMEOUT: Duration = Duration::from_secs(2); + +/// TTL for node registrations. +pub const DTUN_REGISTERED_TTL: Duration = Duration::from_secs(300); + +/// Refresh timer interval. +pub const DTUN_TIMER_INTERVAL: Duration = Duration::from_secs(30); + +/// Maintenance interval (mask_bit exploration). +pub const DTUN_MAINTAIN_INTERVAL: Duration = Duration::from_secs(120); + +// ── Registration record ───────────────────────────── + +/// A node registered in the DTUN overlay. +#[derive(Debug, Clone)] +pub struct Registration { + /// The registered node's address. + pub addr: SocketAddr, + + /// Session identifier (must match for updates). + pub session: u32, + + /// When this registration was created/refreshed. + pub registered_at: Instant, +} + +impl Registration { + /// Check if this registration has expired. + pub fn is_expired(&self) -> bool { + self.registered_at.elapsed() >= DTUN_REGISTERED_TTL + } +} + +// ── Request state ─────────────────────────────────── + +/// State of a reachability request. +#[derive(Debug)] +pub struct RequestState { + /// Target node we're trying to reach. + pub target: NodeId, + + /// When the request was sent. + pub sent_at: Instant, + + /// Remaining retries. + pub retries: usize, + + /// Whether find_value completed. + pub found: bool, + + /// The intermediary node (if found). + pub intermediary: Option<PeerInfo>, +} + +// ── DTUN ──────────────────────────────────────────── + +/// Distributed tunnel for NAT traversal. +pub struct Dtun { + /// Separate routing table for the DTUN overlay. + table: RoutingTable, + + /// Nodes registered through us (we're their + /// "registration server"). Capped at 1000. + registered: HashMap<NodeId, Registration>, + + /// Our own registration session. + register_session: u32, + + /// Whether we're currently registering. + registering: bool, + + /// Last time registration was refreshed. + last_registered: Instant, + + /// Pending reachability requests by nonce. + requests: HashMap<u32, RequestState>, + + /// Whether DTUN is enabled. + enabled: bool, + + /// Local node ID. + id: NodeId, + + /// Mask bit for maintain() exploration. + mask_bit: usize, + + /// Last maintain() call. + last_maintain: Instant, +} + +impl Dtun { + pub fn new(id: NodeId) -> Self { + Self { + table: RoutingTable::new(id), + registered: HashMap::new(), + register_session: 0, + registering: false, + last_registered: Instant::now(), + requests: HashMap::new(), + enabled: true, + id, + mask_bit: 1, + last_maintain: Instant::now(), + } + } + + /// Access the DTUN routing table. + pub fn table(&self) -> &RoutingTable { + &self.table + } + + /// Mutable access to the routing table. + pub fn table_mut(&mut self) -> &mut RoutingTable { + &mut self.table + } + + /// Whether DTUN is enabled. + pub fn is_enabled(&self) -> bool { + self.enabled + } + + /// Enable or disable DTUN. + pub fn set_enabled(&mut self, enabled: bool) { + self.enabled = enabled; + } + + /// Current registration session. + pub fn session(&self) -> u32 { + self.register_session + } + + // ── Registration (server side) ────────────────── + + /// Register a remote node (we act as their + /// registration server). + /// + /// Returns `true` if the registration was accepted. + pub fn register_node( + &mut self, + id: NodeId, + addr: SocketAddr, + session: u32, + ) -> bool { + const MAX_REGISTRATIONS: usize = 1000; + if let Some(existing) = self.registered.get(&id) { + if existing.session != session && !existing.is_expired() { + return false; + } + } + if self.registered.len() >= MAX_REGISTRATIONS + && !self.registered.contains_key(&id) + { + log::debug!("DTUN: registration limit reached"); + return false; + } + + self.registered.insert( + id, + Registration { + addr, + session, + registered_at: Instant::now(), + }, + ); + true + } + + /// Look up a registered node by ID. + pub fn get_registered(&self, id: &NodeId) -> Option<&Registration> { + self.registered.get(id).filter(|r| !r.is_expired()) + } + + /// Remove expired registrations. + pub fn expire_registrations(&mut self) { + self.registered.retain(|_, r| !r.is_expired()); + } + + /// Number of active registrations. + pub fn registration_count(&self) -> usize { + self.registered.values().filter(|r| !r.is_expired()).count() + } + + // ── Registration (client side) ────────────────── + + /// Prepare to register ourselves. Increments the + /// session and returns (session, closest_nodes) for + /// the caller to send DTUN_REGISTER messages. + pub fn prepare_register(&mut self) -> (u32, Vec<PeerInfo>) { + self.register_session = self.register_session.wrapping_add(1); + self.registering = true; + let closest = self.table.closest(&self.id, DTUN_NUM_FIND_NODE); + (self.register_session, closest) + } + + /// Mark registration as complete. + pub fn registration_done(&mut self) { + self.registering = false; + self.last_registered = Instant::now(); + } + + /// Check if re-registration is needed. + pub fn needs_reregister(&self) -> bool { + !self.registering + && self.last_registered.elapsed() >= DTUN_REGISTERED_TTL / 2 + } + + // ── Reachability requests ─────────────────────── + + /// Start a reachability request for a target node. + pub fn start_request(&mut self, nonce: u32, target: NodeId) { + self.requests.insert( + nonce, + RequestState { + target, + sent_at: Instant::now(), + retries: DTUN_REQUEST_RETRY, + found: false, + intermediary: None, + }, + ); + } + + /// Record that find_value found the intermediary for + /// a request. + pub fn request_found(&mut self, nonce: u32, intermediary: PeerInfo) { + if let Some(req) = self.requests.get_mut(&nonce) { + req.found = true; + req.intermediary = Some(intermediary); + } + } + + /// Get the intermediary for a pending request. + pub fn get_request(&self, nonce: &u32) -> Option<&RequestState> { + self.requests.get(nonce) + } + + /// Remove a completed or timed-out request. + pub fn remove_request(&mut self, nonce: &u32) -> Option<RequestState> { + self.requests.remove(nonce) + } + + /// Expire timed-out requests. + pub fn expire_requests(&mut self) { + self.requests.retain(|_, req| { + req.sent_at.elapsed() < DTUN_REQUEST_TIMEOUT || req.retries > 0 + }); + } + + // ── Maintenance ───────────────────────────────── + + /// Periodic maintenance: explore the ID space with + /// mask_bit and expire stale data. + /// + /// Returns target IDs for find_node queries, or empty + /// if maintenance isn't due yet. + pub fn maintain(&mut self) -> Vec<NodeId> { + if self.last_maintain.elapsed() < DTUN_MAINTAIN_INTERVAL { + return Vec::new(); + } + self.last_maintain = Instant::now(); + + // Generate exploration targets + let id_bytes = *self.id.as_bytes(); + let t1 = clear_bit(id_bytes, self.mask_bit); + let t2 = clear_bit(id_bytes, self.mask_bit + 1); + + self.mask_bit += 2; + if self.mask_bit > 20 { + self.mask_bit = 1; + } + + self.expire_registrations(); + self.expire_requests(); + + vec![t1, t2] + } +} + +/// Clear a specific bit (1-indexed from MSB) in a NodeId. +fn clear_bit( + mut bytes: [u8; crate::id::ID_LEN], + bit_from_msb: usize, +) -> NodeId { + if bit_from_msb == 0 || bit_from_msb > crate::id::ID_BITS { + return NodeId::from_bytes(bytes); + } + let pos = bit_from_msb - 1; + let byte_idx = pos / 8; + let bit_idx = 7 - (pos % 8); + bytes[byte_idx] &= !(1 << bit_idx); + NodeId::from_bytes(bytes) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn addr(port: u16) -> SocketAddr { + SocketAddr::from(([127, 0, 0, 1], port)) + } + + #[test] + fn register_and_lookup() { + let mut dtun = Dtun::new(NodeId::from_bytes([0x01; 32])); + let nid = NodeId::from_bytes([0x02; 32]); + + assert!(dtun.register_node(nid, addr(3000), 1)); + assert!(dtun.get_registered(&nid).is_some()); + assert_eq!(dtun.registration_count(), 1); + } + + #[test] + fn register_rejects_different_session() { + let mut dtun = Dtun::new(NodeId::from_bytes([0x01; 32])); + let nid = NodeId::from_bytes([0x02; 32]); + + assert!(dtun.register_node(nid, addr(3000), 1)); + + // Different session, not expired → rejected + assert!(!dtun.register_node(nid, addr(3001), 2)); + } + + #[test] + fn expire_registrations() { + let mut dtun = Dtun::new(NodeId::from_bytes([0x01; 32])); + let nid = NodeId::from_bytes([0x02; 32]); + dtun.registered.insert( + nid, + Registration { + addr: addr(3000), + session: 1, + + // Expired: registered 10 minutes ago + registered_at: Instant::now() - Duration::from_secs(600), + }, + ); + + dtun.expire_registrations(); + assert_eq!(dtun.registration_count(), 0); + } + + #[test] + fn request_lifecycle() { + let mut dtun = Dtun::new(NodeId::from_bytes([0x01; 32])); + let target = NodeId::from_bytes([0x02; 32]); + + dtun.start_request(42, target); + assert!(dtun.get_request(&42).is_some()); + + let intermediary = + PeerInfo::new(NodeId::from_bytes([0x03; 32]), addr(4000)); + dtun.request_found(42, intermediary); + assert!(dtun.get_request(&42).unwrap().found); + + dtun.remove_request(&42); + assert!(dtun.get_request(&42).is_none()); + } + + #[test] + fn prepare_register_increments_session() { + let mut dtun = Dtun::new(NodeId::from_bytes([0x01; 32])); + let (s1, _) = dtun.prepare_register(); + let (s2, _) = dtun.prepare_register(); + assert_eq!(s2, s1 + 1); + } + + #[test] + fn maintain_returns_targets() { + let mut dtun = Dtun::new(NodeId::from_bytes([0xFF; 32])); + + // Force last_maintain to be old enough + dtun.last_maintain = Instant::now() - DTUN_MAINTAIN_INTERVAL; + + let targets = dtun.maintain(); + assert_eq!(targets.len(), 2); + + // Should differ from local ID + assert_ne!(targets[0], dtun.id); + assert_ne!(targets[1], dtun.id); + } + + #[test] + fn maintain_skips_if_recent() { + let mut dtun = Dtun::new(NodeId::from_bytes([0xFF; 32])); + let targets = dtun.maintain(); + assert!(targets.is_empty()); + } + + #[test] + fn enable_disable() { + let mut dtun = Dtun::new(NodeId::from_bytes([0x01; 32])); + assert!(dtun.is_enabled()); + dtun.set_enabled(false); + assert!(!dtun.is_enabled()); + } +} |