diff options
Diffstat (limited to 'src/proxy.rs')
| -rw-r--r-- | src/proxy.rs | 370 |
1 files changed, 370 insertions, 0 deletions
diff --git a/src/proxy.rs b/src/proxy.rs new file mode 100644 index 0000000..aaa3827 --- /dev/null +++ b/src/proxy.rs @@ -0,0 +1,370 @@ +//! Proxy relay for symmetric NAT nodes. +//! +//! When a node is +//! behind a symmetric NAT (where hole-punching fails), +//! it registers with a global node that acts as a relay: +//! +//! - **Store**: the NAT'd node sends store requests to +//! its proxy, which forwards them to the DHT. +//! - **Get**: the NAT'd node sends get requests to its +//! proxy, which performs the lookup and returns results. +//! - **Dgram**: datagrams to/from NAT'd nodes are +//! forwarded through the proxy. +//! - **RDP**: reliable transport is also tunnelled +//! through the proxy (type_proxy_rdp). + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use crate::id::NodeId; +use crate::peers::PeerInfo; + +// ── Constants ──────────────────────────────────────── + +/// Registration timeout. +pub const PROXY_REGISTER_TIMEOUT: Duration = Duration::from_secs(2); + +/// Registration TTL. +pub const PROXY_REGISTER_TTL: Duration = Duration::from_secs(300); + +/// Get request timeout. +pub const PROXY_GET_TIMEOUT: Duration = Duration::from_secs(10); + +/// Maintenance timer interval. +pub const PROXY_TIMER_INTERVAL: Duration = Duration::from_secs(30); + +/// RDP timeout for proxy connections. +pub const PROXY_RDP_TIMEOUT: Duration = Duration::from_secs(30); + +/// RDP port for proxy store. +pub const PROXY_STORE_PORT: u16 = 200; + +/// RDP port for proxy get. +pub const PROXY_GET_PORT: u16 = 201; + +/// RDP port for proxy get reply. +pub const PROXY_GET_REPLY_PORT: u16 = 202; + +// ── Client state ──────────────────────────────────── + +/// A node registered as a client of this proxy. +#[derive(Debug, Clone)] +pub struct ProxyClient { + /// Client's address (behind NAT). + pub addr: SocketAddr, + + /// DTUN session for validation. + pub session: u32, + + /// When the client last registered/refreshed. + pub registered_at: Instant, +} + +impl ProxyClient { + pub fn is_expired(&self) -> bool { + self.registered_at.elapsed() >= PROXY_REGISTER_TTL + } +} + +// ── Pending get state ─────────────────────────────── + +/// State for a pending proxy get request. +#[derive(Debug)] +pub struct PendingGet { + /// Key being looked up. + pub key: Vec<u8>, + + /// Nonce for correlation. + pub nonce: u32, + + /// When the request was sent. + pub sent_at: Instant, + + /// Whether we've received a result. + pub completed: bool, + + /// Collected values. + pub values: Vec<Vec<u8>>, +} + +// ── Proxy ─────────────────────────────────────────── + +/// Proxy relay for symmetric NAT traversal. +/// +/// Acts in two roles: +/// - **Client**: when we're behind symmetric NAT, we +/// register with a proxy server and route operations +/// through it. +/// - **Server**: when we have a public IP, we accept +/// registrations from NAT'd nodes and relay their +/// traffic. +pub struct Proxy { + /// Our proxy server (when we're a client). + server: Option<PeerInfo>, + + /// Whether we've successfully registered with our + /// proxy server. + is_registered: bool, + + /// Whether registration is in progress. + is_registering: bool, + + /// Nonce for registration correlation. + register_nonce: u32, + + /// Clients registered with us. Capped at 500. + clients: HashMap<NodeId, ProxyClient>, + + /// Pending get requests. Capped at 100. + pending_gets: HashMap<u32, PendingGet>, +} + +impl Proxy { + pub fn new(_id: NodeId) -> Self { + Self { + server: None, + is_registered: false, + is_registering: false, + register_nonce: 0, + clients: HashMap::new(), + pending_gets: HashMap::new(), + } + } + + // ── Client role ───────────────────────────────── + + /// Set the proxy server to use. + pub fn set_server(&mut self, server: PeerInfo) { + self.server = Some(server); + self.is_registered = false; + } + + /// Get the current proxy server. + pub fn server(&self) -> Option<&PeerInfo> { + self.server.as_ref() + } + + /// Whether we're registered with a proxy server. + pub fn is_registered(&self) -> bool { + self.is_registered + } + + /// Start proxy registration. Returns the nonce to + /// include in the register message. + pub fn start_register(&mut self, nonce: u32) -> Option<u32> { + self.server.as_ref()?; + self.is_registering = true; + self.register_nonce = nonce; + Some(nonce) + } + + /// Handle registration reply. + pub fn recv_register_reply(&mut self, nonce: u32) -> bool { + if nonce != self.register_nonce { + return false; + } + self.is_registering = false; + self.is_registered = true; + log::info!("Registered with proxy server"); + true + } + + // ── Server role ───────────────────────────────── + + /// Register a client node (we act as their proxy). + pub fn register_client( + &mut self, + id: NodeId, + addr: SocketAddr, + session: u32, + ) -> bool { + const MAX_CLIENTS: usize = 500; + if self.clients.len() >= MAX_CLIENTS && !self.clients.contains_key(&id) + { + return false; + } + if let Some(existing) = self.clients.get(&id) { + if existing.session != session && !existing.is_expired() { + return false; + } + } + self.clients.insert( + id, + ProxyClient { + addr, + session, + registered_at: Instant::now(), + }, + ); + log::debug!("Proxy: registered client {id:?}"); + true + } + + /// Check if a node is registered as our client. + pub fn is_client_registered(&self, id: &NodeId) -> bool { + self.clients + .get(id) + .map(|c| !c.is_expired()) + .unwrap_or(false) + } + + /// Get a registered client's info. + pub fn get_client(&self, id: &NodeId) -> Option<&ProxyClient> { + self.clients.get(id).filter(|c| !c.is_expired()) + } + + /// Number of active proxy clients. + pub fn client_count(&self) -> usize { + self.clients.values().filter(|c| !c.is_expired()).count() + } + + // ── Pending get management ────────────────────── + + /// Start a proxied get request. + pub fn start_get(&mut self, nonce: u32, key: Vec<u8>) { + const MAX_GETS: usize = 100; + if self.pending_gets.len() >= MAX_GETS { + log::debug!("Proxy: too many pending gets"); + return; + } + self.pending_gets.insert( + nonce, + PendingGet { + key, + nonce, + sent_at: Instant::now(), + completed: false, + values: Vec::new(), + }, + ); + } + + /// Add a value to a pending get. + pub fn add_get_value(&mut self, nonce: u32, value: Vec<u8>) -> bool { + if let Some(pg) = self.pending_gets.get_mut(&nonce) { + pg.values.push(value); + true + } else { + false + } + } + + /// Complete a pending get request. Returns the + /// collected values. + pub fn complete_get(&mut self, nonce: u32) -> Option<Vec<Vec<u8>>> { + self.pending_gets.remove(&nonce).map(|pg| pg.values) + } + + // ── Maintenance ───────────────────────────────── + + /// Remove expired clients and timed-out gets. + pub fn refresh(&mut self) { + self.clients.retain(|_, c| !c.is_expired()); + + self.pending_gets + .retain(|_, pg| pg.sent_at.elapsed() < PROXY_GET_TIMEOUT); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn addr(port: u16) -> SocketAddr { + SocketAddr::from(([127, 0, 0, 1], port)) + } + + fn peer(byte: u8, port: u16) -> PeerInfo { + PeerInfo::new(NodeId::from_bytes([byte; 32]), addr(port)) + } + + // ── Client tests ──────────────────────────────── + + #[test] + fn client_register_flow() { + let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); + + assert!(!proxy.is_registered()); + + proxy.set_server(peer(0x02, 5000)); + let nonce = proxy.start_register(42).unwrap(); + assert_eq!(nonce, 42); + + // Wrong nonce + assert!(!proxy.recv_register_reply(99)); + assert!(!proxy.is_registered()); + + // Correct nonce + assert!(proxy.recv_register_reply(42)); + assert!(proxy.is_registered()); + } + + #[test] + fn client_no_server() { + let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); + assert!(proxy.start_register(1).is_none()); + } + + // ── Server tests ──────────────────────────────── + + #[test] + fn server_register_client() { + let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); + let client_id = NodeId::from_bytes([0x02; 32]); + + assert!(proxy.register_client(client_id, addr(3000), 1)); + assert!(proxy.is_client_registered(&client_id)); + assert_eq!(proxy.client_count(), 1); + } + + #[test] + fn server_rejects_different_session() { + let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); + let client_id = NodeId::from_bytes([0x02; 32]); + + proxy.register_client(client_id, addr(3000), 1); + assert!(!proxy.register_client(client_id, addr(3001), 2)); + } + + #[test] + fn server_expire_clients() { + let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); + let client_id = NodeId::from_bytes([0x02; 32]); + + proxy.clients.insert( + client_id, + ProxyClient { + addr: addr(3000), + session: 1, + registered_at: Instant::now() - Duration::from_secs(600), + }, + ); + + proxy.refresh(); + assert_eq!(proxy.client_count(), 0); + } + + // ── Pending get tests ─────────────────────────── + + #[test] + fn pending_get_flow() { + let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); + + proxy.start_get(42, b"mykey".to_vec()); + assert!(proxy.add_get_value(42, b"val1".to_vec())); + assert!(proxy.add_get_value(42, b"val2".to_vec())); + + let vals = proxy.complete_get(42).unwrap(); + assert_eq!(vals.len(), 2); + assert_eq!(vals[0], b"val1"); + assert_eq!(vals[1], b"val2"); + } + + #[test] + fn pending_get_unknown_nonce() { + let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32])); + assert!(!proxy.add_get_value(999, b"v".to_vec())); + assert!(proxy.complete_get(999).is_none()); + } +} |