//! Integration tests: multi-node scenarios over real //! UDP sockets on loopback. use std::time::Duration; use tesseras_dht::Node; use tesseras_dht::nat::NatState; /// Poll all nodes once each (non-blocking best-effort). fn poll_all(nodes: &mut [Node], rounds: usize) { let fast = Duration::from_millis(1); for _ in 0..rounds { for n in nodes.iter_mut() { n.poll_timeout(fast).ok(); } } } /// Create N nodes, join them to the first. fn make_network(n: usize) -> Vec { let mut nodes = Vec::with_capacity(n); let bootstrap = Node::bind(0).unwrap(); let bp = bootstrap.local_addr().unwrap().port(); nodes.push(bootstrap); nodes[0].set_nat_state(NatState::Global); for _ in 1..n { let mut node = Node::bind(0).unwrap(); node.set_nat_state(NatState::Global); node.join("127.0.0.1", bp).unwrap(); nodes.push(node); } // Small sleep to let packets arrive, then poll std::thread::sleep(Duration::from_millis(50)); poll_all(&mut nodes, 5); nodes } // ── Bootstrap tests ───────────────────────────────── #[test] fn two_nodes_discover_each_other() { let nodes = make_network(2); assert!( nodes[0].routing_table_size() >= 1, "Node 0 should have at least 1 peer in routing table" ); assert!( nodes[1].routing_table_size() >= 1, "Node 1 should have at least 1 peer in routing table" ); } #[test] fn three_nodes_form_network() { let nodes = make_network(3); // All nodes should know at least 1 other node for (i, node) in nodes.iter().enumerate() { assert!( node.routing_table_size() >= 1, "Node {i} routing table empty" ); } } #[test] fn five_nodes_routing_tables() { let nodes = make_network(5); // With 5 nodes, most should know 2+ peers let total_peers: usize = nodes.iter().map(|n| n.routing_table_size()).sum(); assert!( total_peers >= 5, "Total routing entries ({total_peers}) too low" ); } // ── Put/Get tests ─────────────────────────────────── #[test] fn put_get_local() { let mut node = Node::bind(0).unwrap(); node.put(b"key1", b"value1", 300, false); let vals = node.get(b"key1"); assert_eq!(vals.len(), 1); assert_eq!(vals[0], b"value1"); } #[test] fn put_get_across_two_nodes() { let mut nodes = make_network(2); // Node 0 stores nodes[0].put(b"hello", b"world", 300, false); // Poll to deliver STORE std::thread::sleep(Duration::from_millis(50)); poll_all(&mut nodes, 5); // Node 1 should have the value (received via STORE) let vals = nodes[1].get(b"hello"); assert_eq!(vals.len(), 1, "Node 1 should have received the value"); assert_eq!(vals[0], b"world"); } #[test] fn put_multiple_values() { let mut nodes = make_network(3); // Store 10 key-value pairs from node 0 for i in 0..10u32 { let key = format!("k{i}"); let val = format!("v{i}"); nodes[0].put(key.as_bytes(), val.as_bytes(), 300, false); } std::thread::sleep(Duration::from_millis(50)); poll_all(&mut nodes, 5); // Node 0 should have all 10 let mut found = 0; for i in 0..10u32 { let key = format!("k{i}"); if !nodes[0].get(key.as_bytes()).is_empty() { found += 1; } } assert_eq!(found, 10, "Node 0 should have all 10 values"); } #[test] fn put_unique_replaces() { let mut node = Node::bind(0).unwrap(); node.put(b"uk", b"first", 300, true); node.put(b"uk", b"second", 300, true); let vals = node.get(b"uk"); assert_eq!(vals.len(), 1); assert_eq!(vals[0], b"second"); } #[test] fn put_get_distributed() { let mut nodes = make_network(5); // Each node stores one value for i in 0..5u32 { let key = format!("node{i}-key"); let val = format!("node{i}-val"); nodes[i as usize].put(key.as_bytes(), val.as_bytes(), 300, false); } std::thread::sleep(Duration::from_millis(50)); poll_all(&mut nodes, 5); // Each node should have its own value at minimum for i in 0..5u32 { let key = format!("node{i}-key"); let vals = nodes[i as usize].get(key.as_bytes()); assert!(!vals.is_empty(), "Node {i} should have its own value"); } } // ── Identity tests ────────────────────────────────── #[test] fn set_id_deterministic() { let mut n1 = Node::bind(0).unwrap(); let mut n2 = Node::bind(0).unwrap(); n1.set_id(b"same-seed"); n2.set_id(b"same-seed"); assert_eq!(n1.id(), n2.id()); } #[test] fn node_id_is_unique() { let n1 = Node::bind(0).unwrap(); let n2 = Node::bind(0).unwrap(); assert_ne!(n1.id(), n2.id()); } // ── NAT state tests ──────────────────────────────── #[test] fn nat_state_transitions() { let mut node = Node::bind(0).unwrap(); assert_eq!(node.nat_state(), NatState::Unknown); node.set_nat_state(NatState::Global); assert_eq!(node.nat_state(), NatState::Global); node.set_nat_state(NatState::ConeNat); assert_eq!(node.nat_state(), NatState::ConeNat); node.set_nat_state(NatState::SymmetricNat); assert_eq!(node.nat_state(), NatState::SymmetricNat); } // ── RDP tests ─────────────────────────────────────── #[test] fn rdp_listen_connect_close() { let mut node = Node::bind(0).unwrap(); let desc = node.rdp_listen(5000).unwrap(); node.rdp_close(desc); // Should be able to re-listen let desc2 = node.rdp_listen(5000).unwrap(); node.rdp_close(desc2); } #[test] fn rdp_connect_state() { use tesseras_dht::rdp::RdpState; let mut node = Node::bind(0).unwrap(); let dst = tesseras_dht::NodeId::from_bytes([0x01; 32]); let desc = node.rdp_connect(0, &dst, 5000).unwrap(); assert_eq!(node.rdp_state(desc).unwrap(), RdpState::SynSent); node.rdp_close(desc); } // ── Resilience tests ──────────────────────────────── #[test] fn poll_with_no_peers() { let mut node = Node::bind(0).unwrap(); // Should not panic or block node.poll().unwrap(); } #[test] fn join_invalid_address() { let mut node = Node::bind(0).unwrap(); let result = node.join("this-does-not-exist.invalid", 9999); assert!(result.is_err()); } #[test] fn empty_get() { let mut node = Node::bind(0).unwrap(); assert!(node.get(b"nonexistent").is_empty()); } #[test] fn put_zero_ttl_removes() { let mut node = Node::bind(0).unwrap(); node.put(b"temp", b"data", 300, false); assert!(!node.get(b"temp").is_empty()); // Store with TTL 0 is a delete in the protocol // (handled at the wire level in handle_dht_store, // but locally we'd need to call storage.remove). // This test validates the local storage. } // ── Scale test ────────────────────────────────────── #[test] fn ten_nodes_put_get() { let mut nodes = make_network(10); // Node 0 stores nodes[0].put(b"scale-key", b"scale-val", 300, false); std::thread::sleep(Duration::from_millis(50)); poll_all(&mut nodes, 5); // Count how many nodes received the value let mut count = 0; for node in &mut nodes { if !node.get(b"scale-key").is_empty() { count += 1; } } assert!( count >= 2, "At least 2 nodes should have the value, got {count}" ); } // ── Remote get via FIND_VALUE ─────────────────────── #[test] fn remote_get_via_find_value() { let mut nodes = make_network(3); // Node 0 stores locally nodes[0].put(b"remote-key", b"remote-val", 300, false); std::thread::sleep(Duration::from_millis(50)); poll_all(&mut nodes, 5); // Node 2 does remote get let before = nodes[2].get(b"remote-key"); // Might already have it from STORE, or empty if before.is_empty() { // Poll to let FIND_VALUE propagate std::thread::sleep(Duration::from_millis(100)); poll_all(&mut nodes, 10); let after = nodes[2].get(b"remote-key"); assert!( !after.is_empty(), "Node 2 should find the value via FIND_VALUE" ); assert_eq!(after[0], b"remote-val"); } } // ── NAT detection tests ──────────────────────────── #[test] fn nat_state_default_unknown() { let node = Node::bind(0).unwrap(); assert_eq!(node.nat_state(), NatState::Unknown); } #[test] fn nat_state_set_persists() { let mut node = Node::bind(0).unwrap(); node.set_nat_state(NatState::SymmetricNat); assert_eq!(node.nat_state(), NatState::SymmetricNat); node.poll().unwrap(); assert_eq!(node.nat_state(), NatState::SymmetricNat); } // ── DTUN tests ────────────────────────────────────── #[test] fn dtun_find_node_exchange() { // Two nodes: node2 sends DtunFindNode to node1 // by joining. The DTUN table should be populated. let nodes = make_network(2); // Both nodes should have peers after join assert!(nodes[0].routing_table_size() >= 1); assert!(nodes[1].routing_table_size() >= 1); } // ── Proxy tests ───────────────────────────────────── #[test] fn proxy_dgram_forwarded() { use std::sync::{Arc, Mutex}; let mut nodes = make_network(2); let received: Arc>>> = Arc::new(Mutex::new(Vec::new())); let recv_clone = received.clone(); nodes[1].set_dgram_callback(move |data, _from| { recv_clone.lock().unwrap().push(data.to_vec()); }); // Node 0 sends dgram to Node 1 let id1 = *nodes[1].id(); nodes[0].send_dgram(b"proxy-test", &id1); std::thread::sleep(Duration::from_millis(50)); poll_all(&mut nodes, 5); let msgs = received.lock().unwrap(); assert!(!msgs.is_empty(), "Node 1 should receive the dgram"); assert_eq!(msgs[0], b"proxy-test"); } // ── Advertise tests ───────────────────────────────── #[test] fn nodes_peer_count_after_join() { let nodes = make_network(3); // All nodes should have at least 1 peer for (i, node) in nodes.iter().enumerate() { assert!( node.peer_count() >= 1, "Node {i} should have at least 1 peer" ); } } // ── Storage tests ─────────────────────────────────── #[test] fn storage_count_after_put() { let mut node = Node::bind(0).unwrap(); assert_eq!(node.storage_count(), 0); node.put(b"k1", b"v1", 300, false); node.put(b"k2", b"v2", 300, false); assert_eq!(node.storage_count(), 2); } #[test] fn put_from_multiple_nodes() { let mut nodes = make_network(3); nodes[0].put(b"from-0", b"val-0", 300, false); nodes[1].put(b"from-1", b"val-1", 300, false); nodes[2].put(b"from-2", b"val-2", 300, false); std::thread::sleep(Duration::from_millis(50)); poll_all(&mut nodes, 5); // Each node should have its own value assert!(!nodes[0].get(b"from-0").is_empty()); assert!(!nodes[1].get(b"from-1").is_empty()); assert!(!nodes[2].get(b"from-2").is_empty()); } // ── Config tests ──────────────────────────────────── #[test] fn config_default_works() { let config = tesseras_dht::config::Config::default(); assert_eq!(config.num_find_node, 10); assert_eq!(config.bucket_size, 20); assert_eq!(config.default_ttl, 300); } #[test] fn config_pastebin_preset() { let config = tesseras_dht::config::Config::pastebin(); assert_eq!(config.default_ttl, 65535); assert!(config.require_signatures); } // ── Metrics tests ─────────────────────────────────── #[test] fn metrics_after_put() { let mut nodes = make_network(2); let before = nodes[0].metrics(); nodes[0].put(b"m-key", b"m-val", 300, false); std::thread::sleep(Duration::from_millis(50)); poll_all(&mut nodes, 5); let after = nodes[0].metrics(); assert!( after.messages_sent > before.messages_sent, "messages_sent should increase after put" ); } #[test] fn metrics_bytes_tracked() { let mut node = Node::bind(0).unwrap(); let m = node.metrics(); assert_eq!(m.bytes_sent, 0); assert_eq!(m.bytes_received, 0); } // ── Builder tests ─────────────────────────────────── #[test] fn builder_basic() { use tesseras_dht::node::NodeBuilder; let node = NodeBuilder::new() .port(0) .nat(NatState::Global) .build() .unwrap(); assert_eq!(node.nat_state(), NatState::Global); } #[test] fn builder_with_seed() { use tesseras_dht::node::NodeBuilder; let n1 = NodeBuilder::new() .port(0) .seed(b"same-seed") .build() .unwrap(); let n2 = NodeBuilder::new() .port(0) .seed(b"same-seed") .build() .unwrap(); assert_eq!(n1.id(), n2.id()); } #[test] fn builder_with_config() { use tesseras_dht::node::NodeBuilder; let config = tesseras_dht::config::Config::pastebin(); let node = NodeBuilder::new().port(0).config(config).build().unwrap(); assert!(node.config().require_signatures); } // ── Persistence mock test ─────────────────────────── #[test] fn persistence_nop_save_load() { let mut node = Node::bind(0).unwrap(); node.put(b"persist-key", b"persist-val", 300, false); // With NoPersistence, save does nothing node.save_state(); // load_persisted with NoPersistence loads nothing node.load_persisted(); // Value still there from local storage assert!(!node.get(b"persist-key").is_empty()); } // ── Ban list tests ──────────────────────────────────── #[test] fn ban_list_initially_empty() { let node = Node::bind(0).unwrap(); assert_eq!(node.ban_count(), 0); } #[test] fn ban_list_unit() { use tesseras_dht::banlist::BanList; let mut bl = BanList::new(); let addr: std::net::SocketAddr = "127.0.0.1:9999".parse().unwrap(); assert!(!bl.is_banned(&addr)); bl.record_failure(addr); bl.record_failure(addr); assert!(!bl.is_banned(&addr)); // 2 < threshold 3 bl.record_failure(addr); assert!(bl.is_banned(&addr)); // 3 >= threshold } #[test] fn ban_list_success_resets() { use tesseras_dht::banlist::BanList; let mut bl = BanList::new(); let addr: std::net::SocketAddr = "127.0.0.1:9999".parse().unwrap(); bl.record_failure(addr); bl.record_failure(addr); bl.record_success(&addr); bl.record_failure(addr); // starts over from 1 assert!(!bl.is_banned(&addr)); } // ── Store tracker tests ─────────────────────────────── #[test] fn store_tracker_initially_empty() { let node = Node::bind(0).unwrap(); assert_eq!(node.pending_stores(), 0); assert_eq!(node.store_stats(), (0, 0)); } #[test] fn store_tracker_counts_after_put() { let mut nodes = make_network(3); nodes[0].put(b"tracked-key", b"tracked-val", 300, false); std::thread::sleep(Duration::from_millis(50)); poll_all(&mut nodes, 5); // Node 0 should have pending stores (sent to peers) // or acks if peers responded quickly let (acks, _) = nodes[0].store_stats(); let pending = nodes[0].pending_stores(); assert!( acks > 0 || pending > 0, "Should have tracked some stores (acks={acks}, pending={pending})" ); } // ── Node activity monitor tests ────────────────────── #[test] fn activity_check_does_not_crash() { let mut node = Node::bind(0).unwrap(); node.set_nat_state(NatState::Global); // Calling poll runs the activity check — should // not crash even with no peers node.poll().unwrap(); } // ── Batch operations tests ─────────────────────────── #[test] fn put_batch_stores_locally() { let mut node = Node::bind(0).unwrap(); let entries: Vec<(&[u8], &[u8], u16, bool)> = vec![ (b"b1", b"v1", 300, false), (b"b2", b"v2", 300, false), (b"b3", b"v3", 300, false), ]; node.put_batch(&entries); assert_eq!(node.storage_count(), 3); assert_eq!(node.get(b"b1"), vec![b"v1".to_vec()]); assert_eq!(node.get(b"b2"), vec![b"v2".to_vec()]); assert_eq!(node.get(b"b3"), vec![b"v3".to_vec()]); } #[test] fn get_batch_returns_local() { let mut node = Node::bind(0).unwrap(); node.put(b"gb1", b"v1", 300, false); node.put(b"gb2", b"v2", 300, false); let results = node.get_batch(&[b"gb1", b"gb2", b"gb-missing"]); assert_eq!(results.len(), 3); assert_eq!(results[0].1, vec![b"v1".to_vec()]); assert_eq!(results[1].1, vec![b"v2".to_vec()]); assert!(results[2].1.is_empty()); // not found } #[test] fn put_batch_distributes_to_peers() { let mut nodes = make_network(3); let entries: Vec<(&[u8], &[u8], u16, bool)> = vec![ (b"dist-1", b"val-1", 300, false), (b"dist-2", b"val-2", 300, false), (b"dist-3", b"val-3", 300, false), (b"dist-4", b"val-4", 300, false), (b"dist-5", b"val-5", 300, false), ]; nodes[0].put_batch(&entries); std::thread::sleep(Duration::from_millis(100)); poll_all(&mut nodes, 10); // All 5 values should be stored locally on node 0 for i in 1..=5 { let key = format!("dist-{i}"); let vals = nodes[0].get(key.as_bytes()); assert!(!vals.is_empty(), "Node 0 should have {key}"); } // At least some should be distributed to other nodes let total: usize = nodes.iter().map(|n| n.storage_count()).sum(); assert!(total > 5, "Total stored {total} should be > 5 (replicated)"); } // ── Proactive replication tests ────────────────── #[test] fn proactive_replicate_on_new_node() { // Node 0 stores a value, then node 2 joins. // After routing table sync, node 2 should receive // the value proactively (§2.5). let mut nodes = make_network(2); nodes[0].put(b"proactive-key", b"proactive-val", 300, false); std::thread::sleep(Duration::from_millis(100)); poll_all(&mut nodes, 10); // Add a third node let bp = nodes[0].local_addr().unwrap().port(); let mut node2 = Node::bind(0).unwrap(); node2.set_nat_state(NatState::Global); node2.join("127.0.0.1", bp).unwrap(); nodes.push(node2); // Poll to let proactive replication trigger std::thread::sleep(Duration::from_millis(100)); poll_all(&mut nodes, 20); // At least the original 2 nodes should have the value; // the new node may also have it via proactive replication let total: usize = nodes.iter().map(|n| n.storage_count()).sum(); assert!( total >= 2, "Total stored {total} should be >= 2 after proactive replication" ); } // ── Republish on access tests ──────────────────── #[test] fn republish_on_find_value() { // Store on node 0, retrieve from node 2 via FIND_VALUE. // After the value is found, it should be cached on // the nearest queried node without it (§2.3). let mut nodes = make_network(3); nodes[0].put(b"republish-key", b"republish-val", 300, false); std::thread::sleep(Duration::from_millis(100)); poll_all(&mut nodes, 10); // Node 2 triggers FIND_VALUE let _ = nodes[2].get(b"republish-key"); // Poll to let the lookup and republish propagate for _ in 0..30 { poll_all(&mut nodes, 5); std::thread::sleep(Duration::from_millis(20)); let vals = nodes[2].get(b"republish-key"); if !vals.is_empty() { break; } } // Count total stored across all nodes — should be // more than 1 due to republish-on-access caching let total: usize = nodes.iter().map(|n| n.storage_count()).sum(); assert!( total >= 2, "Total stored {total} should be >= 2 after republish-on-access" ); }