aboutsummaryrefslogtreecommitdiffstats
path: root/tests/integration.rs
diff options
context:
space:
mode:
authormurilo ijanc2026-03-24 15:04:03 -0300
committermurilo ijanc2026-03-24 15:04:03 -0300
commit9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch)
tree53da095ff90cc755bac3d4bf699172b5e8cd07d6 /tests/integration.rs
downloadtesseras-dht-9821aabf0b50d2487b07502d3d2cd89e7d62bdbe.tar.gz
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support
Diffstat (limited to 'tests/integration.rs')
-rw-r--r--tests/integration.rs704
1 files changed, 704 insertions, 0 deletions
diff --git a/tests/integration.rs b/tests/integration.rs
new file mode 100644
index 0000000..8bfe66d
--- /dev/null
+++ b/tests/integration.rs
@@ -0,0 +1,704 @@
+//! Integration tests: multi-node scenarios over real
+//! UDP sockets on loopback.
+
+use std::time::Duration;
+
+use tesseras_dht::Node;
+use tesseras_dht::nat::NatState;
+
+/// Poll all nodes once each (non-blocking best-effort).
+fn poll_all(nodes: &mut [Node], rounds: usize) {
+ let fast = Duration::from_millis(1);
+ for _ in 0..rounds {
+ for n in nodes.iter_mut() {
+ n.poll_timeout(fast).ok();
+ }
+ }
+}
+
+/// Create N nodes, join them to the first.
+fn make_network(n: usize) -> Vec<Node> {
+ let mut nodes = Vec::with_capacity(n);
+ let bootstrap = Node::bind(0).unwrap();
+ let bp = bootstrap.local_addr().unwrap().port();
+ nodes.push(bootstrap);
+ nodes[0].set_nat_state(NatState::Global);
+
+ for _ in 1..n {
+ let mut node = Node::bind(0).unwrap();
+ node.set_nat_state(NatState::Global);
+ node.join("127.0.0.1", bp).unwrap();
+ nodes.push(node);
+ }
+
+ // Small sleep to let packets arrive, then poll
+ std::thread::sleep(Duration::from_millis(50));
+ poll_all(&mut nodes, 5);
+ nodes
+}
+
+// ── Bootstrap tests ─────────────────────────────────
+
+#[test]
+fn two_nodes_discover_each_other() {
+ let nodes = make_network(2);
+
+ assert!(
+ nodes[0].routing_table_size() >= 1,
+ "Node 0 should have at least 1 peer in routing table"
+ );
+ assert!(
+ nodes[1].routing_table_size() >= 1,
+ "Node 1 should have at least 1 peer in routing table"
+ );
+}
+
+#[test]
+fn three_nodes_form_network() {
+ let nodes = make_network(3);
+
+ // All nodes should know at least 1 other node
+ for (i, node) in nodes.iter().enumerate() {
+ assert!(
+ node.routing_table_size() >= 1,
+ "Node {i} routing table empty"
+ );
+ }
+}
+
+#[test]
+fn five_nodes_routing_tables() {
+ let nodes = make_network(5);
+
+ // With 5 nodes, most should know 2+ peers
+ let total_peers: usize = nodes.iter().map(|n| n.routing_table_size()).sum();
+ assert!(
+ total_peers >= 5,
+ "Total routing entries ({total_peers}) too low"
+ );
+}
+
+// ── Put/Get tests ───────────────────────────────────
+
+#[test]
+fn put_get_local() {
+ let mut node = Node::bind(0).unwrap();
+ node.put(b"key1", b"value1", 300, false);
+ let vals = node.get(b"key1");
+ assert_eq!(vals.len(), 1);
+ assert_eq!(vals[0], b"value1");
+}
+
+#[test]
+fn put_get_across_two_nodes() {
+ let mut nodes = make_network(2);
+
+ // Node 0 stores
+ nodes[0].put(b"hello", b"world", 300, false);
+
+ // Poll to deliver STORE
+ std::thread::sleep(Duration::from_millis(50));
+ poll_all(&mut nodes, 5);
+
+ // Node 1 should have the value (received via STORE)
+ let vals = nodes[1].get(b"hello");
+ assert_eq!(vals.len(), 1, "Node 1 should have received the value");
+ assert_eq!(vals[0], b"world");
+}
+
+#[test]
+fn put_multiple_values() {
+ let mut nodes = make_network(3);
+
+ // Store 10 key-value pairs from node 0
+ for i in 0..10u32 {
+ let key = format!("k{i}");
+ let val = format!("v{i}");
+ nodes[0].put(key.as_bytes(), val.as_bytes(), 300, false);
+ }
+
+ std::thread::sleep(Duration::from_millis(50));
+ poll_all(&mut nodes, 5);
+
+ // Node 0 should have all 10
+ let mut found = 0;
+ for i in 0..10u32 {
+ let key = format!("k{i}");
+ if !nodes[0].get(key.as_bytes()).is_empty() {
+ found += 1;
+ }
+ }
+ assert_eq!(found, 10, "Node 0 should have all 10 values");
+}
+
+#[test]
+fn put_unique_replaces() {
+ let mut node = Node::bind(0).unwrap();
+ node.put(b"uk", b"first", 300, true);
+ node.put(b"uk", b"second", 300, true);
+
+ let vals = node.get(b"uk");
+ assert_eq!(vals.len(), 1);
+ assert_eq!(vals[0], b"second");
+}
+
+#[test]
+fn put_get_distributed() {
+ let mut nodes = make_network(5);
+
+ // Each node stores one value
+ for i in 0..5u32 {
+ let key = format!("node{i}-key");
+ let val = format!("node{i}-val");
+ nodes[i as usize].put(key.as_bytes(), val.as_bytes(), 300, false);
+ }
+
+ std::thread::sleep(Duration::from_millis(50));
+ poll_all(&mut nodes, 5);
+
+ // Each node should have its own value at minimum
+ for i in 0..5u32 {
+ let key = format!("node{i}-key");
+ let vals = nodes[i as usize].get(key.as_bytes());
+ assert!(!vals.is_empty(), "Node {i} should have its own value");
+ }
+}
+
+// ── Identity tests ──────────────────────────────────
+
+#[test]
+fn set_id_deterministic() {
+ let mut n1 = Node::bind(0).unwrap();
+ let mut n2 = Node::bind(0).unwrap();
+ n1.set_id(b"same-seed");
+ n2.set_id(b"same-seed");
+ assert_eq!(n1.id(), n2.id());
+}
+
+#[test]
+fn node_id_is_unique() {
+ let n1 = Node::bind(0).unwrap();
+ let n2 = Node::bind(0).unwrap();
+ assert_ne!(n1.id(), n2.id());
+}
+
+// ── NAT state tests ────────────────────────────────
+
+#[test]
+fn nat_state_transitions() {
+ let mut node = Node::bind(0).unwrap();
+ assert_eq!(node.nat_state(), NatState::Unknown);
+
+ node.set_nat_state(NatState::Global);
+ assert_eq!(node.nat_state(), NatState::Global);
+
+ node.set_nat_state(NatState::ConeNat);
+ assert_eq!(node.nat_state(), NatState::ConeNat);
+
+ node.set_nat_state(NatState::SymmetricNat);
+ assert_eq!(node.nat_state(), NatState::SymmetricNat);
+}
+
+// ── RDP tests ───────────────────────────────────────
+
+#[test]
+fn rdp_listen_connect_close() {
+ let mut node = Node::bind(0).unwrap();
+ let desc = node.rdp_listen(5000).unwrap();
+ node.rdp_close(desc);
+ // Should be able to re-listen
+ let desc2 = node.rdp_listen(5000).unwrap();
+ node.rdp_close(desc2);
+}
+
+#[test]
+fn rdp_connect_state() {
+ use tesseras_dht::rdp::RdpState;
+ let mut node = Node::bind(0).unwrap();
+ let dst = tesseras_dht::NodeId::from_bytes([0x01; 32]);
+ let desc = node.rdp_connect(0, &dst, 5000).unwrap();
+ assert_eq!(node.rdp_state(desc).unwrap(), RdpState::SynSent);
+ node.rdp_close(desc);
+}
+
+// ── Resilience tests ────────────────────────────────
+
+#[test]
+fn poll_with_no_peers() {
+ let mut node = Node::bind(0).unwrap();
+ // Should not panic or block
+ node.poll().unwrap();
+}
+
+#[test]
+fn join_invalid_address() {
+ let mut node = Node::bind(0).unwrap();
+ let result = node.join("this-does-not-exist.invalid", 9999);
+ assert!(result.is_err());
+}
+
+#[test]
+fn empty_get() {
+ let mut node = Node::bind(0).unwrap();
+ assert!(node.get(b"nonexistent").is_empty());
+}
+
+#[test]
+fn put_zero_ttl_removes() {
+ let mut node = Node::bind(0).unwrap();
+ node.put(b"temp", b"data", 300, false);
+ assert!(!node.get(b"temp").is_empty());
+
+ // Store with TTL 0 is a delete in the protocol
+ // (handled at the wire level in handle_dht_store,
+ // but locally we'd need to call storage.remove).
+ // This test validates the local storage.
+}
+
+// ── Scale test ──────────────────────────────────────
+
+#[test]
+fn ten_nodes_put_get() {
+ let mut nodes = make_network(10);
+
+ // Node 0 stores
+ nodes[0].put(b"scale-key", b"scale-val", 300, false);
+ std::thread::sleep(Duration::from_millis(50));
+ poll_all(&mut nodes, 5);
+
+ // Count how many nodes received the value
+ let mut count = 0;
+ for node in &mut nodes {
+ if !node.get(b"scale-key").is_empty() {
+ count += 1;
+ }
+ }
+ assert!(
+ count >= 2,
+ "At least 2 nodes should have the value, got {count}"
+ );
+}
+
+// ── Remote get via FIND_VALUE ───────────────────────
+
+#[test]
+fn remote_get_via_find_value() {
+ let mut nodes = make_network(3);
+
+ // Node 0 stores locally
+ nodes[0].put(b"remote-key", b"remote-val", 300, false);
+ std::thread::sleep(Duration::from_millis(50));
+ poll_all(&mut nodes, 5);
+
+ // Node 2 does remote get
+ let before = nodes[2].get(b"remote-key");
+ // Might already have it from STORE, or empty
+ if before.is_empty() {
+ // Poll to let FIND_VALUE propagate
+ std::thread::sleep(Duration::from_millis(100));
+ poll_all(&mut nodes, 10);
+
+ let after = nodes[2].get(b"remote-key");
+ assert!(
+ !after.is_empty(),
+ "Node 2 should find the value via FIND_VALUE"
+ );
+ assert_eq!(after[0], b"remote-val");
+ }
+}
+
+// ── NAT detection tests ────────────────────────────
+
+#[test]
+fn nat_state_default_unknown() {
+ let node = Node::bind(0).unwrap();
+ assert_eq!(node.nat_state(), NatState::Unknown);
+}
+
+#[test]
+fn nat_state_set_persists() {
+ let mut node = Node::bind(0).unwrap();
+ node.set_nat_state(NatState::SymmetricNat);
+ assert_eq!(node.nat_state(), NatState::SymmetricNat);
+ node.poll().unwrap();
+ assert_eq!(node.nat_state(), NatState::SymmetricNat);
+}
+
+// ── DTUN tests ──────────────────────────────────────
+
+#[test]
+fn dtun_find_node_exchange() {
+ // Two nodes: node2 sends DtunFindNode to node1
+ // by joining. The DTUN table should be populated.
+ let nodes = make_network(2);
+
+ // Both nodes should have peers after join
+ assert!(nodes[0].routing_table_size() >= 1);
+ assert!(nodes[1].routing_table_size() >= 1);
+}
+
+// ── Proxy tests ─────────────────────────────────────
+
+#[test]
+fn proxy_dgram_forwarded() {
+ use std::sync::{Arc, Mutex};
+
+ let mut nodes = make_network(2);
+
+ let received: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
+
+ let recv_clone = received.clone();
+ nodes[1].set_dgram_callback(move |data, _from| {
+ recv_clone.lock().unwrap().push(data.to_vec());
+ });
+
+ // Node 0 sends dgram to Node 1
+ let id1 = *nodes[1].id();
+ nodes[0].send_dgram(b"proxy-test", &id1);
+
+ std::thread::sleep(Duration::from_millis(50));
+ poll_all(&mut nodes, 5);
+
+ let msgs = received.lock().unwrap();
+ assert!(!msgs.is_empty(), "Node 1 should receive the dgram");
+ assert_eq!(msgs[0], b"proxy-test");
+}
+
+// ── Advertise tests ─────────────────────────────────
+
+#[test]
+fn nodes_peer_count_after_join() {
+ let nodes = make_network(3);
+
+ // All nodes should have at least 1 peer
+ for (i, node) in nodes.iter().enumerate() {
+ assert!(
+ node.peer_count() >= 1,
+ "Node {i} should have at least 1 peer"
+ );
+ }
+}
+
+// ── Storage tests ───────────────────────────────────
+
+#[test]
+fn storage_count_after_put() {
+ let mut node = Node::bind(0).unwrap();
+ assert_eq!(node.storage_count(), 0);
+
+ node.put(b"k1", b"v1", 300, false);
+ node.put(b"k2", b"v2", 300, false);
+ assert_eq!(node.storage_count(), 2);
+}
+
+#[test]
+fn put_from_multiple_nodes() {
+ let mut nodes = make_network(3);
+
+ nodes[0].put(b"from-0", b"val-0", 300, false);
+ nodes[1].put(b"from-1", b"val-1", 300, false);
+ nodes[2].put(b"from-2", b"val-2", 300, false);
+
+ std::thread::sleep(Duration::from_millis(50));
+ poll_all(&mut nodes, 5);
+
+ // Each node should have its own value
+ assert!(!nodes[0].get(b"from-0").is_empty());
+ assert!(!nodes[1].get(b"from-1").is_empty());
+ assert!(!nodes[2].get(b"from-2").is_empty());
+}
+
+// ── Config tests ────────────────────────────────────
+
+#[test]
+fn config_default_works() {
+ let config = tesseras_dht::config::Config::default();
+ assert_eq!(config.num_find_node, 10);
+ assert_eq!(config.bucket_size, 20);
+ assert_eq!(config.default_ttl, 300);
+}
+
+#[test]
+fn config_pastebin_preset() {
+ let config = tesseras_dht::config::Config::pastebin();
+ assert_eq!(config.default_ttl, 65535);
+ assert!(config.require_signatures);
+}
+
+// ── Metrics tests ───────────────────────────────────
+
+#[test]
+fn metrics_after_put() {
+ let mut nodes = make_network(2);
+ let before = nodes[0].metrics();
+ nodes[0].put(b"m-key", b"m-val", 300, false);
+ std::thread::sleep(Duration::from_millis(50));
+ poll_all(&mut nodes, 5);
+ let after = nodes[0].metrics();
+ assert!(
+ after.messages_sent > before.messages_sent,
+ "messages_sent should increase after put"
+ );
+}
+
+#[test]
+fn metrics_bytes_tracked() {
+ let mut node = Node::bind(0).unwrap();
+ let m = node.metrics();
+ assert_eq!(m.bytes_sent, 0);
+ assert_eq!(m.bytes_received, 0);
+}
+
+// ── Builder tests ───────────────────────────────────
+
+#[test]
+fn builder_basic() {
+ use tesseras_dht::node::NodeBuilder;
+ let node = NodeBuilder::new()
+ .port(0)
+ .nat(NatState::Global)
+ .build()
+ .unwrap();
+ assert_eq!(node.nat_state(), NatState::Global);
+}
+
+#[test]
+fn builder_with_seed() {
+ use tesseras_dht::node::NodeBuilder;
+ let n1 = NodeBuilder::new()
+ .port(0)
+ .seed(b"same-seed")
+ .build()
+ .unwrap();
+ let n2 = NodeBuilder::new()
+ .port(0)
+ .seed(b"same-seed")
+ .build()
+ .unwrap();
+ assert_eq!(n1.id(), n2.id());
+}
+
+#[test]
+fn builder_with_config() {
+ use tesseras_dht::node::NodeBuilder;
+ let config = tesseras_dht::config::Config::pastebin();
+ let node = NodeBuilder::new().port(0).config(config).build().unwrap();
+ assert!(node.config().require_signatures);
+}
+
+// ── Persistence mock test ───────────────────────────
+
+#[test]
+fn persistence_nop_save_load() {
+ let mut node = Node::bind(0).unwrap();
+ node.put(b"persist-key", b"persist-val", 300, false);
+ // With NoPersistence, save does nothing
+ node.save_state();
+ // load_persisted with NoPersistence loads nothing
+ node.load_persisted();
+ // Value still there from local storage
+ assert!(!node.get(b"persist-key").is_empty());
+}
+
+// ── Ban list tests ────────────────────────────────────
+
+#[test]
+fn ban_list_initially_empty() {
+ let node = Node::bind(0).unwrap();
+ assert_eq!(node.ban_count(), 0);
+}
+
+#[test]
+fn ban_list_unit() {
+ use tesseras_dht::banlist::BanList;
+ let mut bl = BanList::new();
+ let addr: std::net::SocketAddr = "127.0.0.1:9999".parse().unwrap();
+
+ assert!(!bl.is_banned(&addr));
+ bl.record_failure(addr);
+ bl.record_failure(addr);
+ assert!(!bl.is_banned(&addr)); // 2 < threshold 3
+ bl.record_failure(addr);
+ assert!(bl.is_banned(&addr)); // 3 >= threshold
+}
+
+#[test]
+fn ban_list_success_resets() {
+ use tesseras_dht::banlist::BanList;
+ let mut bl = BanList::new();
+ let addr: std::net::SocketAddr = "127.0.0.1:9999".parse().unwrap();
+
+ bl.record_failure(addr);
+ bl.record_failure(addr);
+ bl.record_success(&addr);
+ bl.record_failure(addr); // starts over from 1
+ assert!(!bl.is_banned(&addr));
+}
+
+// ── Store tracker tests ───────────────────────────────
+
+#[test]
+fn store_tracker_initially_empty() {
+ let node = Node::bind(0).unwrap();
+ assert_eq!(node.pending_stores(), 0);
+ assert_eq!(node.store_stats(), (0, 0));
+}
+
+#[test]
+fn store_tracker_counts_after_put() {
+ let mut nodes = make_network(3);
+
+ nodes[0].put(b"tracked-key", b"tracked-val", 300, false);
+ std::thread::sleep(Duration::from_millis(50));
+ poll_all(&mut nodes, 5);
+
+ // Node 0 should have pending stores (sent to peers)
+ // or acks if peers responded quickly
+ let (acks, _) = nodes[0].store_stats();
+ let pending = nodes[0].pending_stores();
+ assert!(
+ acks > 0 || pending > 0,
+ "Should have tracked some stores (acks={acks}, pending={pending})"
+ );
+}
+
+// ── Node activity monitor tests ──────────────────────
+
+#[test]
+fn activity_check_does_not_crash() {
+ let mut node = Node::bind(0).unwrap();
+ node.set_nat_state(NatState::Global);
+ // Calling poll runs the activity check — should
+ // not crash even with no peers
+ node.poll().unwrap();
+}
+
+// ── Batch operations tests ───────────────────────────
+
+#[test]
+fn put_batch_stores_locally() {
+ let mut node = Node::bind(0).unwrap();
+
+ let entries: Vec<(&[u8], &[u8], u16, bool)> = vec![
+ (b"b1", b"v1", 300, false),
+ (b"b2", b"v2", 300, false),
+ (b"b3", b"v3", 300, false),
+ ];
+ node.put_batch(&entries);
+
+ assert_eq!(node.storage_count(), 3);
+ assert_eq!(node.get(b"b1"), vec![b"v1".to_vec()]);
+ assert_eq!(node.get(b"b2"), vec![b"v2".to_vec()]);
+ assert_eq!(node.get(b"b3"), vec![b"v3".to_vec()]);
+}
+
+#[test]
+fn get_batch_returns_local() {
+ let mut node = Node::bind(0).unwrap();
+ node.put(b"gb1", b"v1", 300, false);
+ node.put(b"gb2", b"v2", 300, false);
+
+ let results = node.get_batch(&[b"gb1", b"gb2", b"gb-missing"]);
+ assert_eq!(results.len(), 3);
+ assert_eq!(results[0].1, vec![b"v1".to_vec()]);
+ assert_eq!(results[1].1, vec![b"v2".to_vec()]);
+ assert!(results[2].1.is_empty()); // not found
+}
+
+#[test]
+fn put_batch_distributes_to_peers() {
+ let mut nodes = make_network(3);
+
+ let entries: Vec<(&[u8], &[u8], u16, bool)> = vec![
+ (b"dist-1", b"val-1", 300, false),
+ (b"dist-2", b"val-2", 300, false),
+ (b"dist-3", b"val-3", 300, false),
+ (b"dist-4", b"val-4", 300, false),
+ (b"dist-5", b"val-5", 300, false),
+ ];
+ nodes[0].put_batch(&entries);
+
+ std::thread::sleep(Duration::from_millis(100));
+ poll_all(&mut nodes, 10);
+
+ // All 5 values should be stored locally on node 0
+ for i in 1..=5 {
+ let key = format!("dist-{i}");
+ let vals = nodes[0].get(key.as_bytes());
+ assert!(!vals.is_empty(), "Node 0 should have {key}");
+ }
+
+ // At least some should be distributed to other nodes
+ let total: usize = nodes.iter().map(|n| n.storage_count()).sum();
+ assert!(total > 5, "Total stored {total} should be > 5 (replicated)");
+}
+
+// ── Proactive replication tests ──────────────────
+
+#[test]
+fn proactive_replicate_on_new_node() {
+ // Node 0 stores a value, then node 2 joins.
+ // After routing table sync, node 2 should receive
+ // the value proactively (§2.5).
+ let mut nodes = make_network(2);
+ nodes[0].put(b"proactive-key", b"proactive-val", 300, false);
+
+ std::thread::sleep(Duration::from_millis(100));
+ poll_all(&mut nodes, 10);
+
+ // Add a third node
+ let bp = nodes[0].local_addr().unwrap().port();
+ let mut node2 = Node::bind(0).unwrap();
+ node2.set_nat_state(NatState::Global);
+ node2.join("127.0.0.1", bp).unwrap();
+ nodes.push(node2);
+
+ // Poll to let proactive replication trigger
+ std::thread::sleep(Duration::from_millis(100));
+ poll_all(&mut nodes, 20);
+
+ // At least the original 2 nodes should have the value;
+ // the new node may also have it via proactive replication
+ let total: usize = nodes.iter().map(|n| n.storage_count()).sum();
+ assert!(
+ total >= 2,
+ "Total stored {total} should be >= 2 after proactive replication"
+ );
+}
+
+// ── Republish on access tests ────────────────────
+
+#[test]
+fn republish_on_find_value() {
+ // Store on node 0, retrieve from node 2 via FIND_VALUE.
+ // After the value is found, it should be cached on
+ // the nearest queried node without it (§2.3).
+ let mut nodes = make_network(3);
+
+ nodes[0].put(b"republish-key", b"republish-val", 300, false);
+
+ std::thread::sleep(Duration::from_millis(100));
+ poll_all(&mut nodes, 10);
+
+ // Node 2 triggers FIND_VALUE
+ let _ = nodes[2].get(b"republish-key");
+
+ // Poll to let the lookup and republish propagate
+ for _ in 0..30 {
+ poll_all(&mut nodes, 5);
+ std::thread::sleep(Duration::from_millis(20));
+
+ let vals = nodes[2].get(b"republish-key");
+ if !vals.is_empty() {
+ break;
+ }
+ }
+
+ // Count total stored across all nodes — should be
+ // more than 1 due to republish-on-access caching
+ let total: usize = nodes.iter().map(|n| n.storage_count()).sum();
+ assert!(
+ total >= 2,
+ "Total stored {total} should be >= 2 after republish-on-access"
+ );
+}