aboutsummaryrefslogtreecommitdiffstats
path: root/src/proxy.rs
diff options
context:
space:
mode:
authormurilo ijanc2026-03-24 15:04:03 -0300
committermurilo ijanc2026-03-24 15:04:03 -0300
commit9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch)
tree53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/proxy.rs
downloadtesseras-dht-9821aabf0b50d2487b07502d3d2cd89e7d62bdbe.tar.gz
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support
Diffstat (limited to 'src/proxy.rs')
-rw-r--r--src/proxy.rs370
1 files changed, 370 insertions, 0 deletions
diff --git a/src/proxy.rs b/src/proxy.rs
new file mode 100644
index 0000000..aaa3827
--- /dev/null
+++ b/src/proxy.rs
@@ -0,0 +1,370 @@
+//! Proxy relay for symmetric NAT nodes.
+//!
+//! When a node is
+//! behind a symmetric NAT (where hole-punching fails),
+//! it registers with a global node that acts as a relay:
+//!
+//! - **Store**: the NAT'd node sends store requests to
+//! its proxy, which forwards them to the DHT.
+//! - **Get**: the NAT'd node sends get requests to its
+//! proxy, which performs the lookup and returns results.
+//! - **Dgram**: datagrams to/from NAT'd nodes are
+//! forwarded through the proxy.
+//! - **RDP**: reliable transport is also tunnelled
+//! through the proxy (type_proxy_rdp).
+
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::time::{Duration, Instant};
+
+use crate::id::NodeId;
+use crate::peers::PeerInfo;
+
+// ── Constants ────────────────────────────────────────
+
+/// Registration timeout.
+pub const PROXY_REGISTER_TIMEOUT: Duration = Duration::from_secs(2);
+
+/// Registration TTL.
+pub const PROXY_REGISTER_TTL: Duration = Duration::from_secs(300);
+
+/// Get request timeout.
+pub const PROXY_GET_TIMEOUT: Duration = Duration::from_secs(10);
+
+/// Maintenance timer interval.
+pub const PROXY_TIMER_INTERVAL: Duration = Duration::from_secs(30);
+
+/// RDP timeout for proxy connections.
+pub const PROXY_RDP_TIMEOUT: Duration = Duration::from_secs(30);
+
+/// RDP port for proxy store.
+pub const PROXY_STORE_PORT: u16 = 200;
+
+/// RDP port for proxy get.
+pub const PROXY_GET_PORT: u16 = 201;
+
+/// RDP port for proxy get reply.
+pub const PROXY_GET_REPLY_PORT: u16 = 202;
+
+// ── Client state ────────────────────────────────────
+
+/// A node registered as a client of this proxy.
+#[derive(Debug, Clone)]
+pub struct ProxyClient {
+ /// Client's address (behind NAT).
+ pub addr: SocketAddr,
+
+ /// DTUN session for validation.
+ pub session: u32,
+
+ /// When the client last registered/refreshed.
+ pub registered_at: Instant,
+}
+
+impl ProxyClient {
+ pub fn is_expired(&self) -> bool {
+ self.registered_at.elapsed() >= PROXY_REGISTER_TTL
+ }
+}
+
+// ── Pending get state ───────────────────────────────
+
+/// State for a pending proxy get request.
+#[derive(Debug)]
+pub struct PendingGet {
+ /// Key being looked up.
+ pub key: Vec<u8>,
+
+ /// Nonce for correlation.
+ pub nonce: u32,
+
+ /// When the request was sent.
+ pub sent_at: Instant,
+
+ /// Whether we've received a result.
+ pub completed: bool,
+
+ /// Collected values.
+ pub values: Vec<Vec<u8>>,
+}
+
+// ── Proxy ───────────────────────────────────────────
+
+/// Proxy relay for symmetric NAT traversal.
+///
+/// Acts in two roles:
+/// - **Client**: when we're behind symmetric NAT, we
+/// register with a proxy server and route operations
+/// through it.
+/// - **Server**: when we have a public IP, we accept
+/// registrations from NAT'd nodes and relay their
+/// traffic.
+pub struct Proxy {
+ /// Our proxy server (when we're a client).
+ server: Option<PeerInfo>,
+
+ /// Whether we've successfully registered with our
+ /// proxy server.
+ is_registered: bool,
+
+ /// Whether registration is in progress.
+ is_registering: bool,
+
+ /// Nonce for registration correlation.
+ register_nonce: u32,
+
+ /// Clients registered with us. Capped at 500.
+ clients: HashMap<NodeId, ProxyClient>,
+
+ /// Pending get requests. Capped at 100.
+ pending_gets: HashMap<u32, PendingGet>,
+}
+
+impl Proxy {
+ pub fn new(_id: NodeId) -> Self {
+ Self {
+ server: None,
+ is_registered: false,
+ is_registering: false,
+ register_nonce: 0,
+ clients: HashMap::new(),
+ pending_gets: HashMap::new(),
+ }
+ }
+
+ // ── Client role ─────────────────────────────────
+
+ /// Set the proxy server to use.
+ pub fn set_server(&mut self, server: PeerInfo) {
+ self.server = Some(server);
+ self.is_registered = false;
+ }
+
+ /// Get the current proxy server.
+ pub fn server(&self) -> Option<&PeerInfo> {
+ self.server.as_ref()
+ }
+
+ /// Whether we're registered with a proxy server.
+ pub fn is_registered(&self) -> bool {
+ self.is_registered
+ }
+
+ /// Start proxy registration. Returns the nonce to
+ /// include in the register message.
+ pub fn start_register(&mut self, nonce: u32) -> Option<u32> {
+ self.server.as_ref()?;
+ self.is_registering = true;
+ self.register_nonce = nonce;
+ Some(nonce)
+ }
+
+ /// Handle registration reply.
+ pub fn recv_register_reply(&mut self, nonce: u32) -> bool {
+ if nonce != self.register_nonce {
+ return false;
+ }
+ self.is_registering = false;
+ self.is_registered = true;
+ log::info!("Registered with proxy server");
+ true
+ }
+
+ // ── Server role ─────────────────────────────────
+
+ /// Register a client node (we act as their proxy).
+ pub fn register_client(
+ &mut self,
+ id: NodeId,
+ addr: SocketAddr,
+ session: u32,
+ ) -> bool {
+ const MAX_CLIENTS: usize = 500;
+ if self.clients.len() >= MAX_CLIENTS && !self.clients.contains_key(&id)
+ {
+ return false;
+ }
+ if let Some(existing) = self.clients.get(&id) {
+ if existing.session != session && !existing.is_expired() {
+ return false;
+ }
+ }
+ self.clients.insert(
+ id,
+ ProxyClient {
+ addr,
+ session,
+ registered_at: Instant::now(),
+ },
+ );
+ log::debug!("Proxy: registered client {id:?}");
+ true
+ }
+
+ /// Check if a node is registered as our client.
+ pub fn is_client_registered(&self, id: &NodeId) -> bool {
+ self.clients
+ .get(id)
+ .map(|c| !c.is_expired())
+ .unwrap_or(false)
+ }
+
+ /// Get a registered client's info.
+ pub fn get_client(&self, id: &NodeId) -> Option<&ProxyClient> {
+ self.clients.get(id).filter(|c| !c.is_expired())
+ }
+
+ /// Number of active proxy clients.
+ pub fn client_count(&self) -> usize {
+ self.clients.values().filter(|c| !c.is_expired()).count()
+ }
+
+ // ── Pending get management ──────────────────────
+
+ /// Start a proxied get request.
+ pub fn start_get(&mut self, nonce: u32, key: Vec<u8>) {
+ const MAX_GETS: usize = 100;
+ if self.pending_gets.len() >= MAX_GETS {
+ log::debug!("Proxy: too many pending gets");
+ return;
+ }
+ self.pending_gets.insert(
+ nonce,
+ PendingGet {
+ key,
+ nonce,
+ sent_at: Instant::now(),
+ completed: false,
+ values: Vec::new(),
+ },
+ );
+ }
+
+ /// Add a value to a pending get.
+ pub fn add_get_value(&mut self, nonce: u32, value: Vec<u8>) -> bool {
+ if let Some(pg) = self.pending_gets.get_mut(&nonce) {
+ pg.values.push(value);
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Complete a pending get request. Returns the
+ /// collected values.
+ pub fn complete_get(&mut self, nonce: u32) -> Option<Vec<Vec<u8>>> {
+ self.pending_gets.remove(&nonce).map(|pg| pg.values)
+ }
+
+ // ── Maintenance ─────────────────────────────────
+
+ /// Remove expired clients and timed-out gets.
+ pub fn refresh(&mut self) {
+ self.clients.retain(|_, c| !c.is_expired());
+
+ self.pending_gets
+ .retain(|_, pg| pg.sent_at.elapsed() < PROXY_GET_TIMEOUT);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn addr(port: u16) -> SocketAddr {
+ SocketAddr::from(([127, 0, 0, 1], port))
+ }
+
+ fn peer(byte: u8, port: u16) -> PeerInfo {
+ PeerInfo::new(NodeId::from_bytes([byte; 32]), addr(port))
+ }
+
+ // ── Client tests ────────────────────────────────
+
+ #[test]
+ fn client_register_flow() {
+ let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32]));
+
+ assert!(!proxy.is_registered());
+
+ proxy.set_server(peer(0x02, 5000));
+ let nonce = proxy.start_register(42).unwrap();
+ assert_eq!(nonce, 42);
+
+ // Wrong nonce
+ assert!(!proxy.recv_register_reply(99));
+ assert!(!proxy.is_registered());
+
+ // Correct nonce
+ assert!(proxy.recv_register_reply(42));
+ assert!(proxy.is_registered());
+ }
+
+ #[test]
+ fn client_no_server() {
+ let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32]));
+ assert!(proxy.start_register(1).is_none());
+ }
+
+ // ── Server tests ────────────────────────────────
+
+ #[test]
+ fn server_register_client() {
+ let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32]));
+ let client_id = NodeId::from_bytes([0x02; 32]);
+
+ assert!(proxy.register_client(client_id, addr(3000), 1));
+ assert!(proxy.is_client_registered(&client_id));
+ assert_eq!(proxy.client_count(), 1);
+ }
+
+ #[test]
+ fn server_rejects_different_session() {
+ let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32]));
+ let client_id = NodeId::from_bytes([0x02; 32]);
+
+ proxy.register_client(client_id, addr(3000), 1);
+ assert!(!proxy.register_client(client_id, addr(3001), 2));
+ }
+
+ #[test]
+ fn server_expire_clients() {
+ let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32]));
+ let client_id = NodeId::from_bytes([0x02; 32]);
+
+ proxy.clients.insert(
+ client_id,
+ ProxyClient {
+ addr: addr(3000),
+ session: 1,
+ registered_at: Instant::now() - Duration::from_secs(600),
+ },
+ );
+
+ proxy.refresh();
+ assert_eq!(proxy.client_count(), 0);
+ }
+
+ // ── Pending get tests ───────────────────────────
+
+ #[test]
+ fn pending_get_flow() {
+ let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32]));
+
+ proxy.start_get(42, b"mykey".to_vec());
+ assert!(proxy.add_get_value(42, b"val1".to_vec()));
+ assert!(proxy.add_get_value(42, b"val2".to_vec()));
+
+ let vals = proxy.complete_get(42).unwrap();
+ assert_eq!(vals.len(), 2);
+ assert_eq!(vals[0], b"val1");
+ assert_eq!(vals[1], b"val2");
+ }
+
+ #[test]
+ fn pending_get_unknown_nonce() {
+ let mut proxy = Proxy::new(NodeId::from_bytes([0x01; 32]));
+ assert!(!proxy.add_get_value(999, b"v".to_vec()));
+ assert!(proxy.complete_get(999).is_none());
+ }
+}