aboutsummaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authormurilo ijanc2026-03-24 15:04:03 -0300
committermurilo ijanc2026-03-24 15:04:03 -0300
commit9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch)
tree53da095ff90cc755bac3d4bf699172b5e8cd07d6 /examples
downloadtesseras-dht-e908bc01403f4b8ef2a65fa6be43716fd1c6e003.tar.gz
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support
Diffstat (limited to 'examples')
-rw-r--r--examples/dgram.rs100
-rw-r--r--examples/join.rs65
-rw-r--r--examples/network.rs86
-rw-r--r--examples/put_get.rs93
-rw-r--r--examples/rdp.rs147
-rw-r--r--examples/remote_get.rs106
-rw-r--r--examples/tesserasd.rs338
-rw-r--r--examples/two_nodes.rs133
8 files changed, 1068 insertions, 0 deletions
diff --git a/examples/dgram.rs b/examples/dgram.rs
new file mode 100644
index 0000000..ab25e78
--- /dev/null
+++ b/examples/dgram.rs
@@ -0,0 +1,100 @@
+//! Datagram transport example (equivalent to example4.cpp).
+//!
+//! Creates two nodes and sends datagrams between them
+//! using the dgram callback API.
+//!
+//! Usage:
+//! cargo run --example dgram
+
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tesseras_dht::Node;
+use tesseras_dht::id::NodeId;
+use tesseras_dht::nat::NatState;
+
+fn main() {
+ env_logger::Builder::from_env(
+ env_logger::Env::default().default_filter_or("info"),
+ )
+ .format(|buf, record| {
+ use std::io::Write;
+ writeln!(
+ buf,
+ "{} [{}] {}",
+ record.level(),
+ record.target(),
+ record.args()
+ )
+ })
+ .init();
+
+ let mut node1 = Node::bind(0).expect("bind node1");
+ node1.set_nat_state(NatState::Global);
+ let addr1 = node1.local_addr().unwrap();
+ let id1 = *node1.id();
+
+ let mut node2 = Node::bind(0).expect("bind node2");
+ node2.set_nat_state(NatState::Global);
+ let addr2 = node2.local_addr().unwrap();
+ let id2 = *node2.id();
+
+ println!("Node 1: {} @ {addr1}", node1.id_hex());
+ println!("Node 2: {} @ {addr2}", node2.id_hex());
+
+ // Join node2 to node1
+ node2.join("127.0.0.1", addr1.port()).expect("join");
+
+ // Poll to establish routing
+ for _ in 0..10 {
+ node1.poll().ok();
+ node2.poll().ok();
+ std::thread::sleep(Duration::from_millis(50));
+ }
+
+ // Set up dgram callbacks
+ let received1: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
+ let received2: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
+
+ let r1 = received1.clone();
+ node1.set_dgram_callback(move |data: &[u8], from: &NodeId| {
+ let msg = String::from_utf8_lossy(data).to_string();
+ println!("Node 1 received: '{msg}' from {from:?}");
+ r1.lock().unwrap().push(msg);
+ });
+
+ let r2 = received2.clone();
+ node2.set_dgram_callback(move |data: &[u8], from: &NodeId| {
+ let msg = String::from_utf8_lossy(data).to_string();
+ println!("Node 2 received: '{msg}' from {from:?}");
+ r2.lock().unwrap().push(msg);
+ });
+
+ // Send datagrams
+ println!("\n--- Sending datagrams ---");
+ node1.send_dgram(b"hello from node1", &id2);
+ node2.send_dgram(b"hello from node2", &id1);
+
+ // Poll to deliver
+ for _ in 0..10 {
+ node1.poll().ok();
+ node2.poll().ok();
+ std::thread::sleep(Duration::from_millis(50));
+ }
+
+ // Note: actual dgram delivery requires the full
+ // send queue → address resolution → send flow.
+ // This example demonstrates the API; full delivery
+ // is wired in integration.
+
+ println!("\n--- Summary ---");
+ println!(
+ "Node 1 received {} messages",
+ received1.lock().unwrap().len()
+ );
+ println!(
+ "Node 2 received {} messages",
+ received2.lock().unwrap().len()
+ );
+ println!("Node 1 send queue pending: queued for delivery");
+ println!("Node 2 send queue pending: queued for delivery");
+}
diff --git a/examples/join.rs b/examples/join.rs
new file mode 100644
index 0000000..a478410
--- /dev/null
+++ b/examples/join.rs
@@ -0,0 +1,65 @@
+//! Basic bootstrap example (equivalent to example1.cpp).
+//!
+//! Usage:
+//! cargo run --example join -- 10000
+//! cargo run --example join -- 10001 127.0.0.1 10000
+//!
+//! The first invocation creates a standalone node.
+//! The second joins via the first.
+
+use std::time::Duration;
+use tesseras_dht::Node;
+use tesseras_dht::nat::NatState;
+
+fn main() {
+ env_logger::Builder::from_env(
+ env_logger::Env::default().default_filter_or("info"),
+ )
+ .format(|buf, record| {
+ use std::io::Write;
+ writeln!(
+ buf,
+ "{} [{}] {}",
+ record.level(),
+ record.target(),
+ record.args()
+ )
+ })
+ .init();
+
+ let args: Vec<String> = std::env::args().collect();
+ if args.len() < 2 {
+ eprintln!("usage: {} port [host port]", args[0]);
+ eprintln!();
+ eprintln!("example:");
+ eprintln!(" $ {} 10000 &", args[0]);
+ eprintln!(" $ {} 10001 127.0.0.1 10000", args[0]);
+ std::process::exit(1);
+ }
+
+ let port: u16 = args[1].parse().expect("invalid port");
+
+ let mut node = Node::bind(port).expect("bind failed");
+ node.set_nat_state(NatState::Global);
+
+ println!("Node {} listening on port {port}", node.id_hex());
+
+ if args.len() >= 4 {
+ let dst_host = &args[2];
+ let dst_port: u16 = args[3].parse().expect("invalid dst port");
+
+ match node.join(dst_host, dst_port) {
+ Ok(()) => println!("Join request sent"),
+ Err(e) => {
+ eprintln!("Join failed: {e}");
+ std::process::exit(1);
+ }
+ }
+ }
+
+ // Event loop
+ loop {
+ node.poll().ok();
+ std::thread::sleep(Duration::from_millis(100));
+ }
+}
diff --git a/examples/network.rs b/examples/network.rs
new file mode 100644
index 0000000..be3e0ef
--- /dev/null
+++ b/examples/network.rs
@@ -0,0 +1,86 @@
+//! Multi-node network example (equivalent to example2.cpp).
+//!
+//! Creates N nodes on localhost, joins them recursively
+//! via the first node, then prints state periodically.
+//!
+//! Usage:
+//! cargo run --example network
+//! RUST_LOG=debug cargo run --example network
+
+use std::time::{Duration, Instant};
+use tesseras_dht::Node;
+use tesseras_dht::nat::NatState;
+
+const NUM_NODES: usize = 20;
+const POLL_ROUNDS: usize = 30;
+
+fn main() {
+ env_logger::Builder::from_env(
+ env_logger::Env::default().default_filter_or("info"),
+ )
+ .format(|buf, record| {
+ use std::io::Write;
+ writeln!(
+ buf,
+ "{} [{}] {}",
+ record.level(),
+ record.target(),
+ record.args()
+ )
+ })
+ .init();
+
+ println!("Creating {NUM_NODES} nodes...");
+ let start = Instant::now();
+
+ // Create bootstrap node
+ let mut nodes: Vec<Node> = Vec::new();
+ let bootstrap = Node::bind(0).expect("bind bootstrap");
+ let bootstrap_port = bootstrap.local_addr().unwrap().port();
+ println!("Bootstrap: {} @ port {bootstrap_port}", bootstrap.id_hex());
+ nodes.push(bootstrap);
+
+ // Create and join remaining nodes
+ for i in 1..NUM_NODES {
+ let mut node = Node::bind(0).expect("bind node");
+ node.set_nat_state(NatState::Global);
+ node.join("127.0.0.1", bootstrap_port).expect("join");
+ println!("Node {i}: {} joined", &node.id_hex()[..8]);
+ nodes.push(node);
+ }
+ nodes[0].set_nat_state(NatState::Global);
+
+ println!("\nAll {NUM_NODES} nodes created in {:?}", start.elapsed());
+
+ // Poll all nodes to exchange messages
+ println!("\nPolling {POLL_ROUNDS} rounds...");
+ for round in 0..POLL_ROUNDS {
+ for node in nodes.iter_mut() {
+ node.poll().ok();
+ }
+ std::thread::sleep(Duration::from_millis(50));
+
+ if (round + 1) % 10 == 0 {
+ let sizes: Vec<usize> =
+ nodes.iter().map(|n| n.routing_table_size()).collect();
+ let avg = sizes.iter().sum::<usize>() / sizes.len();
+ let max = sizes.iter().max().unwrap();
+ println!(
+ " Round {}: avg routing table = {avg}, max = {max}",
+ round + 1
+ );
+ }
+ }
+
+ // Print final state
+ println!("\n--- Final state ---");
+ for (i, node) in nodes.iter().enumerate() {
+ println!(
+ "Node {i}: {} | rt={} peers={} storage={}",
+ &node.id_hex()[..8],
+ node.routing_table_size(),
+ node.peer_count(),
+ node.storage_count(),
+ );
+ }
+}
diff --git a/examples/put_get.rs b/examples/put_get.rs
new file mode 100644
index 0000000..e2bfaca
--- /dev/null
+++ b/examples/put_get.rs
@@ -0,0 +1,93 @@
+//! DHT put/get example (equivalent to example3.cpp).
+//!
+//! Creates a small network, stores key-value pairs from
+//! one node, and retrieves them from another.
+//!
+//! Usage:
+//! cargo run --example put_get
+
+use std::time::Duration;
+use tesseras_dht::Node;
+use tesseras_dht::nat::NatState;
+
+const NUM_NODES: usize = 5;
+
+fn main() {
+ env_logger::Builder::from_env(
+ env_logger::Env::default().default_filter_or("info"),
+ )
+ .format(|buf, record| {
+ use std::io::Write;
+ writeln!(
+ buf,
+ "{} [{}] {}",
+ record.level(),
+ record.target(),
+ record.args()
+ )
+ })
+ .init();
+
+ // Create network
+ let mut nodes: Vec<Node> = Vec::new();
+ let bootstrap = Node::bind(0).expect("bind");
+ let bp = bootstrap.local_addr().unwrap().port();
+ nodes.push(bootstrap);
+ nodes[0].set_nat_state(NatState::Global);
+
+ for _ in 1..NUM_NODES {
+ let mut n = Node::bind(0).expect("bind");
+ n.set_nat_state(NatState::Global);
+ n.join("127.0.0.1", bp).expect("join");
+ nodes.push(n);
+ }
+
+ // Poll to establish routing
+ for _ in 0..20 {
+ for n in nodes.iter_mut() {
+ n.poll().ok();
+ }
+ std::thread::sleep(Duration::from_millis(50));
+ }
+
+ println!("Network ready: {NUM_NODES} nodes");
+
+ // Node 0 stores several key-value pairs
+ println!("\n--- Storing values from Node 0 ---");
+ for i in 0..5u32 {
+ let key = format!("key-{i}");
+ let val = format!("value-{i}");
+ nodes[0].put(key.as_bytes(), val.as_bytes(), 300, false);
+ println!(" put({key}, {val})");
+ }
+
+ // Poll to distribute stores
+ for _ in 0..20 {
+ for n in nodes.iter_mut() {
+ n.poll().ok();
+ }
+ std::thread::sleep(Duration::from_millis(50));
+ }
+
+ // Each node retrieves values
+ println!("\n--- Retrieving values ---");
+ for (ni, node) in nodes.iter_mut().enumerate() {
+ for i in 0..5u32 {
+ let key = format!("key-{i}");
+ let vals = node.get(key.as_bytes());
+ let found: Vec<String> = vals
+ .iter()
+ .map(|v| String::from_utf8_lossy(v).to_string())
+ .collect();
+ if !found.is_empty() {
+ println!(" Node {ni} get({key}) = {:?}", found);
+ }
+ }
+ }
+
+ // Summary
+ println!("\n--- Storage summary ---");
+ for (i, n) in nodes.iter().enumerate() {
+ println!(" Node {i}: {} values stored", n.storage_count());
+ }
+}
diff --git a/examples/rdp.rs b/examples/rdp.rs
new file mode 100644
index 0000000..319c779
--- /dev/null
+++ b/examples/rdp.rs
@@ -0,0 +1,147 @@
+//! RDP reliable transport example (equivalent to example5.cpp).
+//!
+//! Two nodes: server listens, client connects, sends
+//! data, server receives it.
+//!
+//! Usage:
+//! cargo run --example rdp
+
+use std::time::Duration;
+use tesseras_dht::Node;
+use tesseras_dht::nat::NatState;
+use tesseras_dht::rdp::RdpState;
+
+const RDP_PORT: u16 = 5000;
+
+fn main() {
+ env_logger::Builder::from_env(
+ env_logger::Env::default().default_filter_or("info"),
+ )
+ .format(|buf, record| {
+ use std::io::Write;
+ writeln!(
+ buf,
+ "{} [{}] {}",
+ record.level(),
+ record.target(),
+ record.args()
+ )
+ })
+ .init();
+
+ // Create two nodes
+ let mut server = Node::bind(0).expect("bind server");
+ server.set_nat_state(NatState::Global);
+ let server_addr = server.local_addr().unwrap();
+ let server_id = *server.id();
+ println!("Server: {} @ {server_addr}", server.id_hex());
+
+ let mut client = Node::bind(0).expect("bind client");
+ client.set_nat_state(NatState::Global);
+ println!("Client: {}", client.id_hex());
+
+ // Client joins server so they know each other
+ client.join("127.0.0.1", server_addr.port()).expect("join");
+
+ // Poll to exchange routing info
+ for _ in 0..10 {
+ server.poll().ok();
+ client.poll().ok();
+ std::thread::sleep(Duration::from_millis(20));
+ }
+ println!("Client knows {} peers", client.routing_table_size());
+
+ // Server listens on RDP port
+ let _listen = server.rdp_listen(RDP_PORT).expect("listen");
+ println!("Server listening on RDP port {RDP_PORT}");
+
+ // Client connects
+ let desc = client
+ .rdp_connect(0, &server_id, RDP_PORT)
+ .expect("connect");
+ println!("Client state: {:?}", client.rdp_state(desc).unwrap());
+
+ // Poll to complete handshake
+ for _ in 0..10 {
+ server.poll().ok();
+ client.poll().ok();
+ std::thread::sleep(Duration::from_millis(20));
+ }
+
+ println!(
+ "Client state after handshake: {:?}",
+ client.rdp_state(desc).unwrap_or(RdpState::Closed)
+ );
+
+ // Send data if connection is open
+ match client.rdp_state(desc) {
+ Ok(RdpState::Open) => {
+ for i in 0..3u16 {
+ let msg = format!("hello {i}");
+ match client.rdp_send(desc, msg.as_bytes()) {
+ Ok(n) => println!("Sent: '{msg}' ({n} bytes)"),
+ Err(e) => println!("Send error: {e}"),
+ }
+ }
+
+ // Poll to deliver
+ for _ in 0..10 {
+ server.poll().ok();
+ client.poll().ok();
+ std::thread::sleep(Duration::from_millis(20));
+ }
+
+ // Server reads received data
+ println!("\n--- Server reading ---");
+ let server_status = server.rdp_status();
+ for s in &server_status {
+ if s.state == RdpState::Open {
+ let mut buf = [0u8; 256];
+ loop {
+ match server.rdp_recv(s.sport as i32 + 1, &mut buf) {
+ Ok(0) => break,
+ Ok(n) => {
+ let msg = String::from_utf8_lossy(&buf[..n]);
+ println!("Server received: '{msg}'");
+ }
+ Err(_) => break,
+ }
+ }
+ }
+ }
+ // Try reading from desc 2 (server-side accepted desc)
+ let mut buf = [0u8; 256];
+ for attempt_desc in 1..=5 {
+ loop {
+ match server.rdp_recv(attempt_desc, &mut buf) {
+ Ok(0) => break,
+ Ok(n) => {
+ let msg = String::from_utf8_lossy(&buf[..n]);
+ println!("Server desc={attempt_desc}: '{msg}'");
+ }
+ Err(_) => break,
+ }
+ }
+ }
+ }
+ Ok(state) => {
+ println!("Connection not open: {state:?}");
+ }
+ Err(e) => {
+ println!("Descriptor error: {e}");
+ }
+ }
+
+ // Show status
+ println!("\n--- RDP Status ---");
+ for s in &client.rdp_status() {
+ println!(" state={:?} dport={} sport={}", s.state, s.dport, s.sport);
+ }
+
+ // Cleanup
+ client.rdp_close(desc);
+
+ println!("\n--- Done ---");
+ println!("Server: {server}");
+ println!("Client: {client}");
+}
diff --git a/examples/remote_get.rs b/examples/remote_get.rs
new file mode 100644
index 0000000..60add19
--- /dev/null
+++ b/examples/remote_get.rs
@@ -0,0 +1,106 @@
+//! Remote get example: FIND_VALUE across the network.
+//!
+//! Node 1 stores a value locally. Node 3 retrieves it
+//! via iterative FIND_VALUE, even though it never
+//! received a STORE.
+//!
+//! Usage:
+//! cargo run --example remote_get
+
+use std::time::Duration;
+use tesseras_dht::Node;
+use tesseras_dht::nat::NatState;
+
+fn main() {
+ env_logger::Builder::from_env(
+ env_logger::Env::default().default_filter_or("info"),
+ )
+ .format(|buf, record| {
+ use std::io::Write;
+ writeln!(
+ buf,
+ "{} [{}] {}",
+ record.level(),
+ record.target(),
+ record.args()
+ )
+ })
+ .init();
+
+ // Create 3 nodes
+ let mut node1 = Node::bind(0).expect("bind");
+ node1.set_nat_state(NatState::Global);
+ let port1 = node1.local_addr().unwrap().port();
+
+ let mut node2 = Node::bind(0).expect("bind");
+ node2.set_nat_state(NatState::Global);
+ node2.join("127.0.0.1", port1).expect("join");
+
+ let mut node3 = Node::bind(0).expect("bind");
+ node3.set_nat_state(NatState::Global);
+ node3.join("127.0.0.1", port1).expect("join");
+
+ println!("Node 1: {} (has the value)", &node1.id_hex()[..8]);
+ println!("Node 2: {} (relay)", &node2.id_hex()[..8]);
+ println!("Node 3: {} (will search)", &node3.id_hex()[..8]);
+
+ // Let them discover each other
+ for _ in 0..10 {
+ node1.poll().ok();
+ node2.poll().ok();
+ node3.poll().ok();
+ std::thread::sleep(Duration::from_millis(20));
+ }
+
+ println!(
+ "\nRouting tables: N1={} N2={} N3={}",
+ node1.routing_table_size(),
+ node2.routing_table_size(),
+ node3.routing_table_size(),
+ );
+
+ // Node 1 stores a value locally only (no STORE sent)
+ println!("\n--- Node 1 stores 'secret-key' ---");
+ node1.put(b"secret-key", b"secret-value", 300, false);
+
+ // Verify: Node 3 does NOT have it
+ assert!(
+ node3.get(b"secret-key").is_empty(),
+ "Node 3 should not have the value yet"
+ );
+ println!("Node 3 get('secret-key'): [] (not found)");
+
+ // Node 3 does get() — triggers FIND_VALUE
+ println!("\n--- Node 3 searches via FIND_VALUE ---");
+ let _ = node3.get(b"secret-key"); // starts query
+
+ // Poll to let FIND_VALUE propagate
+ for _ in 0..15 {
+ node1.poll().ok();
+ node2.poll().ok();
+ node3.poll().ok();
+ std::thread::sleep(Duration::from_millis(30));
+ }
+
+ // Now Node 3 should have cached the value
+ let result = node3.get(b"secret-key");
+ println!(
+ "Node 3 get('secret-key'): {:?}",
+ result
+ .iter()
+ .map(|v| String::from_utf8_lossy(v).to_string())
+ .collect::<Vec<_>>()
+ );
+
+ if result.is_empty() {
+ println!("\n(Value not found — may need more poll rounds)");
+ } else {
+ println!("\nRemote get successful!");
+ }
+
+ // Storage summary
+ println!("\n--- Storage ---");
+ println!("Node 1: {} values", node1.storage_count());
+ println!("Node 2: {} values", node2.storage_count());
+ println!("Node 3: {} values", node3.storage_count());
+}
diff --git a/examples/tesserasd.rs b/examples/tesserasd.rs
new file mode 100644
index 0000000..81fc3bc
--- /dev/null
+++ b/examples/tesserasd.rs
@@ -0,0 +1,338 @@
+//! tesserasd: daemon with TCP command interface.
+//!
+//! Manages multiple DHT nodes via a line-oriented TCP
+//! protocol.
+//!
+//! Usage:
+//! cargo run --example tesserasd
+//! cargo run --example tesserasd -- --host 0.0.0.0 --port 8080
+//!
+//! Then connect with:
+//! nc localhost 12080
+//!
+//! Commands:
+//! new,NAME,PORT[,global] Create a node
+//! delete,NAME Delete a node
+//! set_id,NAME,DATA Set node ID from data
+//! join,NAME,HOST,PORT Join network
+//! put,NAME,KEY,VALUE,TTL Store key-value
+//! get,NAME,KEY Retrieve value
+//! dump,NAME Print node state
+//! list List all nodes
+//! quit Close connection
+//!
+//! Response codes:
+//! 200-205 Success
+//! 400-409 Error
+
+use std::collections::HashMap;
+use std::io::{BufRead, BufReader, Write};
+use std::net::{TcpListener, TcpStream};
+use std::time::Duration;
+
+use tesseras_dht::Node;
+use tesseras_dht::nat::NatState;
+
+// Response codes
+const OK_NEW: &str = "200";
+const OK_DELETE: &str = "201";
+const OK_JOIN: &str = "202";
+const OK_PUT: &str = "203";
+const OK_GET: &str = "204";
+const OK_SET_ID: &str = "205";
+
+const ERR_UNKNOWN: &str = "400";
+const ERR_INVALID: &str = "401";
+const ERR_PORT: &str = "402";
+const ERR_EXISTS: &str = "403";
+const ERR_NO_NODE: &str = "404";
+
+fn main() {
+ env_logger::Builder::from_env(
+ env_logger::Env::default().default_filter_or("info"),
+ )
+ .format(|buf, record| {
+ use std::io::Write;
+ writeln!(
+ buf,
+ "{} [{}] {}",
+ record.level(),
+ record.target(),
+ record.args()
+ )
+ })
+ .init();
+
+ let mut host = "127.0.0.1".to_string();
+ let mut port: u16 = 12080;
+
+ let args: Vec<String> = std::env::args().collect();
+ let mut i = 1;
+ while i < args.len() {
+ match args[i].as_str() {
+ "--host" if i + 1 < args.len() => {
+ host = args[i + 1].clone();
+ i += 2;
+ }
+ "--port" if i + 1 < args.len() => {
+ port = args[i + 1].parse().expect("invalid port");
+ i += 2;
+ }
+ _ => i += 1,
+ }
+ }
+
+ let listener =
+ TcpListener::bind(format!("{host}:{port}")).expect("bind TCP");
+ listener.set_nonblocking(true).expect("set_nonblocking");
+
+ println!("tesserasd listening on 127.0.0.1:{port}");
+ println!("Connect with: nc localhost {port}");
+
+ let mut nodes: HashMap<String, Node> = HashMap::new();
+ let mut clients: Vec<TcpStream> = Vec::new();
+
+ loop {
+ // Accept new TCP connections
+ if let Ok((stream, addr)) = listener.accept() {
+ println!("Client connected: {addr}");
+ stream.set_nonblocking(true).expect("set_nonblocking");
+ let mut s = stream.try_clone().unwrap();
+ let _ = s.write_all(b"tesserasd ready\n");
+ clients.push(stream);
+ }
+
+ // Process commands from connected clients
+ let mut to_remove = Vec::new();
+ for (i, client) in clients.iter().enumerate() {
+ let mut reader = BufReader::new(client.try_clone().unwrap());
+ let mut line = String::new();
+ match reader.read_line(&mut line) {
+ Ok(0) => to_remove.push(i), // disconnected
+ Ok(_) => {
+ let line = line.trim().to_string();
+ if !line.is_empty() {
+ let mut out = client.try_clone().unwrap();
+ let response = handle_command(&line, &mut nodes);
+ let _ = out.write_all(response.as_bytes());
+ let _ = out.write_all(b"\n");
+
+ if line == "quit" {
+ to_remove.push(i);
+ }
+ }
+ }
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ // No data yet
+ }
+ Err(_) => to_remove.push(i),
+ }
+ }
+ // Remove disconnected (reverse order)
+ to_remove.sort();
+ to_remove.dedup();
+ for i in to_remove.into_iter().rev() {
+ println!("Client disconnected");
+ clients.remove(i);
+ }
+
+ // Poll all DHT nodes
+ for node in nodes.values_mut() {
+ node.poll().ok();
+ }
+
+ std::thread::sleep(Duration::from_millis(10));
+ }
+}
+
+fn handle_command(line: &str, nodes: &mut HashMap<String, Node>) -> String {
+ let parts: Vec<&str> = line.split(',').collect();
+ if parts.is_empty() {
+ return format!("{ERR_UNKNOWN},unknown command");
+ }
+
+ match parts[0] {
+ "new" => cmd_new(&parts, nodes),
+ "delete" => cmd_delete(&parts, nodes),
+ "set_id" => cmd_set_id(&parts, nodes),
+ "join" => cmd_join(&parts, nodes),
+ "put" => cmd_put(&parts, nodes),
+ "get" => cmd_get(&parts, nodes),
+ "dump" => cmd_dump(&parts, nodes),
+ "list" => cmd_list(nodes),
+ "quit" => "goodbye".to_string(),
+ _ => format!("{ERR_UNKNOWN},unknown command: {}", parts[0]),
+ }
+}
+
+// new,NAME,PORT[,global]
+fn cmd_new(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String {
+ if parts.len() < 3 {
+ return format!("{ERR_INVALID},usage: new,NAME,PORT[,global]");
+ }
+ let name = parts[1];
+ let port: u16 = match parts[2].parse() {
+ Ok(p) => p,
+ Err(_) => return format!("{ERR_INVALID},invalid port"),
+ };
+
+ if nodes.contains_key(name) {
+ return format!("{ERR_EXISTS},new,{name},{port},already exists");
+ }
+
+ let mut node = match Node::bind(port) {
+ Ok(n) => n,
+ Err(e) => return format!("{ERR_PORT},new,{name},{port},{e}"),
+ };
+
+ if parts.len() >= 4 && parts[3] == "global" {
+ node.set_nat_state(NatState::Global);
+ }
+
+ let id = node.id_hex();
+ nodes.insert(name.to_string(), node);
+ format!("{OK_NEW},new,{name},{port},{id}")
+}
+
+// delete,NAME
+fn cmd_delete(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String {
+ if parts.len() < 2 {
+ return format!("{ERR_INVALID},usage: delete,NAME");
+ }
+ let name = parts[1];
+ if nodes.remove(name).is_some() {
+ format!("{OK_DELETE},delete,{name}")
+ } else {
+ format!("{ERR_NO_NODE},delete,{name},not found")
+ }
+}
+
+// set_id,NAME,DATA
+fn cmd_set_id(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String {
+ if parts.len() < 3 {
+ return format!("{ERR_INVALID},usage: set_id,NAME,DATA");
+ }
+ let name = parts[1];
+ let data = parts[2];
+ match nodes.get_mut(name) {
+ Some(node) => {
+ node.set_id(data.as_bytes());
+ format!("{OK_SET_ID},set_id,{name},{}", node.id_hex())
+ }
+ None => format!("{ERR_NO_NODE},set_id,{name},not found"),
+ }
+}
+
+// join,NAME,HOST,PORT
+fn cmd_join(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String {
+ if parts.len() < 4 {
+ return format!("{ERR_INVALID},usage: join,NAME,HOST,PORT");
+ }
+ let name = parts[1];
+ let host = parts[2];
+ let port: u16 = match parts[3].parse() {
+ Ok(p) => p,
+ Err(_) => return format!("{ERR_INVALID},invalid port"),
+ };
+ match nodes.get_mut(name) {
+ Some(node) => match node.join(host, port) {
+ Ok(()) => {
+ format!("{OK_JOIN},join,{name},{host},{port}")
+ }
+ Err(e) => format!("{ERR_INVALID},join,{name},{host},{port},{e}"),
+ },
+ None => {
+ format!("{ERR_NO_NODE},join,{name},not found")
+ }
+ }
+}
+
+// put,NAME,KEY,VALUE,TTL
+fn cmd_put(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String {
+ if parts.len() < 5 {
+ return format!("{ERR_INVALID},usage: put,NAME,KEY,VALUE,TTL");
+ }
+ let name = parts[1];
+ let key = parts[2];
+ let value = parts[3];
+ let ttl: u16 = match parts[4].parse() {
+ Ok(t) => t,
+ Err(_) => return format!("{ERR_INVALID},invalid TTL"),
+ };
+ match nodes.get_mut(name) {
+ Some(node) => {
+ node.put(key.as_bytes(), value.as_bytes(), ttl, false);
+ format!("{OK_PUT},put,{name},{key}")
+ }
+ None => {
+ format!("{ERR_NO_NODE},put,{name},not found")
+ }
+ }
+}
+
+// get,NAME,KEY
+fn cmd_get(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String {
+ if parts.len() < 3 {
+ return format!("{ERR_INVALID},usage: get,NAME,KEY");
+ }
+ let name = parts[1];
+ let key = parts[2];
+ match nodes.get_mut(name) {
+ Some(node) => {
+ let vals = node.get(key.as_bytes());
+ if vals.is_empty() {
+ format!("{OK_GET},get,{name},{key},<not found>")
+ } else {
+ let values: Vec<String> = vals
+ .iter()
+ .map(|v| String::from_utf8_lossy(v).to_string())
+ .collect();
+ format!("{OK_GET},get,{name},{key},{}", values.join(";"))
+ }
+ }
+ None => {
+ format!("{ERR_NO_NODE},get,{name},not found")
+ }
+ }
+}
+
+// dump,NAME
+fn cmd_dump(parts: &[&str], nodes: &HashMap<String, Node>) -> String {
+ if parts.len() < 2 {
+ return format!("{ERR_INVALID},usage: dump,NAME");
+ }
+ let name = parts[1];
+ match nodes.get(name) {
+ Some(node) => {
+ format!(
+ "{OK_NEW},dump,{name},id={},nat={:?},\
+ rt={},peers={},storage={}",
+ node.id_hex(),
+ node.nat_state(),
+ node.routing_table_size(),
+ node.peer_count(),
+ node.storage_count(),
+ )
+ }
+ None => {
+ format!("{ERR_NO_NODE},dump,{name},not found")
+ }
+ }
+}
+
+// list
+fn cmd_list(nodes: &HashMap<String, Node>) -> String {
+ if nodes.is_empty() {
+ return format!("{OK_NEW},list,<empty>");
+ }
+ let mut lines = Vec::new();
+ for (name, node) in nodes {
+ lines.push(format!(
+ "{name}={} rt={} storage={}",
+ &node.id_hex()[..8],
+ node.routing_table_size(),
+ node.storage_count(),
+ ));
+ }
+ format!("{OK_NEW},list,{}", lines.join(";"))
+}
diff --git a/examples/two_nodes.rs b/examples/two_nodes.rs
new file mode 100644
index 0000000..13565c6
--- /dev/null
+++ b/examples/two_nodes.rs
@@ -0,0 +1,133 @@
+//! Two-node example: bootstrap, put, get.
+//!
+//! Creates two Node nodes on localhost. Node 2 joins
+//! via Node 1, then Node 1 stores a value and Node 2
+//! retrieves it (via protocol exchange).
+//!
+//! Run with:
+//! cargo run --example two_nodes
+//!
+//! With debug logging:
+//! RUST_LOG=debug cargo run --example two_nodes
+
+use std::time::Duration;
+
+use tesseras_dht::Node;
+use tesseras_dht::nat::NatState;
+
+fn main() {
+ env_logger::Builder::from_env(
+ env_logger::Env::default().default_filter_or("info"),
+ )
+ .format(|buf, record| {
+ use std::io::Write;
+ writeln!(
+ buf,
+ "{} [{}] {}",
+ record.level(),
+ record.target(),
+ record.args()
+ )
+ })
+ .init();
+
+ // ── Create two nodes ────────────────────────────
+
+ let mut node1 = Node::bind(0).expect("bind node1");
+ node1.set_nat_state(NatState::Global);
+ let addr1 = node1.local_addr().expect("local addr");
+ println!("Node 1: {} @ {}", node1.id_hex(), addr1);
+
+ let mut node2 = Node::bind(0).expect("bind node2");
+ node2.set_nat_state(NatState::Global);
+ let addr2 = node2.local_addr().expect("local addr");
+ println!("Node 2: {} @ {}", node2.id_hex(), addr2);
+
+ // ── Node 2 joins via Node 1 ─────────────────────
+
+ println!("\n--- Node 2 joining via Node 1 ---");
+ node2.join("127.0.0.1", addr1.port()).expect("join");
+
+ // Poll both nodes a few times to exchange messages
+ for _ in 0..10 {
+ node1.poll().ok();
+ node2.poll().ok();
+ std::thread::sleep(Duration::from_millis(50));
+ }
+
+ println!("Node 1 routing table: {} peers", node1.routing_table_size());
+ println!("Node 2 routing table: {} peers", node2.routing_table_size());
+
+ // ── Node 1 stores a value ───────────────────────
+
+ println!("\n--- Node 1 storing key='hello' ---");
+ node1.put(b"hello", b"world", 300, false);
+
+ // Poll to deliver STORE messages
+ for _ in 0..10 {
+ node1.poll().ok();
+ node2.poll().ok();
+ std::thread::sleep(Duration::from_millis(50));
+ }
+
+ // ── Check storage ───────────────────────────────
+
+ let vals1 = node1.get(b"hello");
+ let vals2 = node2.get(b"hello");
+
+ println!(
+ "\nNode 1 get('hello'): {:?}",
+ vals1
+ .iter()
+ .map(|v| String::from_utf8_lossy(v).to_string())
+ .collect::<Vec<_>>()
+ );
+ println!(
+ "Node 2 get('hello'): {:?}",
+ vals2
+ .iter()
+ .map(|v| String::from_utf8_lossy(v).to_string())
+ .collect::<Vec<_>>()
+ );
+
+ // ── Test remote get via FIND_VALUE ────────────
+
+ println!("\n--- Node 1 storing key='secret' (local only) ---");
+ // Store only on Node 1 (no STORE sent because
+ // we bypass put and go directly to storage)
+ node1.put(b"remote-key", b"remote-val", 300, false);
+ // Don't poll — so Node 2 doesn't get the STORE
+
+ // Node 2 tries to get it — should trigger FIND_VALUE
+ println!(
+ "Node 2 get('remote-key') before poll: {:?}",
+ node2.get(b"remote-key")
+ );
+
+ // Now poll to let FIND_VALUE exchange happen
+ for _ in 0..10 {
+ node1.poll().ok();
+ node2.poll().ok();
+ std::thread::sleep(Duration::from_millis(50));
+ }
+
+ let remote_vals = node2.get(b"remote-key");
+ println!(
+ "Node 2 get('remote-key') after poll: {:?}",
+ remote_vals
+ .iter()
+ .map(|v| String::from_utf8_lossy(v).to_string())
+ .collect::<Vec<_>>()
+ );
+
+ // ── Print state ─────────────────────────────────
+
+ println!("\n--- Node 1 state ---");
+ node1.print_state();
+ println!("\n--- Node 2 state ---");
+ node2.print_state();
+
+ println!("\n--- Done ---");
+ println!("Node 1: {node1}");
+ println!("Node 2: {node2}");
+}