diff options
| author | murilo ijanc | 2026-03-24 15:04:03 -0300 |
|---|---|---|
| committer | murilo ijanc | 2026-03-24 15:04:03 -0300 |
| commit | 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch) | |
| tree | 53da095ff90cc755bac3d4bf699172b5e8cd07d6 /examples/tesserasd.rs | |
| download | tesseras-dht-e908bc01403f4b8ef2a65fa6be43716fd1c6e003.tar.gz | |
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks.
Features:
- Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE)
- NAT traversal via DTUN hole-punching and proxy relay
- Reliable Datagram Protocol (RDP) with 7-state connection machine
- Datagram transport with automatic fragmentation/reassembly
- Ed25519 packet authentication
- 256-bit node IDs (Ed25519 public keys)
- Rate limiting, ban list, and eclipse attack mitigation
- Persistence and metrics
- OpenBSD and Linux support
Diffstat (limited to 'examples/tesserasd.rs')
| -rw-r--r-- | examples/tesserasd.rs | 338 |
1 files changed, 338 insertions, 0 deletions
diff --git a/examples/tesserasd.rs b/examples/tesserasd.rs new file mode 100644 index 0000000..81fc3bc --- /dev/null +++ b/examples/tesserasd.rs @@ -0,0 +1,338 @@ +//! tesserasd: daemon with TCP command interface. +//! +//! Manages multiple DHT nodes via a line-oriented TCP +//! protocol. +//! +//! Usage: +//! cargo run --example tesserasd +//! cargo run --example tesserasd -- --host 0.0.0.0 --port 8080 +//! +//! Then connect with: +//! nc localhost 12080 +//! +//! Commands: +//! new,NAME,PORT[,global] Create a node +//! delete,NAME Delete a node +//! set_id,NAME,DATA Set node ID from data +//! join,NAME,HOST,PORT Join network +//! put,NAME,KEY,VALUE,TTL Store key-value +//! get,NAME,KEY Retrieve value +//! dump,NAME Print node state +//! list List all nodes +//! quit Close connection +//! +//! Response codes: +//! 200-205 Success +//! 400-409 Error + +use std::collections::HashMap; +use std::io::{BufRead, BufReader, Write}; +use std::net::{TcpListener, TcpStream}; +use std::time::Duration; + +use tesseras_dht::Node; +use tesseras_dht::nat::NatState; + +// Response codes +const OK_NEW: &str = "200"; +const OK_DELETE: &str = "201"; +const OK_JOIN: &str = "202"; +const OK_PUT: &str = "203"; +const OK_GET: &str = "204"; +const OK_SET_ID: &str = "205"; + +const ERR_UNKNOWN: &str = "400"; +const ERR_INVALID: &str = "401"; +const ERR_PORT: &str = "402"; +const ERR_EXISTS: &str = "403"; +const ERR_NO_NODE: &str = "404"; + +fn main() { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .format(|buf, record| { + use std::io::Write; + writeln!( + buf, + "{} [{}] {}", + record.level(), + record.target(), + record.args() + ) + }) + .init(); + + let mut host = "127.0.0.1".to_string(); + let mut port: u16 = 12080; + + let args: Vec<String> = std::env::args().collect(); + let mut i = 1; + while i < args.len() { + match args[i].as_str() { + "--host" if i + 1 < args.len() => { + host = args[i + 1].clone(); + i += 2; + } + "--port" if i + 1 < args.len() => { + port = args[i + 1].parse().expect("invalid port"); + i += 2; + } + _ => i += 1, + } + } + + let listener = + TcpListener::bind(format!("{host}:{port}")).expect("bind TCP"); + listener.set_nonblocking(true).expect("set_nonblocking"); + + println!("tesserasd listening on 127.0.0.1:{port}"); + println!("Connect with: nc localhost {port}"); + + let mut nodes: HashMap<String, Node> = HashMap::new(); + let mut clients: Vec<TcpStream> = Vec::new(); + + loop { + // Accept new TCP connections + if let Ok((stream, addr)) = listener.accept() { + println!("Client connected: {addr}"); + stream.set_nonblocking(true).expect("set_nonblocking"); + let mut s = stream.try_clone().unwrap(); + let _ = s.write_all(b"tesserasd ready\n"); + clients.push(stream); + } + + // Process commands from connected clients + let mut to_remove = Vec::new(); + for (i, client) in clients.iter().enumerate() { + let mut reader = BufReader::new(client.try_clone().unwrap()); + let mut line = String::new(); + match reader.read_line(&mut line) { + Ok(0) => to_remove.push(i), // disconnected + Ok(_) => { + let line = line.trim().to_string(); + if !line.is_empty() { + let mut out = client.try_clone().unwrap(); + let response = handle_command(&line, &mut nodes); + let _ = out.write_all(response.as_bytes()); + let _ = out.write_all(b"\n"); + + if line == "quit" { + to_remove.push(i); + } + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // No data yet + } + Err(_) => to_remove.push(i), + } + } + // Remove disconnected (reverse order) + to_remove.sort(); + to_remove.dedup(); + for i in to_remove.into_iter().rev() { + println!("Client disconnected"); + clients.remove(i); + } + + // Poll all DHT nodes + for node in nodes.values_mut() { + node.poll().ok(); + } + + std::thread::sleep(Duration::from_millis(10)); + } +} + +fn handle_command(line: &str, nodes: &mut HashMap<String, Node>) -> String { + let parts: Vec<&str> = line.split(',').collect(); + if parts.is_empty() { + return format!("{ERR_UNKNOWN},unknown command"); + } + + match parts[0] { + "new" => cmd_new(&parts, nodes), + "delete" => cmd_delete(&parts, nodes), + "set_id" => cmd_set_id(&parts, nodes), + "join" => cmd_join(&parts, nodes), + "put" => cmd_put(&parts, nodes), + "get" => cmd_get(&parts, nodes), + "dump" => cmd_dump(&parts, nodes), + "list" => cmd_list(nodes), + "quit" => "goodbye".to_string(), + _ => format!("{ERR_UNKNOWN},unknown command: {}", parts[0]), + } +} + +// new,NAME,PORT[,global] +fn cmd_new(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String { + if parts.len() < 3 { + return format!("{ERR_INVALID},usage: new,NAME,PORT[,global]"); + } + let name = parts[1]; + let port: u16 = match parts[2].parse() { + Ok(p) => p, + Err(_) => return format!("{ERR_INVALID},invalid port"), + }; + + if nodes.contains_key(name) { + return format!("{ERR_EXISTS},new,{name},{port},already exists"); + } + + let mut node = match Node::bind(port) { + Ok(n) => n, + Err(e) => return format!("{ERR_PORT},new,{name},{port},{e}"), + }; + + if parts.len() >= 4 && parts[3] == "global" { + node.set_nat_state(NatState::Global); + } + + let id = node.id_hex(); + nodes.insert(name.to_string(), node); + format!("{OK_NEW},new,{name},{port},{id}") +} + +// delete,NAME +fn cmd_delete(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String { + if parts.len() < 2 { + return format!("{ERR_INVALID},usage: delete,NAME"); + } + let name = parts[1]; + if nodes.remove(name).is_some() { + format!("{OK_DELETE},delete,{name}") + } else { + format!("{ERR_NO_NODE},delete,{name},not found") + } +} + +// set_id,NAME,DATA +fn cmd_set_id(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String { + if parts.len() < 3 { + return format!("{ERR_INVALID},usage: set_id,NAME,DATA"); + } + let name = parts[1]; + let data = parts[2]; + match nodes.get_mut(name) { + Some(node) => { + node.set_id(data.as_bytes()); + format!("{OK_SET_ID},set_id,{name},{}", node.id_hex()) + } + None => format!("{ERR_NO_NODE},set_id,{name},not found"), + } +} + +// join,NAME,HOST,PORT +fn cmd_join(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String { + if parts.len() < 4 { + return format!("{ERR_INVALID},usage: join,NAME,HOST,PORT"); + } + let name = parts[1]; + let host = parts[2]; + let port: u16 = match parts[3].parse() { + Ok(p) => p, + Err(_) => return format!("{ERR_INVALID},invalid port"), + }; + match nodes.get_mut(name) { + Some(node) => match node.join(host, port) { + Ok(()) => { + format!("{OK_JOIN},join,{name},{host},{port}") + } + Err(e) => format!("{ERR_INVALID},join,{name},{host},{port},{e}"), + }, + None => { + format!("{ERR_NO_NODE},join,{name},not found") + } + } +} + +// put,NAME,KEY,VALUE,TTL +fn cmd_put(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String { + if parts.len() < 5 { + return format!("{ERR_INVALID},usage: put,NAME,KEY,VALUE,TTL"); + } + let name = parts[1]; + let key = parts[2]; + let value = parts[3]; + let ttl: u16 = match parts[4].parse() { + Ok(t) => t, + Err(_) => return format!("{ERR_INVALID},invalid TTL"), + }; + match nodes.get_mut(name) { + Some(node) => { + node.put(key.as_bytes(), value.as_bytes(), ttl, false); + format!("{OK_PUT},put,{name},{key}") + } + None => { + format!("{ERR_NO_NODE},put,{name},not found") + } + } +} + +// get,NAME,KEY +fn cmd_get(parts: &[&str], nodes: &mut HashMap<String, Node>) -> String { + if parts.len() < 3 { + return format!("{ERR_INVALID},usage: get,NAME,KEY"); + } + let name = parts[1]; + let key = parts[2]; + match nodes.get_mut(name) { + Some(node) => { + let vals = node.get(key.as_bytes()); + if vals.is_empty() { + format!("{OK_GET},get,{name},{key},<not found>") + } else { + let values: Vec<String> = vals + .iter() + .map(|v| String::from_utf8_lossy(v).to_string()) + .collect(); + format!("{OK_GET},get,{name},{key},{}", values.join(";")) + } + } + None => { + format!("{ERR_NO_NODE},get,{name},not found") + } + } +} + +// dump,NAME +fn cmd_dump(parts: &[&str], nodes: &HashMap<String, Node>) -> String { + if parts.len() < 2 { + return format!("{ERR_INVALID},usage: dump,NAME"); + } + let name = parts[1]; + match nodes.get(name) { + Some(node) => { + format!( + "{OK_NEW},dump,{name},id={},nat={:?},\ + rt={},peers={},storage={}", + node.id_hex(), + node.nat_state(), + node.routing_table_size(), + node.peer_count(), + node.storage_count(), + ) + } + None => { + format!("{ERR_NO_NODE},dump,{name},not found") + } + } +} + +// list +fn cmd_list(nodes: &HashMap<String, Node>) -> String { + if nodes.is_empty() { + return format!("{OK_NEW},list,<empty>"); + } + let mut lines = Vec::new(); + for (name, node) in nodes { + lines.push(format!( + "{name}={} rt={} storage={}", + &node.id_hex()[..8], + node.routing_table_size(), + node.storage_count(), + )); + } + format!("{OK_NEW},list,{}", lines.join(";")) +} |