//! Proxy relay for symmetric NAT nodes. //! //! When a node is //! behind a symmetric NAT (where hole-punching fails), //! it registers with a global node that acts as a relay: //! //! - **Store**: the NAT'd node sends store requests to //! its proxy, which forwards them to the DHT. //! - **Get**: the NAT'd node sends get requests to its //! proxy, which performs the lookup and returns results. //! - **Dgram**: datagrams to/from NAT'd nodes are //! forwarded through the proxy. //! - **RDP**: reliable transport is also tunnelled //! through the proxy (type_proxy_rdp). use std::collections::HashMap; use std::net::SocketAddr; use std::time::{Duration, Instant}; use crate::id::NodeId; use crate::peers::PeerInfo; // ── Constants ──────────────────────────────────────── /// Registration timeout. pub const PROXY_REGISTER_TIMEOUT: Duration = Duration::from_secs(2); /// Registration TTL. pub const PROXY_REGISTER_TTL: Duration = Duration::from_secs(300); /// Get request timeout. pub const PROXY_GET_TIMEOUT: Duration = Duration::from_secs(10); /// Maintenance timer interval. pub const PROXY_TIMER_INTERVAL: Duration = Duration::from_secs(30); /// RDP timeout for proxy connections. pub const PROXY_RDP_TIMEOUT: Duration = Duration::from_secs(30); /// RDP port for proxy store. pub const PROXY_STORE_PORT: u16 = 200; /// RDP port for proxy get. pub const PROXY_GET_PORT: u16 = 201; /// RDP port for proxy get reply. pub const PROXY_GET_REPLY_PORT: u16 = 202; // ── Client state ──────────────────────────────────── /// A node registered as a client of this proxy. #[derive(Debug, Clone)] pub struct ProxyClient { /// Client's address (behind NAT). pub addr: SocketAddr, /// DTUN session for validation. pub session: u32, /// When the client last registered/refreshed. pub registered_at: Instant, } impl ProxyClient { pub fn is_expired(&self) -> bool { self.registered_at.elapsed() >= PROXY_REGISTER_TTL } } // ── Pending get state ─────────────────────────────── /// State for a pending proxy get request. #[derive(Debug)] pub struct PendingGet { /// Key being looked up. pub key: Vec, /// Nonce for correlation. pub nonce: u32, /// When the request was sent. pub sent_at: Instant, /// Whether we've received a result. pub completed: bool, /// Collected values. pub values: Vec>, } // ── Proxy ─────────────────────────────────────────── /// Proxy relay for symmetric NAT traversal. /// /// Acts in two roles: /// - **Client**: when we're behind symmetric NAT, we /// register with a proxy server and route operations /// through it. /// - **Server**: when we have a public IP, we accept /// registrations from NAT'd nodes and relay their /// traffic. pub struct Proxy { /// Our proxy server (when we're a client). server: Option, /// Whether we've successfully registered with our /// proxy server. is_registered: bool, /// Whether registration is in progress. is_registering: bool, /// Nonce for registration correlation. register_nonce: u32, /// Clients registered with us. Capped at 500. clients: HashMap, /// Pending get requests. Capped at 100. pending_gets: HashMap, } impl Proxy { pub fn new(_id: NodeId) -> Self { Self { server: None, is_registered: false, is_registering: false, register_nonce: 0, clients: HashMap::new(), pending_gets: HashMap::new(), } } // ── Client role ───────────────────────────────── /// Set the proxy server to use. pub fn set_server(&mut self, server: PeerInfo) { self.server = Some(server); self.is_registered = false; } /// Get the current proxy server. pub fn server(&self) -> Option<&PeerInfo> { self.server.as_ref() } /// Whether we're registered with a proxy server. pub fn is_registered(&self) -> bool { self.is_registered } /// Start proxy registration. Returns the nonce to /// include in the register message. pub fn start_register(&mut self, nonce: u32) -> Option { self.server.as_ref()?; self.is_registering = true; self.register_nonce = nonce; Some(nonce) } /// Handle registration reply. pub fn recv_register_reply(&mut self, nonce: u32) -> bool { if nonce != self.register_nonce { return false; } self.is_registering = false; self.is_registered = true; log::info!("Registered with proxy server"); true } // ── Server role ───────────────────────────────── /// Register a client node (we act as their proxy). pub fn register_client( &mut self, id: NodeId, addr: SocketAddr, session: u32, ) -> bool { const MAX_CLIENTS: usize = 500; if self.clients.len() >= MAX_CLIENTS && !self.clients.contains_key(&id) { return false; } if let Some(existing) = self.clients.get(&id) { if existing.session != session && !existing.is_expired() { return false; } } self.clients.insert( id, ProxyClient { addr, session, registered_at: Instant::now(), }, ); log::debug!("Proxy: registered client {id:?}"); true } /// Check if a node is registered as our client. pub fn is_client_registered(&self, id: &NodeId) -> bool { self.clients .get(id) .map(|c| !c.is_expired()) .unwrap_or(false) } /// Get a registered client's info. pub fn get_client(&self, id: &NodeId) -> Option<&ProxyClient> { self.clients.get(id).filter(|c| !c.is_expired()) } /// Number of active proxy clients. pub fn client_count(&self) -> usize { self.clients.values().filter(|c| !c.is_expired()).count() } // ── Pending get management ────────────────────── /// Start a proxied get request. pub fn start_get(&mut self, nonce: u32, key: Vec) { const MAX_GETS: usize = 100; if self.pending_gets.len() >= MAX_GETS { log::debug!("Proxy: too many pending gets"); return; } self.pending_gets.insert( nonce, PendingGet { key, nonce, sent_at: Instant::now(), completed: false, values: Vec::new(), }, ); } /// Add a value to a pending get. pub fn add_get_value(&mut self, nonce: u32, value: Vec) -> bool { if let Some(pg) = self.pending_gets.get_mut(&nonce) { pg.values.push(value); true } else { false } } /// Complete a pending get request. Returns the /// collected values. pub fn complete_get(&mut self, nonce: u32) -> Option>> { self.pending_gets.remove(&nonce).map(|pg| pg.values) } // ── Maintenance ───────────────────────────────── /// Remove expired clients and timed-out gets. pub fn refresh(&mut self) { self.clients.retain(|_, c| !c.is_expired()); self.pending_gets .retain(|_, pg| pg.sent_at.elapsed() < PROXY_GET_TIMEOUT); } } #[cfg(test)] mod tests { use super::*; fn addr(port: u16) -> SocketAddr { SocketAddr::from(([127, 0, 0, 1], port)) } fn peer(byte: u8, port: u16) -> PeerInfo { PeerInfo::new(NodeId::from_bytes([byte; 32]), addr(port)) } // ── Client tests ──────────────────────────────── #[test] fn client_register_flow() { let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); assert!(!proxy.is_registered()); proxy.set_server(peer(0x02, 5000)); let nonce = proxy.start_register(42).unwrap(); assert_eq!(nonce, 42); // Wrong nonce assert!(!proxy.recv_register_reply(99)); assert!(!proxy.is_registered()); // Correct nonce assert!(proxy.recv_register_reply(42)); assert!(proxy.is_registered()); } #[test] fn client_no_server() { let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); assert!(proxy.start_register(1).is_none()); } // ── Server tests ──────────────────────────────── #[test] fn server_register_client() { let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); let client_id = NodeId::from_bytes([0x02; 32]); assert!(proxy.register_client(client_id, addr(3000), 1)); assert!(proxy.is_client_registered(&client_id)); assert_eq!(proxy.client_count(), 1); } #[test] fn server_rejects_different_session() { let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); let client_id = NodeId::from_bytes([0x02; 32]); proxy.register_client(client_id, addr(3000), 1); assert!(!proxy.register_client(client_id, addr(3001), 2)); } #[test] fn server_expire_clients() { let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); let client_id = NodeId::from_bytes([0x02; 32]); proxy.clients.insert( client_id, ProxyClient { addr: addr(3000), session: 1, registered_at: Instant::now() - Duration::from_secs(600), }, ); proxy.refresh(); assert_eq!(proxy.client_count(), 0); } // ── Pending get tests ─────────────────────────── #[test] fn pending_get_flow() { let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); proxy.start_get(42, b"mykey".to_vec()); assert!(proxy.add_get_value(42, b"val1".to_vec())); assert!(proxy.add_get_value(42, b"val2".to_vec())); let vals = proxy.complete_get(42).unwrap(); assert_eq!(vals.len(), 2); assert_eq!(vals[0], b"val1"); assert_eq!(vals[1], b"val2"); } #[test] fn pending_get_unknown_nonce() { let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); assert!(!proxy.add_get_value(999, b"v".to_vec())); assert!(proxy.complete_get(999).is_none()); } }