//! tesserasd: daemon with TCP command interface. //! //! Manages multiple DHT nodes via a line-oriented TCP //! protocol. //! //! Usage: //! cargo run --example tesserasd //! cargo run --example tesserasd -- --host 0.0.0.0 --port 8080 //! //! Then connect with: //! nc localhost 12080 //! //! Commands: //! new,NAME,PORT[,global] Create a node //! delete,NAME Delete a node //! set_id,NAME,DATA Set node ID from data //! join,NAME,HOST,PORT Join network //! put,NAME,KEY,VALUE,TTL Store key-value //! get,NAME,KEY Retrieve value //! dump,NAME Print node state //! list List all nodes //! quit Close connection //! //! Response codes: //! 200-205 Success //! 400-409 Error use std::collections::HashMap; use std::io::{BufRead, BufReader, Write}; use std::net::{TcpListener, TcpStream}; use std::time::Duration; use tesseras_dht::Node; use tesseras_dht::nat::NatState; // Response codes const OK_NEW: &str = "200"; const OK_DELETE: &str = "201"; const OK_JOIN: &str = "202"; const OK_PUT: &str = "203"; const OK_GET: &str = "204"; const OK_SET_ID: &str = "205"; const ERR_UNKNOWN: &str = "400"; const ERR_INVALID: &str = "401"; const ERR_PORT: &str = "402"; const ERR_EXISTS: &str = "403"; const ERR_NO_NODE: &str = "404"; fn main() { env_logger::Builder::from_env( env_logger::Env::default().default_filter_or("info"), ) .format(|buf, record| { use std::io::Write; writeln!( buf, "{} [{}] {}", record.level(), record.target(), record.args() ) }) .init(); let mut host = "127.0.0.1".to_string(); let mut port: u16 = 12080; let args: Vec = std::env::args().collect(); let mut i = 1; while i < args.len() { match args[i].as_str() { "--host" if i + 1 < args.len() => { host = args[i + 1].clone(); i += 2; } "--port" if i + 1 < args.len() => { port = args[i + 1].parse().expect("invalid port"); i += 2; } _ => i += 1, } } let listener = TcpListener::bind(format!("{host}:{port}")).expect("bind TCP"); listener.set_nonblocking(true).expect("set_nonblocking"); println!("tesserasd listening on 127.0.0.1:{port}"); println!("Connect with: nc localhost {port}"); let mut nodes: HashMap = HashMap::new(); let mut clients: Vec = Vec::new(); loop { // Accept new TCP connections if let Ok((stream, addr)) = listener.accept() { println!("Client connected: {addr}"); stream.set_nonblocking(true).expect("set_nonblocking"); let mut s = stream.try_clone().unwrap(); let _ = s.write_all(b"tesserasd ready\n"); clients.push(stream); } // Process commands from connected clients let mut to_remove = Vec::new(); for (i, client) in clients.iter().enumerate() { let mut reader = BufReader::new(client.try_clone().unwrap()); let mut line = String::new(); match reader.read_line(&mut line) { Ok(0) => to_remove.push(i), // disconnected Ok(_) => { let line = line.trim().to_string(); if !line.is_empty() { let mut out = client.try_clone().unwrap(); let response = handle_command(&line, &mut nodes); let _ = out.write_all(response.as_bytes()); let _ = out.write_all(b"\n"); if line == "quit" { to_remove.push(i); } } } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { // No data yet } Err(_) => to_remove.push(i), } } // Remove disconnected (reverse order) to_remove.sort(); to_remove.dedup(); for i in to_remove.into_iter().rev() { println!("Client disconnected"); clients.remove(i); } // Poll all DHT nodes for node in nodes.values_mut() { node.poll().ok(); } std::thread::sleep(Duration::from_millis(10)); } } fn handle_command(line: &str, nodes: &mut HashMap) -> String { let parts: Vec<&str> = line.split(',').collect(); if parts.is_empty() { return format!("{ERR_UNKNOWN},unknown command"); } match parts[0] { "new" => cmd_new(&parts, nodes), "delete" => cmd_delete(&parts, nodes), "set_id" => cmd_set_id(&parts, nodes), "join" => cmd_join(&parts, nodes), "put" => cmd_put(&parts, nodes), "get" => cmd_get(&parts, nodes), "dump" => cmd_dump(&parts, nodes), "list" => cmd_list(nodes), "quit" => "goodbye".to_string(), _ => format!("{ERR_UNKNOWN},unknown command: {}", parts[0]), } } // new,NAME,PORT[,global] fn cmd_new(parts: &[&str], nodes: &mut HashMap) -> String { if parts.len() < 3 { return format!("{ERR_INVALID},usage: new,NAME,PORT[,global]"); } let name = parts[1]; let port: u16 = match parts[2].parse() { Ok(p) => p, Err(_) => return format!("{ERR_INVALID},invalid port"), }; if nodes.contains_key(name) { return format!("{ERR_EXISTS},new,{name},{port},already exists"); } let mut node = match Node::bind(port) { Ok(n) => n, Err(e) => return format!("{ERR_PORT},new,{name},{port},{e}"), }; if parts.len() >= 4 && parts[3] == "global" { node.set_nat_state(NatState::Global); } let id = node.id_hex(); nodes.insert(name.to_string(), node); format!("{OK_NEW},new,{name},{port},{id}") } // delete,NAME fn cmd_delete(parts: &[&str], nodes: &mut HashMap) -> String { if parts.len() < 2 { return format!("{ERR_INVALID},usage: delete,NAME"); } let name = parts[1]; if nodes.remove(name).is_some() { format!("{OK_DELETE},delete,{name}") } else { format!("{ERR_NO_NODE},delete,{name},not found") } } // set_id,NAME,DATA fn cmd_set_id(parts: &[&str], nodes: &mut HashMap) -> String { if parts.len() < 3 { return format!("{ERR_INVALID},usage: set_id,NAME,DATA"); } let name = parts[1]; let data = parts[2]; match nodes.get_mut(name) { Some(node) => { node.set_id(data.as_bytes()); format!("{OK_SET_ID},set_id,{name},{}", node.id_hex()) } None => format!("{ERR_NO_NODE},set_id,{name},not found"), } } // join,NAME,HOST,PORT fn cmd_join(parts: &[&str], nodes: &mut HashMap) -> String { if parts.len() < 4 { return format!("{ERR_INVALID},usage: join,NAME,HOST,PORT"); } let name = parts[1]; let host = parts[2]; let port: u16 = match parts[3].parse() { Ok(p) => p, Err(_) => return format!("{ERR_INVALID},invalid port"), }; match nodes.get_mut(name) { Some(node) => match node.join(host, port) { Ok(()) => { format!("{OK_JOIN},join,{name},{host},{port}") } Err(e) => format!("{ERR_INVALID},join,{name},{host},{port},{e}"), }, None => { format!("{ERR_NO_NODE},join,{name},not found") } } } // put,NAME,KEY,VALUE,TTL fn cmd_put(parts: &[&str], nodes: &mut HashMap) -> String { if parts.len() < 5 { return format!("{ERR_INVALID},usage: put,NAME,KEY,VALUE,TTL"); } let name = parts[1]; let key = parts[2]; let value = parts[3]; let ttl: u16 = match parts[4].parse() { Ok(t) => t, Err(_) => return format!("{ERR_INVALID},invalid TTL"), }; match nodes.get_mut(name) { Some(node) => { node.put(key.as_bytes(), value.as_bytes(), ttl, false); format!("{OK_PUT},put,{name},{key}") } None => { format!("{ERR_NO_NODE},put,{name},not found") } } } // get,NAME,KEY fn cmd_get(parts: &[&str], nodes: &mut HashMap) -> String { if parts.len() < 3 { return format!("{ERR_INVALID},usage: get,NAME,KEY"); } let name = parts[1]; let key = parts[2]; match nodes.get_mut(name) { Some(node) => { let vals = node.get(key.as_bytes()); if vals.is_empty() { format!("{OK_GET},get,{name},{key},") } else { let values: Vec = vals .iter() .map(|v| String::from_utf8_lossy(v).to_string()) .collect(); format!("{OK_GET},get,{name},{key},{}", values.join(";")) } } None => { format!("{ERR_NO_NODE},get,{name},not found") } } } // dump,NAME fn cmd_dump(parts: &[&str], nodes: &HashMap) -> String { if parts.len() < 2 { return format!("{ERR_INVALID},usage: dump,NAME"); } let name = parts[1]; match nodes.get(name) { Some(node) => { format!( "{OK_NEW},dump,{name},id={},nat={:?},\ rt={},peers={},storage={}", node.id_hex(), node.nat_state(), node.routing_table_size(), node.peer_count(), node.storage_count(), ) } None => { format!("{ERR_NO_NODE},dump,{name},not found") } } } // list fn cmd_list(nodes: &HashMap) -> String { if nodes.is_empty() { return format!("{OK_NEW},list,"); } let mut lines = Vec::new(); for (name, node) in nodes { lines.push(format!( "{name}={} rt={} storage={}", &node.id_hex()[..8], node.routing_table_size(), node.storage_count(), )); } format!("{OK_NEW},list,{}", lines.join(";")) }