From 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe Mon Sep 17 00:00:00 2001 From: murilo ijanc Date: Tue, 24 Mar 2026 15:04:03 -0300 Subject: Initial commit NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support --- src/handlers.rs | 1049 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1049 insertions(+) create mode 100644 src/handlers.rs (limited to 'src/handlers.rs') diff --git a/src/handlers.rs b/src/handlers.rs new file mode 100644 index 0000000..f4c5b2c --- /dev/null +++ b/src/handlers.rs @@ -0,0 +1,1049 @@ +//! Packet dispatch and message handlers. +//! +//! Extension impl block for Node. All handle_* +//! methods process incoming protocol messages. + +use std::net::SocketAddr; +use std::time::Instant; + +use crate::msg; +use crate::nat::NatState; +use crate::node::Node; +use crate::peers::PeerInfo; +use crate::wire::{DOMAIN_INET, DOMAIN_INET6, HEADER_SIZE, MsgHeader, MsgType}; + +impl Node { + // ── Packet dispatch ───────────────────────────── + + pub(crate) fn handle_packet(&mut self, raw: &[u8], from: SocketAddr) { + self.metrics + .bytes_received + .fetch_add(raw.len() as u64, std::sync::atomic::Ordering::Relaxed); + + // Verify signature and strip it + let buf = match self.verify_incoming(raw, from) { + Some(b) => b, + None => { + self.metrics + .packets_rejected + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + log::trace!("Dropped unsigned packet from {from}"); + return; + } + }; + + self.metrics + .messages_received + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + // Reject banned peers + if self.ban_list.is_banned(&from) { + log::trace!("Dropped packet from banned peer {from}"); + return; + } + + // Rate limit per source IP + if !self.rate_limiter.allow(from.ip()) { + log::trace!("Rate limited packet from {from}"); + return; + } + + if buf.len() < HEADER_SIZE { + return; + } + + let hdr = match MsgHeader::parse(buf) { + Ok(h) => h, + Err(e) => { + log::trace!("Dropped packet from {from}: {e}"); + return; + } + }; + + log::trace!( + "Received {:?} from {from} (src={:?})", + hdr.msg_type, + hdr.src + ); + + // Register sender as peer and record successful + // communication (clears ban list failures) + self.peers.add(PeerInfo::new(hdr.src, from)); + self.ban_list.record_success(&from); + + match hdr.msg_type { + // ── DHT messages ──────────────────────── + MsgType::DhtPing => { + self.handle_dht_ping(buf, &hdr, from); + } + MsgType::DhtPingReply => { + self.handle_dht_ping_reply(buf, &hdr, from); + } + MsgType::DhtFindNode => { + self.handle_dht_find_node(buf, &hdr, from); + } + MsgType::DhtFindNodeReply => { + self.handle_dht_find_node_reply(buf, &hdr, from); + } + MsgType::DhtStore => { + self.handle_dht_store(buf, &hdr, from); + } + MsgType::DhtFindValue => { + self.handle_dht_find_value(buf, &hdr, from); + } + MsgType::DhtFindValueReply => { + self.handle_dht_find_value_reply(buf, &hdr); + } + + // ── NAT detection ─────────────────────── + MsgType::NatEcho => { + // Respond with our observed address of the sender + if let Ok(nonce) = msg::parse_nat_echo(buf) { + log::debug!("NatEcho from {:?} nonce={nonce}", hdr.src); + let reply = msg::NatEchoReply { + nonce, + domain: if from.is_ipv4() { + DOMAIN_INET + } else { + DOMAIN_INET6 + }, + port: from.port(), + addr: { + let mut a = [0u8; 16]; + match from.ip() { + std::net::IpAddr::V4(v4) => { + a[..4].copy_from_slice(&v4.octets()) + } + std::net::IpAddr::V6(v6) => { + a.copy_from_slice(&v6.octets()) + } + } + a + }, + }; + let size = HEADER_SIZE + msg::NAT_ECHO_REPLY_BODY; + let mut rbuf = vec![0u8; size]; + let rhdr = MsgHeader::new( + MsgType::NatEchoReply, + Self::len16(size), + self.id, + hdr.src, + ); + if rhdr.write(&mut rbuf).is_ok() { + msg::write_nat_echo_reply(&mut rbuf, &reply); + let _ = self.send_signed(&rbuf, from); + } + } + } + MsgType::NatEchoReply => { + if let Ok(reply) = msg::parse_nat_echo_reply(buf) { + let observed_port = reply.port; + let observed_ip: std::net::IpAddr = + if reply.domain == DOMAIN_INET { + std::net::IpAddr::V4(std::net::Ipv4Addr::new( + reply.addr[0], + reply.addr[1], + reply.addr[2], + reply.addr[3], + )) + } else { + std::net::IpAddr::V6(std::net::Ipv6Addr::from( + reply.addr, + )) + }; + let observed = SocketAddr::new(observed_ip, observed_port); + let local = self.net.local_addr().unwrap_or(from); + let action = + self.nat.recv_echo_reply(reply.nonce, observed, local); + log::info!( + "NAT echo reply: observed={observed} action={action:?}" + ); + + // After NAT detection completes, register with DTUN + if let crate::nat::EchoReplyAction::DetectionComplete( + state, + ) = action + { + log::info!("NAT type detected: {state:?}"); + if state != NatState::Global && self.is_dtun { + self.dtun_register(); + } + if state == NatState::SymmetricNat { + // Find a global node to use as proxy + let closest = self.dht_table.closest(&self.id, 1); + if let Some(server) = closest.first() { + self.proxy.set_server(server.clone()); + let nonce = self.alloc_nonce(); + if let Some(n) = + self.proxy.start_register(nonce) + { + // Send ProxyRegister msg + let size = HEADER_SIZE + 8; + let mut buf = vec![0u8; size]; + let hdr = MsgHeader::new( + MsgType::ProxyRegister, + Self::len16(size), + self.id, + server.id, + ); + if hdr.write(&mut buf).is_ok() { + let session = self.dtun.session(); + buf[HEADER_SIZE..HEADER_SIZE + 4] + .copy_from_slice( + &session.to_be_bytes(), + ); + buf[HEADER_SIZE + 4..HEADER_SIZE + 8] + .copy_from_slice(&n.to_be_bytes()); + if let Err(e) = + self.send_signed(&buf, server.addr) + { + log::warn!( + "ProxyRegister send failed: {e}" + ); + } + } + log::info!( + "Sent ProxyRegister to {:?}", + server.id + ); + } + } else { + log::warn!( + "No peers available as proxy server" + ); + } + } + } + } + } + MsgType::NatEchoRedirect | MsgType::NatEchoRedirectReply => { + log::debug!("NAT redirect: {:?}", hdr.msg_type); + } + + // ── DTUN ──────────────────────────────── + MsgType::DtunPing => { + // Reuse DHT ping handler logic + self.handle_dht_ping(buf, &hdr, from); + self.dtun.table_mut().add(PeerInfo::new(hdr.src, from)); + } + MsgType::DtunPingReply => { + self.handle_dht_ping_reply(buf, &hdr, from); + self.dtun.table_mut().add(PeerInfo::new(hdr.src, from)); + } + MsgType::DtunFindNode => { + // Respond with closest from DTUN table + if let Ok(find) = msg::parse_find_node(buf) { + self.dtun.table_mut().add(PeerInfo::new(hdr.src, from)); + let closest = self + .dtun + .table() + .closest(&find.target, self.config.num_find_node); + let domain = if from.is_ipv4() { + DOMAIN_INET + } else { + DOMAIN_INET6 + }; + let reply_msg = msg::FindNodeReplyMsg { + nonce: find.nonce, + id: find.target, + domain, + nodes: closest, + }; + let mut rbuf = [0u8; 2048]; + let rhdr = MsgHeader::new( + MsgType::DtunFindNodeReply, + 0, + self.id, + hdr.src, + ); + if rhdr.write(&mut rbuf).is_ok() { + let total = + msg::write_find_node_reply(&mut rbuf, &reply_msg); + rbuf[4..6] + .copy_from_slice(&Self::len16(total).to_be_bytes()); + let _ = self.send_signed(&rbuf[..total], from); + } + } + } + MsgType::DtunFindNodeReply => { + if let Ok(reply) = msg::parse_find_node_reply(buf) { + log::debug!( + "DTUN find_node_reply: {} nodes", + reply.nodes.len() + ); + for node in &reply.nodes { + self.dtun.table_mut().add(node.clone()); + } + } + } + MsgType::DtunFindValue | MsgType::DtunFindValueReply => { + log::debug!("DTUN find_value: {:?}", hdr.msg_type); + } + MsgType::DtunRegister => { + if let Ok(session) = msg::parse_dtun_register(buf) { + log::debug!( + "DTUN register from {:?} session={session}", + hdr.src + ); + self.dtun.register_node(hdr.src, from, session); + } + } + MsgType::DtunRequest => { + if let Ok((nonce, target)) = msg::parse_dtun_request(buf) { + log::debug!("DTUN request for {:?} nonce={nonce}", target); + + // Check if we have this node registered + if let Some(reg) = self.dtun.get_registered(&target) { + log::debug!( + "DTUN: forwarding request to {:?}", + reg.addr + ); + } + } + } + MsgType::DtunRequestBy | MsgType::DtunRequestReply => { + log::debug!("DTUN request_by/reply: {:?}", hdr.msg_type); + } + + // ── Proxy ─────────────────────────────── + MsgType::ProxyRegister => { + if buf.len() >= HEADER_SIZE + 8 { + let session = u32::from_be_bytes([ + buf[HEADER_SIZE], + buf[HEADER_SIZE + 1], + buf[HEADER_SIZE + 2], + buf[HEADER_SIZE + 3], + ]); + let nonce = u32::from_be_bytes([ + buf[HEADER_SIZE + 4], + buf[HEADER_SIZE + 5], + buf[HEADER_SIZE + 6], + buf[HEADER_SIZE + 7], + ]); + log::debug!( + "Proxy register from {:?} session={session}", + hdr.src + ); + self.proxy.register_client(hdr.src, from, session); + + // Send reply + let size = HEADER_SIZE + 4; + let mut rbuf = vec![0u8; size]; + let rhdr = MsgHeader::new( + MsgType::ProxyRegisterReply, + Self::len16(size), + self.id, + hdr.src, + ); + if rhdr.write(&mut rbuf).is_ok() { + rbuf[HEADER_SIZE..HEADER_SIZE + 4] + .copy_from_slice(&nonce.to_be_bytes()); + let _ = self.send_signed(&rbuf, from); + } + } + } + MsgType::ProxyRegisterReply => { + if let Ok(nonce) = msg::parse_ping(buf) { + self.proxy.recv_register_reply(nonce); + } + } + MsgType::ProxyStore => { + // Forward store to DHT on behalf of the client + self.handle_dht_store(buf, &hdr, from); + } + MsgType::ProxyGet | MsgType::ProxyGetReply => { + log::debug!("Proxy get/reply: {:?}", hdr.msg_type); + } + MsgType::ProxyDgram | MsgType::ProxyDgramForwarded => { + // Forward dgram payload + self.handle_dgram(buf, &hdr); + } + MsgType::ProxyRdp | MsgType::ProxyRdpForwarded => { + // Forward RDP payload + self.handle_rdp(buf, &hdr); + } + + // ── Transport ─────────────────────────── + MsgType::Dgram => { + self.handle_dgram(buf, &hdr); + } + MsgType::Rdp => { + self.handle_rdp(buf, &hdr); + } + + // ── Advertise ─────────────────────────── + MsgType::Advertise => { + if buf.len() >= HEADER_SIZE + 8 { + let nonce = u32::from_be_bytes([ + buf[HEADER_SIZE], + buf[HEADER_SIZE + 1], + buf[HEADER_SIZE + 2], + buf[HEADER_SIZE + 3], + ]); + log::debug!("Advertise from {:?} nonce={nonce}", hdr.src); + self.advertise.recv_advertise(hdr.src); + + // Send reply + let size = HEADER_SIZE + 8; + let mut rbuf = vec![0u8; size]; + let rhdr = MsgHeader::new( + MsgType::AdvertiseReply, + Self::len16(size), + self.id, + hdr.src, + ); + if rhdr.write(&mut rbuf).is_ok() { + rbuf[HEADER_SIZE..HEADER_SIZE + 8].copy_from_slice( + &buf[HEADER_SIZE..HEADER_SIZE + 8], + ); + let _ = self.send_signed(&rbuf, from); + } + } + } + MsgType::AdvertiseReply => { + if buf.len() >= HEADER_SIZE + 8 { + let nonce = u32::from_be_bytes([ + buf[HEADER_SIZE], + buf[HEADER_SIZE + 1], + buf[HEADER_SIZE + 2], + buf[HEADER_SIZE + 3], + ]); + self.advertise.recv_reply(nonce); + } + } + } + } + + // ── DHT message handlers ──────────────────────── + + pub(crate) fn handle_dht_ping( + &mut self, + buf: &[u8], + hdr: &MsgHeader, + from: SocketAddr, + ) { + let nonce = match msg::parse_ping(buf) { + Ok(n) => n, + Err(_) => return, + }; + + if hdr.dst != self.id { + return; + } + + log::debug!("DHT ping from {:?} nonce={nonce}", hdr.src); + + // Add to routing table + self.dht_table.add(PeerInfo::new(hdr.src, from)); + + // Send reply + let reply_hdr = MsgHeader::new( + MsgType::DhtPingReply, + Self::len16(msg::PING_MSG_SIZE), + self.id, + hdr.src, + ); + let mut reply = [0u8; msg::PING_MSG_SIZE]; + if reply_hdr.write(&mut reply).is_ok() { + msg::write_ping(&mut reply, nonce); + let _ = self.send_signed(&reply, from); + } + } + + pub(crate) fn handle_dht_ping_reply( + &mut self, + buf: &[u8], + hdr: &MsgHeader, + from: SocketAddr, + ) { + let nonce = match msg::parse_ping(buf) { + Ok(n) => n, + Err(_) => return, + }; + + if hdr.dst != self.id { + return; + } + + // Verify we actually sent this nonce + match self.pending_pings.remove(&nonce) { + Some((expected_id, _sent_at)) => { + if expected_id != hdr.src && !expected_id.is_zero() { + log::debug!( + "Ping reply nonce={nonce}: expected {:?} got {:?}", + expected_id, + hdr.src + ); + return; + } + } + None => { + log::trace!("Ignoring unsolicited ping reply nonce={nonce}"); + return; + } + } + + log::debug!("DHT ping reply from {:?} nonce={nonce}", hdr.src); + + self.dht_table.add(PeerInfo::new(hdr.src, from)); + self.dht_table.mark_seen(&hdr.src); + } + + pub(crate) fn handle_dht_find_node( + &mut self, + buf: &[u8], + hdr: &MsgHeader, + from: SocketAddr, + ) { + let find = match msg::parse_find_node(buf) { + Ok(f) => f, + Err(_) => return, + }; + + log::debug!( + "DHT find_node from {:?} target={:?}", + hdr.src, + find.target + ); + + self.dht_table.add(PeerInfo::new(hdr.src, from)); + + // Respond with our closest nodes + let closest = self + .dht_table + .closest(&find.target, self.config.num_find_node); + + let domain = if from.is_ipv4() { + DOMAIN_INET + } else { + DOMAIN_INET6 + }; + + let reply_msg = msg::FindNodeReplyMsg { + nonce: find.nonce, + id: find.target, + domain, + nodes: closest, + }; + + let mut reply_buf = [0u8; 2048]; + let reply_hdr = + MsgHeader::new(MsgType::DhtFindNodeReply, 0, self.id, hdr.src); + if reply_hdr.write(&mut reply_buf).is_ok() { + let total = msg::write_find_node_reply(&mut reply_buf, &reply_msg); + + // Fix the length in header + let len_bytes = Self::len16(total).to_be_bytes(); + reply_buf[4] = len_bytes[0]; + reply_buf[5] = len_bytes[1]; + let _ = self.send_signed(&reply_buf[..total], from); + } + } + + pub(crate) fn handle_dht_find_node_reply( + &mut self, + buf: &[u8], + hdr: &MsgHeader, + from: SocketAddr, + ) { + let reply = match msg::parse_find_node_reply(buf) { + Ok(r) => r, + Err(_) => return, + }; + + log::debug!( + "DHT find_node_reply from {:?}: {} nodes", + hdr.src, + reply.nodes.len() + ); + + // Add sender to routing table + self.dht_table.add(PeerInfo::new(hdr.src, from)); + + // Add returned nodes, filtering invalid addresses + for node in &reply.nodes { + // S1-6: reject unroutable addresses + let ip = node.addr.ip(); + if ip.is_unspecified() || ip.is_multicast() { + continue; + } + // Reject zero NodeId + if node.id.is_zero() { + continue; + } + let result = self.dht_table.add(node.clone()); + self.peers.add(node.clone()); + + // §2.5: replicate data to newly discovered nodes + if matches!(result, crate::routing::InsertResult::Inserted) { + self.proactive_replicate(node); + } + } + + // Feed active queries with the reply + let sender_id = hdr.src; + let nodes_clone = reply.nodes.clone(); + + // Find which query this reply belongs to (match + // by nonce or by pending sender). + let matching_nonce: Option = self + .queries + .iter() + .find(|(_, q)| q.pending.contains_key(&sender_id)) + .map(|(n, _)| *n); + + if let Some(nonce) = matching_nonce { + if let Some(q) = self.queries.get_mut(&nonce) { + q.process_reply(&sender_id, nodes_clone); + + // Send follow-up find_nodes to newly + // discovered nodes + let next = q.next_to_query(); + let target = q.target; + for peer in next { + if self.send_find_node(peer.addr, target).is_ok() { + if let Some(q) = self.queries.get_mut(&nonce) { + q.pending.insert(peer.id, Instant::now()); + } + } + } + } + } + } + + pub(crate) fn handle_dht_store( + &mut self, + buf: &[u8], + hdr: &MsgHeader, + from: SocketAddr, + ) { + let store = match msg::parse_store(buf) { + Ok(s) => s, + Err(_) => return, + }; + + if hdr.dst != self.id { + return; + } + + // S1-4: enforce max value size + if store.value.len() > self.config.max_value_size { + log::debug!( + "Rejecting oversized store: {} bytes > {} max", + store.value.len(), + self.config.max_value_size + ); + return; + } + + // S2-9: verify sender matches claimed originator + if hdr.src != store.from { + log::debug!( + "Store origin mismatch: sender={:?} from={:?}", + hdr.src, + store.from + ); + return; + } + + log::debug!( + "DHT store from {:?}: key={} bytes, value={} bytes, ttl={}", + hdr.src, + store.key.len(), + store.value.len(), + store.ttl + ); + + self.dht_table.add(PeerInfo::new(hdr.src, from)); + + if store.ttl == 0 { + self.storage.remove(&store.id, &store.key); + } else { + let val = crate::dht::StoredValue { + key: store.key, + value: store.value, + id: store.id, + source: store.from, + ttl: store.ttl, + stored_at: std::time::Instant::now(), + is_unique: store.is_unique, + original: 0, // received, not originated + recvd: { + let mut s = std::collections::HashSet::new(); + s.insert(hdr.src); + s + }, + version: crate::dht::now_version(), + }; + self.storage.store(val); + } + } + + pub(crate) fn handle_dht_find_value( + &mut self, + buf: &[u8], + hdr: &MsgHeader, + from: SocketAddr, + ) { + let fv = match msg::parse_find_value(buf) { + Ok(f) => f, + Err(_) => return, + }; + + log::debug!( + "DHT find_value from {:?}: key={} bytes", + hdr.src, + fv.key.len() + ); + + self.dht_table.add(PeerInfo::new(hdr.src, from)); + + // Check local storage + let values = self.storage.get(&fv.target, &fv.key); + + if !values.is_empty() { + log::debug!("Found {} value(s) locally", values.len()); + + // Send value reply using DHT_FIND_VALUE_REPLY + // with DATA_ARE_VALUES flag + let val = &values[0]; + let fixed = msg::FIND_VALUE_REPLY_FIXED; + let total = HEADER_SIZE + fixed + val.value.len(); + let mut rbuf = vec![0u8; total]; + let rhdr = MsgHeader::new( + MsgType::DhtFindValueReply, + Self::len16(total), + self.id, + hdr.src, + ); + if rhdr.write(&mut rbuf).is_ok() { + let off = HEADER_SIZE; + rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes()); + fv.target + .write_to(&mut rbuf[off + 4..off + 4 + crate::id::ID_LEN]); + rbuf[off + 24..off + 26].copy_from_slice(&0u16.to_be_bytes()); // index + rbuf[off + 26..off + 28].copy_from_slice(&1u16.to_be_bytes()); // total + rbuf[off + 28] = crate::wire::DATA_ARE_VALUES; + rbuf[off + 29] = 0; + rbuf[off + 30] = 0; + rbuf[off + 31] = 0; + rbuf[HEADER_SIZE + fixed..].copy_from_slice(&val.value); + let _ = self.send_signed(&rbuf, from); + } + } else { + // Send closest nodes as find_node_reply format + let closest = self + .dht_table + .closest(&fv.target, self.config.num_find_node); + log::debug!("Value not found, returning {} nodes", closest.len()); + let domain = if from.is_ipv4() { + DOMAIN_INET + } else { + DOMAIN_INET6 + }; + let reply_msg = msg::FindNodeReplyMsg { + nonce: fv.nonce, + id: fv.target, + domain, + nodes: closest, + }; + let mut rbuf = [0u8; 2048]; + let rhdr = + MsgHeader::new(MsgType::DhtFindValueReply, 0, self.id, hdr.src); + if rhdr.write(&mut rbuf).is_ok() { + // Write header with DATA_ARE_NODES flag + let off = HEADER_SIZE; + rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes()); + fv.target + .write_to(&mut rbuf[off + 4..off + 4 + crate::id::ID_LEN]); + rbuf[off + 24..off + 26].fill(0); // index + rbuf[off + 26..off + 28].fill(0); // total + rbuf[off + 28] = crate::wire::DATA_ARE_NODES; + rbuf[off + 29] = 0; + rbuf[off + 30] = 0; + rbuf[off + 31] = 0; + + // Write nodes after the fixed part + let nodes_off = HEADER_SIZE + msg::FIND_VALUE_REPLY_FIXED; + + // Write domain + num + padding before nodes + rbuf[nodes_off..nodes_off + 2] + .copy_from_slice(&domain.to_be_bytes()); + rbuf[nodes_off + 2] = reply_msg.nodes.len() as u8; + rbuf[nodes_off + 3] = 0; + let nw = if domain == DOMAIN_INET { + msg::write_nodes_inet( + &mut rbuf[nodes_off + 4..], + &reply_msg.nodes, + ) + } else { + msg::write_nodes_inet6( + &mut rbuf[nodes_off + 4..], + &reply_msg.nodes, + ) + }; + let total = nodes_off + 4 + nw; + rbuf[4..6].copy_from_slice(&Self::len16(total).to_be_bytes()); + let _ = self.send_signed(&rbuf[..total], from); + } + } + } + + pub(crate) fn handle_dht_find_value_reply( + &mut self, + buf: &[u8], + hdr: &MsgHeader, + ) { + let reply = match msg::parse_find_value_reply(buf) { + Ok(r) => r, + Err(e) => { + log::debug!("Failed to parse find_value_reply: {e}"); + return; + } + }; + + let sender_id = hdr.src; + + // Find the matching active query + let matching_nonce: Option = self + .queries + .iter() + .filter(|(_, q)| q.is_find_value) + .find(|(_, q)| q.pending.contains_key(&sender_id)) + .map(|(n, _)| *n); + + match reply.data { + msg::FindValueReplyData::Value { data, .. } => { + log::info!( + "Received value from {:?}: {} bytes", + sender_id, + data.len() + ); + + // Cache the value locally and republish + // to the closest queried node without it + // (§2.3: cache on nearest without value) + if let Some(nonce) = matching_nonce { + if let Some(q) = self.queries.get_mut(&nonce) { + // Store in local storage for + // subsequent get() calls + let val = crate::dht::StoredValue { + key: q.key.clone(), + value: data.clone(), + id: q.target, + source: sender_id, + ttl: 300, + stored_at: Instant::now(), + is_unique: false, + original: 0, + recvd: std::collections::HashSet::new(), + version: crate::dht::now_version(), + }; + self.storage.store(val); + + // §2.3: store on nearest queried + // node that didn't have the value + let target = q.target; + let key = q.key.clone(); + let nearest_without: Option = q + .closest + .iter() + .find(|p| { + q.queried.contains(&p.id) + && p.id != sender_id + }) + .cloned(); + + q.process_value(data.clone()); + + if let Some(peer) = nearest_without { + let store_msg = crate::msg::StoreMsg { + id: target, + from: self.id, + key, + value: data, + ttl: 300, + is_unique: false, + }; + if let Err(e) = self.send_store(&peer, &store_msg) { + log::debug!( + "Republish-on-access failed: {e}" + ); + } else { + log::debug!( + "Republished value to {:?} (nearest without)", + peer.id, + ); + } + } + } + } + } + msg::FindValueReplyData::Nodes { nodes, .. } => { + log::debug!( + "find_value_reply from {:?}: {} nodes (no value)", + sender_id, + nodes.len() + ); + + // Add nodes to routing table + for node in &nodes { + self.dht_table.add(node.clone()); + self.peers.add(node.clone()); + } + + // Feed the query with the nodes so it + // continues iterating + if let Some(nonce) = matching_nonce { + if let Some(q) = self.queries.get_mut(&nonce) { + q.process_reply(&sender_id, nodes.clone()); + + // Send follow-up find_value to + // newly discovered nodes + let next = q.next_to_query(); + let target = q.target; + let key = q.key.clone(); + for peer in next { + if self + .send_find_value_msg(peer.addr, target, &key) + .is_ok() + { + if let Some(q) = self.queries.get_mut(&nonce) { + q.pending.insert(peer.id, Instant::now()); + } + } + } + } + } + } + msg::FindValueReplyData::Nul => { + log::debug!("find_value_reply NUL from {:?}", sender_id); + if let Some(nonce) = matching_nonce { + if let Some(q) = self.queries.get_mut(&nonce) { + q.pending.remove(&sender_id); + q.queried.insert(sender_id); + } + } + } + } + } + + // ── Data restore / republish ──────────────────── + + /// Restore (republish) stored data to k-closest + /// nodes. Tracks the `original` counter and `recvd` + /// set to avoid unnecessary duplicates. + pub(crate) fn restore_data(&mut self) { + let values = self.storage.all_values(); + if values.is_empty() { + return; + } + + log::debug!("Restoring {} stored values", values.len()); + + for val in &values { + if val.is_expired() { + continue; + } + let closest = + self.dht_table.closest(&val.id, self.config.num_find_node); + let store_msg = msg::StoreMsg { + id: val.id, + from: val.source, + key: val.key.clone(), + value: val.value.clone(), + ttl: val.remaining_ttl(), + is_unique: val.is_unique, + }; + for peer in &closest { + if peer.id == self.id { + continue; + } + + // Skip peers that already have this value + if val.recvd.contains(&peer.id) { + continue; + } + if let Err(e) = self.send_store(peer, &store_msg) { + log::debug!("Restore send failed: {e}"); + continue; + } + self.storage.mark_received(val, peer.id); + } + } + } + + /// Run mask_bit exploration: send find_node queries + /// to probe distant regions of the ID space. + pub(crate) fn run_maintain(&mut self) { + let (t1, t2) = self.explorer.next_targets(); + log::debug!("Maintain: exploring targets {:?} {:?}", t1, t2); + let _ = self.start_find_node(t1); + let _ = self.start_find_node(t2); + } + + // ── Proactive replication (§2.5) ────────────── + + /// Replicate stored data to a newly discovered node + /// if it is closer to a key than the current furthest + /// holder. This ensures data availability without + /// waiting for the periodic restore cycle. + /// + /// Called when a new node is inserted into the routing + /// table (not on updates of existing nodes). + pub(crate) fn proactive_replicate(&mut self, new_node: &PeerInfo) { + let values = self.storage.all_values(); + if values.is_empty() { + return; + } + + let k = self.config.num_find_node; + let mut sent = 0u32; + + for val in &values { + if val.is_expired() { + continue; + } + // Skip if this node already received the value + if val.recvd.contains(&new_node.id) { + continue; + } + + // Check: is new_node closer to this key than + // the furthest current k-closest holder? + let closest = self.dht_table.closest(&val.id, k); + let furthest = match closest.last() { + Some(p) => p, + None => continue, + }; + + let dist_new = val.id.distance(&new_node.id); + let dist_far = val.id.distance(&furthest.id); + + if dist_new >= dist_far { + continue; // new node is not closer + } + + let store_msg = crate::msg::StoreMsg { + id: val.id, + from: val.source, + key: val.key.clone(), + value: val.value.clone(), + ttl: val.remaining_ttl(), + is_unique: val.is_unique, + }; + + if self.send_store(new_node, &store_msg).is_ok() { + self.storage.mark_received(val, new_node.id); + sent += 1; + } + } + + if sent > 0 { + log::debug!( + "Proactive replicate: sent {sent} values to {:?}", + new_node.id, + ); + } + } +} -- cgit v1.2.3