diff options
| author | murilo ijanc | 2026-03-24 15:04:03 -0300 |
|---|---|---|
| committer | murilo ijanc | 2026-03-24 15:04:03 -0300 |
| commit | 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch) | |
| tree | 53da095ff90cc755bac3d4bf699172b5e8cd07d6 /tests/integration.rs | |
| download | tesseras-dht-e908bc01403f4b8ef2a65fa6be43716fd1c6e003.tar.gz | |
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks.
Features:
- Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE)
- NAT traversal via DTUN hole-punching and proxy relay
- Reliable Datagram Protocol (RDP) with 7-state connection machine
- Datagram transport with automatic fragmentation/reassembly
- Ed25519 packet authentication
- 256-bit node IDs (Ed25519 public keys)
- Rate limiting, ban list, and eclipse attack mitigation
- Persistence and metrics
- OpenBSD and Linux support
Diffstat (limited to 'tests/integration.rs')
| -rw-r--r-- | tests/integration.rs | 704 |
1 files changed, 704 insertions, 0 deletions
diff --git a/tests/integration.rs b/tests/integration.rs new file mode 100644 index 0000000..8bfe66d --- /dev/null +++ b/tests/integration.rs @@ -0,0 +1,704 @@ +//! Integration tests: multi-node scenarios over real +//! UDP sockets on loopback. + +use std::time::Duration; + +use tesseras_dht::Node; +use tesseras_dht::nat::NatState; + +/// Poll all nodes once each (non-blocking best-effort). +fn poll_all(nodes: &mut [Node], rounds: usize) { + let fast = Duration::from_millis(1); + for _ in 0..rounds { + for n in nodes.iter_mut() { + n.poll_timeout(fast).ok(); + } + } +} + +/// Create N nodes, join them to the first. +fn make_network(n: usize) -> Vec<Node> { + let mut nodes = Vec::with_capacity(n); + let bootstrap = Node::bind(0).unwrap(); + let bp = bootstrap.local_addr().unwrap().port(); + nodes.push(bootstrap); + nodes[0].set_nat_state(NatState::Global); + + for _ in 1..n { + let mut node = Node::bind(0).unwrap(); + node.set_nat_state(NatState::Global); + node.join("127.0.0.1", bp).unwrap(); + nodes.push(node); + } + + // Small sleep to let packets arrive, then poll + std::thread::sleep(Duration::from_millis(50)); + poll_all(&mut nodes, 5); + nodes +} + +// ── Bootstrap tests ───────────────────────────────── + +#[test] +fn two_nodes_discover_each_other() { + let nodes = make_network(2); + + assert!( + nodes[0].routing_table_size() >= 1, + "Node 0 should have at least 1 peer in routing table" + ); + assert!( + nodes[1].routing_table_size() >= 1, + "Node 1 should have at least 1 peer in routing table" + ); +} + +#[test] +fn three_nodes_form_network() { + let nodes = make_network(3); + + // All nodes should know at least 1 other node + for (i, node) in nodes.iter().enumerate() { + assert!( + node.routing_table_size() >= 1, + "Node {i} routing table empty" + ); + } +} + +#[test] +fn five_nodes_routing_tables() { + let nodes = make_network(5); + + // With 5 nodes, most should know 2+ peers + let total_peers: usize = nodes.iter().map(|n| n.routing_table_size()).sum(); + assert!( + total_peers >= 5, + "Total routing entries ({total_peers}) too low" + ); +} + +// ── Put/Get tests ─────────────────────────────────── + +#[test] +fn put_get_local() { + let mut node = Node::bind(0).unwrap(); + node.put(b"key1", b"value1", 300, false); + let vals = node.get(b"key1"); + assert_eq!(vals.len(), 1); + assert_eq!(vals[0], b"value1"); +} + +#[test] +fn put_get_across_two_nodes() { + let mut nodes = make_network(2); + + // Node 0 stores + nodes[0].put(b"hello", b"world", 300, false); + + // Poll to deliver STORE + std::thread::sleep(Duration::from_millis(50)); + poll_all(&mut nodes, 5); + + // Node 1 should have the value (received via STORE) + let vals = nodes[1].get(b"hello"); + assert_eq!(vals.len(), 1, "Node 1 should have received the value"); + assert_eq!(vals[0], b"world"); +} + +#[test] +fn put_multiple_values() { + let mut nodes = make_network(3); + + // Store 10 key-value pairs from node 0 + for i in 0..10u32 { + let key = format!("k{i}"); + let val = format!("v{i}"); + nodes[0].put(key.as_bytes(), val.as_bytes(), 300, false); + } + + std::thread::sleep(Duration::from_millis(50)); + poll_all(&mut nodes, 5); + + // Node 0 should have all 10 + let mut found = 0; + for i in 0..10u32 { + let key = format!("k{i}"); + if !nodes[0].get(key.as_bytes()).is_empty() { + found += 1; + } + } + assert_eq!(found, 10, "Node 0 should have all 10 values"); +} + +#[test] +fn put_unique_replaces() { + let mut node = Node::bind(0).unwrap(); + node.put(b"uk", b"first", 300, true); + node.put(b"uk", b"second", 300, true); + + let vals = node.get(b"uk"); + assert_eq!(vals.len(), 1); + assert_eq!(vals[0], b"second"); +} + +#[test] +fn put_get_distributed() { + let mut nodes = make_network(5); + + // Each node stores one value + for i in 0..5u32 { + let key = format!("node{i}-key"); + let val = format!("node{i}-val"); + nodes[i as usize].put(key.as_bytes(), val.as_bytes(), 300, false); + } + + std::thread::sleep(Duration::from_millis(50)); + poll_all(&mut nodes, 5); + + // Each node should have its own value at minimum + for i in 0..5u32 { + let key = format!("node{i}-key"); + let vals = nodes[i as usize].get(key.as_bytes()); + assert!(!vals.is_empty(), "Node {i} should have its own value"); + } +} + +// ── Identity tests ────────────────────────────────── + +#[test] +fn set_id_deterministic() { + let mut n1 = Node::bind(0).unwrap(); + let mut n2 = Node::bind(0).unwrap(); + n1.set_id(b"same-seed"); + n2.set_id(b"same-seed"); + assert_eq!(n1.id(), n2.id()); +} + +#[test] +fn node_id_is_unique() { + let n1 = Node::bind(0).unwrap(); + let n2 = Node::bind(0).unwrap(); + assert_ne!(n1.id(), n2.id()); +} + +// ── NAT state tests ──────────────────────────────── + +#[test] +fn nat_state_transitions() { + let mut node = Node::bind(0).unwrap(); + assert_eq!(node.nat_state(), NatState::Unknown); + + node.set_nat_state(NatState::Global); + assert_eq!(node.nat_state(), NatState::Global); + + node.set_nat_state(NatState::ConeNat); + assert_eq!(node.nat_state(), NatState::ConeNat); + + node.set_nat_state(NatState::SymmetricNat); + assert_eq!(node.nat_state(), NatState::SymmetricNat); +} + +// ── RDP tests ─────────────────────────────────────── + +#[test] +fn rdp_listen_connect_close() { + let mut node = Node::bind(0).unwrap(); + let desc = node.rdp_listen(5000).unwrap(); + node.rdp_close(desc); + // Should be able to re-listen + let desc2 = node.rdp_listen(5000).unwrap(); + node.rdp_close(desc2); +} + +#[test] +fn rdp_connect_state() { + use tesseras_dht::rdp::RdpState; + let mut node = Node::bind(0).unwrap(); + let dst = tesseras_dht::NodeId::from_bytes([0x01; 32]); + let desc = node.rdp_connect(0, &dst, 5000).unwrap(); + assert_eq!(node.rdp_state(desc).unwrap(), RdpState::SynSent); + node.rdp_close(desc); +} + +// ── Resilience tests ──────────────────────────────── + +#[test] +fn poll_with_no_peers() { + let mut node = Node::bind(0).unwrap(); + // Should not panic or block + node.poll().unwrap(); +} + +#[test] +fn join_invalid_address() { + let mut node = Node::bind(0).unwrap(); + let result = node.join("this-does-not-exist.invalid", 9999); + assert!(result.is_err()); +} + +#[test] +fn empty_get() { + let mut node = Node::bind(0).unwrap(); + assert!(node.get(b"nonexistent").is_empty()); +} + +#[test] +fn put_zero_ttl_removes() { + let mut node = Node::bind(0).unwrap(); + node.put(b"temp", b"data", 300, false); + assert!(!node.get(b"temp").is_empty()); + + // Store with TTL 0 is a delete in the protocol + // (handled at the wire level in handle_dht_store, + // but locally we'd need to call storage.remove). + // This test validates the local storage. +} + +// ── Scale test ────────────────────────────────────── + +#[test] +fn ten_nodes_put_get() { + let mut nodes = make_network(10); + + // Node 0 stores + nodes[0].put(b"scale-key", b"scale-val", 300, false); + std::thread::sleep(Duration::from_millis(50)); + poll_all(&mut nodes, 5); + + // Count how many nodes received the value + let mut count = 0; + for node in &mut nodes { + if !node.get(b"scale-key").is_empty() { + count += 1; + } + } + assert!( + count >= 2, + "At least 2 nodes should have the value, got {count}" + ); +} + +// ── Remote get via FIND_VALUE ─────────────────────── + +#[test] +fn remote_get_via_find_value() { + let mut nodes = make_network(3); + + // Node 0 stores locally + nodes[0].put(b"remote-key", b"remote-val", 300, false); + std::thread::sleep(Duration::from_millis(50)); + poll_all(&mut nodes, 5); + + // Node 2 does remote get + let before = nodes[2].get(b"remote-key"); + // Might already have it from STORE, or empty + if before.is_empty() { + // Poll to let FIND_VALUE propagate + std::thread::sleep(Duration::from_millis(100)); + poll_all(&mut nodes, 10); + + let after = nodes[2].get(b"remote-key"); + assert!( + !after.is_empty(), + "Node 2 should find the value via FIND_VALUE" + ); + assert_eq!(after[0], b"remote-val"); + } +} + +// ── NAT detection tests ──────────────────────────── + +#[test] +fn nat_state_default_unknown() { + let node = Node::bind(0).unwrap(); + assert_eq!(node.nat_state(), NatState::Unknown); +} + +#[test] +fn nat_state_set_persists() { + let mut node = Node::bind(0).unwrap(); + node.set_nat_state(NatState::SymmetricNat); + assert_eq!(node.nat_state(), NatState::SymmetricNat); + node.poll().unwrap(); + assert_eq!(node.nat_state(), NatState::SymmetricNat); +} + +// ── DTUN tests ────────────────────────────────────── + +#[test] +fn dtun_find_node_exchange() { + // Two nodes: node2 sends DtunFindNode to node1 + // by joining. The DTUN table should be populated. + let nodes = make_network(2); + + // Both nodes should have peers after join + assert!(nodes[0].routing_table_size() >= 1); + assert!(nodes[1].routing_table_size() >= 1); +} + +// ── Proxy tests ───────────────────────────────────── + +#[test] +fn proxy_dgram_forwarded() { + use std::sync::{Arc, Mutex}; + + let mut nodes = make_network(2); + + let received: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new())); + + let recv_clone = received.clone(); + nodes[1].set_dgram_callback(move |data, _from| { + recv_clone.lock().unwrap().push(data.to_vec()); + }); + + // Node 0 sends dgram to Node 1 + let id1 = *nodes[1].id(); + nodes[0].send_dgram(b"proxy-test", &id1); + + std::thread::sleep(Duration::from_millis(50)); + poll_all(&mut nodes, 5); + + let msgs = received.lock().unwrap(); + assert!(!msgs.is_empty(), "Node 1 should receive the dgram"); + assert_eq!(msgs[0], b"proxy-test"); +} + +// ── Advertise tests ───────────────────────────────── + +#[test] +fn nodes_peer_count_after_join() { + let nodes = make_network(3); + + // All nodes should have at least 1 peer + for (i, node) in nodes.iter().enumerate() { + assert!( + node.peer_count() >= 1, + "Node {i} should have at least 1 peer" + ); + } +} + +// ── Storage tests ─────────────────────────────────── + +#[test] +fn storage_count_after_put() { + let mut node = Node::bind(0).unwrap(); + assert_eq!(node.storage_count(), 0); + + node.put(b"k1", b"v1", 300, false); + node.put(b"k2", b"v2", 300, false); + assert_eq!(node.storage_count(), 2); +} + +#[test] +fn put_from_multiple_nodes() { + let mut nodes = make_network(3); + + nodes[0].put(b"from-0", b"val-0", 300, false); + nodes[1].put(b"from-1", b"val-1", 300, false); + nodes[2].put(b"from-2", b"val-2", 300, false); + + std::thread::sleep(Duration::from_millis(50)); + poll_all(&mut nodes, 5); + + // Each node should have its own value + assert!(!nodes[0].get(b"from-0").is_empty()); + assert!(!nodes[1].get(b"from-1").is_empty()); + assert!(!nodes[2].get(b"from-2").is_empty()); +} + +// ── Config tests ──────────────────────────────────── + +#[test] +fn config_default_works() { + let config = tesseras_dht::config::Config::default(); + assert_eq!(config.num_find_node, 10); + assert_eq!(config.bucket_size, 20); + assert_eq!(config.default_ttl, 300); +} + +#[test] +fn config_pastebin_preset() { + let config = tesseras_dht::config::Config::pastebin(); + assert_eq!(config.default_ttl, 65535); + assert!(config.require_signatures); +} + +// ── Metrics tests ─────────────────────────────────── + +#[test] +fn metrics_after_put() { + let mut nodes = make_network(2); + let before = nodes[0].metrics(); + nodes[0].put(b"m-key", b"m-val", 300, false); + std::thread::sleep(Duration::from_millis(50)); + poll_all(&mut nodes, 5); + let after = nodes[0].metrics(); + assert!( + after.messages_sent > before.messages_sent, + "messages_sent should increase after put" + ); +} + +#[test] +fn metrics_bytes_tracked() { + let mut node = Node::bind(0).unwrap(); + let m = node.metrics(); + assert_eq!(m.bytes_sent, 0); + assert_eq!(m.bytes_received, 0); +} + +// ── Builder tests ─────────────────────────────────── + +#[test] +fn builder_basic() { + use tesseras_dht::node::NodeBuilder; + let node = NodeBuilder::new() + .port(0) + .nat(NatState::Global) + .build() + .unwrap(); + assert_eq!(node.nat_state(), NatState::Global); +} + +#[test] +fn builder_with_seed() { + use tesseras_dht::node::NodeBuilder; + let n1 = NodeBuilder::new() + .port(0) + .seed(b"same-seed") + .build() + .unwrap(); + let n2 = NodeBuilder::new() + .port(0) + .seed(b"same-seed") + .build() + .unwrap(); + assert_eq!(n1.id(), n2.id()); +} + +#[test] +fn builder_with_config() { + use tesseras_dht::node::NodeBuilder; + let config = tesseras_dht::config::Config::pastebin(); + let node = NodeBuilder::new().port(0).config(config).build().unwrap(); + assert!(node.config().require_signatures); +} + +// ── Persistence mock test ─────────────────────────── + +#[test] +fn persistence_nop_save_load() { + let mut node = Node::bind(0).unwrap(); + node.put(b"persist-key", b"persist-val", 300, false); + // With NoPersistence, save does nothing + node.save_state(); + // load_persisted with NoPersistence loads nothing + node.load_persisted(); + // Value still there from local storage + assert!(!node.get(b"persist-key").is_empty()); +} + +// ── Ban list tests ──────────────────────────────────── + +#[test] +fn ban_list_initially_empty() { + let node = Node::bind(0).unwrap(); + assert_eq!(node.ban_count(), 0); +} + +#[test] +fn ban_list_unit() { + use tesseras_dht::banlist::BanList; + let mut bl = BanList::new(); + let addr: std::net::SocketAddr = "127.0.0.1:9999".parse().unwrap(); + + assert!(!bl.is_banned(&addr)); + bl.record_failure(addr); + bl.record_failure(addr); + assert!(!bl.is_banned(&addr)); // 2 < threshold 3 + bl.record_failure(addr); + assert!(bl.is_banned(&addr)); // 3 >= threshold +} + +#[test] +fn ban_list_success_resets() { + use tesseras_dht::banlist::BanList; + let mut bl = BanList::new(); + let addr: std::net::SocketAddr = "127.0.0.1:9999".parse().unwrap(); + + bl.record_failure(addr); + bl.record_failure(addr); + bl.record_success(&addr); + bl.record_failure(addr); // starts over from 1 + assert!(!bl.is_banned(&addr)); +} + +// ── Store tracker tests ─────────────────────────────── + +#[test] +fn store_tracker_initially_empty() { + let node = Node::bind(0).unwrap(); + assert_eq!(node.pending_stores(), 0); + assert_eq!(node.store_stats(), (0, 0)); +} + +#[test] +fn store_tracker_counts_after_put() { + let mut nodes = make_network(3); + + nodes[0].put(b"tracked-key", b"tracked-val", 300, false); + std::thread::sleep(Duration::from_millis(50)); + poll_all(&mut nodes, 5); + + // Node 0 should have pending stores (sent to peers) + // or acks if peers responded quickly + let (acks, _) = nodes[0].store_stats(); + let pending = nodes[0].pending_stores(); + assert!( + acks > 0 || pending > 0, + "Should have tracked some stores (acks={acks}, pending={pending})" + ); +} + +// ── Node activity monitor tests ────────────────────── + +#[test] +fn activity_check_does_not_crash() { + let mut node = Node::bind(0).unwrap(); + node.set_nat_state(NatState::Global); + // Calling poll runs the activity check — should + // not crash even with no peers + node.poll().unwrap(); +} + +// ── Batch operations tests ─────────────────────────── + +#[test] +fn put_batch_stores_locally() { + let mut node = Node::bind(0).unwrap(); + + let entries: Vec<(&[u8], &[u8], u16, bool)> = vec![ + (b"b1", b"v1", 300, false), + (b"b2", b"v2", 300, false), + (b"b3", b"v3", 300, false), + ]; + node.put_batch(&entries); + + assert_eq!(node.storage_count(), 3); + assert_eq!(node.get(b"b1"), vec![b"v1".to_vec()]); + assert_eq!(node.get(b"b2"), vec![b"v2".to_vec()]); + assert_eq!(node.get(b"b3"), vec![b"v3".to_vec()]); +} + +#[test] +fn get_batch_returns_local() { + let mut node = Node::bind(0).unwrap(); + node.put(b"gb1", b"v1", 300, false); + node.put(b"gb2", b"v2", 300, false); + + let results = node.get_batch(&[b"gb1", b"gb2", b"gb-missing"]); + assert_eq!(results.len(), 3); + assert_eq!(results[0].1, vec![b"v1".to_vec()]); + assert_eq!(results[1].1, vec![b"v2".to_vec()]); + assert!(results[2].1.is_empty()); // not found +} + +#[test] +fn put_batch_distributes_to_peers() { + let mut nodes = make_network(3); + + let entries: Vec<(&[u8], &[u8], u16, bool)> = vec![ + (b"dist-1", b"val-1", 300, false), + (b"dist-2", b"val-2", 300, false), + (b"dist-3", b"val-3", 300, false), + (b"dist-4", b"val-4", 300, false), + (b"dist-5", b"val-5", 300, false), + ]; + nodes[0].put_batch(&entries); + + std::thread::sleep(Duration::from_millis(100)); + poll_all(&mut nodes, 10); + + // All 5 values should be stored locally on node 0 + for i in 1..=5 { + let key = format!("dist-{i}"); + let vals = nodes[0].get(key.as_bytes()); + assert!(!vals.is_empty(), "Node 0 should have {key}"); + } + + // At least some should be distributed to other nodes + let total: usize = nodes.iter().map(|n| n.storage_count()).sum(); + assert!(total > 5, "Total stored {total} should be > 5 (replicated)"); +} + +// ── Proactive replication tests ────────────────── + +#[test] +fn proactive_replicate_on_new_node() { + // Node 0 stores a value, then node 2 joins. + // After routing table sync, node 2 should receive + // the value proactively (§2.5). + let mut nodes = make_network(2); + nodes[0].put(b"proactive-key", b"proactive-val", 300, false); + + std::thread::sleep(Duration::from_millis(100)); + poll_all(&mut nodes, 10); + + // Add a third node + let bp = nodes[0].local_addr().unwrap().port(); + let mut node2 = Node::bind(0).unwrap(); + node2.set_nat_state(NatState::Global); + node2.join("127.0.0.1", bp).unwrap(); + nodes.push(node2); + + // Poll to let proactive replication trigger + std::thread::sleep(Duration::from_millis(100)); + poll_all(&mut nodes, 20); + + // At least the original 2 nodes should have the value; + // the new node may also have it via proactive replication + let total: usize = nodes.iter().map(|n| n.storage_count()).sum(); + assert!( + total >= 2, + "Total stored {total} should be >= 2 after proactive replication" + ); +} + +// ── Republish on access tests ──────────────────── + +#[test] +fn republish_on_find_value() { + // Store on node 0, retrieve from node 2 via FIND_VALUE. + // After the value is found, it should be cached on + // the nearest queried node without it (§2.3). + let mut nodes = make_network(3); + + nodes[0].put(b"republish-key", b"republish-val", 300, false); + + std::thread::sleep(Duration::from_millis(100)); + poll_all(&mut nodes, 10); + + // Node 2 triggers FIND_VALUE + let _ = nodes[2].get(b"republish-key"); + + // Poll to let the lookup and republish propagate + for _ in 0..30 { + poll_all(&mut nodes, 5); + std::thread::sleep(Duration::from_millis(20)); + + let vals = nodes[2].get(b"republish-key"); + if !vals.is_empty() { + break; + } + } + + // Count total stored across all nodes — should be + // more than 1 due to republish-on-access caching + let total: usize = nodes.iter().map(|n| n.storage_count()).sum(); + assert!( + total >= 2, + "Total stored {total} should be >= 2 after republish-on-access" + ); +} |