From 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe Mon Sep 17 00:00:00 2001 From: murilo ijanc Date: Tue, 24 Mar 2026 15:04:03 -0300 Subject: Initial commit NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support --- examples/dgram.rs | 100 +++++++++++++++ examples/join.rs | 65 ++++++++++ examples/network.rs | 86 +++++++++++++ examples/put_get.rs | 93 ++++++++++++++ examples/rdp.rs | 147 +++++++++++++++++++++ examples/remote_get.rs | 106 ++++++++++++++++ examples/tesserasd.rs | 338 +++++++++++++++++++++++++++++++++++++++++++++++++ examples/two_nodes.rs | 133 +++++++++++++++++++ 8 files changed, 1068 insertions(+) create mode 100644 examples/dgram.rs create mode 100644 examples/join.rs create mode 100644 examples/network.rs create mode 100644 examples/put_get.rs create mode 100644 examples/rdp.rs create mode 100644 examples/remote_get.rs create mode 100644 examples/tesserasd.rs create mode 100644 examples/two_nodes.rs (limited to 'examples') diff --git a/examples/dgram.rs b/examples/dgram.rs new file mode 100644 index 0000000..ab25e78 --- /dev/null +++ b/examples/dgram.rs @@ -0,0 +1,100 @@ +//! Datagram transport example (equivalent to example4.cpp). +//! +//! Creates two nodes and sends datagrams between them +//! using the dgram callback API. +//! +//! Usage: +//! cargo run --example dgram + +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tesseras_dht::Node; +use tesseras_dht::id::NodeId; +use tesseras_dht::nat::NatState; + +fn main() { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .format(|buf, record| { + use std::io::Write; + writeln!( + buf, + "{} [{}] {}", + record.level(), + record.target(), + record.args() + ) + }) + .init(); + + let mut node1 = Node::bind(0).expect("bind node1"); + node1.set_nat_state(NatState::Global); + let addr1 = node1.local_addr().unwrap(); + let id1 = *node1.id(); + + let mut node2 = Node::bind(0).expect("bind node2"); + node2.set_nat_state(NatState::Global); + let addr2 = node2.local_addr().unwrap(); + let id2 = *node2.id(); + + println!("Node 1: {} @ {addr1}", node1.id_hex()); + println!("Node 2: {} @ {addr2}", node2.id_hex()); + + // Join node2 to node1 + node2.join("127.0.0.1", addr1.port()).expect("join"); + + // Poll to establish routing + for _ in 0..10 { + node1.poll().ok(); + node2.poll().ok(); + std::thread::sleep(Duration::from_millis(50)); + } + + // Set up dgram callbacks + let received1: Arc>> = Arc::new(Mutex::new(Vec::new())); + let received2: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let r1 = received1.clone(); + node1.set_dgram_callback(move |data: &[u8], from: &NodeId| { + let msg = String::from_utf8_lossy(data).to_string(); + println!("Node 1 received: '{msg}' from {from:?}"); + r1.lock().unwrap().push(msg); + }); + + let r2 = received2.clone(); + node2.set_dgram_callback(move |data: &[u8], from: &NodeId| { + let msg = String::from_utf8_lossy(data).to_string(); + println!("Node 2 received: '{msg}' from {from:?}"); + r2.lock().unwrap().push(msg); + }); + + // Send datagrams + println!("\n--- Sending datagrams ---"); + node1.send_dgram(b"hello from node1", &id2); + node2.send_dgram(b"hello from node2", &id1); + + // Poll to deliver + for _ in 0..10 { + node1.poll().ok(); + node2.poll().ok(); + std::thread::sleep(Duration::from_millis(50)); + } + + // Note: actual dgram delivery requires the full + // send queue → address resolution → send flow. + // This example demonstrates the API; full delivery + // is wired in integration. + + println!("\n--- Summary ---"); + println!( + "Node 1 received {} messages", + received1.lock().unwrap().len() + ); + println!( + "Node 2 received {} messages", + received2.lock().unwrap().len() + ); + println!("Node 1 send queue pending: queued for delivery"); + println!("Node 2 send queue pending: queued for delivery"); +} diff --git a/examples/join.rs b/examples/join.rs new file mode 100644 index 0000000..a478410 --- /dev/null +++ b/examples/join.rs @@ -0,0 +1,65 @@ +//! Basic bootstrap example (equivalent to example1.cpp). +//! +//! Usage: +//! cargo run --example join -- 10000 +//! cargo run --example join -- 10001 127.0.0.1 10000 +//! +//! The first invocation creates a standalone node. +//! The second joins via the first. + +use std::time::Duration; +use tesseras_dht::Node; +use tesseras_dht::nat::NatState; + +fn main() { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .format(|buf, record| { + use std::io::Write; + writeln!( + buf, + "{} [{}] {}", + record.level(), + record.target(), + record.args() + ) + }) + .init(); + + let args: Vec = std::env::args().collect(); + if args.len() < 2 { + eprintln!("usage: {} port [host port]", args[0]); + eprintln!(); + eprintln!("example:"); + eprintln!(" $ {} 10000 &", args[0]); + eprintln!(" $ {} 10001 127.0.0.1 10000", args[0]); + std::process::exit(1); + } + + let port: u16 = args[1].parse().expect("invalid port"); + + let mut node = Node::bind(port).expect("bind failed"); + node.set_nat_state(NatState::Global); + + println!("Node {} listening on port {port}", node.id_hex()); + + if args.len() >= 4 { + let dst_host = &args[2]; + let dst_port: u16 = args[3].parse().expect("invalid dst port"); + + match node.join(dst_host, dst_port) { + Ok(()) => println!("Join request sent"), + Err(e) => { + eprintln!("Join failed: {e}"); + std::process::exit(1); + } + } + } + + // Event loop + loop { + node.poll().ok(); + std::thread::sleep(Duration::from_millis(100)); + } +} diff --git a/examples/network.rs b/examples/network.rs new file mode 100644 index 0000000..be3e0ef --- /dev/null +++ b/examples/network.rs @@ -0,0 +1,86 @@ +//! Multi-node network example (equivalent to example2.cpp). +//! +//! Creates N nodes on localhost, joins them recursively +//! via the first node, then prints state periodically. +//! +//! Usage: +//! cargo run --example network +//! RUST_LOG=debug cargo run --example network + +use std::time::{Duration, Instant}; +use tesseras_dht::Node; +use tesseras_dht::nat::NatState; + +const NUM_NODES: usize = 20; +const POLL_ROUNDS: usize = 30; + +fn main() { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .format(|buf, record| { + use std::io::Write; + writeln!( + buf, + "{} [{}] {}", + record.level(), + record.target(), + record.args() + ) + }) + .init(); + + println!("Creating {NUM_NODES} nodes..."); + let start = Instant::now(); + + // Create bootstrap node + let mut nodes: Vec = Vec::new(); + let bootstrap = Node::bind(0).expect("bind bootstrap"); + let bootstrap_port = bootstrap.local_addr().unwrap().port(); + println!("Bootstrap: {} @ port {bootstrap_port}", bootstrap.id_hex()); + nodes.push(bootstrap); + + // Create and join remaining nodes + for i in 1..NUM_NODES { + let mut node = Node::bind(0).expect("bind node"); + node.set_nat_state(NatState::Global); + node.join("127.0.0.1", bootstrap_port).expect("join"); + println!("Node {i}: {} joined", &node.id_hex()[..8]); + nodes.push(node); + } + nodes[0].set_nat_state(NatState::Global); + + println!("\nAll {NUM_NODES} nodes created in {:?}", start.elapsed()); + + // Poll all nodes to exchange messages + println!("\nPolling {POLL_ROUNDS} rounds..."); + for round in 0..POLL_ROUNDS { + for node in nodes.iter_mut() { + node.poll().ok(); + } + std::thread::sleep(Duration::from_millis(50)); + + if (round + 1) % 10 == 0 { + let sizes: Vec = + nodes.iter().map(|n| n.routing_table_size()).collect(); + let avg = sizes.iter().sum::() / sizes.len(); + let max = sizes.iter().max().unwrap(); + println!( + " Round {}: avg routing table = {avg}, max = {max}", + round + 1 + ); + } + } + + // Print final state + println!("\n--- Final state ---"); + for (i, node) in nodes.iter().enumerate() { + println!( + "Node {i}: {} | rt={} peers={} storage={}", + &node.id_hex()[..8], + node.routing_table_size(), + node.peer_count(), + node.storage_count(), + ); + } +} diff --git a/examples/put_get.rs b/examples/put_get.rs new file mode 100644 index 0000000..e2bfaca --- /dev/null +++ b/examples/put_get.rs @@ -0,0 +1,93 @@ +//! DHT put/get example (equivalent to example3.cpp). +//! +//! Creates a small network, stores key-value pairs from +//! one node, and retrieves them from another. +//! +//! Usage: +//! cargo run --example put_get + +use std::time::Duration; +use tesseras_dht::Node; +use tesseras_dht::nat::NatState; + +const NUM_NODES: usize = 5; + +fn main() { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .format(|buf, record| { + use std::io::Write; + writeln!( + buf, + "{} [{}] {}", + record.level(), + record.target(), + record.args() + ) + }) + .init(); + + // Create network + let mut nodes: Vec = Vec::new(); + let bootstrap = Node::bind(0).expect("bind"); + let bp = bootstrap.local_addr().unwrap().port(); + nodes.push(bootstrap); + nodes[0].set_nat_state(NatState::Global); + + for _ in 1..NUM_NODES { + let mut n = Node::bind(0).expect("bind"); + n.set_nat_state(NatState::Global); + n.join("127.0.0.1", bp).expect("join"); + nodes.push(n); + } + + // Poll to establish routing + for _ in 0..20 { + for n in nodes.iter_mut() { + n.poll().ok(); + } + std::thread::sleep(Duration::from_millis(50)); + } + + println!("Network ready: {NUM_NODES} nodes"); + + // Node 0 stores several key-value pairs + println!("\n--- Storing values from Node 0 ---"); + for i in 0..5u32 { + let key = format!("key-{i}"); + let val = format!("value-{i}"); + nodes[0].put(key.as_bytes(), val.as_bytes(), 300, false); + println!(" put({key}, {val})"); + } + + // Poll to distribute stores + for _ in 0..20 { + for n in nodes.iter_mut() { + n.poll().ok(); + } + std::thread::sleep(Duration::from_millis(50)); + } + + // Each node retrieves values + println!("\n--- Retrieving values ---"); + for (ni, node) in nodes.iter_mut().enumerate() { + for i in 0..5u32 { + let key = format!("key-{i}"); + let vals = node.get(key.as_bytes()); + let found: Vec = vals + .iter() + .map(|v| String::from_utf8_lossy(v).to_string()) + .collect(); + if !found.is_empty() { + println!(" Node {ni} get({key}) = {:?}", found); + } + } + } + + // Summary + println!("\n--- Storage summary ---"); + for (i, n) in nodes.iter().enumerate() { + println!(" Node {i}: {} values stored", n.storage_count()); + } +} diff --git a/examples/rdp.rs b/examples/rdp.rs new file mode 100644 index 0000000..319c779 --- /dev/null +++ b/examples/rdp.rs @@ -0,0 +1,147 @@ +//! RDP reliable transport example (equivalent to example5.cpp). +//! +//! Two nodes: server listens, client connects, sends +//! data, server receives it. +//! +//! Usage: +//! cargo run --example rdp + +use std::time::Duration; +use tesseras_dht::Node; +use tesseras_dht::nat::NatState; +use tesseras_dht::rdp::RdpState; + +const RDP_PORT: u16 = 5000; + +fn main() { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .format(|buf, record| { + use std::io::Write; + writeln!( + buf, + "{} [{}] {}", + record.level(), + record.target(), + record.args() + ) + }) + .init(); + + // Create two nodes + let mut server = Node::bind(0).expect("bind server"); + server.set_nat_state(NatState::Global); + let server_addr = server.local_addr().unwrap(); + let server_id = *server.id(); + println!("Server: {} @ {server_addr}", server.id_hex()); + + let mut client = Node::bind(0).expect("bind client"); + client.set_nat_state(NatState::Global); + println!("Client: {}", client.id_hex()); + + // Client joins server so they know each other + client.join("127.0.0.1", server_addr.port()).expect("join"); + + // Poll to exchange routing info + for _ in 0..10 { + server.poll().ok(); + client.poll().ok(); + std::thread::sleep(Duration::from_millis(20)); + } + println!("Client knows {} peers", client.routing_table_size()); + + // Server listens on RDP port + let _listen = server.rdp_listen(RDP_PORT).expect("listen"); + println!("Server listening on RDP port {RDP_PORT}"); + + // Client connects + let desc = client + .rdp_connect(0, &server_id, RDP_PORT) + .expect("connect"); + println!("Client state: {:?}", client.rdp_state(desc).unwrap()); + + // Poll to complete handshake + for _ in 0..10 { + server.poll().ok(); + client.poll().ok(); + std::thread::sleep(Duration::from_millis(20)); + } + + println!( + "Client state after handshake: {:?}", + client.rdp_state(desc).unwrap_or(RdpState::Closed) + ); + + // Send data if connection is open + match client.rdp_state(desc) { + Ok(RdpState::Open) => { + for i in 0..3u16 { + let msg = format!("hello {i}"); + match client.rdp_send(desc, msg.as_bytes()) { + Ok(n) => println!("Sent: '{msg}' ({n} bytes)"), + Err(e) => println!("Send error: {e}"), + } + } + + // Poll to deliver + for _ in 0..10 { + server.poll().ok(); + client.poll().ok(); + std::thread::sleep(Duration::from_millis(20)); + } + + // Server reads received data + println!("\n--- Server reading ---"); + let server_status = server.rdp_status(); + for s in &server_status { + if s.state == RdpState::Open { + let mut buf = [0u8; 256]; + loop { + match server.rdp_recv(s.sport as i32 + 1, &mut buf) { + Ok(0) => break, + Ok(n) => { + let msg = String::from_utf8_lossy(&buf[..n]); + println!("Server received: '{msg}'"); + } + Err(_) => break, + } + } + } + } + // Try reading from desc 2 (server-side accepted desc) + let mut buf = [0u8; 256]; + for attempt_desc in 1..=5 { + loop { + match server.rdp_recv(attempt_desc, &mut buf) { + Ok(0) => break, + Ok(n) => { + let msg = String::from_utf8_lossy(&buf[..n]); + println!("Server desc={attempt_desc}: '{msg}'"); + } + Err(_) => break, + } + } + } + } + Ok(state) => { + println!("Connection not open: {state:?}"); + } + Err(e) => { + println!("Descriptor error: {e}"); + } + } + + // Show status + println!("\n--- RDP Status ---"); + for s in &client.rdp_status() { + println!(" state={:?} dport={} sport={}", s.state, s.dport, s.sport); + } + + // Cleanup + client.rdp_close(desc); + + println!("\n--- Done ---"); + println!("Server: {server}"); + println!("Client: {client}"); +} diff --git a/examples/remote_get.rs b/examples/remote_get.rs new file mode 100644 index 0000000..60add19 --- /dev/null +++ b/examples/remote_get.rs @@ -0,0 +1,106 @@ +//! Remote get example: FIND_VALUE across the network. +//! +//! Node 1 stores a value locally. Node 3 retrieves it +//! via iterative FIND_VALUE, even though it never +//! received a STORE. +//! +//! Usage: +//! cargo run --example remote_get + +use std::time::Duration; +use tesseras_dht::Node; +use tesseras_dht::nat::NatState; + +fn main() { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .format(|buf, record| { + use std::io::Write; + writeln!( + buf, + "{} [{}] {}", + record.level(), + record.target(), + record.args() + ) + }) + .init(); + + // Create 3 nodes + let mut node1 = Node::bind(0).expect("bind"); + node1.set_nat_state(NatState::Global); + let port1 = node1.local_addr().unwrap().port(); + + let mut node2 = Node::bind(0).expect("bind"); + node2.set_nat_state(NatState::Global); + node2.join("127.0.0.1", port1).expect("join"); + + let mut node3 = Node::bind(0).expect("bind"); + node3.set_nat_state(NatState::Global); + node3.join("127.0.0.1", port1).expect("join"); + + println!("Node 1: {} (has the value)", &node1.id_hex()[..8]); + println!("Node 2: {} (relay)", &node2.id_hex()[..8]); + println!("Node 3: {} (will search)", &node3.id_hex()[..8]); + + // Let them discover each other + for _ in 0..10 { + node1.poll().ok(); + node2.poll().ok(); + node3.poll().ok(); + std::thread::sleep(Duration::from_millis(20)); + } + + println!( + "\nRouting tables: N1={} N2={} N3={}", + node1.routing_table_size(), + node2.routing_table_size(), + node3.routing_table_size(), + ); + + // Node 1 stores a value locally only (no STORE sent) + println!("\n--- Node 1 stores 'secret-key' ---"); + node1.put(b"secret-key", b"secret-value", 300, false); + + // Verify: Node 3 does NOT have it + assert!( + node3.get(b"secret-key").is_empty(), + "Node 3 should not have the value yet" + ); + println!("Node 3 get('secret-key'): [] (not found)"); + + // Node 3 does get() — triggers FIND_VALUE + println!("\n--- Node 3 searches via FIND_VALUE ---"); + let _ = node3.get(b"secret-key"); // starts query + + // Poll to let FIND_VALUE propagate + for _ in 0..15 { + node1.poll().ok(); + node2.poll().ok(); + node3.poll().ok(); + std::thread::sleep(Duration::from_millis(30)); + } + + // Now Node 3 should have cached the value + let result = node3.get(b"secret-key"); + println!( + "Node 3 get('secret-key'): {:?}", + result + .iter() + .map(|v| String::from_utf8_lossy(v).to_string()) + .collect::>() + ); + + if result.is_empty() { + println!("\n(Value not found — may need more poll rounds)"); + } else { + println!("\nRemote get successful!"); + } + + // Storage summary + println!("\n--- Storage ---"); + println!("Node 1: {} values", node1.storage_count()); + println!("Node 2: {} values", node2.storage_count()); + println!("Node 3: {} values", node3.storage_count()); +} diff --git a/examples/tesserasd.rs b/examples/tesserasd.rs new file mode 100644 index 0000000..81fc3bc --- /dev/null +++ b/examples/tesserasd.rs @@ -0,0 +1,338 @@ +//! tesserasd: daemon with TCP command interface. +//! +//! Manages multiple DHT nodes via a line-oriented TCP +//! protocol. +//! +//! Usage: +//! cargo run --example tesserasd +//! cargo run --example tesserasd -- --host 0.0.0.0 --port 8080 +//! +//! Then connect with: +//! nc localhost 12080 +//! +//! Commands: +//! new,NAME,PORT[,global] Create a node +//! delete,NAME Delete a node +//! set_id,NAME,DATA Set node ID from data +//! join,NAME,HOST,PORT Join network +//! put,NAME,KEY,VALUE,TTL Store key-value +//! get,NAME,KEY Retrieve value +//! dump,NAME Print node state +//! list List all nodes +//! quit Close connection +//! +//! Response codes: +//! 200-205 Success +//! 400-409 Error + +use std::collections::HashMap; +use std::io::{BufRead, BufReader, Write}; +use std::net::{TcpListener, TcpStream}; +use std::time::Duration; + +use tesseras_dht::Node; +use tesseras_dht::nat::NatState; + +// Response codes +const OK_NEW: &str = "200"; +const OK_DELETE: &str = "201"; +const OK_JOIN: &str = "202"; +const OK_PUT: &str = "203"; +const OK_GET: &str = "204"; +const OK_SET_ID: &str = "205"; + +const ERR_UNKNOWN: &str = "400"; +const ERR_INVALID: &str = "401"; +const ERR_PORT: &str = "402"; +const ERR_EXISTS: &str = "403"; +const ERR_NO_NODE: &str = "404"; + +fn main() { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .format(|buf, record| { + use std::io::Write; + writeln!( + buf, + "{} [{}] {}", + record.level(), + record.target(), + record.args() + ) + }) + .init(); + + let mut host = "127.0.0.1".to_string(); + let mut port: u16 = 12080; + + let args: Vec = std::env::args().collect(); + let mut i = 1; + while i < args.len() { + match args[i].as_str() { + "--host" if i + 1 < args.len() => { + host = args[i + 1].clone(); + i += 2; + } + "--port" if i + 1 < args.len() => { + port = args[i + 1].parse().expect("invalid port"); + i += 2; + } + _ => i += 1, + } + } + + let listener = + TcpListener::bind(format!("{host}:{port}")).expect("bind TCP"); + listener.set_nonblocking(true).expect("set_nonblocking"); + + println!("tesserasd listening on 127.0.0.1:{port}"); + println!("Connect with: nc localhost {port}"); + + let mut nodes: HashMap = HashMap::new(); + let mut clients: Vec = Vec::new(); + + loop { + // Accept new TCP connections + if let Ok((stream, addr)) = listener.accept() { + println!("Client connected: {addr}"); + stream.set_nonblocking(true).expect("set_nonblocking"); + let mut s = stream.try_clone().unwrap(); + let _ = s.write_all(b"tesserasd ready\n"); + clients.push(stream); + } + + // Process commands from connected clients + let mut to_remove = Vec::new(); + for (i, client) in clients.iter().enumerate() { + let mut reader = BufReader::new(client.try_clone().unwrap()); + let mut line = String::new(); + match reader.read_line(&mut line) { + Ok(0) => to_remove.push(i), // disconnected + Ok(_) => { + let line = line.trim().to_string(); + if !line.is_empty() { + let mut out = client.try_clone().unwrap(); + let response = handle_command(&line, &mut nodes); + let _ = out.write_all(response.as_bytes()); + let _ = out.write_all(b"\n"); + + if line == "quit" { + to_remove.push(i); + } + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // No data yet + } + Err(_) => to_remove.push(i), + } + } + // Remove disconnected (reverse order) + to_remove.sort(); + to_remove.dedup(); + for i in to_remove.into_iter().rev() { + println!("Client disconnected"); + clients.remove(i); + } + + // Poll all DHT nodes + for node in nodes.values_mut() { + node.poll().ok(); + } + + std::thread::sleep(Duration::from_millis(10)); + } +} + +fn handle_command(line: &str, nodes: &mut HashMap) -> String { + let parts: Vec<&str> = line.split(',').collect(); + if parts.is_empty() { + return format!("{ERR_UNKNOWN},unknown command"); + } + + match parts[0] { + "new" => cmd_new(&parts, nodes), + "delete" => cmd_delete(&parts, nodes), + "set_id" => cmd_set_id(&parts, nodes), + "join" => cmd_join(&parts, nodes), + "put" => cmd_put(&parts, nodes), + "get" => cmd_get(&parts, nodes), + "dump" => cmd_dump(&parts, nodes), + "list" => cmd_list(nodes), + "quit" => "goodbye".to_string(), + _ => format!("{ERR_UNKNOWN},unknown command: {}", parts[0]), + } +} + +// new,NAME,PORT[,global] +fn cmd_new(parts: &[&str], nodes: &mut HashMap) -> String { + if parts.len() < 3 { + return format!("{ERR_INVALID},usage: new,NAME,PORT[,global]"); + } + let name = parts[1]; + let port: u16 = match parts[2].parse() { + Ok(p) => p, + Err(_) => return format!("{ERR_INVALID},invalid port"), + }; + + if nodes.contains_key(name) { + return format!("{ERR_EXISTS},new,{name},{port},already exists"); + } + + let mut node = match Node::bind(port) { + Ok(n) => n, + Err(e) => return format!("{ERR_PORT},new,{name},{port},{e}"), + }; + + if parts.len() >= 4 && parts[3] == "global" { + node.set_nat_state(NatState::Global); + } + + let id = node.id_hex(); + nodes.insert(name.to_string(), node); + format!("{OK_NEW},new,{name},{port},{id}") +} + +// delete,NAME +fn cmd_delete(parts: &[&str], nodes: &mut HashMap) -> String { + if parts.len() < 2 { + return format!("{ERR_INVALID},usage: delete,NAME"); + } + let name = parts[1]; + if nodes.remove(name).is_some() { + format!("{OK_DELETE},delete,{name}") + } else { + format!("{ERR_NO_NODE},delete,{name},not found") + } +} + +// set_id,NAME,DATA +fn cmd_set_id(parts: &[&str], nodes: &mut HashMap) -> String { + if parts.len() < 3 { + return format!("{ERR_INVALID},usage: set_id,NAME,DATA"); + } + let name = parts[1]; + let data = parts[2]; + match nodes.get_mut(name) { + Some(node) => { + node.set_id(data.as_bytes()); + format!("{OK_SET_ID},set_id,{name},{}", node.id_hex()) + } + None => format!("{ERR_NO_NODE},set_id,{name},not found"), + } +} + +// join,NAME,HOST,PORT +fn cmd_join(parts: &[&str], nodes: &mut HashMap) -> String { + if parts.len() < 4 { + return format!("{ERR_INVALID},usage: join,NAME,HOST,PORT"); + } + let name = parts[1]; + let host = parts[2]; + let port: u16 = match parts[3].parse() { + Ok(p) => p, + Err(_) => return format!("{ERR_INVALID},invalid port"), + }; + match nodes.get_mut(name) { + Some(node) => match node.join(host, port) { + Ok(()) => { + format!("{OK_JOIN},join,{name},{host},{port}") + } + Err(e) => format!("{ERR_INVALID},join,{name},{host},{port},{e}"), + }, + None => { + format!("{ERR_NO_NODE},join,{name},not found") + } + } +} + +// put,NAME,KEY,VALUE,TTL +fn cmd_put(parts: &[&str], nodes: &mut HashMap) -> String { + if parts.len() < 5 { + return format!("{ERR_INVALID},usage: put,NAME,KEY,VALUE,TTL"); + } + let name = parts[1]; + let key = parts[2]; + let value = parts[3]; + let ttl: u16 = match parts[4].parse() { + Ok(t) => t, + Err(_) => return format!("{ERR_INVALID},invalid TTL"), + }; + match nodes.get_mut(name) { + Some(node) => { + node.put(key.as_bytes(), value.as_bytes(), ttl, false); + format!("{OK_PUT},put,{name},{key}") + } + None => { + format!("{ERR_NO_NODE},put,{name},not found") + } + } +} + +// get,NAME,KEY +fn cmd_get(parts: &[&str], nodes: &mut HashMap) -> String { + if parts.len() < 3 { + return format!("{ERR_INVALID},usage: get,NAME,KEY"); + } + let name = parts[1]; + let key = parts[2]; + match nodes.get_mut(name) { + Some(node) => { + let vals = node.get(key.as_bytes()); + if vals.is_empty() { + format!("{OK_GET},get,{name},{key},") + } else { + let values: Vec = vals + .iter() + .map(|v| String::from_utf8_lossy(v).to_string()) + .collect(); + format!("{OK_GET},get,{name},{key},{}", values.join(";")) + } + } + None => { + format!("{ERR_NO_NODE},get,{name},not found") + } + } +} + +// dump,NAME +fn cmd_dump(parts: &[&str], nodes: &HashMap) -> String { + if parts.len() < 2 { + return format!("{ERR_INVALID},usage: dump,NAME"); + } + let name = parts[1]; + match nodes.get(name) { + Some(node) => { + format!( + "{OK_NEW},dump,{name},id={},nat={:?},\ + rt={},peers={},storage={}", + node.id_hex(), + node.nat_state(), + node.routing_table_size(), + node.peer_count(), + node.storage_count(), + ) + } + None => { + format!("{ERR_NO_NODE},dump,{name},not found") + } + } +} + +// list +fn cmd_list(nodes: &HashMap) -> String { + if nodes.is_empty() { + return format!("{OK_NEW},list,"); + } + let mut lines = Vec::new(); + for (name, node) in nodes { + lines.push(format!( + "{name}={} rt={} storage={}", + &node.id_hex()[..8], + node.routing_table_size(), + node.storage_count(), + )); + } + format!("{OK_NEW},list,{}", lines.join(";")) +} diff --git a/examples/two_nodes.rs b/examples/two_nodes.rs new file mode 100644 index 0000000..13565c6 --- /dev/null +++ b/examples/two_nodes.rs @@ -0,0 +1,133 @@ +//! Two-node example: bootstrap, put, get. +//! +//! Creates two Node nodes on localhost. Node 2 joins +//! via Node 1, then Node 1 stores a value and Node 2 +//! retrieves it (via protocol exchange). +//! +//! Run with: +//! cargo run --example two_nodes +//! +//! With debug logging: +//! RUST_LOG=debug cargo run --example two_nodes + +use std::time::Duration; + +use tesseras_dht::Node; +use tesseras_dht::nat::NatState; + +fn main() { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .format(|buf, record| { + use std::io::Write; + writeln!( + buf, + "{} [{}] {}", + record.level(), + record.target(), + record.args() + ) + }) + .init(); + + // ── Create two nodes ──────────────────────────── + + let mut node1 = Node::bind(0).expect("bind node1"); + node1.set_nat_state(NatState::Global); + let addr1 = node1.local_addr().expect("local addr"); + println!("Node 1: {} @ {}", node1.id_hex(), addr1); + + let mut node2 = Node::bind(0).expect("bind node2"); + node2.set_nat_state(NatState::Global); + let addr2 = node2.local_addr().expect("local addr"); + println!("Node 2: {} @ {}", node2.id_hex(), addr2); + + // ── Node 2 joins via Node 1 ───────────────────── + + println!("\n--- Node 2 joining via Node 1 ---"); + node2.join("127.0.0.1", addr1.port()).expect("join"); + + // Poll both nodes a few times to exchange messages + for _ in 0..10 { + node1.poll().ok(); + node2.poll().ok(); + std::thread::sleep(Duration::from_millis(50)); + } + + println!("Node 1 routing table: {} peers", node1.routing_table_size()); + println!("Node 2 routing table: {} peers", node2.routing_table_size()); + + // ── Node 1 stores a value ─────────────────────── + + println!("\n--- Node 1 storing key='hello' ---"); + node1.put(b"hello", b"world", 300, false); + + // Poll to deliver STORE messages + for _ in 0..10 { + node1.poll().ok(); + node2.poll().ok(); + std::thread::sleep(Duration::from_millis(50)); + } + + // ── Check storage ─────────────────────────────── + + let vals1 = node1.get(b"hello"); + let vals2 = node2.get(b"hello"); + + println!( + "\nNode 1 get('hello'): {:?}", + vals1 + .iter() + .map(|v| String::from_utf8_lossy(v).to_string()) + .collect::>() + ); + println!( + "Node 2 get('hello'): {:?}", + vals2 + .iter() + .map(|v| String::from_utf8_lossy(v).to_string()) + .collect::>() + ); + + // ── Test remote get via FIND_VALUE ──────────── + + println!("\n--- Node 1 storing key='secret' (local only) ---"); + // Store only on Node 1 (no STORE sent because + // we bypass put and go directly to storage) + node1.put(b"remote-key", b"remote-val", 300, false); + // Don't poll — so Node 2 doesn't get the STORE + + // Node 2 tries to get it — should trigger FIND_VALUE + println!( + "Node 2 get('remote-key') before poll: {:?}", + node2.get(b"remote-key") + ); + + // Now poll to let FIND_VALUE exchange happen + for _ in 0..10 { + node1.poll().ok(); + node2.poll().ok(); + std::thread::sleep(Duration::from_millis(50)); + } + + let remote_vals = node2.get(b"remote-key"); + println!( + "Node 2 get('remote-key') after poll: {:?}", + remote_vals + .iter() + .map(|v| String::from_utf8_lossy(v).to_string()) + .collect::>() + ); + + // ── Print state ───────────────────────────────── + + println!("\n--- Node 1 state ---"); + node1.print_state(); + println!("\n--- Node 2 state ---"); + node2.print_state(); + + println!("\n--- Done ---"); + println!("Node 1: {node1}"); + println!("Node 2: {node2}"); +} -- cgit v1.2.3