diff options
| author | murilo ijanc | 2026-03-24 15:04:03 -0300 |
|---|---|---|
| committer | murilo ijanc | 2026-03-24 15:04:03 -0300 |
| commit | 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch) | |
| tree | 53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/net.rs | |
| download | tesseras-dht-0.1.0.tar.gz | |
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks.
Features:
- Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE)
- NAT traversal via DTUN hole-punching and proxy relay
- Reliable Datagram Protocol (RDP) with 7-state connection machine
- Datagram transport with automatic fragmentation/reassembly
- Ed25519 packet authentication
- 256-bit node IDs (Ed25519 public keys)
- Rate limiting, ban list, and eclipse attack mitigation
- Persistence and metrics
- OpenBSD and Linux support
Diffstat (limited to 'src/net.rs')
| -rw-r--r-- | src/net.rs | 744 |
1 files changed, 744 insertions, 0 deletions
diff --git a/src/net.rs b/src/net.rs new file mode 100644 index 0000000..aa6d2a7 --- /dev/null +++ b/src/net.rs @@ -0,0 +1,744 @@ +//! Network send helpers and query management. +//! +//! Extension impl block for Node. Contains send_signed, +//! verify_incoming, send_find_node, send_store, query +//! batch sending, and liveness probing. + +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use crate::dgram; +use crate::dht::IterativeQuery; +use crate::error::Error; +use crate::id::NodeId; +use crate::msg; +use crate::nat::NatState; +use crate::node::Node; +use crate::peers::PeerInfo; +use crate::wire::{DOMAIN_INET, DOMAIN_INET6, HEADER_SIZE, MsgHeader, MsgType}; + +impl Node { + // ── Network ───────────────────────────────────── + + /// Local socket address. + pub fn local_addr(&self) -> Result<SocketAddr, Error> { + self.net.local_addr() + } + + /// Join the DHT network via a bootstrap node. + /// + /// Starts an iterative FIND_NODE for our own ID. + /// The query is driven by subsequent `poll()` calls. + /// DNS resolution is synchronous. + /// + /// # Example + /// + /// ```rust,no_run + /// # let mut node = tesseras_dht::Node::bind(0).unwrap(); + /// node.join("bootstrap.example.com", 10000).unwrap(); + /// loop { node.poll().unwrap(); } + /// ``` + pub fn join(&mut self, host: &str, port: u16) -> Result<(), Error> { + use std::net::ToSocketAddrs; + + let addr_str = format!("{host}:{port}"); + let addr = addr_str + .to_socket_addrs() + .map_err(Error::Io)? + .next() + .ok_or_else(|| { + Error::Io(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("no addresses for {addr_str}"), + )) + })?; + + log::info!("Joining via {addr}"); + + // Send DHT FIND_NODE to bootstrap (populates + // DHT routing table via handle_dht_find_node_reply) + self.send_find_node(addr, self.id)?; + + if self.is_dtun { + // Send DTUN FIND_NODE to bootstrap too + // (populates DTUN routing table) + let nonce = self.alloc_nonce(); + let domain = if addr.is_ipv4() { + crate::wire::DOMAIN_INET + } else { + crate::wire::DOMAIN_INET6 + }; + let find = crate::msg::FindNodeMsg { + nonce, + target: self.id, + domain, + state: 0, + }; + let mut buf = [0u8; crate::msg::FIND_NODE_MSG_SIZE]; + let hdr = MsgHeader::new( + MsgType::DtunFindNode, + Self::len16(crate::msg::FIND_NODE_MSG_SIZE), + self.id, + NodeId::from_bytes([0; 32]), + ); + if hdr.write(&mut buf).is_ok() { + crate::msg::write_find_node(&mut buf, &find); + if let Err(e) = self.send_signed(&buf, addr) { + log::debug!("DTUN find_node send failed: {e}"); + } + } + } + + // Start NAT detection if DTUN is enabled + if self.is_dtun && !self.nat.is_complete() { + let nonce = self.alloc_nonce(); + self.nat.start_detect(nonce); + + // Send NatEcho to bootstrap + let size = HEADER_SIZE + 4; + let mut buf = vec![0u8; size]; + let hdr = MsgHeader::new( + MsgType::NatEcho, + Self::len16(size), + self.id, + NodeId::from_bytes([0; crate::id::ID_LEN]), + ); + if hdr.write(&mut buf).is_ok() { + buf[HEADER_SIZE..HEADER_SIZE + 4] + .copy_from_slice(&nonce.to_be_bytes()); + if let Err(e) = self.send_signed(&buf, addr) { + log::warn!("Failed to send NatEcho: {e}"); + } + log::debug!("Sent NatEcho to {addr} nonce={nonce}"); + } + } + + Ok(()) + } + + /// Start an iterative FIND_NODE query for a target. + /// + /// Returns the query nonce. The query is driven by + /// `poll()` and completes when converged. + pub fn start_find_node(&mut self, target: NodeId) -> Result<u32, Error> { + let nonce = self.alloc_nonce(); + let mut query = IterativeQuery::find_node(target, nonce); + + // Seed with our closest known nodes + let closest = + self.dht_table.closest(&target, self.config.num_find_node); + query.closest = closest; + + // Limit concurrent queries to prevent memory + // exhaustion (max 100) + const MAX_CONCURRENT_QUERIES: usize = 100; + if self.queries.len() >= MAX_CONCURRENT_QUERIES { + log::warn!("Too many concurrent queries, dropping"); + return Err(Error::Timeout); + } + + // Send initial batch + self.send_query_batch(nonce)?; + + self.queries.insert(nonce, query); + Ok(nonce) + } + + /// Send a FIND_NODE message to a specific address. + pub(crate) fn send_find_node( + &mut self, + to: SocketAddr, + target: NodeId, + ) -> Result<u32, Error> { + let nonce = self.alloc_nonce(); + let domain = if to.is_ipv4() { + DOMAIN_INET + } else { + DOMAIN_INET6 + }; + + let find = msg::FindNodeMsg { + nonce, + target, + domain, + state: 0, + }; + + let mut buf = [0u8; msg::FIND_NODE_MSG_SIZE]; + let hdr = MsgHeader::new( + MsgType::DhtFindNode, + Self::len16(msg::FIND_NODE_MSG_SIZE), + self.id, + NodeId::from_bytes([0; crate::id::ID_LEN]), // unknown dst + ); + hdr.write(&mut buf)?; + msg::write_find_node(&mut buf, &find); + self.send_signed(&buf, to)?; + + log::debug!("Sent find_node to {to} target={target:?} nonce={nonce}"); + Ok(nonce) + } + + /// Send a STORE message to a specific peer. + pub(crate) fn send_store( + &mut self, + peer: &PeerInfo, + store: &msg::StoreMsg, + ) -> Result<(), Error> { + let total = HEADER_SIZE + + msg::STORE_FIXED + + store.key.len() + + store.value.len(); + let mut buf = vec![0u8; total]; + let hdr = MsgHeader::new( + MsgType::DhtStore, + Self::len16(total), + self.id, + peer.id, + ); + hdr.write(&mut buf)?; + msg::write_store(&mut buf, store)?; + self.send_signed(&buf, peer.addr)?; + log::debug!( + "Sent store to {:?} key={} bytes", + peer.id, + store.key.len() + ); + Ok(()) + } + + /// Send the next batch of queries for an active + /// iterative query. Uses FIND_VALUE for find_value + /// queries, FIND_NODE otherwise. + pub(crate) fn send_query_batch(&mut self, nonce: u32) -> Result<(), Error> { + let query = match self.queries.get(&nonce) { + Some(q) => q, + None => return Ok(()), + }; + + let to_query = query.next_to_query(); + let target = query.target; + let is_find_value = query.is_find_value; + let key = query.key.clone(); + + for peer in to_query { + let result = if is_find_value { + self.send_find_value_msg(peer.addr, target, &key) + } else { + self.send_find_node(peer.addr, target) + }; + + if let Err(e) = result { + log::debug!("Failed to send query to {:?}: {e}", peer.id); + continue; + } + if let Some(q) = self.queries.get_mut(&nonce) { + q.pending.insert(peer.id, Instant::now()); + } + } + Ok(()) + } + + /// Drive all active iterative queries: expire + /// timeouts, send next batches, clean up finished. + pub(crate) fn drive_queries(&mut self) { + // Expire timed-out pending requests + let nonces: Vec<u32> = self.queries.keys().copied().collect(); + + for nonce in &nonces { + if let Some(q) = self.queries.get_mut(nonce) { + q.expire_pending(); + } + } + + // Send next batch for active queries + for nonce in &nonces { + let is_active = self + .queries + .get(nonce) + .map(|q| !q.is_done()) + .unwrap_or(false); + + if is_active { + if let Err(e) = self.send_query_batch(*nonce) { + log::debug!("Query batch send failed: {e}"); + } + } + } + + // Remove completed queries + self.queries.retain(|nonce, q| { + if q.is_done() { + log::debug!( + "Query nonce={nonce} complete: {} closest, {} hops, {}ms", + q.closest.len(), + q.hops, + q.started_at.elapsed().as_millis(), + ); + self.metrics + .lookups_completed + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + false + } else { + true + } + }); + } + + /// Refresh stale routing table buckets by starting + /// FIND_NODE queries for random IDs in each stale + /// bucket's range. + pub(crate) fn refresh_buckets(&mut self) { + let threshold = self.config.refresh_interval; + + if self.last_refresh.elapsed() < threshold { + return; + } + self.last_refresh = Instant::now(); + + let targets = self.dht_table.stale_bucket_targets(threshold); + + if targets.is_empty() { + return; + } + + // Limit to 3 refresh queries per cycle to avoid + // flooding the query queue (256 empty buckets would + // otherwise spawn 256 queries at once). + let batch = targets.into_iter().take(3); + log::debug!("Refreshing stale buckets"); + + for target in batch { + if let Err(e) = self.start_find_node(target) { + log::debug!("Refresh find_node failed: {e}"); + break; // query queue full, stop + } + } + } + + /// Ping LRU peers in each bucket to verify they're + /// alive. Dead peers are removed from the routing + /// table. Called alongside refresh_buckets. + pub(crate) fn probe_liveness(&mut self) { + let lru_peers = self.dht_table.lru_peers(); + if lru_peers.is_empty() { + return; + } + + for peer in &lru_peers { + // Skip if we've seen them recently + if peer.last_seen.elapsed() < Duration::from_secs(30) { + continue; + } + let nonce = self.alloc_nonce(); + let size = msg::PING_MSG_SIZE; + let mut buf = [0u8; msg::PING_MSG_SIZE]; + let hdr = MsgHeader::new( + MsgType::DhtPing, + Self::len16(size), + self.id, + peer.id, + ); + if hdr.write(&mut buf).is_ok() { + buf[HEADER_SIZE..HEADER_SIZE + 4] + .copy_from_slice(&nonce.to_be_bytes()); + let _ = self.send_signed(&buf, peer.addr); + self.pending_pings.insert(nonce, (peer.id, Instant::now())); + } + } + } + + /// Try to send queued datagrams whose destinations + /// are now in the peer store. + pub(crate) fn drain_send_queue(&mut self) { + let known_ids: Vec<crate::id::NodeId> = self.peers.ids(); + + for dst_id in known_ids { + if !self.send_queue.has_pending(&dst_id) { + continue; + } + let peer = match self.peers.get(&dst_id).cloned() { + Some(p) => p, + None => continue, + }; + let queued = self.send_queue.drain(&dst_id); + for item in queued { + self.send_dgram_raw(&item.data, &peer); + } + } + } + + /// Handle an incoming Dgram message: reassemble + /// fragments and invoke the dgram callback. + pub(crate) fn handle_dgram(&mut self, buf: &[u8], hdr: &MsgHeader) { + let payload = &buf[HEADER_SIZE..]; + + let (total, index, frag_data) = match dgram::parse_fragment(payload) { + Some(v) => v, + None => return, + }; + + let complete = + self.reassembler + .feed(hdr.src, total, index, frag_data.to_vec()); + + if let Some(data) = complete { + log::debug!( + "Dgram reassembled: {} bytes from {:?}", + data.len(), + hdr.src + ); + if let Some(ref cb) = self.dgram_callback { + cb(&data, &hdr.src); + } + } + } + + /// Flush pending RDP output for a connection, + /// sending packets via UDP. + pub(crate) fn flush_rdp_output(&mut self, desc: i32) { + let output = match self.rdp.pending_output(desc) { + Some(o) => o, + None => return, + }; + + // Determine target address and message type + let (send_addr, msg_type) = + if self.nat.state() == NatState::SymmetricNat { + // Route through proxy + if let Some(server) = self.proxy.server() { + (server.addr, MsgType::ProxyRdp) + } else if let Some(peer) = self.peers.get(&output.dst) { + (peer.addr, MsgType::Rdp) + } else { + log::debug!("RDP: no route for {:?}", output.dst); + return; + } + } else if let Some(peer) = self.peers.get(&output.dst) { + (peer.addr, MsgType::Rdp) + } else { + log::debug!("RDP: no address for {:?}", output.dst); + return; + }; + + for pkt in &output.packets { + let rdp_wire = crate::rdp::build_rdp_wire( + pkt.flags, + output.sport, + output.dport, + pkt.seqnum, + pkt.acknum, + &pkt.data, + ); + + let total = HEADER_SIZE + rdp_wire.len(); + let mut buf = vec![0u8; total]; + let hdr = MsgHeader::new( + msg_type, + Self::len16(total), + self.id, + output.dst, + ); + if hdr.write(&mut buf).is_ok() { + buf[HEADER_SIZE..].copy_from_slice(&rdp_wire); + let _ = self.send_signed(&buf, send_addr); + } + } + } + + /// Flush pending RDP output for all connections. + pub(crate) fn flush_all_rdp(&mut self) { + let descs = self.rdp.descriptors(); + for desc in descs { + self.flush_rdp_output(desc); + } + } + + /// Handle an incoming RDP packet. + pub(crate) fn handle_rdp(&mut self, buf: &[u8], hdr: &MsgHeader) { + let payload = &buf[HEADER_SIZE..]; + let wire = match crate::rdp::parse_rdp_wire(payload) { + Some(w) => w, + None => return, + }; + + log::debug!( + "RDP from {:?}: flags=0x{:02x} \ + sport={} dport={} \ + seq={} ack={} \ + data={} bytes", + hdr.src, + wire.flags, + wire.sport, + wire.dport, + wire.seqnum, + wire.acknum, + wire.data.len() + ); + + let input = crate::rdp::RdpInput { + src: hdr.src, + sport: wire.sport, + dport: wire.dport, + flags: wire.flags, + seqnum: wire.seqnum, + acknum: wire.acknum, + data: wire.data, + }; + let actions = self.rdp.input(&input); + + for action in actions { + match action { + crate::rdp::RdpAction::Event { + desc, + ref addr, + event, + } => { + log::info!("RDP event: desc={desc} {:?}", event); + + // Invoke app callback + if let Some(ref cb) = self.rdp_callback { + cb(desc, addr, event); + } + + // After accept/connect, flush SYN-ACK/ACK + self.flush_rdp_output(desc); + } + crate::rdp::RdpAction::Close(desc) => { + self.rdp.close(desc); + } + } + } + } + + /// Register this node with DTUN (for NAT traversal). + /// + /// Sends DTUN_REGISTER to the k-closest global nodes + /// so other peers can find us via hole-punching. + pub(crate) fn dtun_register(&mut self) { + let (session, closest) = self.dtun.prepare_register(); + log::info!( + "DTUN register: session={session}, {} targets", + closest.len() + ); + + for peer in &closest { + let size = HEADER_SIZE + 4; + let mut buf = vec![0u8; size]; + let hdr = MsgHeader::new( + MsgType::DtunRegister, + Self::len16(size), + self.id, + peer.id, + ); + if hdr.write(&mut buf).is_ok() { + buf[HEADER_SIZE..HEADER_SIZE + 4] + .copy_from_slice(&session.to_be_bytes()); + let _ = self.send_signed(&buf, peer.addr); + } + } + + self.dtun.registration_done(); + } + + /// Send a packet with Ed25519 signature appended. + /// + /// Appends 64-byte signature after the packet body. + /// All outgoing packets go through this method. + pub(crate) fn send_signed( + &self, + buf: &[u8], + to: SocketAddr, + ) -> Result<usize, Error> { + let mut signed = buf.to_vec(); + crate::wire::sign_packet(&mut signed, &self.identity); + self.metrics + .messages_sent + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.metrics.bytes_sent.fetch_add( + signed.len() as u64, + std::sync::atomic::Ordering::Relaxed, + ); + // Note: reply sends use `let _ =` (best-effort). + // Critical sends (join, put, DTUN register) use + // `if let Err(e) = ... { log::warn! }`. + self.net.send_to(&signed, to) + } + + /// Proactively check node activity by pinging peers + /// in the routing table. Peers that fail to respond + /// accumulate failures in the ban list. This keeps + /// the routing table healthy by detecting dead nodes + /// before queries need them. + pub(crate) fn check_node_activity(&mut self) { + if self.last_activity_check.elapsed() + < self.config.activity_check_interval + { + return; + } + self.last_activity_check = Instant::now(); + + let lru_peers = self.dht_table.lru_peers(); + if lru_peers.is_empty() { + return; + } + + let mut pinged = 0u32; + for peer in &lru_peers { + // Only ping peers not seen for >60s + if peer.last_seen.elapsed() < Duration::from_secs(60) { + continue; + } + // Skip banned peers + if self.ban_list.is_banned(&peer.addr) { + continue; + } + + let nonce = self.alloc_nonce(); + let size = crate::msg::PING_MSG_SIZE; + let mut buf = [0u8; crate::msg::PING_MSG_SIZE]; + let hdr = MsgHeader::new( + MsgType::DhtPing, + Self::len16(size), + self.id, + peer.id, + ); + if hdr.write(&mut buf).is_ok() { + buf[HEADER_SIZE..HEADER_SIZE + 4] + .copy_from_slice(&nonce.to_be_bytes()); + let _ = self.send_signed(&buf, peer.addr); + self.pending_pings.insert(nonce, (peer.id, Instant::now())); + pinged += 1; + } + } + + if pinged > 0 { + log::debug!("Activity check: pinged {pinged} peers"); + } + } + + /// Sweep timed-out stores and retry with alternative + /// peers. Failed peers accumulate ban list failures. + pub(crate) fn retry_failed_stores(&mut self) { + if self.last_store_retry.elapsed() < self.config.store_retry_interval { + return; + } + self.last_store_retry = Instant::now(); + + let retries = self.store_tracker.collect_timeouts(); + if retries.is_empty() { + return; + } + + log::debug!("Store retry: {} timed-out stores", retries.len()); + + for retry in &retries { + // Record failure for the peer that didn't ack + if let Some(peer_info) = self.peers.get(&retry.failed_peer) { + self.ban_list.record_failure(peer_info.addr); + } + + // Find alternative peers (excluding the failed one) + let closest = self + .dht_table + .closest(&retry.target, self.config.num_find_node); + + let store_msg = crate::msg::StoreMsg { + id: retry.target, + from: self.id, + key: retry.key.clone(), + value: retry.value.clone(), + ttl: retry.ttl, + is_unique: retry.is_unique, + }; + + let mut sent = false; + for peer in &closest { + if peer.id == retry.failed_peer { + continue; + } + if self.ban_list.is_banned(&peer.addr) { + continue; + } + if let Err(e) = self.send_store(peer, &store_msg) { + log::debug!("Store retry send failed: {e}"); + continue; + } + self.store_tracker.track( + retry.target, + retry.key.clone(), + retry.value.clone(), + retry.ttl, + retry.is_unique, + peer.clone(), + ); + sent = true; + break; // one alternative peer is enough per retry + } + + if !sent { + log::debug!( + "Store retry: no alternative peer for key ({} bytes)", + retry.key.len() + ); + } + } + } + + /// Verify Ed25519 signature on an incoming packet. + /// + /// Since NodeId = Ed25519 public key, the src field + /// in the header IS the public key. The signature + /// proves the sender holds the private key for that + /// NodeId. + /// + /// Additionally, if we already know this peer, verify + /// the source address matches to prevent IP spoofing. + pub(crate) fn verify_incoming<'a>( + &self, + buf: &'a [u8], + from: std::net::SocketAddr, + ) -> Option<&'a [u8]> { + if buf.len() < HEADER_SIZE + crate::crypto::SIGNATURE_SIZE { + log::trace!("Rejecting unsigned packet ({} bytes)", buf.len()); + return None; + } + + // NodeId IS the public key — verify signature + let src = NodeId::read_from(&buf[8..8 + crate::id::ID_LEN]); + + // Reject zero NodeId + if src.is_zero() { + log::trace!("Rejecting packet from zero NodeId"); + return None; + } + + let pubkey = src.as_bytes(); + + if !crate::wire::verify_packet(buf, pubkey) { + log::trace!("Signature verification failed"); + self.metrics + .packets_rejected + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + return None; + } + + // If we know this peer, verify source address + // (prevents IP spoofing with valid signatures) + if let Some(known) = self.peers.get(&src) { + if known.addr != from { + log::debug!( + "Peer {:?} address mismatch: known={} got={}", + src, + known.addr, + from + ); + // Allow — peer may have changed IP (NAT rebind) + // but log for monitoring + } + } + + let payload_end = buf.len() - crate::crypto::SIGNATURE_SIZE; + Some(&buf[..payload_end]) + } +} |