aboutsummaryrefslogtreecommitdiffstats
path: root/src/net.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.rs')
-rw-r--r--src/net.rs744
1 files changed, 744 insertions, 0 deletions
diff --git a/src/net.rs b/src/net.rs
new file mode 100644
index 0000000..aa6d2a7
--- /dev/null
+++ b/src/net.rs
@@ -0,0 +1,744 @@
+//! Network send helpers and query management.
+//!
+//! Extension impl block for Node. Contains send_signed,
+//! verify_incoming, send_find_node, send_store, query
+//! batch sending, and liveness probing.
+
+use std::net::SocketAddr;
+use std::time::{Duration, Instant};
+
+use crate::dgram;
+use crate::dht::IterativeQuery;
+use crate::error::Error;
+use crate::id::NodeId;
+use crate::msg;
+use crate::nat::NatState;
+use crate::node::Node;
+use crate::peers::PeerInfo;
+use crate::wire::{DOMAIN_INET, DOMAIN_INET6, HEADER_SIZE, MsgHeader, MsgType};
+
+impl Node {
+ // ── Network ─────────────────────────────────────
+
+ /// Local socket address.
+ pub fn local_addr(&self) -> Result<SocketAddr, Error> {
+ self.net.local_addr()
+ }
+
+ /// Join the DHT network via a bootstrap node.
+ ///
+ /// Starts an iterative FIND_NODE for our own ID.
+ /// The query is driven by subsequent `poll()` calls.
+ /// DNS resolution is synchronous.
+ ///
+ /// # Example
+ ///
+ /// ```rust,no_run
+ /// # let mut node = tesseras_dht::Node::bind(0).unwrap();
+ /// node.join("bootstrap.example.com", 10000).unwrap();
+ /// loop { node.poll().unwrap(); }
+ /// ```
+ pub fn join(&mut self, host: &str, port: u16) -> Result<(), Error> {
+ use std::net::ToSocketAddrs;
+
+ let addr_str = format!("{host}:{port}");
+ let addr = addr_str
+ .to_socket_addrs()
+ .map_err(Error::Io)?
+ .next()
+ .ok_or_else(|| {
+ Error::Io(std::io::Error::new(
+ std::io::ErrorKind::NotFound,
+ format!("no addresses for {addr_str}"),
+ ))
+ })?;
+
+ log::info!("Joining via {addr}");
+
+ // Send DHT FIND_NODE to bootstrap (populates
+ // DHT routing table via handle_dht_find_node_reply)
+ self.send_find_node(addr, self.id)?;
+
+ if self.is_dtun {
+ // Send DTUN FIND_NODE to bootstrap too
+ // (populates DTUN routing table)
+ let nonce = self.alloc_nonce();
+ let domain = if addr.is_ipv4() {
+ crate::wire::DOMAIN_INET
+ } else {
+ crate::wire::DOMAIN_INET6
+ };
+ let find = crate::msg::FindNodeMsg {
+ nonce,
+ target: self.id,
+ domain,
+ state: 0,
+ };
+ let mut buf = [0u8; crate::msg::FIND_NODE_MSG_SIZE];
+ let hdr = MsgHeader::new(
+ MsgType::DtunFindNode,
+ Self::len16(crate::msg::FIND_NODE_MSG_SIZE),
+ self.id,
+ NodeId::from_bytes([0; 32]),
+ );
+ if hdr.write(&mut buf).is_ok() {
+ crate::msg::write_find_node(&mut buf, &find);
+ if let Err(e) = self.send_signed(&buf, addr) {
+ log::debug!("DTUN find_node send failed: {e}");
+ }
+ }
+ }
+
+ // Start NAT detection if DTUN is enabled
+ if self.is_dtun && !self.nat.is_complete() {
+ let nonce = self.alloc_nonce();
+ self.nat.start_detect(nonce);
+
+ // Send NatEcho to bootstrap
+ let size = HEADER_SIZE + 4;
+ let mut buf = vec![0u8; size];
+ let hdr = MsgHeader::new(
+ MsgType::NatEcho,
+ Self::len16(size),
+ self.id,
+ NodeId::from_bytes([0; crate::id::ID_LEN]),
+ );
+ if hdr.write(&mut buf).is_ok() {
+ buf[HEADER_SIZE..HEADER_SIZE + 4]
+ .copy_from_slice(&nonce.to_be_bytes());
+ if let Err(e) = self.send_signed(&buf, addr) {
+ log::warn!("Failed to send NatEcho: {e}");
+ }
+ log::debug!("Sent NatEcho to {addr} nonce={nonce}");
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Start an iterative FIND_NODE query for a target.
+ ///
+ /// Returns the query nonce. The query is driven by
+ /// `poll()` and completes when converged.
+ pub fn start_find_node(&mut self, target: NodeId) -> Result<u32, Error> {
+ let nonce = self.alloc_nonce();
+ let mut query = IterativeQuery::find_node(target, nonce);
+
+ // Seed with our closest known nodes
+ let closest =
+ self.dht_table.closest(&target, self.config.num_find_node);
+ query.closest = closest;
+
+ // Limit concurrent queries to prevent memory
+ // exhaustion (max 100)
+ const MAX_CONCURRENT_QUERIES: usize = 100;
+ if self.queries.len() >= MAX_CONCURRENT_QUERIES {
+ log::warn!("Too many concurrent queries, dropping");
+ return Err(Error::Timeout);
+ }
+
+ // Send initial batch
+ self.send_query_batch(nonce)?;
+
+ self.queries.insert(nonce, query);
+ Ok(nonce)
+ }
+
+ /// Send a FIND_NODE message to a specific address.
+ pub(crate) fn send_find_node(
+ &mut self,
+ to: SocketAddr,
+ target: NodeId,
+ ) -> Result<u32, Error> {
+ let nonce = self.alloc_nonce();
+ let domain = if to.is_ipv4() {
+ DOMAIN_INET
+ } else {
+ DOMAIN_INET6
+ };
+
+ let find = msg::FindNodeMsg {
+ nonce,
+ target,
+ domain,
+ state: 0,
+ };
+
+ let mut buf = [0u8; msg::FIND_NODE_MSG_SIZE];
+ let hdr = MsgHeader::new(
+ MsgType::DhtFindNode,
+ Self::len16(msg::FIND_NODE_MSG_SIZE),
+ self.id,
+ NodeId::from_bytes([0; crate::id::ID_LEN]), // unknown dst
+ );
+ hdr.write(&mut buf)?;
+ msg::write_find_node(&mut buf, &find);
+ self.send_signed(&buf, to)?;
+
+ log::debug!("Sent find_node to {to} target={target:?} nonce={nonce}");
+ Ok(nonce)
+ }
+
+ /// Send a STORE message to a specific peer.
+ pub(crate) fn send_store(
+ &mut self,
+ peer: &PeerInfo,
+ store: &msg::StoreMsg,
+ ) -> Result<(), Error> {
+ let total = HEADER_SIZE
+ + msg::STORE_FIXED
+ + store.key.len()
+ + store.value.len();
+ let mut buf = vec![0u8; total];
+ let hdr = MsgHeader::new(
+ MsgType::DhtStore,
+ Self::len16(total),
+ self.id,
+ peer.id,
+ );
+ hdr.write(&mut buf)?;
+ msg::write_store(&mut buf, store)?;
+ self.send_signed(&buf, peer.addr)?;
+ log::debug!(
+ "Sent store to {:?} key={} bytes",
+ peer.id,
+ store.key.len()
+ );
+ Ok(())
+ }
+
+ /// Send the next batch of queries for an active
+ /// iterative query. Uses FIND_VALUE for find_value
+ /// queries, FIND_NODE otherwise.
+ pub(crate) fn send_query_batch(&mut self, nonce: u32) -> Result<(), Error> {
+ let query = match self.queries.get(&nonce) {
+ Some(q) => q,
+ None => return Ok(()),
+ };
+
+ let to_query = query.next_to_query();
+ let target = query.target;
+ let is_find_value = query.is_find_value;
+ let key = query.key.clone();
+
+ for peer in to_query {
+ let result = if is_find_value {
+ self.send_find_value_msg(peer.addr, target, &key)
+ } else {
+ self.send_find_node(peer.addr, target)
+ };
+
+ if let Err(e) = result {
+ log::debug!("Failed to send query to {:?}: {e}", peer.id);
+ continue;
+ }
+ if let Some(q) = self.queries.get_mut(&nonce) {
+ q.pending.insert(peer.id, Instant::now());
+ }
+ }
+ Ok(())
+ }
+
+ /// Drive all active iterative queries: expire
+ /// timeouts, send next batches, clean up finished.
+ pub(crate) fn drive_queries(&mut self) {
+ // Expire timed-out pending requests
+ let nonces: Vec<u32> = self.queries.keys().copied().collect();
+
+ for nonce in &nonces {
+ if let Some(q) = self.queries.get_mut(nonce) {
+ q.expire_pending();
+ }
+ }
+
+ // Send next batch for active queries
+ for nonce in &nonces {
+ let is_active = self
+ .queries
+ .get(nonce)
+ .map(|q| !q.is_done())
+ .unwrap_or(false);
+
+ if is_active {
+ if let Err(e) = self.send_query_batch(*nonce) {
+ log::debug!("Query batch send failed: {e}");
+ }
+ }
+ }
+
+ // Remove completed queries
+ self.queries.retain(|nonce, q| {
+ if q.is_done() {
+ log::debug!(
+ "Query nonce={nonce} complete: {} closest, {} hops, {}ms",
+ q.closest.len(),
+ q.hops,
+ q.started_at.elapsed().as_millis(),
+ );
+ self.metrics
+ .lookups_completed
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ false
+ } else {
+ true
+ }
+ });
+ }
+
+ /// Refresh stale routing table buckets by starting
+ /// FIND_NODE queries for random IDs in each stale
+ /// bucket's range.
+ pub(crate) fn refresh_buckets(&mut self) {
+ let threshold = self.config.refresh_interval;
+
+ if self.last_refresh.elapsed() < threshold {
+ return;
+ }
+ self.last_refresh = Instant::now();
+
+ let targets = self.dht_table.stale_bucket_targets(threshold);
+
+ if targets.is_empty() {
+ return;
+ }
+
+ // Limit to 3 refresh queries per cycle to avoid
+ // flooding the query queue (256 empty buckets would
+ // otherwise spawn 256 queries at once).
+ let batch = targets.into_iter().take(3);
+ log::debug!("Refreshing stale buckets");
+
+ for target in batch {
+ if let Err(e) = self.start_find_node(target) {
+ log::debug!("Refresh find_node failed: {e}");
+ break; // query queue full, stop
+ }
+ }
+ }
+
+ /// Ping LRU peers in each bucket to verify they're
+ /// alive. Dead peers are removed from the routing
+ /// table. Called alongside refresh_buckets.
+ pub(crate) fn probe_liveness(&mut self) {
+ let lru_peers = self.dht_table.lru_peers();
+ if lru_peers.is_empty() {
+ return;
+ }
+
+ for peer in &lru_peers {
+ // Skip if we've seen them recently
+ if peer.last_seen.elapsed() < Duration::from_secs(30) {
+ continue;
+ }
+ let nonce = self.alloc_nonce();
+ let size = msg::PING_MSG_SIZE;
+ let mut buf = [0u8; msg::PING_MSG_SIZE];
+ let hdr = MsgHeader::new(
+ MsgType::DhtPing,
+ Self::len16(size),
+ self.id,
+ peer.id,
+ );
+ if hdr.write(&mut buf).is_ok() {
+ buf[HEADER_SIZE..HEADER_SIZE + 4]
+ .copy_from_slice(&nonce.to_be_bytes());
+ let _ = self.send_signed(&buf, peer.addr);
+ self.pending_pings.insert(nonce, (peer.id, Instant::now()));
+ }
+ }
+ }
+
+ /// Try to send queued datagrams whose destinations
+ /// are now in the peer store.
+ pub(crate) fn drain_send_queue(&mut self) {
+ let known_ids: Vec<crate::id::NodeId> = self.peers.ids();
+
+ for dst_id in known_ids {
+ if !self.send_queue.has_pending(&dst_id) {
+ continue;
+ }
+ let peer = match self.peers.get(&dst_id).cloned() {
+ Some(p) => p,
+ None => continue,
+ };
+ let queued = self.send_queue.drain(&dst_id);
+ for item in queued {
+ self.send_dgram_raw(&item.data, &peer);
+ }
+ }
+ }
+
+ /// Handle an incoming Dgram message: reassemble
+ /// fragments and invoke the dgram callback.
+ pub(crate) fn handle_dgram(&mut self, buf: &[u8], hdr: &MsgHeader) {
+ let payload = &buf[HEADER_SIZE..];
+
+ let (total, index, frag_data) = match dgram::parse_fragment(payload) {
+ Some(v) => v,
+ None => return,
+ };
+
+ let complete =
+ self.reassembler
+ .feed(hdr.src, total, index, frag_data.to_vec());
+
+ if let Some(data) = complete {
+ log::debug!(
+ "Dgram reassembled: {} bytes from {:?}",
+ data.len(),
+ hdr.src
+ );
+ if let Some(ref cb) = self.dgram_callback {
+ cb(&data, &hdr.src);
+ }
+ }
+ }
+
+ /// Flush pending RDP output for a connection,
+ /// sending packets via UDP.
+ pub(crate) fn flush_rdp_output(&mut self, desc: i32) {
+ let output = match self.rdp.pending_output(desc) {
+ Some(o) => o,
+ None => return,
+ };
+
+ // Determine target address and message type
+ let (send_addr, msg_type) =
+ if self.nat.state() == NatState::SymmetricNat {
+ // Route through proxy
+ if let Some(server) = self.proxy.server() {
+ (server.addr, MsgType::ProxyRdp)
+ } else if let Some(peer) = self.peers.get(&output.dst) {
+ (peer.addr, MsgType::Rdp)
+ } else {
+ log::debug!("RDP: no route for {:?}", output.dst);
+ return;
+ }
+ } else if let Some(peer) = self.peers.get(&output.dst) {
+ (peer.addr, MsgType::Rdp)
+ } else {
+ log::debug!("RDP: no address for {:?}", output.dst);
+ return;
+ };
+
+ for pkt in &output.packets {
+ let rdp_wire = crate::rdp::build_rdp_wire(
+ pkt.flags,
+ output.sport,
+ output.dport,
+ pkt.seqnum,
+ pkt.acknum,
+ &pkt.data,
+ );
+
+ let total = HEADER_SIZE + rdp_wire.len();
+ let mut buf = vec![0u8; total];
+ let hdr = MsgHeader::new(
+ msg_type,
+ Self::len16(total),
+ self.id,
+ output.dst,
+ );
+ if hdr.write(&mut buf).is_ok() {
+ buf[HEADER_SIZE..].copy_from_slice(&rdp_wire);
+ let _ = self.send_signed(&buf, send_addr);
+ }
+ }
+ }
+
+ /// Flush pending RDP output for all connections.
+ pub(crate) fn flush_all_rdp(&mut self) {
+ let descs = self.rdp.descriptors();
+ for desc in descs {
+ self.flush_rdp_output(desc);
+ }
+ }
+
+ /// Handle an incoming RDP packet.
+ pub(crate) fn handle_rdp(&mut self, buf: &[u8], hdr: &MsgHeader) {
+ let payload = &buf[HEADER_SIZE..];
+ let wire = match crate::rdp::parse_rdp_wire(payload) {
+ Some(w) => w,
+ None => return,
+ };
+
+ log::debug!(
+ "RDP from {:?}: flags=0x{:02x} \
+ sport={} dport={} \
+ seq={} ack={} \
+ data={} bytes",
+ hdr.src,
+ wire.flags,
+ wire.sport,
+ wire.dport,
+ wire.seqnum,
+ wire.acknum,
+ wire.data.len()
+ );
+
+ let input = crate::rdp::RdpInput {
+ src: hdr.src,
+ sport: wire.sport,
+ dport: wire.dport,
+ flags: wire.flags,
+ seqnum: wire.seqnum,
+ acknum: wire.acknum,
+ data: wire.data,
+ };
+ let actions = self.rdp.input(&input);
+
+ for action in actions {
+ match action {
+ crate::rdp::RdpAction::Event {
+ desc,
+ ref addr,
+ event,
+ } => {
+ log::info!("RDP event: desc={desc} {:?}", event);
+
+ // Invoke app callback
+ if let Some(ref cb) = self.rdp_callback {
+ cb(desc, addr, event);
+ }
+
+ // After accept/connect, flush SYN-ACK/ACK
+ self.flush_rdp_output(desc);
+ }
+ crate::rdp::RdpAction::Close(desc) => {
+ self.rdp.close(desc);
+ }
+ }
+ }
+ }
+
+ /// Register this node with DTUN (for NAT traversal).
+ ///
+ /// Sends DTUN_REGISTER to the k-closest global nodes
+ /// so other peers can find us via hole-punching.
+ pub(crate) fn dtun_register(&mut self) {
+ let (session, closest) = self.dtun.prepare_register();
+ log::info!(
+ "DTUN register: session={session}, {} targets",
+ closest.len()
+ );
+
+ for peer in &closest {
+ let size = HEADER_SIZE + 4;
+ let mut buf = vec![0u8; size];
+ let hdr = MsgHeader::new(
+ MsgType::DtunRegister,
+ Self::len16(size),
+ self.id,
+ peer.id,
+ );
+ if hdr.write(&mut buf).is_ok() {
+ buf[HEADER_SIZE..HEADER_SIZE + 4]
+ .copy_from_slice(&session.to_be_bytes());
+ let _ = self.send_signed(&buf, peer.addr);
+ }
+ }
+
+ self.dtun.registration_done();
+ }
+
+ /// Send a packet with Ed25519 signature appended.
+ ///
+ /// Appends 64-byte signature after the packet body.
+ /// All outgoing packets go through this method.
+ pub(crate) fn send_signed(
+ &self,
+ buf: &[u8],
+ to: SocketAddr,
+ ) -> Result<usize, Error> {
+ let mut signed = buf.to_vec();
+ crate::wire::sign_packet(&mut signed, &self.identity);
+ self.metrics
+ .messages_sent
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ self.metrics.bytes_sent.fetch_add(
+ signed.len() as u64,
+ std::sync::atomic::Ordering::Relaxed,
+ );
+ // Note: reply sends use `let _ =` (best-effort).
+ // Critical sends (join, put, DTUN register) use
+ // `if let Err(e) = ... { log::warn! }`.
+ self.net.send_to(&signed, to)
+ }
+
+ /// Proactively check node activity by pinging peers
+ /// in the routing table. Peers that fail to respond
+ /// accumulate failures in the ban list. This keeps
+ /// the routing table healthy by detecting dead nodes
+ /// before queries need them.
+ pub(crate) fn check_node_activity(&mut self) {
+ if self.last_activity_check.elapsed()
+ < self.config.activity_check_interval
+ {
+ return;
+ }
+ self.last_activity_check = Instant::now();
+
+ let lru_peers = self.dht_table.lru_peers();
+ if lru_peers.is_empty() {
+ return;
+ }
+
+ let mut pinged = 0u32;
+ for peer in &lru_peers {
+ // Only ping peers not seen for >60s
+ if peer.last_seen.elapsed() < Duration::from_secs(60) {
+ continue;
+ }
+ // Skip banned peers
+ if self.ban_list.is_banned(&peer.addr) {
+ continue;
+ }
+
+ let nonce = self.alloc_nonce();
+ let size = crate::msg::PING_MSG_SIZE;
+ let mut buf = [0u8; crate::msg::PING_MSG_SIZE];
+ let hdr = MsgHeader::new(
+ MsgType::DhtPing,
+ Self::len16(size),
+ self.id,
+ peer.id,
+ );
+ if hdr.write(&mut buf).is_ok() {
+ buf[HEADER_SIZE..HEADER_SIZE + 4]
+ .copy_from_slice(&nonce.to_be_bytes());
+ let _ = self.send_signed(&buf, peer.addr);
+ self.pending_pings.insert(nonce, (peer.id, Instant::now()));
+ pinged += 1;
+ }
+ }
+
+ if pinged > 0 {
+ log::debug!("Activity check: pinged {pinged} peers");
+ }
+ }
+
+ /// Sweep timed-out stores and retry with alternative
+ /// peers. Failed peers accumulate ban list failures.
+ pub(crate) fn retry_failed_stores(&mut self) {
+ if self.last_store_retry.elapsed() < self.config.store_retry_interval {
+ return;
+ }
+ self.last_store_retry = Instant::now();
+
+ let retries = self.store_tracker.collect_timeouts();
+ if retries.is_empty() {
+ return;
+ }
+
+ log::debug!("Store retry: {} timed-out stores", retries.len());
+
+ for retry in &retries {
+ // Record failure for the peer that didn't ack
+ if let Some(peer_info) = self.peers.get(&retry.failed_peer) {
+ self.ban_list.record_failure(peer_info.addr);
+ }
+
+ // Find alternative peers (excluding the failed one)
+ let closest = self
+ .dht_table
+ .closest(&retry.target, self.config.num_find_node);
+
+ let store_msg = crate::msg::StoreMsg {
+ id: retry.target,
+ from: self.id,
+ key: retry.key.clone(),
+ value: retry.value.clone(),
+ ttl: retry.ttl,
+ is_unique: retry.is_unique,
+ };
+
+ let mut sent = false;
+ for peer in &closest {
+ if peer.id == retry.failed_peer {
+ continue;
+ }
+ if self.ban_list.is_banned(&peer.addr) {
+ continue;
+ }
+ if let Err(e) = self.send_store(peer, &store_msg) {
+ log::debug!("Store retry send failed: {e}");
+ continue;
+ }
+ self.store_tracker.track(
+ retry.target,
+ retry.key.clone(),
+ retry.value.clone(),
+ retry.ttl,
+ retry.is_unique,
+ peer.clone(),
+ );
+ sent = true;
+ break; // one alternative peer is enough per retry
+ }
+
+ if !sent {
+ log::debug!(
+ "Store retry: no alternative peer for key ({} bytes)",
+ retry.key.len()
+ );
+ }
+ }
+ }
+
+ /// Verify Ed25519 signature on an incoming packet.
+ ///
+ /// Since NodeId = Ed25519 public key, the src field
+ /// in the header IS the public key. The signature
+ /// proves the sender holds the private key for that
+ /// NodeId.
+ ///
+ /// Additionally, if we already know this peer, verify
+ /// the source address matches to prevent IP spoofing.
+ pub(crate) fn verify_incoming<'a>(
+ &self,
+ buf: &'a [u8],
+ from: std::net::SocketAddr,
+ ) -> Option<&'a [u8]> {
+ if buf.len() < HEADER_SIZE + crate::crypto::SIGNATURE_SIZE {
+ log::trace!("Rejecting unsigned packet ({} bytes)", buf.len());
+ return None;
+ }
+
+ // NodeId IS the public key — verify signature
+ let src = NodeId::read_from(&buf[8..8 + crate::id::ID_LEN]);
+
+ // Reject zero NodeId
+ if src.is_zero() {
+ log::trace!("Rejecting packet from zero NodeId");
+ return None;
+ }
+
+ let pubkey = src.as_bytes();
+
+ if !crate::wire::verify_packet(buf, pubkey) {
+ log::trace!("Signature verification failed");
+ self.metrics
+ .packets_rejected
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ return None;
+ }
+
+ // If we know this peer, verify source address
+ // (prevents IP spoofing with valid signatures)
+ if let Some(known) = self.peers.get(&src) {
+ if known.addr != from {
+ log::debug!(
+ "Peer {:?} address mismatch: known={} got={}",
+ src,
+ known.addr,
+ from
+ );
+ // Allow — peer may have changed IP (NAT rebind)
+ // but log for monitoring
+ }
+ }
+
+ let payload_end = buf.len() - crate::crypto::SIGNATURE_SIZE;
+ Some(&buf[..payload_end])
+ }
+}