//! Network send helpers and query management. //! //! Extension impl block for Node. Contains send_signed, //! verify_incoming, send_find_node, send_store, query //! batch sending, and liveness probing. use std::net::SocketAddr; use std::time::{Duration, Instant}; use crate::dgram; use crate::dht::IterativeQuery; use crate::error::Error; use crate::id::NodeId; use crate::msg; use crate::nat::NatState; use crate::node::Node; use crate::peers::PeerInfo; use crate::wire::{DOMAIN_INET, DOMAIN_INET6, HEADER_SIZE, MsgHeader, MsgType}; impl Node { // ── Network ───────────────────────────────────── /// Local socket address. pub fn local_addr(&self) -> Result { self.net.local_addr() } /// Join the DHT network via a bootstrap node. /// /// Starts an iterative FIND_NODE for our own ID. /// The query is driven by subsequent `poll()` calls. /// DNS resolution is synchronous. /// /// # Example /// /// ```rust,no_run /// # let mut node = tesseras_dht::Node::bind(0).unwrap(); /// node.join("bootstrap.example.com", 10000).unwrap(); /// loop { node.poll().unwrap(); } /// ``` pub fn join(&mut self, host: &str, port: u16) -> Result<(), Error> { use std::net::ToSocketAddrs; let addr_str = format!("{host}:{port}"); let addr = addr_str .to_socket_addrs() .map_err(Error::Io)? .next() .ok_or_else(|| { Error::Io(std::io::Error::new( std::io::ErrorKind::NotFound, format!("no addresses for {addr_str}"), )) })?; log::info!("Joining via {addr}"); // Send DHT FIND_NODE to bootstrap (populates // DHT routing table via handle_dht_find_node_reply) self.send_find_node(addr, self.id)?; if self.is_dtun { // Send DTUN FIND_NODE to bootstrap too // (populates DTUN routing table) let nonce = self.alloc_nonce(); let domain = if addr.is_ipv4() { crate::wire::DOMAIN_INET } else { crate::wire::DOMAIN_INET6 }; let find = crate::msg::FindNodeMsg { nonce, target: self.id, domain, state: 0, }; let mut buf = [0u8; crate::msg::FIND_NODE_MSG_SIZE]; let hdr = MsgHeader::new( MsgType::DtunFindNode, Self::len16(crate::msg::FIND_NODE_MSG_SIZE), self.id, NodeId::from_bytes([0; 32]), ); if hdr.write(&mut buf).is_ok() { crate::msg::write_find_node(&mut buf, &find); if let Err(e) = self.send_signed(&buf, addr) { log::debug!("DTUN find_node send failed: {e}"); } } } // Start NAT detection if DTUN is enabled if self.is_dtun && !self.nat.is_complete() { let nonce = self.alloc_nonce(); self.nat.start_detect(nonce); // Send NatEcho to bootstrap let size = HEADER_SIZE + 4; let mut buf = vec![0u8; size]; let hdr = MsgHeader::new( MsgType::NatEcho, Self::len16(size), self.id, NodeId::from_bytes([0; crate::id::ID_LEN]), ); if hdr.write(&mut buf).is_ok() { buf[HEADER_SIZE..HEADER_SIZE + 4] .copy_from_slice(&nonce.to_be_bytes()); if let Err(e) = self.send_signed(&buf, addr) { log::warn!("Failed to send NatEcho: {e}"); } log::debug!("Sent NatEcho to {addr} nonce={nonce}"); } } Ok(()) } /// Start an iterative FIND_NODE query for a target. /// /// Returns the query nonce. The query is driven by /// `poll()` and completes when converged. pub fn start_find_node(&mut self, target: NodeId) -> Result { let nonce = self.alloc_nonce(); let mut query = IterativeQuery::find_node(target, nonce); // Seed with our closest known nodes let closest = self.dht_table.closest(&target, self.config.num_find_node); query.closest = closest; // Limit concurrent queries to prevent memory // exhaustion (max 100) const MAX_CONCURRENT_QUERIES: usize = 100; if self.queries.len() >= MAX_CONCURRENT_QUERIES { log::warn!("Too many concurrent queries, dropping"); return Err(Error::Timeout); } // Send initial batch self.send_query_batch(nonce)?; self.queries.insert(nonce, query); Ok(nonce) } /// Send a FIND_NODE message to a specific address. pub(crate) fn send_find_node( &mut self, to: SocketAddr, target: NodeId, ) -> Result { let nonce = self.alloc_nonce(); let domain = if to.is_ipv4() { DOMAIN_INET } else { DOMAIN_INET6 }; let find = msg::FindNodeMsg { nonce, target, domain, state: 0, }; let mut buf = [0u8; msg::FIND_NODE_MSG_SIZE]; let hdr = MsgHeader::new( MsgType::DhtFindNode, Self::len16(msg::FIND_NODE_MSG_SIZE), self.id, NodeId::from_bytes([0; crate::id::ID_LEN]), // unknown dst ); hdr.write(&mut buf)?; msg::write_find_node(&mut buf, &find); self.send_signed(&buf, to)?; log::debug!("Sent find_node to {to} target={target:?} nonce={nonce}"); Ok(nonce) } /// Send a STORE message to a specific peer. pub(crate) fn send_store( &mut self, peer: &PeerInfo, store: &msg::StoreMsg, ) -> Result<(), Error> { let total = HEADER_SIZE + msg::STORE_FIXED + store.key.len() + store.value.len(); let mut buf = vec![0u8; total]; let hdr = MsgHeader::new( MsgType::DhtStore, Self::len16(total), self.id, peer.id, ); hdr.write(&mut buf)?; msg::write_store(&mut buf, store)?; self.send_signed(&buf, peer.addr)?; log::debug!( "Sent store to {:?} key={} bytes", peer.id, store.key.len() ); Ok(()) } /// Send the next batch of queries for an active /// iterative query. Uses FIND_VALUE for find_value /// queries, FIND_NODE otherwise. pub(crate) fn send_query_batch(&mut self, nonce: u32) -> Result<(), Error> { let query = match self.queries.get(&nonce) { Some(q) => q, None => return Ok(()), }; let to_query = query.next_to_query(); let target = query.target; let is_find_value = query.is_find_value; let key = query.key.clone(); for peer in to_query { let result = if is_find_value { self.send_find_value_msg(peer.addr, target, &key) } else { self.send_find_node(peer.addr, target) }; if let Err(e) = result { log::debug!("Failed to send query to {:?}: {e}", peer.id); continue; } if let Some(q) = self.queries.get_mut(&nonce) { q.pending.insert(peer.id, Instant::now()); } } Ok(()) } /// Drive all active iterative queries: expire /// timeouts, send next batches, clean up finished. pub(crate) fn drive_queries(&mut self) { // Expire timed-out pending requests let nonces: Vec = self.queries.keys().copied().collect(); for nonce in &nonces { if let Some(q) = self.queries.get_mut(nonce) { q.expire_pending(); } } // Send next batch for active queries for nonce in &nonces { let is_active = self .queries .get(nonce) .map(|q| !q.is_done()) .unwrap_or(false); if is_active { if let Err(e) = self.send_query_batch(*nonce) { log::debug!("Query batch send failed: {e}"); } } } // Remove completed queries self.queries.retain(|nonce, q| { if q.is_done() { log::debug!( "Query nonce={nonce} complete: {} closest, {} hops, {}ms", q.closest.len(), q.hops, q.started_at.elapsed().as_millis(), ); self.metrics .lookups_completed .fetch_add(1, std::sync::atomic::Ordering::Relaxed); false } else { true } }); } /// Refresh stale routing table buckets by starting /// FIND_NODE queries for random IDs in each stale /// bucket's range. pub(crate) fn refresh_buckets(&mut self) { let threshold = self.config.refresh_interval; if self.last_refresh.elapsed() < threshold { return; } self.last_refresh = Instant::now(); let targets = self.dht_table.stale_bucket_targets(threshold); if targets.is_empty() { return; } // Limit to 3 refresh queries per cycle to avoid // flooding the query queue (256 empty buckets would // otherwise spawn 256 queries at once). let batch = targets.into_iter().take(3); log::debug!("Refreshing stale buckets"); for target in batch { if let Err(e) = self.start_find_node(target) { log::debug!("Refresh find_node failed: {e}"); break; // query queue full, stop } } } /// Ping LRU peers in each bucket to verify they're /// alive. Dead peers are removed from the routing /// table. Called alongside refresh_buckets. pub(crate) fn probe_liveness(&mut self) { let lru_peers = self.dht_table.lru_peers(); if lru_peers.is_empty() { return; } for peer in &lru_peers { // Skip if we've seen them recently if peer.last_seen.elapsed() < Duration::from_secs(30) { continue; } let nonce = self.alloc_nonce(); let size = msg::PING_MSG_SIZE; let mut buf = [0u8; msg::PING_MSG_SIZE]; let hdr = MsgHeader::new( MsgType::DhtPing, Self::len16(size), self.id, peer.id, ); if hdr.write(&mut buf).is_ok() { buf[HEADER_SIZE..HEADER_SIZE + 4] .copy_from_slice(&nonce.to_be_bytes()); let _ = self.send_signed(&buf, peer.addr); self.pending_pings.insert(nonce, (peer.id, Instant::now())); } } } /// Try to send queued datagrams whose destinations /// are now in the peer store. pub(crate) fn drain_send_queue(&mut self) { let known_ids: Vec = self.peers.ids(); for dst_id in known_ids { if !self.send_queue.has_pending(&dst_id) { continue; } let peer = match self.peers.get(&dst_id).cloned() { Some(p) => p, None => continue, }; let queued = self.send_queue.drain(&dst_id); for item in queued { self.send_dgram_raw(&item.data, &peer); } } } /// Handle an incoming Dgram message: reassemble /// fragments and invoke the dgram callback. pub(crate) fn handle_dgram(&mut self, buf: &[u8], hdr: &MsgHeader) { let payload = &buf[HEADER_SIZE..]; let (total, index, frag_data) = match dgram::parse_fragment(payload) { Some(v) => v, None => return, }; let complete = self.reassembler .feed(hdr.src, total, index, frag_data.to_vec()); if let Some(data) = complete { log::debug!( "Dgram reassembled: {} bytes from {:?}", data.len(), hdr.src ); if let Some(ref cb) = self.dgram_callback { cb(&data, &hdr.src); } } } /// Flush pending RDP output for a connection, /// sending packets via UDP. pub(crate) fn flush_rdp_output(&mut self, desc: i32) { let output = match self.rdp.pending_output(desc) { Some(o) => o, None => return, }; // Determine target address and message type let (send_addr, msg_type) = if self.nat.state() == NatState::SymmetricNat { // Route through proxy if let Some(server) = self.proxy.server() { (server.addr, MsgType::ProxyRdp) } else if let Some(peer) = self.peers.get(&output.dst) { (peer.addr, MsgType::Rdp) } else { log::debug!("RDP: no route for {:?}", output.dst); return; } } else if let Some(peer) = self.peers.get(&output.dst) { (peer.addr, MsgType::Rdp) } else { log::debug!("RDP: no address for {:?}", output.dst); return; }; for pkt in &output.packets { let rdp_wire = crate::rdp::build_rdp_wire( pkt.flags, output.sport, output.dport, pkt.seqnum, pkt.acknum, &pkt.data, ); let total = HEADER_SIZE + rdp_wire.len(); let mut buf = vec![0u8; total]; let hdr = MsgHeader::new( msg_type, Self::len16(total), self.id, output.dst, ); if hdr.write(&mut buf).is_ok() { buf[HEADER_SIZE..].copy_from_slice(&rdp_wire); let _ = self.send_signed(&buf, send_addr); } } } /// Flush pending RDP output for all connections. pub(crate) fn flush_all_rdp(&mut self) { let descs = self.rdp.descriptors(); for desc in descs { self.flush_rdp_output(desc); } } /// Handle an incoming RDP packet. pub(crate) fn handle_rdp(&mut self, buf: &[u8], hdr: &MsgHeader) { let payload = &buf[HEADER_SIZE..]; let wire = match crate::rdp::parse_rdp_wire(payload) { Some(w) => w, None => return, }; log::debug!( "RDP from {:?}: flags=0x{:02x} \ sport={} dport={} \ seq={} ack={} \ data={} bytes", hdr.src, wire.flags, wire.sport, wire.dport, wire.seqnum, wire.acknum, wire.data.len() ); let input = crate::rdp::RdpInput { src: hdr.src, sport: wire.sport, dport: wire.dport, flags: wire.flags, seqnum: wire.seqnum, acknum: wire.acknum, data: wire.data, }; let actions = self.rdp.input(&input); for action in actions { match action { crate::rdp::RdpAction::Event { desc, ref addr, event, } => { log::info!("RDP event: desc={desc} {:?}", event); // Invoke app callback if let Some(ref cb) = self.rdp_callback { cb(desc, addr, event); } // After accept/connect, flush SYN-ACK/ACK self.flush_rdp_output(desc); } crate::rdp::RdpAction::Close(desc) => { self.rdp.close(desc); } } } } /// Register this node with DTUN (for NAT traversal). /// /// Sends DTUN_REGISTER to the k-closest global nodes /// so other peers can find us via hole-punching. pub(crate) fn dtun_register(&mut self) { let (session, closest) = self.dtun.prepare_register(); log::info!( "DTUN register: session={session}, {} targets", closest.len() ); for peer in &closest { let size = HEADER_SIZE + 4; let mut buf = vec![0u8; size]; let hdr = MsgHeader::new( MsgType::DtunRegister, Self::len16(size), self.id, peer.id, ); if hdr.write(&mut buf).is_ok() { buf[HEADER_SIZE..HEADER_SIZE + 4] .copy_from_slice(&session.to_be_bytes()); let _ = self.send_signed(&buf, peer.addr); } } self.dtun.registration_done(); } /// Send a packet with Ed25519 signature appended. /// /// Appends 64-byte signature after the packet body. /// All outgoing packets go through this method. pub(crate) fn send_signed( &self, buf: &[u8], to: SocketAddr, ) -> Result { let mut signed = buf.to_vec(); crate::wire::sign_packet(&mut signed, &self.identity); self.metrics .messages_sent .fetch_add(1, std::sync::atomic::Ordering::Relaxed); self.metrics.bytes_sent.fetch_add( signed.len() as u64, std::sync::atomic::Ordering::Relaxed, ); // Note: reply sends use `let _ =` (best-effort). // Critical sends (join, put, DTUN register) use // `if let Err(e) = ... { log::warn! }`. self.net.send_to(&signed, to) } /// Proactively check node activity by pinging peers /// in the routing table. Peers that fail to respond /// accumulate failures in the ban list. This keeps /// the routing table healthy by detecting dead nodes /// before queries need them. pub(crate) fn check_node_activity(&mut self) { if self.last_activity_check.elapsed() < self.config.activity_check_interval { return; } self.last_activity_check = Instant::now(); let lru_peers = self.dht_table.lru_peers(); if lru_peers.is_empty() { return; } let mut pinged = 0u32; for peer in &lru_peers { // Only ping peers not seen for >60s if peer.last_seen.elapsed() < Duration::from_secs(60) { continue; } // Skip banned peers if self.ban_list.is_banned(&peer.addr) { continue; } let nonce = self.alloc_nonce(); let size = crate::msg::PING_MSG_SIZE; let mut buf = [0u8; crate::msg::PING_MSG_SIZE]; let hdr = MsgHeader::new( MsgType::DhtPing, Self::len16(size), self.id, peer.id, ); if hdr.write(&mut buf).is_ok() { buf[HEADER_SIZE..HEADER_SIZE + 4] .copy_from_slice(&nonce.to_be_bytes()); let _ = self.send_signed(&buf, peer.addr); self.pending_pings.insert(nonce, (peer.id, Instant::now())); pinged += 1; } } if pinged > 0 { log::debug!("Activity check: pinged {pinged} peers"); } } /// Sweep timed-out stores and retry with alternative /// peers. Failed peers accumulate ban list failures. pub(crate) fn retry_failed_stores(&mut self) { if self.last_store_retry.elapsed() < self.config.store_retry_interval { return; } self.last_store_retry = Instant::now(); let retries = self.store_tracker.collect_timeouts(); if retries.is_empty() { return; } log::debug!("Store retry: {} timed-out stores", retries.len()); for retry in &retries { // Record failure for the peer that didn't ack if let Some(peer_info) = self.peers.get(&retry.failed_peer) { self.ban_list.record_failure(peer_info.addr); } // Find alternative peers (excluding the failed one) let closest = self .dht_table .closest(&retry.target, self.config.num_find_node); let store_msg = crate::msg::StoreMsg { id: retry.target, from: self.id, key: retry.key.clone(), value: retry.value.clone(), ttl: retry.ttl, is_unique: retry.is_unique, }; let mut sent = false; for peer in &closest { if peer.id == retry.failed_peer { continue; } if self.ban_list.is_banned(&peer.addr) { continue; } if let Err(e) = self.send_store(peer, &store_msg) { log::debug!("Store retry send failed: {e}"); continue; } self.store_tracker.track( retry.target, retry.key.clone(), retry.value.clone(), retry.ttl, retry.is_unique, peer.clone(), ); sent = true; break; // one alternative peer is enough per retry } if !sent { log::debug!( "Store retry: no alternative peer for key ({} bytes)", retry.key.len() ); } } } /// Verify Ed25519 signature on an incoming packet. /// /// Since NodeId = Ed25519 public key, the src field /// in the header IS the public key. The signature /// proves the sender holds the private key for that /// NodeId. /// /// Additionally, if we already know this peer, verify /// the source address matches to prevent IP spoofing. pub(crate) fn verify_incoming<'a>( &self, buf: &'a [u8], from: std::net::SocketAddr, ) -> Option<&'a [u8]> { if buf.len() < HEADER_SIZE + crate::crypto::SIGNATURE_SIZE { log::trace!("Rejecting unsigned packet ({} bytes)", buf.len()); return None; } // NodeId IS the public key — verify signature let src = NodeId::read_from(&buf[8..8 + crate::id::ID_LEN]); // Reject zero NodeId if src.is_zero() { log::trace!("Rejecting packet from zero NodeId"); return None; } let pubkey = src.as_bytes(); if !crate::wire::verify_packet(buf, pubkey) { log::trace!("Signature verification failed"); self.metrics .packets_rejected .fetch_add(1, std::sync::atomic::Ordering::Relaxed); return None; } // If we know this peer, verify source address // (prevents IP spoofing with valid signatures) if let Some(known) = self.peers.get(&src) { if known.addr != from { log::debug!( "Peer {:?} address mismatch: known={} got={}", src, known.addr, from ); // Allow — peer may have changed IP (NAT rebind) // but log for monitoring } } let payload_end = buf.len() - crate::crypto::SIGNATURE_SIZE; Some(&buf[..payload_end]) } }