diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/base58.rs | 153 | ||||
| -rw-r--r-- | src/bin/tp.rs | 206 | ||||
| -rw-r--r-- | src/bin/tpd.rs | 293 | ||||
| -rw-r--r-- | src/crypto.rs | 84 | ||||
| -rw-r--r-- | src/daemon.rs | 504 | ||||
| -rw-r--r-- | src/ops.rs | 160 | ||||
| -rw-r--r-- | src/paste.rs | 128 | ||||
| -rw-r--r-- | src/protocol.rs | 173 | ||||
| -rw-r--r-- | src/store.rs | 262 |
9 files changed, 1963 insertions, 0 deletions
diff --git a/src/base58.rs b/src/base58.rs new file mode 100644 index 0000000..c412bc3 --- /dev/null +++ b/src/base58.rs @@ -0,0 +1,153 @@ +//! Bitcoin-style Base58 encoding/decoding. +//! +//! No external dependencies. Uses the standard Bitcoin +//! alphabet (no 0, O, I, l to avoid ambiguity). + +const ALPHABET: &[u8; 58] = + b"123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"; + +/// Decode table: ASCII byte → base58 value (255 = invalid). +const DECODE: [u8; 128] = { + let mut table = [255u8; 128]; + let mut i = 0; + while i < 58 { + table[ALPHABET[i] as usize] = i as u8; + i += 1; + } + table +}; + +/// Encode bytes to Base58 string. +pub fn encode(input: &[u8]) -> String { + if input.is_empty() { + return String::new(); + } + + // Count leading zeros + let leading_zeros = input.iter().take_while(|&&b| b == 0).count(); + + // Convert to base58 via repeated division + let mut digits: Vec<u8> = Vec::with_capacity(input.len() * 2); + for &byte in input { + let mut carry = byte as u32; + for d in &mut digits { + carry += (*d as u32) << 8; + *d = (carry % 58) as u8; + carry /= 58; + } + while carry > 0 { + digits.push((carry % 58) as u8); + carry /= 58; + } + } + + let mut result = String::with_capacity(leading_zeros + digits.len()); + for _ in 0..leading_zeros { + result.push('1'); + } + for &d in digits.iter().rev() { + result.push(ALPHABET[d as usize] as char); + } + result +} + +/// Decode a Base58 string to bytes. +pub fn decode(input: &str) -> Option<Vec<u8>> { + if input.is_empty() { + return Some(Vec::new()); + } + + // Count leading '1's (representing zero bytes) + let leading_ones = input.chars().take_while(|&c| c == '1').count(); + + // Convert from base58 via repeated multiplication + let mut bytes: Vec<u8> = Vec::with_capacity(input.len()); + for c in input.chars() { + let c = c as usize; + if c >= 128 { + return None; + } + let val = DECODE[c]; + if val == 255 { + return None; + } + let mut carry = val as u32; + for b in &mut bytes { + carry += (*b as u32) * 58; + *b = (carry & 0xFF) as u8; + carry >>= 8; + } + while carry > 0 { + bytes.push((carry & 0xFF) as u8); + carry >>= 8; + } + } + + let mut result = vec![0; leading_ones]; + result.extend(bytes.iter().rev()); + Some(result) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn encode_empty() { + assert_eq!(encode(b""), ""); + } + + #[test] + fn encode_hello() { + assert_eq!(encode(b"Hello World"), "JxF12TrwUP45BMd"); + } + + #[test] + fn roundtrip() { + let data = b"tesseras-paste test data"; + let encoded = encode(data); + let decoded = decode(&encoded).unwrap(); + assert_eq!(decoded, data); + } + + #[test] + fn roundtrip_binary() { + let data: Vec<u8> = (0..=255).collect(); + let encoded = encode(&data); + let decoded = decode(&encoded).unwrap(); + assert_eq!(decoded, data); + } + + #[test] + fn leading_zeros() { + let data = [0, 0, 0, 1, 2, 3]; + let encoded = encode(&data); + assert!(encoded.starts_with("111")); + let decoded = decode(&encoded).unwrap(); + assert_eq!(decoded, data); + } + + #[test] + fn decode_invalid_char() { + assert!(decode("invalid0char").is_none()); + } + + #[test] + fn decode_empty() { + assert_eq!(decode("").unwrap(), Vec::<u8>::new()); + } + + #[test] + fn known_vector() { + // SHA-256 of "test" in base58 + let hash: [u8; 32] = { + use tesseras_dht::sha2::{Digest, Sha256}; + let mut h = Sha256::new(); + h.update(b"test"); + h.finalize().into() + }; + let encoded = encode(&hash); + let decoded = decode(&encoded).unwrap(); + assert_eq!(decoded, hash); + } +} diff --git a/src/bin/tp.rs b/src/bin/tp.rs new file mode 100644 index 0000000..e33c357 --- /dev/null +++ b/src/bin/tp.rs @@ -0,0 +1,206 @@ +//! tp — tesseras-paste CLI client. +//! +//! Sends commands to the `tpd` daemon over a Unix socket. +//! Reads paste content from stdin (put) and writes it to +//! stdout (get). + +use std::io::{BufRead, BufReader, Read, Write}; +use std::os::unix::net::UnixStream; +use std::path::PathBuf; + +#[path = "../base58.rs"] +mod base58; + +fn default_socket() -> PathBuf { + PathBuf::from("/var/tesseras-paste/daemon.sock") +} + +fn usage() { + eprintln!("usage: tp [-s sock] <command> [args]"); + eprintln!(); + eprintln!("commands:"); + eprintln!(" put [-t ttl] [-p] read stdin, store paste"); + eprintln!(" -p public (no encryption)"); + eprintln!(" get <key> retrieve paste to stdout"); + eprintln!(" del <key> delete paste"); + eprintln!(" pin <key> pin (never expires)"); + eprintln!(" unpin <key> unpin"); + eprintln!(" status show daemon status"); + eprintln!(); + eprintln!(" -s sock Unix socket path"); + eprintln!(" -t ttl time-to-live (e.g. 24h 30m 3600)"); +} + +fn parse_ttl(s: &str) -> Result<u64, String> { + let s = s.trim(); + if let Some(h) = s.strip_suffix('h') { + h.parse::<u64>() + .map(|v| v * 3600) + .map_err(|e| e.to_string()) + } else if let Some(m) = s.strip_suffix('m') { + m.parse::<u64>().map(|v| v * 60).map_err(|e| e.to_string()) + } else if let Some(sec) = s.strip_suffix('s') { + sec.parse::<u64>().map_err(|e| e.to_string()) + } else { + s.parse::<u64>().map_err(|e| e.to_string()) + } +} + +fn main() { + let args: Vec<String> = std::env::args().collect(); + + let mut sock_path = default_socket(); + let mut cmd_start = 1; + + // Parse global options before command + let mut i = 1; + while i < args.len() { + match args[i].as_str() { + "-s" => { + i += 1; + sock_path = + args.get(i).map(PathBuf::from).unwrap_or_else(|| { + eprintln!("error: -s requires path"); + std::process::exit(1); + }); + cmd_start = i + 1; + } + "-h" | "--help" => { + usage(); + return; + } + _ => break, + } + i += 1; + } + + let cmd_args = &args[cmd_start..]; + if cmd_args.is_empty() { + usage(); + std::process::exit(1); + } + + let command = &cmd_args[0]; + let mut is_get = false; + + let request = match command.as_str() { + "put" => { + let mut ttl = "24h".to_string(); + let mut public = false; + let mut j = 1; + while j < cmd_args.len() { + match cmd_args[j].as_str() { + "-t" => { + j += 1; + if j < cmd_args.len() { + ttl = cmd_args[j].clone(); + } + } + "-p" => public = true, + _ => {} + } + j += 1; + } + let ttl_secs = match parse_ttl(&ttl) { + Ok(s) => s, + Err(e) => { + eprintln!("error: bad TTL: {e}"); + std::process::exit(1); + } + }; + let mut content = Vec::new(); + if let Err(e) = std::io::stdin().read_to_end(&mut content) { + eprintln!("error: reading stdin: {e}"); + std::process::exit(1); + } + if content.is_empty() { + eprintln!("error: empty input"); + std::process::exit(1); + } + let cmd = if public { "PUTP" } else { "PUT" }; + format!("{cmd} {ttl_secs} {}\n", base58::encode(&content)) + } + "get" => { + let key = cmd_args.get(1).unwrap_or_else(|| { + eprintln!("error: get requires a key"); + std::process::exit(1); + }); + is_get = true; + format!("GET {key}\n") + } + "del" => { + let key = cmd_args.get(1).unwrap_or_else(|| { + eprintln!("error: del requires a key"); + std::process::exit(1); + }); + format!("DEL {key}\n") + } + "pin" => { + let key = cmd_args.get(1).unwrap_or_else(|| { + eprintln!("error: pin requires a key"); + std::process::exit(1); + }); + format!("PIN {key}\n") + } + "unpin" => { + let key = cmd_args.get(1).unwrap_or_else(|| { + eprintln!("error: unpin requires a key"); + std::process::exit(1); + }); + format!("UNPIN {key}\n") + } + "status" => "STATUS\n".to_string(), + other => { + eprintln!("unknown command: {other}"); + usage(); + std::process::exit(1); + } + }; + + let stream = match UnixStream::connect(&sock_path) { + Ok(s) => s, + Err(e) => { + eprintln!("error: cannot connect to {}: {e}", sock_path.display(),); + eprintln!("hint: is tpd running?"); + std::process::exit(1); + } + }; + + stream + .set_read_timeout(Some(std::time::Duration::from_secs(60))) + .ok(); + + let mut writer = &stream; + if let Err(e) = writer.write_all(request.as_bytes()) { + eprintln!("error: writing to socket: {e}"); + std::process::exit(1); + } + + let reader = BufReader::new(&stream); + for line in reader.lines() { + let line = match line { + Ok(l) => l, + Err(_) => break, + }; + if let Some(data) = line.strip_prefix("OK ") { + if is_get { + // Decode base58 → raw bytes → stdout + match base58::decode(data) { + Some(bytes) => { + if let Err(e) = std::io::stdout().write_all(&bytes) { + eprintln!("error: writing to stdout: {e}"); + std::process::exit(1); + } + } + None => println!("{data}"), + } + } else { + println!("{data}"); + } + break; + } else if let Some(msg) = line.strip_prefix("ERR ") { + eprintln!("error: {msg}"); + std::process::exit(1); + } + } +} diff --git a/src/bin/tpd.rs b/src/bin/tpd.rs new file mode 100644 index 0000000..15e7d9b --- /dev/null +++ b/src/bin/tpd.rs @@ -0,0 +1,293 @@ +//! tpd — tesseras-paste daemon. +//! +//! Runs a DHT node that stores and serves encrypted pastes. +//! Communicates with the CLI (`tp`) over a Unix socket and +//! optionally serves pastes via HTTP. + +#[path = "../base58.rs"] +mod base58; +#[path = "../crypto.rs"] +mod crypto; +#[path = "../daemon.rs"] +mod daemon; +#[path = "../ops.rs"] +mod ops; +#[path = "../paste.rs"] +mod paste; +#[path = "../protocol.rs"] +mod protocol; +#[path = "../store.rs"] +mod store; + +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, mpsc}; + +use tesseras_dht::nat::NatState; +use tesseras_dht::node::NodeBuilder; + +use store::PasteStore; + +fn default_dir() -> PathBuf { + PathBuf::from("/var/tesseras-paste") +} + +fn usage() { + eprintln!( + "usage: tpd [-p port] [-d dir] [-s sock] \ + [-w http_port] [-g] [-b host:port] [-h]" + ); + eprintln!(); + eprintln!(" -p port UDP port (0 = random)"); + eprintln!(" -d dir data directory"); + eprintln!(" -s sock Unix socket path"); + eprintln!(" -w port HTTP server port"); + eprintln!(" -g global NAT (public server)"); + eprintln!(" -b host:port bootstrap peer (repeatable)"); + eprintln!(" -h show this help"); +} + +fn main() { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .format(|buf, record| { + use std::io::Write; + writeln!(buf, "[{}]: {}", record.level(), record.args()) + }) + .init(); + + let mut port: u16 = 0; + let mut dir = default_dir(); + let mut sock: Option<PathBuf> = None; + let mut http_port: Option<u16> = None; + let mut global = false; + let mut bootstrap: Vec<String> = Vec::new(); + + let args: Vec<String> = std::env::args().collect(); + let mut i = 1; + while i < args.len() { + match args[i].as_str() { + "-p" => { + i += 1; + port = args.get(i).and_then(|s| s.parse().ok()).unwrap_or_else( + || { + eprintln!("error: -p requires a port"); + std::process::exit(1); + }, + ); + } + "-d" => { + i += 1; + dir = args.get(i).map(PathBuf::from).unwrap_or_else(|| { + eprintln!("error: -d requires a path"); + std::process::exit(1); + }); + } + "-s" => { + i += 1; + sock = args.get(i).map(PathBuf::from); + if sock.is_none() { + eprintln!("error: -s requires a path"); + std::process::exit(1); + } + } + "-w" => { + i += 1; + http_port = Some( + args.get(i).and_then(|s| s.parse().ok()).unwrap_or_else( + || { + eprintln!("error: -w requires a port"); + std::process::exit(1); + }, + ), + ); + } + "-g" => global = true, + "-b" => { + i += 1; + if let Some(addr) = args.get(i) { + bootstrap.push(addr.clone()); + } else { + eprintln!("error: -b requires host:port"); + std::process::exit(1); + } + } + "-h" | "--help" => { + usage(); + return; + } + other => { + eprintln!("unknown option: {other}"); + usage(); + std::process::exit(1); + } + } + i += 1; + } + + let sock_path = sock.unwrap_or_else(|| dir.join("daemon.sock")); + + // Ensure directories exist + let _ = std::fs::create_dir_all(&dir); + if let Some(parent) = sock_path.parent() { + let _ = std::fs::create_dir_all(parent); + } + + let store = match PasteStore::open(&dir) { + Ok(s) => s, + Err(e) => { + eprintln!("error: {e}"); + std::process::exit(1); + } + }; + + // Load or generate persistent identity + let identity_path = dir.join("identity.key"); + let identity_seed = load_or_create_identity(&identity_path); + + let mut builder = NodeBuilder::new().port(port).seed(&identity_seed); + if global { + builder = builder.nat(NatState::Global); + } + + let cfg = tesseras_dht::config::Config { + default_ttl: 65535, + max_value_size: 128 * 1024, + require_signatures: true, + ..Default::default() + }; + builder = builder.config(cfg); + + let mut node = match builder.build() { + Ok(n) => n, + Err(e) => { + eprintln!("error: {e}"); + std::process::exit(1); + } + }; + + node.set_routing_persistence(Box::new(store.clone())); + node.set_data_persistence(Box::new(store.clone())); + node.load_persisted(); + + let addr = match node.local_addr() { + Ok(a) => a, + Err(e) => { + eprintln!("error: could not determine local address: {e}"); + std::process::exit(1); + } + }; + let id = node.id_hex(); + eprintln!("tpd {addr} id={:.8}", id); + + for peer in &bootstrap { + let parts: Vec<&str> = peer.rsplitn(2, ':').collect(); + if parts.len() != 2 { + eprintln!("warning: bad bootstrap: {peer}"); + continue; + } + let host = parts[1]; + let p: u16 = match parts[0].parse() { + Ok(p) => p, + Err(_) => { + eprintln!("warning: bad port: {peer}"); + continue; + } + }; + if let Err(e) = node.join(host, p) { + eprintln!("warning: bootstrap {peer}: {e}"); + } else { + log::info!("bootstrap: connected to {peer}"); + } + } + + for _ in 0..10 { + let _ = node.poll(); + } + + eprintln!( + "peers={} socket={}", + node.routing_table_size(), + sock_path.display() + ); + + let shutdown = Arc::new(AtomicBool::new(false)); + + // Signal handler + let sig = Arc::clone(&shutdown); + unsafe { + SHUTDOWN_PTR.store( + Arc::into_raw(sig) as *mut AtomicBool as usize, + Ordering::SeqCst, + ); + signal(SIGINT, sig_handler as *const () as usize); + signal(SIGTERM, sig_handler as *const () as usize); + } + + let (tx, rx) = mpsc::channel(); + + let listener_shutdown = Arc::clone(&shutdown); + let listener_path = sock_path.clone(); + let handle = std::thread::spawn(move || { + daemon::run_unix_listener(&listener_path, tx, &listener_shutdown); + }); + + // HTTP server thread (optional) + let http_handle = http_port.map(|hp| { + let http_store = store.clone(); + let http_shutdown = Arc::clone(&shutdown); + let http_sock = sock_path.clone(); + eprintln!("http on 0.0.0.0:{hp}"); + std::thread::spawn(move || { + daemon::run_http(hp, &http_sock, &http_store, &http_shutdown); + }) + }); + + daemon::run_daemon(&mut node, &store, &rx, &shutdown); + + let _ = std::fs::remove_file(&sock_path); + let _ = handle.join(); + if let Some(h) = http_handle { + let _ = h.join(); + } + eprintln!("shutdown complete"); +} + +/// Load identity seed from file, or generate and save +/// a new one. This ensures the node keeps the same +/// Ed25519 keypair (and NodeId) across restarts. +fn load_or_create_identity(path: &std::path::Path) -> Vec<u8> { + if let Ok(data) = std::fs::read(path) + && data.len() == 32 + { + log::info!("identity: loaded from {}", path.display()); + return data; + } + let mut seed = [0u8; 32]; + tesseras_dht::sys::random_bytes(&mut seed); + if let Err(e) = std::fs::write(path, seed) { + log::warn!("identity: failed to save to {}: {e}", path.display()); + } else { + log::info!("identity: generated new keypair at {}", path.display()); + } + seed.to_vec() +} + +const SIGINT: i32 = 2; +const SIGTERM: i32 = 15; + +unsafe extern "C" { + fn signal(sig: i32, handler: usize) -> usize; +} + +static SHUTDOWN_PTR: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + +extern "C" fn sig_handler(_sig: i32) { + let ptr = SHUTDOWN_PTR.load(Ordering::SeqCst); + if ptr != 0 { + let flag = unsafe { &*(ptr as *const AtomicBool) }; + flag.store(true, Ordering::SeqCst); + } +} diff --git a/src/crypto.rs b/src/crypto.rs new file mode 100644 index 0000000..1480040 --- /dev/null +++ b/src/crypto.rs @@ -0,0 +1,84 @@ +//! XChaCha20-Poly1305 authenticated encryption. +//! +//! Uses OS-provided randomness (`arc4random_buf`) via +//! `tesseras_dht::sys::random_bytes` for key and nonce +//! generation. + +use chacha20poly1305::{ + XChaCha20Poly1305, XNonce, + aead::{Aead, KeyInit}, +}; + +/// XChaCha20 extended nonce size (24 bytes). +const NONCE_SIZE: usize = 24; + +/// XChaCha20-Poly1305 key size (32 bytes). +pub const KEY_SIZE: usize = 32; + +/// Generate a random 32-byte encryption key. +pub fn generate_key() -> [u8; KEY_SIZE] { + let mut key = [0u8; KEY_SIZE]; + tesseras_dht::sys::random_bytes(&mut key); + key +} + +/// Encrypt plaintext with a random nonce. Returns `nonce || ciphertext`. +pub fn encrypt(key: &[u8; KEY_SIZE], plaintext: &[u8]) -> Vec<u8> { + let cipher = XChaCha20Poly1305::new(key.into()); + let mut nonce_bytes = [0u8; NONCE_SIZE]; + tesseras_dht::sys::random_bytes(&mut nonce_bytes); + let nonce = XNonce::from(nonce_bytes); + let ciphertext = cipher + .encrypt(&nonce, plaintext) + .expect("encryption should not fail"); + let mut out = Vec::with_capacity(NONCE_SIZE + ciphertext.len()); + out.extend_from_slice(&nonce_bytes); + out.extend_from_slice(&ciphertext); + out +} + +/// Decrypt `nonce || ciphertext`. Returns `None` if authentication fails. +pub fn decrypt(key: &[u8; KEY_SIZE], data: &[u8]) -> Option<Vec<u8>> { + if data.len() < NONCE_SIZE { + return None; + } + let (nonce_bytes, ciphertext) = data.split_at(NONCE_SIZE); + let nonce = XNonce::from_slice(nonce_bytes); + let cipher = XChaCha20Poly1305::new(key.into()); + cipher.decrypt(nonce, ciphertext).ok() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn round_trip() { + let key = generate_key(); + let sealed = encrypt(&key, b"hello"); + let opened = decrypt(&key, &sealed).unwrap(); + assert_eq!(opened, b"hello"); + } + + #[test] + fn wrong_key_fails() { + let key = generate_key(); + let wrong = generate_key(); + let sealed = encrypt(&key, b"secret"); + assert!(decrypt(&wrong, &sealed).is_none()); + } + + #[test] + fn truncated_fails() { + let key = generate_key(); + assert!(decrypt(&key, &[0u8; 10]).is_none()); + } + + #[test] + fn different_nonces() { + let key = generate_key(); + let a = encrypt(&key, b"same"); + let b = encrypt(&key, b"same"); + assert_ne!(a, b); + } +} diff --git a/src/daemon.rs b/src/daemon.rs new file mode 100644 index 0000000..313a4aa --- /dev/null +++ b/src/daemon.rs @@ -0,0 +1,504 @@ +//! Daemon main loop, Unix socket listener, and HTTP server. +//! +//! The daemon loop drives the DHT node, processes client +//! requests, and runs periodic maintenance (GC, republish, +//! state persistence). Communication with the CLI happens +//! over a Unix socket using a line-oriented text protocol +//! (see [`crate::protocol`]). + +use std::io::{BufRead, BufReader, Write}; +use std::path::Path; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +use tesseras_dht::Node; + +use crate::base58; +use crate::ops; +use crate::paste::Paste; +use crate::protocol::{self, Request, Response}; +use crate::store::PasteStore; + +/// How often to garbage-collect expired pastes (10 min). +const GC_INTERVAL: Duration = Duration::from_secs(600); + +/// How often to republish local pastes to the DHT (30 min). +const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800); + +/// How often to persist routing table and state (5 min). +const SAVE_INTERVAL: Duration = Duration::from_secs(300); + +/// How often to sync DHT-replicated values to local store (5 s). +const SYNC_INTERVAL: Duration = Duration::from_secs(5); + +/// A request from the socket thread to the main thread. +pub struct DaemonRequest { + pub cmd: Request, + pub reply: mpsc::Sender<Response>, +} + +/// Run the daemon main loop. +pub fn run_daemon( + node: &mut Node, + store: &PasteStore, + rx: &mpsc::Receiver<DaemonRequest>, + shutdown: &AtomicBool, +) { + let mut last_gc = Instant::now(); + let mut last_republish = Instant::now() - REPUBLISH_INTERVAL; + let mut last_save = Instant::now(); + let mut last_sync = Instant::now(); + + log::info!("daemon main loop started"); + + while !shutdown.load(Ordering::Relaxed) { + let _ = node.poll_timeout(Duration::from_millis(100)); + + while let Ok(req) = rx.try_recv() { + let is_shutdown = matches!(req.cmd, Request::Shutdown); + let resp = handle_request(node, store, req.cmd); + let _ = req.reply.send(resp); + if is_shutdown { + shutdown.store(true, Ordering::Relaxed); + } + } + + if last_sync.elapsed() >= SYNC_INTERVAL { + last_sync = Instant::now(); + sync_dht_to_store(node, store); + } + + if last_gc.elapsed() >= GC_INTERVAL { + last_gc = Instant::now(); + match store.gc() { + Ok(0) => {} + Ok(n) => log::info!("gc: removed {n} expired pastes"), + Err(e) => log::warn!("gc: {e}"), + } + } + + if last_republish.elapsed() >= REPUBLISH_INTERVAL { + last_republish = Instant::now(); + republish(node, store); + } + + if last_save.elapsed() >= SAVE_INTERVAL { + last_save = Instant::now(); + node.save_state(); + } + } + + log::info!("daemon main loop stopped, shutting down"); + node.shutdown(); +} + +/// Dispatch a single client request to the appropriate operation. +fn handle_request( + node: &mut Node, + store: &PasteStore, + cmd: Request, +) -> Response { + match cmd { + Request::Put { + ttl_secs, + content_b58, + encrypt, + } => { + let content = match base58::decode(&content_b58) { + Some(c) => c, + None => return Response::Err("invalid base58 content".into()), + }; + match ops::put_paste(node, store, &content, ttl_secs, encrypt) { + Ok(key) => Response::Ok(key), + Err(e) => Response::Err(e.to_string()), + } + } + Request::Get { key } => match ops::get_paste(node, store, &key) { + Ok(data) => Response::Ok(base58::encode(&data)), + Err(e) => Response::Err(e.to_string()), + }, + Request::Del { key } => match ops::delete_paste(node, store, &key) { + Ok(()) => Response::Ok("deleted".into()), + Err(e) => Response::Err(e.to_string()), + }, + Request::Pin { ref key } | Request::Unpin { ref key } => { + let is_pin = matches!(cmd, Request::Pin { .. }); + let key = key.clone(); + let hash = match ops::resolve_hash(&key) { + Ok(h) => h, + Err(e) => return Response::Err(e.to_string()), + }; + let result = if is_pin { + store.pin(&hash) + } else { + store.unpin(&hash) + }; + match result { + Ok(()) => { + let label = if is_pin { "pinned" } else { "unpinned" }; + Response::Ok(label.into()) + } + Err(e) => Response::Err(e.to_string()), + } + } + Request::Status => { + let m = node.metrics(); + let status = format!( + "peers={} stored={} pastes={} \ + sent={} recv={} lookups={}/{}", + node.routing_table_size(), + node.storage_count(), + store.paste_count(), + m.messages_sent, + m.messages_received, + m.lookups_completed, + m.lookups_started, + ); + Response::Ok(status) + } + Request::Shutdown => Response::Ok("shutting down".into()), + } +} + +/// Copy DHT-replicated values into the local file store so +/// the HTTP server can serve them without a DHT lookup. +fn sync_dht_to_store(node: &Node, store: &PasteStore) { + for (key, value) in node.dht_values() { + if key.len() != 32 { + continue; + } + if store.get_paste(&key).is_none() { + let _ = store.put_paste(&key, &value); + } + } +} + +/// Re-announce locally stored pastes to the DHT so they +/// remain reachable as nodes join and leave the network. +fn republish(node: &mut Node, store: &PasteStore) { + let keys = store.original_keys(); + if keys.is_empty() { + return; + } + + let mut count = 0u32; + for key in &keys { + if let Some(data) = store.get_paste(key) + && let Some(paste) = Paste::from_bytes(&data) + { + let remaining = if store.is_pinned(key) { + u16::MAX + } else if paste.is_expired() { + continue; + } else { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let expires = paste.created_at + paste.ttl_secs; + let rem = expires.saturating_sub(now); + std::cmp::min(rem, u16::MAX as u64) as u16 + }; + node.put(key, &data, remaining, false); + count += 1; + } + } + if count > 0 { + log::info!("republish: announced {count} pastes to DHT"); + } +} + +/// Run the Unix socket listener thread. +pub fn run_unix_listener( + sock_path: &Path, + tx: mpsc::Sender<DaemonRequest>, + shutdown: &AtomicBool, +) { + let _ = std::fs::remove_file(sock_path); + + let listener = match std::os::unix::net::UnixListener::bind(sock_path) { + Ok(l) => l, + Err(e) => { + log::error!("unix: failed to bind {}: {e}", sock_path.display()); + return; + } + }; + + // Allow group members to connect (0o770) + use std::os::unix::fs::PermissionsExt; + let perms = std::fs::Permissions::from_mode(0o770); + if let Err(e) = std::fs::set_permissions(sock_path, perms) { + log::warn!("unix: failed to set socket permissions: {e}"); + } + + if let Err(e) = listener.set_nonblocking(true) { + log::error!("unix: failed to set non-blocking: {e}"); + return; + } + + log::info!("unix: listening on {}", sock_path.display()); + + while !shutdown.load(Ordering::Relaxed) { + match listener.accept() { + Ok((stream, _)) => { + if let Err(e) = handle_client(stream, &tx) { + log::debug!("unix: client disconnected: {e}"); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(50)); + } + Err(e) => { + log::warn!("unix: accept failed: {e}"); + std::thread::sleep(Duration::from_millis(100)); + } + } + } + + let _ = std::fs::remove_file(sock_path); +} + +/// Read requests line-by-line from a connected Unix socket +/// client, forwarding each to the daemon main loop via `tx`. +fn handle_client( + stream: std::os::unix::net::UnixStream, + tx: &mpsc::Sender<DaemonRequest>, +) -> Result<(), Box<dyn std::error::Error>> { + stream.set_nonblocking(false)?; + stream.set_read_timeout(Some(Duration::from_secs(60)))?; + + let reader = BufReader::new(&stream); + let mut writer = &stream; + + for line in reader.lines() { + let line = line?; + let cmd = match protocol::parse_request(&line) { + Ok(c) => c, + Err(e) => { + let resp = protocol::format_response(&Response::Err(e)); + writer.write_all(resp.as_bytes())?; + continue; + } + }; + + let is_shutdown = matches!(cmd, Request::Shutdown); + + let (reply_tx, reply_rx) = mpsc::channel(); + tx.send(DaemonRequest { + cmd, + reply: reply_tx, + })?; + + let resp = reply_rx + .recv_timeout(Duration::from_secs(60)) + .unwrap_or(Response::Err("timeout".into())); + writer.write_all(protocol::format_response(&resp).as_bytes())?; + + if is_shutdown { + break; + } + } + Ok(()) +} + +// ── HTTP server ───────────────────────────────────── + +/// Minimal HTTP server. Serves pastes at /<hash> or +/// /<hash>/<enckey>. Queries the daemon via Unix socket +/// so it can access DHT-replicated data too. +pub fn run_http( + port: u16, + sock_path: &Path, + store: &PasteStore, + shutdown: &AtomicBool, +) { + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); + let listener = match std::net::TcpListener::bind(addr) { + Ok(l) => l, + Err(e) => { + log::error!("http: failed to bind {addr}: {e}"); + return; + } + }; + if let Err(e) = listener.set_nonblocking(true) { + log::error!("http: failed to set non-blocking: {e}"); + return; + } + + log::info!("http: listening on {addr}"); + + while !shutdown.load(Ordering::Relaxed) { + match listener.accept() { + Ok((stream, _)) => { + let store = store.clone(); + handle_http(stream, &store, sock_path); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(50)); + } + Err(e) => { + log::debug!("http: accept failed: {e}"); + std::thread::sleep(Duration::from_millis(100)); + } + } + } +} + +fn handle_http( + mut stream: std::net::TcpStream, + store: &PasteStore, + sock_path: &Path, +) { + use std::io::Read; + + stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); + + let mut buf = [0u8; 4096]; + let n = match stream.read(&mut buf) { + Ok(n) => n, + Err(_) => return, + }; + let request = String::from_utf8_lossy(&buf[..n]); + + // Parse "GET /<path> HTTP/1.x" + let path = match request.split_whitespace().nth(1) { + Some(p) => p, + None => { + http_response(&mut stream, 400, "text/plain", b"Bad Request"); + return; + } + }; + + if path == "/" || path == "/favicon.ico" { + http_response(&mut stream, 200, "text/plain", b"tesseras-paste\n"); + return; + } + + // Strip leading / + let key = path.trim_start_matches('/'); + if key.is_empty() { + http_response(&mut stream, 400, "text/plain", b"missing key"); + return; + } + + // Split hash#enckey (URL fragment won't arrive via + // HTTP, so also support hash/enckey as path) + let (hash_b58, enc_key_b58) = if let Some((h, k)) = key.split_once('/') { + (h, Some(k)) + } else { + (key, None) + }; + + let hash = match base58::decode(hash_b58) { + Some(h) if h.len() == 32 => h, + _ => { + http_response(&mut stream, 400, "text/plain", b"invalid key"); + return; + } + }; + + // Build the daemon-style key: hash#enckey + let daemon_key = match enc_key_b58 { + Some(ek) => format!("{hash_b58}#{ek}"), + None => hash_b58.to_string(), + }; + + // Try local store first (fast path) + let body = if let Some(data) = store.get_paste(&hash) { + match serve_paste_data(&data, enc_key_b58) { + Ok(b) => b, + Err((status, msg)) => { + http_response( + &mut stream, + status, + "text/plain", + msg.as_bytes(), + ); + return; + } + } + } else { + // Not local — ask daemon which does a DHT lookup + match dht_lookup_via_socket(sock_path, &daemon_key) { + Some(b) => b, + None => { + http_response(&mut stream, 404, "text/plain", b"not found"); + return; + } + } + }; + + let ct = if std::str::from_utf8(&body).is_ok() { + "text/plain; charset=utf-8" + } else { + "application/octet-stream" + }; + + http_response(&mut stream, 200, ct, &body); +} + +/// Deserialize a Paste from store bytes, optionally decrypt. +fn serve_paste_data( + data: &[u8], + enc_key_b58: Option<&str>, +) -> Result<Vec<u8>, (u16, &'static str)> { + let paste = Paste::from_bytes(data).ok_or((500, "corrupt paste"))?; + + if let Some(kb58) = enc_key_b58 { + let key_bytes = base58::decode(kb58).ok_or((400, "invalid enc key"))?; + if key_bytes.len() != 32 { + return Err((400, "invalid enc key")); + } + let mut key = [0u8; 32]; + key.copy_from_slice(&key_bytes); + crate::crypto::decrypt(&key, &paste.content) + .ok_or((403, "decryption failed")) + } else { + Ok(paste.content) + } +} + +/// Ask the daemon for a paste via Unix socket (triggers +/// a DHT lookup if not in local store). +/// Key format: "hash" or "hash#enckey". +fn dht_lookup_via_socket(sock_path: &Path, key: &str) -> Option<Vec<u8>> { + let sock = std::os::unix::net::UnixStream::connect(sock_path).ok()?; + sock.set_read_timeout(Some(Duration::from_secs(35))).ok(); + sock.set_write_timeout(Some(Duration::from_secs(5))).ok(); + + let cmd = format!("GET {key}\n"); + (&sock).write_all(cmd.as_bytes()).ok()?; + + let reader = BufReader::new(&sock); + let line = reader.lines().next()?.ok()?; + let rest = line.strip_prefix("OK ")?; + + base58::decode(rest) +} + +fn http_response( + stream: &mut std::net::TcpStream, + status: u16, + content_type: &str, + body: &[u8], +) { + let reason = match status { + 200 => "OK", + 400 => "Bad Request", + 403 => "Forbidden", + 404 => "Not Found", + 500 => "Internal Server Error", + _ => "Unknown", + }; + let header = format!( + "HTTP/1.1 {status} {reason}\r\n\ + Content-Type: {content_type}\r\n\ + Content-Length: {}\r\n\ + Connection: close\r\n\ + \r\n", + body.len(), + ); + let _ = stream.write_all(header.as_bytes()); + let _ = stream.write_all(body); +} diff --git a/src/ops.rs b/src/ops.rs new file mode 100644 index 0000000..302bd58 --- /dev/null +++ b/src/ops.rs @@ -0,0 +1,160 @@ +//! High-level paste operations. +//! +//! Each function combines local storage and DHT interaction +//! into a single call: put, get, delete, pin/unpin. + +use std::time::Duration; + +use tesseras_dht::Node; + +use crate::base58; +use crate::crypto; +use crate::paste::{MAX_PASTE_SIZE, Paste}; +use crate::store::PasteStore; + +/// Timeout for blocking DHT lookups. +const OP_TIMEOUT: Duration = Duration::from_secs(30); + +/// Errors from paste operations. +#[derive(Debug)] +pub enum PasteError { + InvalidKey, + NotFound, + Expired, + DecryptionFailed, + TooLarge, + Internal(String), +} + +impl std::fmt::Display for PasteError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidKey => write!(f, "invalid key"), + Self::NotFound => write!(f, "not found"), + Self::Expired => write!(f, "expired"), + Self::DecryptionFailed => write!(f, "decryption failed"), + Self::TooLarge => write!(f, "paste too large"), + Self::Internal(msg) => write!(f, "internal: {msg}"), + } + } +} + +/// Decode the hash portion of a key string ("hash#enckey" or "hash"). +/// Returns the 32-byte hash. +fn parse_hash(key_str: &str) -> Result<Vec<u8>, PasteError> { + let hash_b58 = key_str.split_once('#').map(|(h, _)| h).unwrap_or(key_str); + let hash = base58::decode(hash_b58).ok_or(PasteError::InvalidKey)?; + if hash.len() != 32 { + return Err(PasteError::InvalidKey); + } + Ok(hash) +} + +/// Store a paste. If `encrypt` is true, encrypts the content and +/// returns "hash#enckey" in base58. Otherwise returns just the hash. +pub fn put_paste( + node: &mut Node, + store: &PasteStore, + content: &[u8], + ttl_secs: u64, + encrypt: bool, +) -> Result<String, PasteError> { + if content.len() > MAX_PASTE_SIZE { + return Err(PasteError::TooLarge); + } + + let (paste_content, enc_key) = if encrypt { + let key = crypto::generate_key(); + (crypto::encrypt(&key, content), Some(key)) + } else { + (content.to_vec(), None) + }; + + let paste = Paste::new(paste_content, ttl_secs); + let serialized = paste.to_bytes(); + let hash = Paste::content_key(&paste.content); + + store + .put_paste(&hash, &serialized) + .map_err(|e| PasteError::Internal(e.to_string()))?; + + let dht_ttl = std::cmp::min(ttl_secs, u16::MAX as u64) as u16; + node.put(&hash, &serialized, dht_ttl, false); + + let hash_b58 = base58::encode(&hash); + let label = if encrypt { "encrypted" } else { "public" }; + log::info!( + "put: stored {label} paste {hash_b58} ({} bytes)", + content.len() + ); + + match enc_key { + Some(key) => Ok(format!("{hash_b58}#{}", base58::encode(&key))), + None => Ok(hash_b58), + } +} + +/// Retrieve a paste by key ("hash#enckey" or bare "hash"). +/// Tries local store first, then falls back to a blocking DHT lookup. +pub fn get_paste( + node: &mut Node, + store: &PasteStore, + key_str: &str, +) -> Result<Vec<u8>, PasteError> { + let (hash_b58, enc_key_b58) = match key_str.split_once('#') { + Some((h, k)) => (h, Some(k)), + None => (key_str, None), + }; + + let hash = base58::decode(hash_b58).ok_or(PasteError::InvalidKey)?; + if hash.len() != 32 { + return Err(PasteError::InvalidKey); + } + + let data = if let Some(local) = store.get_paste(&hash) { + local + } else { + let vals = node.get_blocking(&hash, OP_TIMEOUT); + if vals.is_empty() { + return Err(PasteError::NotFound); + } + vals[0].clone() + }; + + let paste = Paste::from_bytes(&data).ok_or(PasteError::InvalidKey)?; + if paste.is_expired() && !store.is_pinned(&hash) { + return Err(PasteError::Expired); + } + + if let Some(kb58) = enc_key_b58 { + let key_bytes = base58::decode(kb58).ok_or(PasteError::InvalidKey)?; + if key_bytes.len() != crypto::KEY_SIZE { + return Err(PasteError::InvalidKey); + } + let mut key = [0u8; crypto::KEY_SIZE]; + key.copy_from_slice(&key_bytes); + crypto::decrypt(&key, &paste.content) + .ok_or(PasteError::DecryptionFailed) + } else { + Ok(paste.content) + } +} + +/// Delete a paste from local store and the DHT. +pub fn delete_paste( + node: &mut Node, + store: &PasteStore, + key_str: &str, +) -> Result<(), PasteError> { + let hash = parse_hash(key_str)?; + store.remove_paste(&hash); + node.delete(&hash); + let hash_b58 = key_str.split_once('#').map(|(h, _)| h).unwrap_or(key_str); + log::info!("del: removed paste {hash_b58}"); + Ok(()) +} + +/// Parse a key and resolve the 32-byte hash (stripping any #enckey). +pub fn resolve_hash(key: &str) -> Result<Vec<u8>, PasteError> { + parse_hash(key) +} diff --git a/src/paste.rs b/src/paste.rs new file mode 100644 index 0000000..50b32b1 --- /dev/null +++ b/src/paste.rs @@ -0,0 +1,128 @@ +//! Paste record format. +//! +//! Binary format (no external serialization deps): +//! version: u8 +//! created_at: u64 (big-endian) +//! ttl_secs: u64 (big-endian) +//! content: [u8] (remaining bytes) + +use tesseras_dht::sha2::{Digest, Sha256}; + +/// Maximum paste size: 64 KiB. +pub const MAX_PASTE_SIZE: usize = 64 * 1024; + +/// Current format version. +const FORMAT_VERSION: u8 = 1; + +/// Header size: version(1) + created_at(8) + ttl(8) = 17. +const HEADER_SIZE: usize = 17; + +/// A paste record stored locally and replicated via the DHT. +#[derive(Debug, Clone)] +pub struct Paste { + pub version: u8, + pub content: Vec<u8>, + pub created_at: u64, + pub ttl_secs: u64, +} + +impl Paste { + /// Create a new paste with the current timestamp. + pub fn new(content: Vec<u8>, ttl_secs: u64) -> Self { + let created_at = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + Paste { + version: FORMAT_VERSION, + content, + created_at, + ttl_secs, + } + } + + /// Compute the SHA-256 content-addressed key (32 bytes). + pub fn content_key(content: &[u8]) -> [u8; 32] { + let mut h = Sha256::new(); + h.update(content); + h.finalize().into() + } + + /// Serialize to bytes. + pub fn to_bytes(&self) -> Vec<u8> { + let mut buf = Vec::with_capacity(HEADER_SIZE + self.content.len()); + buf.push(self.version); + buf.extend_from_slice(&self.created_at.to_be_bytes()); + buf.extend_from_slice(&self.ttl_secs.to_be_bytes()); + buf.extend_from_slice(&self.content); + buf + } + + /// Deserialize from bytes. + pub fn from_bytes(data: &[u8]) -> Option<Self> { + if data.len() < HEADER_SIZE { + return None; + } + let version = data[0]; + let created_at = u64::from_be_bytes(data[1..9].try_into().ok()?); + let ttl_secs = u64::from_be_bytes(data[9..17].try_into().ok()?); + let content = data[HEADER_SIZE..].to_vec(); + Some(Paste { + version, + content, + created_at, + ttl_secs, + }) + } + + /// Whether this paste has expired. + pub fn is_expired(&self) -> bool { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + now > self.created_at + self.ttl_secs + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn round_trip() { + let paste = Paste::new(b"hello world".to_vec(), 3600); + let bytes = paste.to_bytes(); + let decoded = Paste::from_bytes(&bytes).unwrap(); + assert_eq!(decoded.content, b"hello world"); + assert_eq!(decoded.ttl_secs, 3600); + assert_eq!(decoded.version, FORMAT_VERSION); + } + + #[test] + fn content_key_deterministic() { + let k1 = Paste::content_key(b"test"); + let k2 = Paste::content_key(b"test"); + assert_eq!(k1, k2); + } + + #[test] + fn content_key_differs() { + let k1 = Paste::content_key(b"aaa"); + let k2 = Paste::content_key(b"bbb"); + assert_ne!(k1, k2); + } + + #[test] + fn too_short_fails() { + assert!(Paste::from_bytes(&[0u8; 5]).is_none()); + } + + #[test] + fn empty_content() { + let paste = Paste::new(Vec::new(), 60); + let bytes = paste.to_bytes(); + let decoded = Paste::from_bytes(&bytes).unwrap(); + assert!(decoded.content.is_empty()); + } +} diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..d45cdd8 --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,173 @@ +//! Unix socket protocol for daemon ↔ CLI. +//! +//! Simple line-oriented text protocol: +//! PUT <ttl_secs> <content>\n +//! PUTP <ttl_secs> <content>\n +//! GET <key>\n +//! DEL <key>\n +//! PIN <key>\n +//! UNPIN <key>\n +//! STATUS\n +//! SHUTDOWN\n +//! +//! Responses: +//! OK <data>\n +//! ERR <message>\n + +/// A parsed request received from the CLI over the Unix socket. +pub enum Request { + Put { + ttl_secs: u64, + content_b58: String, + encrypt: bool, + }, + Get { + key: String, + }, + Del { + key: String, + }, + Pin { + key: String, + }, + Unpin { + key: String, + }, + Status, + Shutdown, +} + +/// A response sent back to the CLI over the Unix socket. +pub enum Response { + Ok(String), + Err(String), +} + +/// Parse a single protocol line into a [`Request`]. +pub fn parse_request(line: &str) -> Result<Request, String> { + let line = line.trim(); + if line.is_empty() { + return Err("empty request".into()); + } + + let mut parts = line.splitn(3, ' '); + let cmd = parts.next().unwrap(); + + match cmd { + "PUT" | "PUTP" => { + let ttl_str = + parts.next().ok_or("PUT requires: PUT <ttl> <data>")?; + let content_b58 = + parts.next().ok_or("PUT requires: PUT <ttl> <data>")?; + let ttl_secs: u64 = + ttl_str.parse().map_err(|_| "invalid TTL number")?; + Ok(Request::Put { + ttl_secs, + content_b58: content_b58.to_string(), + encrypt: cmd == "PUT", + }) + } + "GET" => { + let key = parts.next().ok_or("GET requires: GET <key>")?; + Ok(Request::Get { + key: key.to_string(), + }) + } + "DEL" => { + let key = parts.next().ok_or("DEL requires: DEL <key>")?; + Ok(Request::Del { + key: key.to_string(), + }) + } + "PIN" => { + let key = parts.next().ok_or("PIN requires: PIN <key>")?; + Ok(Request::Pin { + key: key.to_string(), + }) + } + "UNPIN" => { + let key = parts.next().ok_or("UNPIN requires: UNPIN <key>")?; + Ok(Request::Unpin { + key: key.to_string(), + }) + } + "STATUS" => Ok(Request::Status), + "SHUTDOWN" => Ok(Request::Shutdown), + _ => Err(format!("unknown command: {cmd}")), + } +} + +/// Serialize a [`Response`] into a protocol line (`OK ...\n` or `ERR ...\n`). +pub fn format_response(r: &Response) -> String { + match r { + Response::Ok(data) => format!("OK {data}\n"), + Response::Err(msg) => format!("ERR {msg}\n"), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_put() { + let r = parse_request("PUT 3600 deadbeef").unwrap(); + match r { + Request::Put { + ttl_secs, + content_b58, + encrypt, + } => { + assert_eq!(ttl_secs, 3600); + assert_eq!(content_b58, "deadbeef"); + assert!(encrypt); + } + _ => panic!("expected Put"), + } + } + + #[test] + fn parse_putp() { + let r = parse_request("PUTP 60 abc").unwrap(); + match r { + Request::Put { encrypt, .. } => assert!(!encrypt), + _ => panic!("expected Put"), + } + } + + #[test] + fn parse_get() { + let r = parse_request("GET abc123#key456").unwrap(); + match r { + Request::Get { key } => assert_eq!(key, "abc123#key456"), + _ => panic!("expected Get"), + } + } + + #[test] + fn parse_status() { + assert!(matches!(parse_request("STATUS").unwrap(), Request::Status)); + } + + #[test] + fn parse_empty_fails() { + assert!(parse_request("").is_err()); + } + + #[test] + fn parse_unknown_fails() { + assert!(parse_request("FOOBAR").is_err()); + } + + #[test] + fn format_ok() { + let r = format_response(&Response::Ok("hello".into())); + assert_eq!(r, "OK hello\n"); + } + + #[test] + fn format_err() { + let r = format_response(&Response::Err("oops".into())); + assert_eq!(r, "ERR oops\n"); + } +} diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 0000000..18a7641 --- /dev/null +++ b/src/store.rs @@ -0,0 +1,262 @@ +//! Filesystem-based paste storage. +//! +//! Simple directory layout: +//! <root>/pastes/<hash>.bin +//! <root>/pins/<hash> +//! <root>/blocked/<hash> +//! <root>/contacts.bin + +use std::fs; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use crate::base58; +use crate::paste::Paste; + +/// Persistent paste store backed by the filesystem. +#[derive(Clone)] +pub struct PasteStore { + root: PathBuf, +} + +impl PasteStore { + /// Open or create a store rooted at the given directory. + /// Creates `pastes/`, `pins/`, and `blocked/` subdirectories. + pub fn open(root: &Path) -> std::io::Result<Self> { + fs::create_dir_all(root.join("pastes"))?; + fs::create_dir_all(root.join("pins"))?; + fs::create_dir_all(root.join("blocked"))?; + Ok(PasteStore { + root: root.to_path_buf(), + }) + } + + fn paste_path(&self, key: &[u8]) -> PathBuf { + self.root.join("pastes").join(base58::encode(key)) + } + + fn pin_path(&self, key: &[u8]) -> PathBuf { + self.root.join("pins").join(base58::encode(key)) + } + + fn block_path(&self, key: &[u8]) -> PathBuf { + self.root.join("blocked").join(base58::encode(key)) + } + + // ── Paste CRUD ────────────────────────────────── + + /// Write a paste to disk. The key (32 bytes) is prepended + /// to the file so [`original_keys`] can reconstruct it. + pub fn put_paste(&self, key: &[u8], value: &[u8]) -> std::io::Result<()> { + let path = self.paste_path(key); + let mut f = fs::File::create(path)?; + f.write_all(key)?; + f.write_all(value)?; + Ok(()) + } + + /// Read a paste from disk. Returns `None` if the paste + /// is blocked, expired (and not pinned), or does not exist. + pub fn get_paste(&self, key: &[u8]) -> Option<Vec<u8>> { + if self.is_blocked(key) { + return None; + } + let path = self.paste_path(key); + let data = fs::read(&path).ok()?; + // Strip key prefix (32 bytes) + if data.len() < 32 { + return None; + } + let value = data[32..].to_vec(); + + // Check expiry (pinned never expire) + if let Some(paste) = Paste::from_bytes(&value) + && paste.is_expired() + && !self.is_pinned(key) + { + return None; + } + Some(value) + } + + /// Delete a paste file from disk (no-op if absent). + pub fn remove_paste(&self, key: &[u8]) { + let _ = fs::remove_file(self.paste_path(key)); + } + + /// List all non-expired, non-blocked paste keys. + pub fn original_keys(&self) -> Vec<Vec<u8>> { + let dir = self.root.join("pastes"); + let entries = match fs::read_dir(&dir) { + Ok(e) => e, + Err(_) => return Vec::new(), + }; + + let mut keys = Vec::new(); + for entry in entries.flatten() { + let data = match fs::read(entry.path()) { + Ok(d) => d, + Err(_) => continue, + }; + if data.len() < 32 { + continue; + } + let key = &data[..32]; + let value = &data[32..]; + + if self.is_blocked(key) { + continue; + } + if let Some(paste) = Paste::from_bytes(value) + && paste.is_expired() + && !self.is_pinned(key) + { + continue; + } + keys.push(key.to_vec()); + } + keys + } + + // ── Pin / Block ───────────────────────────────── + + /// Mark a paste as pinned (never expires). + pub fn pin(&self, key: &[u8]) -> std::io::Result<()> { + fs::File::create(self.pin_path(key))?; + Ok(()) + } + + /// Remove the pin from a paste (re-enables expiry). + pub fn unpin(&self, key: &[u8]) -> std::io::Result<()> { + let _ = fs::remove_file(self.pin_path(key)); + Ok(()) + } + + pub fn is_pinned(&self, key: &[u8]) -> bool { + self.pin_path(key).exists() + } + + pub fn is_blocked(&self, key: &[u8]) -> bool { + self.block_path(key).exists() + } + + // ── GC ────────────────────────────────────────── + + /// Remove expired pastes from disk. Pinned pastes are kept. + pub fn gc(&self) -> std::io::Result<usize> { + let dir = self.root.join("pastes"); + let entries = fs::read_dir(&dir)?; + let mut removed = 0; + + for entry in entries.flatten() { + let data = match fs::read(entry.path()) { + Ok(d) => d, + Err(_) => continue, + }; + if data.len() < 32 { + continue; + } + let key = &data[..32]; + let value = &data[32..]; + + if self.is_pinned(key) { + continue; + } + if let Some(paste) = Paste::from_bytes(value) + && paste.is_expired() + { + let _ = fs::remove_file(entry.path()); + removed += 1; + } + } + Ok(removed) + } + + /// Count stored pastes. + pub fn paste_count(&self) -> usize { + let dir = self.root.join("pastes"); + fs::read_dir(&dir).map(|e| e.count()).unwrap_or(0) + } +} + +// ── tesseras-dht persistence traits ───────────────── + +impl tesseras_dht::persist::RoutingPersistence for PasteStore { + fn save_contacts( + &self, + contacts: &[tesseras_dht::persist::ContactRecord], + ) -> Result<(), tesseras_dht::Error> { + let path = self.root.join("contacts.bin"); + let mut buf = Vec::new(); + for c in contacts { + let id = c.id.as_bytes(); + let addr = c.addr.to_string(); + let addr_bytes = addr.as_bytes(); + // length-prefixed: addr_len(u16) + id(32) + addr + let len = addr_bytes.len() as u16; + buf.extend_from_slice(&len.to_be_bytes()); + buf.extend_from_slice(id); + buf.extend_from_slice(addr_bytes); + } + fs::write(&path, &buf).map_err(tesseras_dht::Error::Io)?; + log::info!("store: persisted {} routing contacts", contacts.len()); + Ok(()) + } + + fn load_contacts( + &self, + ) -> Result<Vec<tesseras_dht::persist::ContactRecord>, tesseras_dht::Error> + { + let path = self.root.join("contacts.bin"); + let data = match fs::read(&path) { + Ok(d) => d, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return Ok(Vec::new()); + } + Err(e) => return Err(tesseras_dht::Error::Io(e)), + }; + + let mut out = Vec::new(); + let mut pos = 0; + while pos + 2 + 32 <= data.len() { + let addr_len = + u16::from_be_bytes([data[pos], data[pos + 1]]) as usize; + pos += 2; + if pos + 32 + addr_len > data.len() { + break; + } + let mut id_bytes = [0u8; 32]; + id_bytes.copy_from_slice(&data[pos..pos + 32]); + pos += 32; + let addr_str = + std::str::from_utf8(&data[pos..pos + addr_len]).unwrap_or(""); + pos += addr_len; + if let Ok(addr) = addr_str.parse() { + out.push(tesseras_dht::persist::ContactRecord { + id: tesseras_dht::NodeId::from_bytes(id_bytes), + addr, + }); + } + } + if !out.is_empty() { + log::info!("store: loaded {} routing contacts", out.len()); + } + Ok(out) + } +} + +impl tesseras_dht::persist::DataPersistence for PasteStore { + fn save( + &self, + _records: &[tesseras_dht::persist::StoredRecord], + ) -> Result<(), tesseras_dht::Error> { + Ok(()) // app-level storage handles this + } + + fn load( + &self, + ) -> Result<Vec<tesseras_dht::persist::StoredRecord>, tesseras_dht::Error> + { + Ok(Vec::new()) // republish timer re-populates DHT + } +} |