//! Packet dispatch and message handlers. //! //! Extension impl block for Node. All handle_* //! methods process incoming protocol messages. use std::net::SocketAddr; use std::time::Instant; use crate::msg; use crate::nat::NatState; use crate::node::Node; use crate::peers::PeerInfo; use crate::wire::{DOMAIN_INET, DOMAIN_INET6, HEADER_SIZE, MsgHeader, MsgType}; impl Node { // ── Packet dispatch ───────────────────────────── pub(crate) fn handle_packet(&mut self, raw: &[u8], from: SocketAddr) { self.metrics .bytes_received .fetch_add(raw.len() as u64, std::sync::atomic::Ordering::Relaxed); // Verify signature and strip it let buf = match self.verify_incoming(raw, from) { Some(b) => b, None => { self.metrics .packets_rejected .fetch_add(1, std::sync::atomic::Ordering::Relaxed); log::trace!("Dropped unsigned packet from {from}"); return; } }; self.metrics .messages_received .fetch_add(1, std::sync::atomic::Ordering::Relaxed); // Reject banned peers if self.ban_list.is_banned(&from) { log::trace!("Dropped packet from banned peer {from}"); return; } // Rate limit per source IP if !self.rate_limiter.allow(from.ip()) { log::trace!("Rate limited packet from {from}"); return; } if buf.len() < HEADER_SIZE { return; } let hdr = match MsgHeader::parse(buf) { Ok(h) => h, Err(e) => { log::trace!("Dropped packet from {from}: {e}"); return; } }; log::trace!( "Received {:?} from {from} (src={:?})", hdr.msg_type, hdr.src ); // Register sender as peer and record successful // communication (clears ban list failures) self.peers.add(PeerInfo::new(hdr.src, from)); self.ban_list.record_success(&from); match hdr.msg_type { // ── DHT messages ──────────────────────── MsgType::DhtPing => { self.handle_dht_ping(buf, &hdr, from); } MsgType::DhtPingReply => { self.handle_dht_ping_reply(buf, &hdr, from); } MsgType::DhtFindNode => { self.handle_dht_find_node(buf, &hdr, from); } MsgType::DhtFindNodeReply => { self.handle_dht_find_node_reply(buf, &hdr, from); } MsgType::DhtStore => { self.handle_dht_store(buf, &hdr, from); } MsgType::DhtFindValue => { self.handle_dht_find_value(buf, &hdr, from); } MsgType::DhtFindValueReply => { self.handle_dht_find_value_reply(buf, &hdr); } // ── NAT detection ─────────────────────── MsgType::NatEcho => { // Respond with our observed address of the sender if let Ok(nonce) = msg::parse_nat_echo(buf) { log::debug!("NatEcho from {:?} nonce={nonce}", hdr.src); let reply = msg::NatEchoReply { nonce, domain: if from.is_ipv4() { DOMAIN_INET } else { DOMAIN_INET6 }, port: from.port(), addr: { let mut a = [0u8; 16]; match from.ip() { std::net::IpAddr::V4(v4) => { a[..4].copy_from_slice(&v4.octets()) } std::net::IpAddr::V6(v6) => { a.copy_from_slice(&v6.octets()) } } a }, }; let size = HEADER_SIZE + msg::NAT_ECHO_REPLY_BODY; let mut rbuf = vec![0u8; size]; let rhdr = MsgHeader::new( MsgType::NatEchoReply, Self::len16(size), self.id, hdr.src, ); if rhdr.write(&mut rbuf).is_ok() { msg::write_nat_echo_reply(&mut rbuf, &reply); let _ = self.send_signed(&rbuf, from); } } } MsgType::NatEchoReply => { if let Ok(reply) = msg::parse_nat_echo_reply(buf) { let observed_port = reply.port; let observed_ip: std::net::IpAddr = if reply.domain == DOMAIN_INET { std::net::IpAddr::V4(std::net::Ipv4Addr::new( reply.addr[0], reply.addr[1], reply.addr[2], reply.addr[3], )) } else { std::net::IpAddr::V6(std::net::Ipv6Addr::from( reply.addr, )) }; let observed = SocketAddr::new(observed_ip, observed_port); let local = self.net.local_addr().unwrap_or(from); let action = self.nat.recv_echo_reply(reply.nonce, observed, local); log::info!( "NAT echo reply: observed={observed} action={action:?}" ); // After NAT detection completes, register with DTUN if let crate::nat::EchoReplyAction::DetectionComplete( state, ) = action { log::info!("NAT type detected: {state:?}"); if state != NatState::Global && self.is_dtun { self.dtun_register(); } if state == NatState::SymmetricNat { // Find a global node to use as proxy let closest = self.dht_table.closest(&self.id, 1); if let Some(server) = closest.first() { self.proxy.set_server(server.clone()); let nonce = self.alloc_nonce(); if let Some(n) = self.proxy.start_register(nonce) { // Send ProxyRegister msg let size = HEADER_SIZE + 8; let mut buf = vec![0u8; size]; let hdr = MsgHeader::new( MsgType::ProxyRegister, Self::len16(size), self.id, server.id, ); if hdr.write(&mut buf).is_ok() { let session = self.dtun.session(); buf[HEADER_SIZE..HEADER_SIZE + 4] .copy_from_slice( &session.to_be_bytes(), ); buf[HEADER_SIZE + 4..HEADER_SIZE + 8] .copy_from_slice(&n.to_be_bytes()); if let Err(e) = self.send_signed(&buf, server.addr) { log::warn!( "ProxyRegister send failed: {e}" ); } } log::info!( "Sent ProxyRegister to {:?}", server.id ); } } else { log::warn!( "No peers available as proxy server" ); } } } } } MsgType::NatEchoRedirect | MsgType::NatEchoRedirectReply => { log::debug!("NAT redirect: {:?}", hdr.msg_type); } // ── DTUN ──────────────────────────────── MsgType::DtunPing => { // Reuse DHT ping handler logic self.handle_dht_ping(buf, &hdr, from); self.dtun.table_mut().add(PeerInfo::new(hdr.src, from)); } MsgType::DtunPingReply => { self.handle_dht_ping_reply(buf, &hdr, from); self.dtun.table_mut().add(PeerInfo::new(hdr.src, from)); } MsgType::DtunFindNode => { // Respond with closest from DTUN table if let Ok(find) = msg::parse_find_node(buf) { self.dtun.table_mut().add(PeerInfo::new(hdr.src, from)); let closest = self .dtun .table() .closest(&find.target, self.config.num_find_node); let domain = if from.is_ipv4() { DOMAIN_INET } else { DOMAIN_INET6 }; let reply_msg = msg::FindNodeReplyMsg { nonce: find.nonce, id: find.target, domain, nodes: closest, }; let mut rbuf = [0u8; 2048]; let rhdr = MsgHeader::new( MsgType::DtunFindNodeReply, 0, self.id, hdr.src, ); if rhdr.write(&mut rbuf).is_ok() { let total = msg::write_find_node_reply(&mut rbuf, &reply_msg); rbuf[4..6] .copy_from_slice(&Self::len16(total).to_be_bytes()); let _ = self.send_signed(&rbuf[..total], from); } } } MsgType::DtunFindNodeReply => { if let Ok(reply) = msg::parse_find_node_reply(buf) { log::debug!( "DTUN find_node_reply: {} nodes", reply.nodes.len() ); for node in &reply.nodes { self.dtun.table_mut().add(node.clone()); } } } MsgType::DtunFindValue | MsgType::DtunFindValueReply => { log::debug!("DTUN find_value: {:?}", hdr.msg_type); } MsgType::DtunRegister => { if let Ok(session) = msg::parse_dtun_register(buf) { log::debug!( "DTUN register from {:?} session={session}", hdr.src ); self.dtun.register_node(hdr.src, from, session); } } MsgType::DtunRequest => { if let Ok((nonce, target)) = msg::parse_dtun_request(buf) { log::debug!("DTUN request for {:?} nonce={nonce}", target); // Check if we have this node registered if let Some(reg) = self.dtun.get_registered(&target) { log::debug!( "DTUN: forwarding request to {:?}", reg.addr ); } } } MsgType::DtunRequestBy | MsgType::DtunRequestReply => { log::debug!("DTUN request_by/reply: {:?}", hdr.msg_type); } // ── Proxy ─────────────────────────────── MsgType::ProxyRegister => { if buf.len() >= HEADER_SIZE + 8 { let session = u32::from_be_bytes([ buf[HEADER_SIZE], buf[HEADER_SIZE + 1], buf[HEADER_SIZE + 2], buf[HEADER_SIZE + 3], ]); let nonce = u32::from_be_bytes([ buf[HEADER_SIZE + 4], buf[HEADER_SIZE + 5], buf[HEADER_SIZE + 6], buf[HEADER_SIZE + 7], ]); log::debug!( "Proxy register from {:?} session={session}", hdr.src ); self.proxy.register_client(hdr.src, from, session); // Send reply let size = HEADER_SIZE + 4; let mut rbuf = vec![0u8; size]; let rhdr = MsgHeader::new( MsgType::ProxyRegisterReply, Self::len16(size), self.id, hdr.src, ); if rhdr.write(&mut rbuf).is_ok() { rbuf[HEADER_SIZE..HEADER_SIZE + 4] .copy_from_slice(&nonce.to_be_bytes()); let _ = self.send_signed(&rbuf, from); } } } MsgType::ProxyRegisterReply => { if let Ok(nonce) = msg::parse_ping(buf) { self.proxy.recv_register_reply(nonce); } } MsgType::ProxyStore => { // Forward store to DHT on behalf of the client self.handle_dht_store(buf, &hdr, from); } MsgType::ProxyGet | MsgType::ProxyGetReply => { log::debug!("Proxy get/reply: {:?}", hdr.msg_type); } MsgType::ProxyDgram | MsgType::ProxyDgramForwarded => { // Forward dgram payload self.handle_dgram(buf, &hdr); } MsgType::ProxyRdp | MsgType::ProxyRdpForwarded => { // Forward RDP payload self.handle_rdp(buf, &hdr); } // ── Transport ─────────────────────────── MsgType::Dgram => { self.handle_dgram(buf, &hdr); } MsgType::Rdp => { self.handle_rdp(buf, &hdr); } // ── Advertise ─────────────────────────── MsgType::Advertise => { if buf.len() >= HEADER_SIZE + 8 { let nonce = u32::from_be_bytes([ buf[HEADER_SIZE], buf[HEADER_SIZE + 1], buf[HEADER_SIZE + 2], buf[HEADER_SIZE + 3], ]); log::debug!("Advertise from {:?} nonce={nonce}", hdr.src); self.advertise.recv_advertise(hdr.src); // Send reply let size = HEADER_SIZE + 8; let mut rbuf = vec![0u8; size]; let rhdr = MsgHeader::new( MsgType::AdvertiseReply, Self::len16(size), self.id, hdr.src, ); if rhdr.write(&mut rbuf).is_ok() { rbuf[HEADER_SIZE..HEADER_SIZE + 8].copy_from_slice( &buf[HEADER_SIZE..HEADER_SIZE + 8], ); let _ = self.send_signed(&rbuf, from); } } } MsgType::AdvertiseReply => { if buf.len() >= HEADER_SIZE + 8 { let nonce = u32::from_be_bytes([ buf[HEADER_SIZE], buf[HEADER_SIZE + 1], buf[HEADER_SIZE + 2], buf[HEADER_SIZE + 3], ]); self.advertise.recv_reply(nonce); } } } } // ── DHT message handlers ──────────────────────── pub(crate) fn handle_dht_ping( &mut self, buf: &[u8], hdr: &MsgHeader, from: SocketAddr, ) { let nonce = match msg::parse_ping(buf) { Ok(n) => n, Err(_) => return, }; if hdr.dst != self.id { return; } log::debug!("DHT ping from {:?} nonce={nonce}", hdr.src); // Add to routing table self.dht_table.add(PeerInfo::new(hdr.src, from)); // Send reply let reply_hdr = MsgHeader::new( MsgType::DhtPingReply, Self::len16(msg::PING_MSG_SIZE), self.id, hdr.src, ); let mut reply = [0u8; msg::PING_MSG_SIZE]; if reply_hdr.write(&mut reply).is_ok() { msg::write_ping(&mut reply, nonce); let _ = self.send_signed(&reply, from); } } pub(crate) fn handle_dht_ping_reply( &mut self, buf: &[u8], hdr: &MsgHeader, from: SocketAddr, ) { let nonce = match msg::parse_ping(buf) { Ok(n) => n, Err(_) => return, }; if hdr.dst != self.id { return; } // Verify we actually sent this nonce match self.pending_pings.remove(&nonce) { Some((expected_id, _sent_at)) => { if expected_id != hdr.src && !expected_id.is_zero() { log::debug!( "Ping reply nonce={nonce}: expected {:?} got {:?}", expected_id, hdr.src ); return; } } None => { log::trace!("Ignoring unsolicited ping reply nonce={nonce}"); return; } } log::debug!("DHT ping reply from {:?} nonce={nonce}", hdr.src); self.dht_table.add(PeerInfo::new(hdr.src, from)); self.dht_table.mark_seen(&hdr.src); } pub(crate) fn handle_dht_find_node( &mut self, buf: &[u8], hdr: &MsgHeader, from: SocketAddr, ) { let find = match msg::parse_find_node(buf) { Ok(f) => f, Err(_) => return, }; log::debug!( "DHT find_node from {:?} target={:?}", hdr.src, find.target ); self.dht_table.add(PeerInfo::new(hdr.src, from)); // Respond with our closest nodes let closest = self .dht_table .closest(&find.target, self.config.num_find_node); let domain = if from.is_ipv4() { DOMAIN_INET } else { DOMAIN_INET6 }; let reply_msg = msg::FindNodeReplyMsg { nonce: find.nonce, id: find.target, domain, nodes: closest, }; let mut reply_buf = [0u8; 2048]; let reply_hdr = MsgHeader::new(MsgType::DhtFindNodeReply, 0, self.id, hdr.src); if reply_hdr.write(&mut reply_buf).is_ok() { let total = msg::write_find_node_reply(&mut reply_buf, &reply_msg); // Fix the length in header let len_bytes = Self::len16(total).to_be_bytes(); reply_buf[4] = len_bytes[0]; reply_buf[5] = len_bytes[1]; let _ = self.send_signed(&reply_buf[..total], from); } } pub(crate) fn handle_dht_find_node_reply( &mut self, buf: &[u8], hdr: &MsgHeader, from: SocketAddr, ) { let reply = match msg::parse_find_node_reply(buf) { Ok(r) => r, Err(_) => return, }; log::debug!( "DHT find_node_reply from {:?}: {} nodes", hdr.src, reply.nodes.len() ); // Add sender to routing table self.dht_table.add(PeerInfo::new(hdr.src, from)); // Add returned nodes, filtering invalid addresses for node in &reply.nodes { // S1-6: reject unroutable addresses let ip = node.addr.ip(); if ip.is_unspecified() || ip.is_multicast() { continue; } // Reject zero NodeId if node.id.is_zero() { continue; } let result = self.dht_table.add(node.clone()); self.peers.add(node.clone()); // §2.5: replicate data to newly discovered nodes if matches!(result, crate::routing::InsertResult::Inserted) { self.proactive_replicate(node); } } // Feed active queries with the reply let sender_id = hdr.src; let nodes_clone = reply.nodes.clone(); // Find which query this reply belongs to (match // by nonce or by pending sender). let matching_nonce: Option = self .queries .iter() .find(|(_, q)| q.pending.contains_key(&sender_id)) .map(|(n, _)| *n); if let Some(nonce) = matching_nonce { if let Some(q) = self.queries.get_mut(&nonce) { q.process_reply(&sender_id, nodes_clone); // Send follow-up find_nodes to newly // discovered nodes let next = q.next_to_query(); let target = q.target; for peer in next { if self.send_find_node(peer.addr, target).is_ok() { if let Some(q) = self.queries.get_mut(&nonce) { q.pending.insert(peer.id, Instant::now()); } } } } } } pub(crate) fn handle_dht_store( &mut self, buf: &[u8], hdr: &MsgHeader, from: SocketAddr, ) { let store = match msg::parse_store(buf) { Ok(s) => s, Err(_) => return, }; if hdr.dst != self.id { return; } // S1-4: enforce max value size if store.value.len() > self.config.max_value_size { log::debug!( "Rejecting oversized store: {} bytes > {} max", store.value.len(), self.config.max_value_size ); return; } // S2-9: verify sender matches claimed originator if hdr.src != store.from { log::debug!( "Store origin mismatch: sender={:?} from={:?}", hdr.src, store.from ); return; } log::debug!( "DHT store from {:?}: key={} bytes, value={} bytes, ttl={}", hdr.src, store.key.len(), store.value.len(), store.ttl ); self.dht_table.add(PeerInfo::new(hdr.src, from)); if store.ttl == 0 { self.storage.remove(&store.id, &store.key); } else { let val = crate::dht::StoredValue { key: store.key, value: store.value, id: store.id, source: store.from, ttl: store.ttl, stored_at: std::time::Instant::now(), is_unique: store.is_unique, original: 0, // received, not originated recvd: { let mut s = std::collections::HashSet::new(); s.insert(hdr.src); s }, version: crate::dht::now_version(), }; self.storage.store(val); } } pub(crate) fn handle_dht_find_value( &mut self, buf: &[u8], hdr: &MsgHeader, from: SocketAddr, ) { let fv = match msg::parse_find_value(buf) { Ok(f) => f, Err(_) => return, }; log::debug!( "DHT find_value from {:?}: key={} bytes", hdr.src, fv.key.len() ); self.dht_table.add(PeerInfo::new(hdr.src, from)); // Check local storage let values = self.storage.get(&fv.target, &fv.key); if !values.is_empty() { log::debug!("Found {} value(s) locally", values.len()); // Send value reply using DHT_FIND_VALUE_REPLY // with DATA_ARE_VALUES flag let val = &values[0]; let fixed = msg::FIND_VALUE_REPLY_FIXED; let total = HEADER_SIZE + fixed + val.value.len(); let mut rbuf = vec![0u8; total]; let rhdr = MsgHeader::new( MsgType::DhtFindValueReply, Self::len16(total), self.id, hdr.src, ); if rhdr.write(&mut rbuf).is_ok() { let off = HEADER_SIZE; rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes()); fv.target .write_to(&mut rbuf[off + 4..off + 4 + crate::id::ID_LEN]); rbuf[off + 24..off + 26].copy_from_slice(&0u16.to_be_bytes()); // index rbuf[off + 26..off + 28].copy_from_slice(&1u16.to_be_bytes()); // total rbuf[off + 28] = crate::wire::DATA_ARE_VALUES; rbuf[off + 29] = 0; rbuf[off + 30] = 0; rbuf[off + 31] = 0; rbuf[HEADER_SIZE + fixed..].copy_from_slice(&val.value); let _ = self.send_signed(&rbuf, from); } } else { // Send closest nodes as find_node_reply format let closest = self .dht_table .closest(&fv.target, self.config.num_find_node); log::debug!("Value not found, returning {} nodes", closest.len()); let domain = if from.is_ipv4() { DOMAIN_INET } else { DOMAIN_INET6 }; let reply_msg = msg::FindNodeReplyMsg { nonce: fv.nonce, id: fv.target, domain, nodes: closest, }; let mut rbuf = [0u8; 2048]; let rhdr = MsgHeader::new(MsgType::DhtFindValueReply, 0, self.id, hdr.src); if rhdr.write(&mut rbuf).is_ok() { // Write header with DATA_ARE_NODES flag let off = HEADER_SIZE; rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes()); fv.target .write_to(&mut rbuf[off + 4..off + 4 + crate::id::ID_LEN]); rbuf[off + 24..off + 26].fill(0); // index rbuf[off + 26..off + 28].fill(0); // total rbuf[off + 28] = crate::wire::DATA_ARE_NODES; rbuf[off + 29] = 0; rbuf[off + 30] = 0; rbuf[off + 31] = 0; // Write nodes after the fixed part let nodes_off = HEADER_SIZE + msg::FIND_VALUE_REPLY_FIXED; // Write domain + num + padding before nodes rbuf[nodes_off..nodes_off + 2] .copy_from_slice(&domain.to_be_bytes()); rbuf[nodes_off + 2] = reply_msg.nodes.len() as u8; rbuf[nodes_off + 3] = 0; let nw = if domain == DOMAIN_INET { msg::write_nodes_inet( &mut rbuf[nodes_off + 4..], &reply_msg.nodes, ) } else { msg::write_nodes_inet6( &mut rbuf[nodes_off + 4..], &reply_msg.nodes, ) }; let total = nodes_off + 4 + nw; rbuf[4..6].copy_from_slice(&Self::len16(total).to_be_bytes()); let _ = self.send_signed(&rbuf[..total], from); } } } pub(crate) fn handle_dht_find_value_reply( &mut self, buf: &[u8], hdr: &MsgHeader, ) { let reply = match msg::parse_find_value_reply(buf) { Ok(r) => r, Err(e) => { log::debug!("Failed to parse find_value_reply: {e}"); return; } }; let sender_id = hdr.src; // Find the matching active query let matching_nonce: Option = self .queries .iter() .filter(|(_, q)| q.is_find_value) .find(|(_, q)| q.pending.contains_key(&sender_id)) .map(|(n, _)| *n); match reply.data { msg::FindValueReplyData::Value { data, .. } => { log::info!( "Received value from {:?}: {} bytes", sender_id, data.len() ); // Cache the value locally and republish // to the closest queried node without it // (§2.3: cache on nearest without value) if let Some(nonce) = matching_nonce { if let Some(q) = self.queries.get_mut(&nonce) { // Store in local storage for // subsequent get() calls let val = crate::dht::StoredValue { key: q.key.clone(), value: data.clone(), id: q.target, source: sender_id, ttl: 300, stored_at: Instant::now(), is_unique: false, original: 0, recvd: std::collections::HashSet::new(), version: crate::dht::now_version(), }; self.storage.store(val); // §2.3: store on nearest queried // node that didn't have the value let target = q.target; let key = q.key.clone(); let nearest_without: Option = q .closest .iter() .find(|p| { q.queried.contains(&p.id) && p.id != sender_id }) .cloned(); q.process_value(data.clone()); if let Some(peer) = nearest_without { let store_msg = crate::msg::StoreMsg { id: target, from: self.id, key, value: data, ttl: 300, is_unique: false, }; if let Err(e) = self.send_store(&peer, &store_msg) { log::debug!( "Republish-on-access failed: {e}" ); } else { log::debug!( "Republished value to {:?} (nearest without)", peer.id, ); } } } } } msg::FindValueReplyData::Nodes { nodes, .. } => { log::debug!( "find_value_reply from {:?}: {} nodes (no value)", sender_id, nodes.len() ); // Add nodes to routing table for node in &nodes { self.dht_table.add(node.clone()); self.peers.add(node.clone()); } // Feed the query with the nodes so it // continues iterating if let Some(nonce) = matching_nonce { if let Some(q) = self.queries.get_mut(&nonce) { q.process_reply(&sender_id, nodes.clone()); // Send follow-up find_value to // newly discovered nodes let next = q.next_to_query(); let target = q.target; let key = q.key.clone(); for peer in next { if self .send_find_value_msg(peer.addr, target, &key) .is_ok() { if let Some(q) = self.queries.get_mut(&nonce) { q.pending.insert(peer.id, Instant::now()); } } } } } } msg::FindValueReplyData::Nul => { log::debug!("find_value_reply NUL from {:?}", sender_id); if let Some(nonce) = matching_nonce { if let Some(q) = self.queries.get_mut(&nonce) { q.pending.remove(&sender_id); q.queried.insert(sender_id); } } } } } // ── Data restore / republish ──────────────────── /// Restore (republish) stored data to k-closest /// nodes. Tracks the `original` counter and `recvd` /// set to avoid unnecessary duplicates. pub(crate) fn restore_data(&mut self) { let values = self.storage.all_values(); if values.is_empty() { return; } log::debug!("Restoring {} stored values", values.len()); for val in &values { if val.is_expired() { continue; } let closest = self.dht_table.closest(&val.id, self.config.num_find_node); let store_msg = msg::StoreMsg { id: val.id, from: val.source, key: val.key.clone(), value: val.value.clone(), ttl: val.remaining_ttl(), is_unique: val.is_unique, }; for peer in &closest { if peer.id == self.id { continue; } // Skip peers that already have this value if val.recvd.contains(&peer.id) { continue; } if let Err(e) = self.send_store(peer, &store_msg) { log::debug!("Restore send failed: {e}"); continue; } self.storage.mark_received(val, peer.id); } } } /// Run mask_bit exploration: send find_node queries /// to probe distant regions of the ID space. pub(crate) fn run_maintain(&mut self) { let (t1, t2) = self.explorer.next_targets(); log::debug!("Maintain: exploring targets {:?} {:?}", t1, t2); let _ = self.start_find_node(t1); let _ = self.start_find_node(t2); } // ── Proactive replication (§2.5) ────────────── /// Replicate stored data to a newly discovered node /// if it is closer to a key than the current furthest /// holder. This ensures data availability without /// waiting for the periodic restore cycle. /// /// Called when a new node is inserted into the routing /// table (not on updates of existing nodes). pub(crate) fn proactive_replicate(&mut self, new_node: &PeerInfo) { let values = self.storage.all_values(); if values.is_empty() { return; } let k = self.config.num_find_node; let mut sent = 0u32; for val in &values { if val.is_expired() { continue; } // Skip if this node already received the value if val.recvd.contains(&new_node.id) { continue; } // Check: is new_node closer to this key than // the furthest current k-closest holder? let closest = self.dht_table.closest(&val.id, k); let furthest = match closest.last() { Some(p) => p, None => continue, }; let dist_new = val.id.distance(&new_node.id); let dist_far = val.id.distance(&furthest.id); if dist_new >= dist_far { continue; // new node is not closer } let store_msg = crate::msg::StoreMsg { id: val.id, from: val.source, key: val.key.clone(), value: val.value.clone(), ttl: val.remaining_ttl(), is_unique: val.is_unique, }; if self.send_store(new_node, &store_msg).is_ok() { self.storage.mark_received(val, new_node.id); sent += 1; } } if sent > 0 { log::debug!( "Proactive replicate: sent {sent} values to {:?}", new_node.id, ); } } }