aboutsummaryrefslogtreecommitdiffstats
path: root/src/handlers.rs
diff options
context:
space:
mode:
authormurilo ijanc2026-03-24 15:04:03 -0300
committermurilo ijanc2026-03-24 15:04:03 -0300
commit9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch)
tree53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/handlers.rs
downloadtesseras-dht-e908bc01403f4b8ef2a65fa6be43716fd1c6e003.tar.gz
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support
Diffstat (limited to 'src/handlers.rs')
-rw-r--r--src/handlers.rs1049
1 files changed, 1049 insertions, 0 deletions
diff --git a/src/handlers.rs b/src/handlers.rs
new file mode 100644
index 0000000..f4c5b2c
--- /dev/null
+++ b/src/handlers.rs
@@ -0,0 +1,1049 @@
+//! Packet dispatch and message handlers.
+//!
+//! Extension impl block for Node. All handle_*
+//! methods process incoming protocol messages.
+
+use std::net::SocketAddr;
+use std::time::Instant;
+
+use crate::msg;
+use crate::nat::NatState;
+use crate::node::Node;
+use crate::peers::PeerInfo;
+use crate::wire::{DOMAIN_INET, DOMAIN_INET6, HEADER_SIZE, MsgHeader, MsgType};
+
+impl Node {
+ // ── Packet dispatch ─────────────────────────────
+
+ pub(crate) fn handle_packet(&mut self, raw: &[u8], from: SocketAddr) {
+ self.metrics
+ .bytes_received
+ .fetch_add(raw.len() as u64, std::sync::atomic::Ordering::Relaxed);
+
+ // Verify signature and strip it
+ let buf = match self.verify_incoming(raw, from) {
+ Some(b) => b,
+ None => {
+ self.metrics
+ .packets_rejected
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ log::trace!("Dropped unsigned packet from {from}");
+ return;
+ }
+ };
+
+ self.metrics
+ .messages_received
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+
+ // Reject banned peers
+ if self.ban_list.is_banned(&from) {
+ log::trace!("Dropped packet from banned peer {from}");
+ return;
+ }
+
+ // Rate limit per source IP
+ if !self.rate_limiter.allow(from.ip()) {
+ log::trace!("Rate limited packet from {from}");
+ return;
+ }
+
+ if buf.len() < HEADER_SIZE {
+ return;
+ }
+
+ let hdr = match MsgHeader::parse(buf) {
+ Ok(h) => h,
+ Err(e) => {
+ log::trace!("Dropped packet from {from}: {e}");
+ return;
+ }
+ };
+
+ log::trace!(
+ "Received {:?} from {from} (src={:?})",
+ hdr.msg_type,
+ hdr.src
+ );
+
+ // Register sender as peer and record successful
+ // communication (clears ban list failures)
+ self.peers.add(PeerInfo::new(hdr.src, from));
+ self.ban_list.record_success(&from);
+
+ match hdr.msg_type {
+ // ── DHT messages ────────────────────────
+ MsgType::DhtPing => {
+ self.handle_dht_ping(buf, &hdr, from);
+ }
+ MsgType::DhtPingReply => {
+ self.handle_dht_ping_reply(buf, &hdr, from);
+ }
+ MsgType::DhtFindNode => {
+ self.handle_dht_find_node(buf, &hdr, from);
+ }
+ MsgType::DhtFindNodeReply => {
+ self.handle_dht_find_node_reply(buf, &hdr, from);
+ }
+ MsgType::DhtStore => {
+ self.handle_dht_store(buf, &hdr, from);
+ }
+ MsgType::DhtFindValue => {
+ self.handle_dht_find_value(buf, &hdr, from);
+ }
+ MsgType::DhtFindValueReply => {
+ self.handle_dht_find_value_reply(buf, &hdr);
+ }
+
+ // ── NAT detection ───────────────────────
+ MsgType::NatEcho => {
+ // Respond with our observed address of the sender
+ if let Ok(nonce) = msg::parse_nat_echo(buf) {
+ log::debug!("NatEcho from {:?} nonce={nonce}", hdr.src);
+ let reply = msg::NatEchoReply {
+ nonce,
+ domain: if from.is_ipv4() {
+ DOMAIN_INET
+ } else {
+ DOMAIN_INET6
+ },
+ port: from.port(),
+ addr: {
+ let mut a = [0u8; 16];
+ match from.ip() {
+ std::net::IpAddr::V4(v4) => {
+ a[..4].copy_from_slice(&v4.octets())
+ }
+ std::net::IpAddr::V6(v6) => {
+ a.copy_from_slice(&v6.octets())
+ }
+ }
+ a
+ },
+ };
+ let size = HEADER_SIZE + msg::NAT_ECHO_REPLY_BODY;
+ let mut rbuf = vec![0u8; size];
+ let rhdr = MsgHeader::new(
+ MsgType::NatEchoReply,
+ Self::len16(size),
+ self.id,
+ hdr.src,
+ );
+ if rhdr.write(&mut rbuf).is_ok() {
+ msg::write_nat_echo_reply(&mut rbuf, &reply);
+ let _ = self.send_signed(&rbuf, from);
+ }
+ }
+ }
+ MsgType::NatEchoReply => {
+ if let Ok(reply) = msg::parse_nat_echo_reply(buf) {
+ let observed_port = reply.port;
+ let observed_ip: std::net::IpAddr =
+ if reply.domain == DOMAIN_INET {
+ std::net::IpAddr::V4(std::net::Ipv4Addr::new(
+ reply.addr[0],
+ reply.addr[1],
+ reply.addr[2],
+ reply.addr[3],
+ ))
+ } else {
+ std::net::IpAddr::V6(std::net::Ipv6Addr::from(
+ reply.addr,
+ ))
+ };
+ let observed = SocketAddr::new(observed_ip, observed_port);
+ let local = self.net.local_addr().unwrap_or(from);
+ let action =
+ self.nat.recv_echo_reply(reply.nonce, observed, local);
+ log::info!(
+ "NAT echo reply: observed={observed} action={action:?}"
+ );
+
+ // After NAT detection completes, register with DTUN
+ if let crate::nat::EchoReplyAction::DetectionComplete(
+ state,
+ ) = action
+ {
+ log::info!("NAT type detected: {state:?}");
+ if state != NatState::Global && self.is_dtun {
+ self.dtun_register();
+ }
+ if state == NatState::SymmetricNat {
+ // Find a global node to use as proxy
+ let closest = self.dht_table.closest(&self.id, 1);
+ if let Some(server) = closest.first() {
+ self.proxy.set_server(server.clone());
+ let nonce = self.alloc_nonce();
+ if let Some(n) =
+ self.proxy.start_register(nonce)
+ {
+ // Send ProxyRegister msg
+ let size = HEADER_SIZE + 8;
+ let mut buf = vec![0u8; size];
+ let hdr = MsgHeader::new(
+ MsgType::ProxyRegister,
+ Self::len16(size),
+ self.id,
+ server.id,
+ );
+ if hdr.write(&mut buf).is_ok() {
+ let session = self.dtun.session();
+ buf[HEADER_SIZE..HEADER_SIZE + 4]
+ .copy_from_slice(
+ &session.to_be_bytes(),
+ );
+ buf[HEADER_SIZE + 4..HEADER_SIZE + 8]
+ .copy_from_slice(&n.to_be_bytes());
+ if let Err(e) =
+ self.send_signed(&buf, server.addr)
+ {
+ log::warn!(
+ "ProxyRegister send failed: {e}"
+ );
+ }
+ }
+ log::info!(
+ "Sent ProxyRegister to {:?}",
+ server.id
+ );
+ }
+ } else {
+ log::warn!(
+ "No peers available as proxy server"
+ );
+ }
+ }
+ }
+ }
+ }
+ MsgType::NatEchoRedirect | MsgType::NatEchoRedirectReply => {
+ log::debug!("NAT redirect: {:?}", hdr.msg_type);
+ }
+
+ // ── DTUN ────────────────────────────────
+ MsgType::DtunPing => {
+ // Reuse DHT ping handler logic
+ self.handle_dht_ping(buf, &hdr, from);
+ self.dtun.table_mut().add(PeerInfo::new(hdr.src, from));
+ }
+ MsgType::DtunPingReply => {
+ self.handle_dht_ping_reply(buf, &hdr, from);
+ self.dtun.table_mut().add(PeerInfo::new(hdr.src, from));
+ }
+ MsgType::DtunFindNode => {
+ // Respond with closest from DTUN table
+ if let Ok(find) = msg::parse_find_node(buf) {
+ self.dtun.table_mut().add(PeerInfo::new(hdr.src, from));
+ let closest = self
+ .dtun
+ .table()
+ .closest(&find.target, self.config.num_find_node);
+ let domain = if from.is_ipv4() {
+ DOMAIN_INET
+ } else {
+ DOMAIN_INET6
+ };
+ let reply_msg = msg::FindNodeReplyMsg {
+ nonce: find.nonce,
+ id: find.target,
+ domain,
+ nodes: closest,
+ };
+ let mut rbuf = [0u8; 2048];
+ let rhdr = MsgHeader::new(
+ MsgType::DtunFindNodeReply,
+ 0,
+ self.id,
+ hdr.src,
+ );
+ if rhdr.write(&mut rbuf).is_ok() {
+ let total =
+ msg::write_find_node_reply(&mut rbuf, &reply_msg);
+ rbuf[4..6]
+ .copy_from_slice(&Self::len16(total).to_be_bytes());
+ let _ = self.send_signed(&rbuf[..total], from);
+ }
+ }
+ }
+ MsgType::DtunFindNodeReply => {
+ if let Ok(reply) = msg::parse_find_node_reply(buf) {
+ log::debug!(
+ "DTUN find_node_reply: {} nodes",
+ reply.nodes.len()
+ );
+ for node in &reply.nodes {
+ self.dtun.table_mut().add(node.clone());
+ }
+ }
+ }
+ MsgType::DtunFindValue | MsgType::DtunFindValueReply => {
+ log::debug!("DTUN find_value: {:?}", hdr.msg_type);
+ }
+ MsgType::DtunRegister => {
+ if let Ok(session) = msg::parse_dtun_register(buf) {
+ log::debug!(
+ "DTUN register from {:?} session={session}",
+ hdr.src
+ );
+ self.dtun.register_node(hdr.src, from, session);
+ }
+ }
+ MsgType::DtunRequest => {
+ if let Ok((nonce, target)) = msg::parse_dtun_request(buf) {
+ log::debug!("DTUN request for {:?} nonce={nonce}", target);
+
+ // Check if we have this node registered
+ if let Some(reg) = self.dtun.get_registered(&target) {
+ log::debug!(
+ "DTUN: forwarding request to {:?}",
+ reg.addr
+ );
+ }
+ }
+ }
+ MsgType::DtunRequestBy | MsgType::DtunRequestReply => {
+ log::debug!("DTUN request_by/reply: {:?}", hdr.msg_type);
+ }
+
+ // ── Proxy ───────────────────────────────
+ MsgType::ProxyRegister => {
+ if buf.len() >= HEADER_SIZE + 8 {
+ let session = u32::from_be_bytes([
+ buf[HEADER_SIZE],
+ buf[HEADER_SIZE + 1],
+ buf[HEADER_SIZE + 2],
+ buf[HEADER_SIZE + 3],
+ ]);
+ let nonce = u32::from_be_bytes([
+ buf[HEADER_SIZE + 4],
+ buf[HEADER_SIZE + 5],
+ buf[HEADER_SIZE + 6],
+ buf[HEADER_SIZE + 7],
+ ]);
+ log::debug!(
+ "Proxy register from {:?} session={session}",
+ hdr.src
+ );
+ self.proxy.register_client(hdr.src, from, session);
+
+ // Send reply
+ let size = HEADER_SIZE + 4;
+ let mut rbuf = vec![0u8; size];
+ let rhdr = MsgHeader::new(
+ MsgType::ProxyRegisterReply,
+ Self::len16(size),
+ self.id,
+ hdr.src,
+ );
+ if rhdr.write(&mut rbuf).is_ok() {
+ rbuf[HEADER_SIZE..HEADER_SIZE + 4]
+ .copy_from_slice(&nonce.to_be_bytes());
+ let _ = self.send_signed(&rbuf, from);
+ }
+ }
+ }
+ MsgType::ProxyRegisterReply => {
+ if let Ok(nonce) = msg::parse_ping(buf) {
+ self.proxy.recv_register_reply(nonce);
+ }
+ }
+ MsgType::ProxyStore => {
+ // Forward store to DHT on behalf of the client
+ self.handle_dht_store(buf, &hdr, from);
+ }
+ MsgType::ProxyGet | MsgType::ProxyGetReply => {
+ log::debug!("Proxy get/reply: {:?}", hdr.msg_type);
+ }
+ MsgType::ProxyDgram | MsgType::ProxyDgramForwarded => {
+ // Forward dgram payload
+ self.handle_dgram(buf, &hdr);
+ }
+ MsgType::ProxyRdp | MsgType::ProxyRdpForwarded => {
+ // Forward RDP payload
+ self.handle_rdp(buf, &hdr);
+ }
+
+ // ── Transport ───────────────────────────
+ MsgType::Dgram => {
+ self.handle_dgram(buf, &hdr);
+ }
+ MsgType::Rdp => {
+ self.handle_rdp(buf, &hdr);
+ }
+
+ // ── Advertise ───────────────────────────
+ MsgType::Advertise => {
+ if buf.len() >= HEADER_SIZE + 8 {
+ let nonce = u32::from_be_bytes([
+ buf[HEADER_SIZE],
+ buf[HEADER_SIZE + 1],
+ buf[HEADER_SIZE + 2],
+ buf[HEADER_SIZE + 3],
+ ]);
+ log::debug!("Advertise from {:?} nonce={nonce}", hdr.src);
+ self.advertise.recv_advertise(hdr.src);
+
+ // Send reply
+ let size = HEADER_SIZE + 8;
+ let mut rbuf = vec![0u8; size];
+ let rhdr = MsgHeader::new(
+ MsgType::AdvertiseReply,
+ Self::len16(size),
+ self.id,
+ hdr.src,
+ );
+ if rhdr.write(&mut rbuf).is_ok() {
+ rbuf[HEADER_SIZE..HEADER_SIZE + 8].copy_from_slice(
+ &buf[HEADER_SIZE..HEADER_SIZE + 8],
+ );
+ let _ = self.send_signed(&rbuf, from);
+ }
+ }
+ }
+ MsgType::AdvertiseReply => {
+ if buf.len() >= HEADER_SIZE + 8 {
+ let nonce = u32::from_be_bytes([
+ buf[HEADER_SIZE],
+ buf[HEADER_SIZE + 1],
+ buf[HEADER_SIZE + 2],
+ buf[HEADER_SIZE + 3],
+ ]);
+ self.advertise.recv_reply(nonce);
+ }
+ }
+ }
+ }
+
+ // ── DHT message handlers ────────────────────────
+
+ pub(crate) fn handle_dht_ping(
+ &mut self,
+ buf: &[u8],
+ hdr: &MsgHeader,
+ from: SocketAddr,
+ ) {
+ let nonce = match msg::parse_ping(buf) {
+ Ok(n) => n,
+ Err(_) => return,
+ };
+
+ if hdr.dst != self.id {
+ return;
+ }
+
+ log::debug!("DHT ping from {:?} nonce={nonce}", hdr.src);
+
+ // Add to routing table
+ self.dht_table.add(PeerInfo::new(hdr.src, from));
+
+ // Send reply
+ let reply_hdr = MsgHeader::new(
+ MsgType::DhtPingReply,
+ Self::len16(msg::PING_MSG_SIZE),
+ self.id,
+ hdr.src,
+ );
+ let mut reply = [0u8; msg::PING_MSG_SIZE];
+ if reply_hdr.write(&mut reply).is_ok() {
+ msg::write_ping(&mut reply, nonce);
+ let _ = self.send_signed(&reply, from);
+ }
+ }
+
+ pub(crate) fn handle_dht_ping_reply(
+ &mut self,
+ buf: &[u8],
+ hdr: &MsgHeader,
+ from: SocketAddr,
+ ) {
+ let nonce = match msg::parse_ping(buf) {
+ Ok(n) => n,
+ Err(_) => return,
+ };
+
+ if hdr.dst != self.id {
+ return;
+ }
+
+ // Verify we actually sent this nonce
+ match self.pending_pings.remove(&nonce) {
+ Some((expected_id, _sent_at)) => {
+ if expected_id != hdr.src && !expected_id.is_zero() {
+ log::debug!(
+ "Ping reply nonce={nonce}: expected {:?} got {:?}",
+ expected_id,
+ hdr.src
+ );
+ return;
+ }
+ }
+ None => {
+ log::trace!("Ignoring unsolicited ping reply nonce={nonce}");
+ return;
+ }
+ }
+
+ log::debug!("DHT ping reply from {:?} nonce={nonce}", hdr.src);
+
+ self.dht_table.add(PeerInfo::new(hdr.src, from));
+ self.dht_table.mark_seen(&hdr.src);
+ }
+
+ pub(crate) fn handle_dht_find_node(
+ &mut self,
+ buf: &[u8],
+ hdr: &MsgHeader,
+ from: SocketAddr,
+ ) {
+ let find = match msg::parse_find_node(buf) {
+ Ok(f) => f,
+ Err(_) => return,
+ };
+
+ log::debug!(
+ "DHT find_node from {:?} target={:?}",
+ hdr.src,
+ find.target
+ );
+
+ self.dht_table.add(PeerInfo::new(hdr.src, from));
+
+ // Respond with our closest nodes
+ let closest = self
+ .dht_table
+ .closest(&find.target, self.config.num_find_node);
+
+ let domain = if from.is_ipv4() {
+ DOMAIN_INET
+ } else {
+ DOMAIN_INET6
+ };
+
+ let reply_msg = msg::FindNodeReplyMsg {
+ nonce: find.nonce,
+ id: find.target,
+ domain,
+ nodes: closest,
+ };
+
+ let mut reply_buf = [0u8; 2048];
+ let reply_hdr =
+ MsgHeader::new(MsgType::DhtFindNodeReply, 0, self.id, hdr.src);
+ if reply_hdr.write(&mut reply_buf).is_ok() {
+ let total = msg::write_find_node_reply(&mut reply_buf, &reply_msg);
+
+ // Fix the length in header
+ let len_bytes = Self::len16(total).to_be_bytes();
+ reply_buf[4] = len_bytes[0];
+ reply_buf[5] = len_bytes[1];
+ let _ = self.send_signed(&reply_buf[..total], from);
+ }
+ }
+
+ pub(crate) fn handle_dht_find_node_reply(
+ &mut self,
+ buf: &[u8],
+ hdr: &MsgHeader,
+ from: SocketAddr,
+ ) {
+ let reply = match msg::parse_find_node_reply(buf) {
+ Ok(r) => r,
+ Err(_) => return,
+ };
+
+ log::debug!(
+ "DHT find_node_reply from {:?}: {} nodes",
+ hdr.src,
+ reply.nodes.len()
+ );
+
+ // Add sender to routing table
+ self.dht_table.add(PeerInfo::new(hdr.src, from));
+
+ // Add returned nodes, filtering invalid addresses
+ for node in &reply.nodes {
+ // S1-6: reject unroutable addresses
+ let ip = node.addr.ip();
+ if ip.is_unspecified() || ip.is_multicast() {
+ continue;
+ }
+ // Reject zero NodeId
+ if node.id.is_zero() {
+ continue;
+ }
+ let result = self.dht_table.add(node.clone());
+ self.peers.add(node.clone());
+
+ // §2.5: replicate data to newly discovered nodes
+ if matches!(result, crate::routing::InsertResult::Inserted) {
+ self.proactive_replicate(node);
+ }
+ }
+
+ // Feed active queries with the reply
+ let sender_id = hdr.src;
+ let nodes_clone = reply.nodes.clone();
+
+ // Find which query this reply belongs to (match
+ // by nonce or by pending sender).
+ let matching_nonce: Option<u32> = self
+ .queries
+ .iter()
+ .find(|(_, q)| q.pending.contains_key(&sender_id))
+ .map(|(n, _)| *n);
+
+ if let Some(nonce) = matching_nonce {
+ if let Some(q) = self.queries.get_mut(&nonce) {
+ q.process_reply(&sender_id, nodes_clone);
+
+ // Send follow-up find_nodes to newly
+ // discovered nodes
+ let next = q.next_to_query();
+ let target = q.target;
+ for peer in next {
+ if self.send_find_node(peer.addr, target).is_ok() {
+ if let Some(q) = self.queries.get_mut(&nonce) {
+ q.pending.insert(peer.id, Instant::now());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ pub(crate) fn handle_dht_store(
+ &mut self,
+ buf: &[u8],
+ hdr: &MsgHeader,
+ from: SocketAddr,
+ ) {
+ let store = match msg::parse_store(buf) {
+ Ok(s) => s,
+ Err(_) => return,
+ };
+
+ if hdr.dst != self.id {
+ return;
+ }
+
+ // S1-4: enforce max value size
+ if store.value.len() > self.config.max_value_size {
+ log::debug!(
+ "Rejecting oversized store: {} bytes > {} max",
+ store.value.len(),
+ self.config.max_value_size
+ );
+ return;
+ }
+
+ // S2-9: verify sender matches claimed originator
+ if hdr.src != store.from {
+ log::debug!(
+ "Store origin mismatch: sender={:?} from={:?}",
+ hdr.src,
+ store.from
+ );
+ return;
+ }
+
+ log::debug!(
+ "DHT store from {:?}: key={} bytes, value={} bytes, ttl={}",
+ hdr.src,
+ store.key.len(),
+ store.value.len(),
+ store.ttl
+ );
+
+ self.dht_table.add(PeerInfo::new(hdr.src, from));
+
+ if store.ttl == 0 {
+ self.storage.remove(&store.id, &store.key);
+ } else {
+ let val = crate::dht::StoredValue {
+ key: store.key,
+ value: store.value,
+ id: store.id,
+ source: store.from,
+ ttl: store.ttl,
+ stored_at: std::time::Instant::now(),
+ is_unique: store.is_unique,
+ original: 0, // received, not originated
+ recvd: {
+ let mut s = std::collections::HashSet::new();
+ s.insert(hdr.src);
+ s
+ },
+ version: crate::dht::now_version(),
+ };
+ self.storage.store(val);
+ }
+ }
+
+ pub(crate) fn handle_dht_find_value(
+ &mut self,
+ buf: &[u8],
+ hdr: &MsgHeader,
+ from: SocketAddr,
+ ) {
+ let fv = match msg::parse_find_value(buf) {
+ Ok(f) => f,
+ Err(_) => return,
+ };
+
+ log::debug!(
+ "DHT find_value from {:?}: key={} bytes",
+ hdr.src,
+ fv.key.len()
+ );
+
+ self.dht_table.add(PeerInfo::new(hdr.src, from));
+
+ // Check local storage
+ let values = self.storage.get(&fv.target, &fv.key);
+
+ if !values.is_empty() {
+ log::debug!("Found {} value(s) locally", values.len());
+
+ // Send value reply using DHT_FIND_VALUE_REPLY
+ // with DATA_ARE_VALUES flag
+ let val = &values[0];
+ let fixed = msg::FIND_VALUE_REPLY_FIXED;
+ let total = HEADER_SIZE + fixed + val.value.len();
+ let mut rbuf = vec![0u8; total];
+ let rhdr = MsgHeader::new(
+ MsgType::DhtFindValueReply,
+ Self::len16(total),
+ self.id,
+ hdr.src,
+ );
+ if rhdr.write(&mut rbuf).is_ok() {
+ let off = HEADER_SIZE;
+ rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes());
+ fv.target
+ .write_to(&mut rbuf[off + 4..off + 4 + crate::id::ID_LEN]);
+ rbuf[off + 24..off + 26].copy_from_slice(&0u16.to_be_bytes()); // index
+ rbuf[off + 26..off + 28].copy_from_slice(&1u16.to_be_bytes()); // total
+ rbuf[off + 28] = crate::wire::DATA_ARE_VALUES;
+ rbuf[off + 29] = 0;
+ rbuf[off + 30] = 0;
+ rbuf[off + 31] = 0;
+ rbuf[HEADER_SIZE + fixed..].copy_from_slice(&val.value);
+ let _ = self.send_signed(&rbuf, from);
+ }
+ } else {
+ // Send closest nodes as find_node_reply format
+ let closest = self
+ .dht_table
+ .closest(&fv.target, self.config.num_find_node);
+ log::debug!("Value not found, returning {} nodes", closest.len());
+ let domain = if from.is_ipv4() {
+ DOMAIN_INET
+ } else {
+ DOMAIN_INET6
+ };
+ let reply_msg = msg::FindNodeReplyMsg {
+ nonce: fv.nonce,
+ id: fv.target,
+ domain,
+ nodes: closest,
+ };
+ let mut rbuf = [0u8; 2048];
+ let rhdr =
+ MsgHeader::new(MsgType::DhtFindValueReply, 0, self.id, hdr.src);
+ if rhdr.write(&mut rbuf).is_ok() {
+ // Write header with DATA_ARE_NODES flag
+ let off = HEADER_SIZE;
+ rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes());
+ fv.target
+ .write_to(&mut rbuf[off + 4..off + 4 + crate::id::ID_LEN]);
+ rbuf[off + 24..off + 26].fill(0); // index
+ rbuf[off + 26..off + 28].fill(0); // total
+ rbuf[off + 28] = crate::wire::DATA_ARE_NODES;
+ rbuf[off + 29] = 0;
+ rbuf[off + 30] = 0;
+ rbuf[off + 31] = 0;
+
+ // Write nodes after the fixed part
+ let nodes_off = HEADER_SIZE + msg::FIND_VALUE_REPLY_FIXED;
+
+ // Write domain + num + padding before nodes
+ rbuf[nodes_off..nodes_off + 2]
+ .copy_from_slice(&domain.to_be_bytes());
+ rbuf[nodes_off + 2] = reply_msg.nodes.len() as u8;
+ rbuf[nodes_off + 3] = 0;
+ let nw = if domain == DOMAIN_INET {
+ msg::write_nodes_inet(
+ &mut rbuf[nodes_off + 4..],
+ &reply_msg.nodes,
+ )
+ } else {
+ msg::write_nodes_inet6(
+ &mut rbuf[nodes_off + 4..],
+ &reply_msg.nodes,
+ )
+ };
+ let total = nodes_off + 4 + nw;
+ rbuf[4..6].copy_from_slice(&Self::len16(total).to_be_bytes());
+ let _ = self.send_signed(&rbuf[..total], from);
+ }
+ }
+ }
+
+ pub(crate) fn handle_dht_find_value_reply(
+ &mut self,
+ buf: &[u8],
+ hdr: &MsgHeader,
+ ) {
+ let reply = match msg::parse_find_value_reply(buf) {
+ Ok(r) => r,
+ Err(e) => {
+ log::debug!("Failed to parse find_value_reply: {e}");
+ return;
+ }
+ };
+
+ let sender_id = hdr.src;
+
+ // Find the matching active query
+ let matching_nonce: Option<u32> = self
+ .queries
+ .iter()
+ .filter(|(_, q)| q.is_find_value)
+ .find(|(_, q)| q.pending.contains_key(&sender_id))
+ .map(|(n, _)| *n);
+
+ match reply.data {
+ msg::FindValueReplyData::Value { data, .. } => {
+ log::info!(
+ "Received value from {:?}: {} bytes",
+ sender_id,
+ data.len()
+ );
+
+ // Cache the value locally and republish
+ // to the closest queried node without it
+ // (§2.3: cache on nearest without value)
+ if let Some(nonce) = matching_nonce {
+ if let Some(q) = self.queries.get_mut(&nonce) {
+ // Store in local storage for
+ // subsequent get() calls
+ let val = crate::dht::StoredValue {
+ key: q.key.clone(),
+ value: data.clone(),
+ id: q.target,
+ source: sender_id,
+ ttl: 300,
+ stored_at: Instant::now(),
+ is_unique: false,
+ original: 0,
+ recvd: std::collections::HashSet::new(),
+ version: crate::dht::now_version(),
+ };
+ self.storage.store(val);
+
+ // §2.3: store on nearest queried
+ // node that didn't have the value
+ let target = q.target;
+ let key = q.key.clone();
+ let nearest_without: Option<PeerInfo> = q
+ .closest
+ .iter()
+ .find(|p| {
+ q.queried.contains(&p.id)
+ && p.id != sender_id
+ })
+ .cloned();
+
+ q.process_value(data.clone());
+
+ if let Some(peer) = nearest_without {
+ let store_msg = crate::msg::StoreMsg {
+ id: target,
+ from: self.id,
+ key,
+ value: data,
+ ttl: 300,
+ is_unique: false,
+ };
+ if let Err(e) = self.send_store(&peer, &store_msg) {
+ log::debug!(
+ "Republish-on-access failed: {e}"
+ );
+ } else {
+ log::debug!(
+ "Republished value to {:?} (nearest without)",
+ peer.id,
+ );
+ }
+ }
+ }
+ }
+ }
+ msg::FindValueReplyData::Nodes { nodes, .. } => {
+ log::debug!(
+ "find_value_reply from {:?}: {} nodes (no value)",
+ sender_id,
+ nodes.len()
+ );
+
+ // Add nodes to routing table
+ for node in &nodes {
+ self.dht_table.add(node.clone());
+ self.peers.add(node.clone());
+ }
+
+ // Feed the query with the nodes so it
+ // continues iterating
+ if let Some(nonce) = matching_nonce {
+ if let Some(q) = self.queries.get_mut(&nonce) {
+ q.process_reply(&sender_id, nodes.clone());
+
+ // Send follow-up find_value to
+ // newly discovered nodes
+ let next = q.next_to_query();
+ let target = q.target;
+ let key = q.key.clone();
+ for peer in next {
+ if self
+ .send_find_value_msg(peer.addr, target, &key)
+ .is_ok()
+ {
+ if let Some(q) = self.queries.get_mut(&nonce) {
+ q.pending.insert(peer.id, Instant::now());
+ }
+ }
+ }
+ }
+ }
+ }
+ msg::FindValueReplyData::Nul => {
+ log::debug!("find_value_reply NUL from {:?}", sender_id);
+ if let Some(nonce) = matching_nonce {
+ if let Some(q) = self.queries.get_mut(&nonce) {
+ q.pending.remove(&sender_id);
+ q.queried.insert(sender_id);
+ }
+ }
+ }
+ }
+ }
+
+ // ── Data restore / republish ────────────────────
+
+ /// Restore (republish) stored data to k-closest
+ /// nodes. Tracks the `original` counter and `recvd`
+ /// set to avoid unnecessary duplicates.
+ pub(crate) fn restore_data(&mut self) {
+ let values = self.storage.all_values();
+ if values.is_empty() {
+ return;
+ }
+
+ log::debug!("Restoring {} stored values", values.len());
+
+ for val in &values {
+ if val.is_expired() {
+ continue;
+ }
+ let closest =
+ self.dht_table.closest(&val.id, self.config.num_find_node);
+ let store_msg = msg::StoreMsg {
+ id: val.id,
+ from: val.source,
+ key: val.key.clone(),
+ value: val.value.clone(),
+ ttl: val.remaining_ttl(),
+ is_unique: val.is_unique,
+ };
+ for peer in &closest {
+ if peer.id == self.id {
+ continue;
+ }
+
+ // Skip peers that already have this value
+ if val.recvd.contains(&peer.id) {
+ continue;
+ }
+ if let Err(e) = self.send_store(peer, &store_msg) {
+ log::debug!("Restore send failed: {e}");
+ continue;
+ }
+ self.storage.mark_received(val, peer.id);
+ }
+ }
+ }
+
+ /// Run mask_bit exploration: send find_node queries
+ /// to probe distant regions of the ID space.
+ pub(crate) fn run_maintain(&mut self) {
+ let (t1, t2) = self.explorer.next_targets();
+ log::debug!("Maintain: exploring targets {:?} {:?}", t1, t2);
+ let _ = self.start_find_node(t1);
+ let _ = self.start_find_node(t2);
+ }
+
+ // ── Proactive replication (§2.5) ──────────────
+
+ /// Replicate stored data to a newly discovered node
+ /// if it is closer to a key than the current furthest
+ /// holder. This ensures data availability without
+ /// waiting for the periodic restore cycle.
+ ///
+ /// Called when a new node is inserted into the routing
+ /// table (not on updates of existing nodes).
+ pub(crate) fn proactive_replicate(&mut self, new_node: &PeerInfo) {
+ let values = self.storage.all_values();
+ if values.is_empty() {
+ return;
+ }
+
+ let k = self.config.num_find_node;
+ let mut sent = 0u32;
+
+ for val in &values {
+ if val.is_expired() {
+ continue;
+ }
+ // Skip if this node already received the value
+ if val.recvd.contains(&new_node.id) {
+ continue;
+ }
+
+ // Check: is new_node closer to this key than
+ // the furthest current k-closest holder?
+ let closest = self.dht_table.closest(&val.id, k);
+ let furthest = match closest.last() {
+ Some(p) => p,
+ None => continue,
+ };
+
+ let dist_new = val.id.distance(&new_node.id);
+ let dist_far = val.id.distance(&furthest.id);
+
+ if dist_new >= dist_far {
+ continue; // new node is not closer
+ }
+
+ let store_msg = crate::msg::StoreMsg {
+ id: val.id,
+ from: val.source,
+ key: val.key.clone(),
+ value: val.value.clone(),
+ ttl: val.remaining_ttl(),
+ is_unique: val.is_unique,
+ };
+
+ if self.send_store(new_node, &store_msg).is_ok() {
+ self.storage.mark_received(val, new_node.id);
+ sent += 1;
+ }
+ }
+
+ if sent > 0 {
+ log::debug!(
+ "Proactive replicate: sent {sent} values to {:?}",
+ new_node.id,
+ );
+ }
+ }
+}