From 7aff2e1d279a4e442b32f49ca0a0eca065355787 Mon Sep 17 00:00:00 2001 From: murilo ijanc Date: Wed, 25 Mar 2026 02:07:37 -0300 Subject: Initial commit: tesseras-paste decentralized pastebin DHT-backed encrypted pastebin with two binaries (tp/tpd), XChaCha20-Poly1305 encryption, content-addressed storage, and Unix socket + HTTP interfaces. --- src/daemon.rs | 504 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 504 insertions(+) create mode 100644 src/daemon.rs (limited to 'src/daemon.rs') diff --git a/src/daemon.rs b/src/daemon.rs new file mode 100644 index 0000000..313a4aa --- /dev/null +++ b/src/daemon.rs @@ -0,0 +1,504 @@ +//! Daemon main loop, Unix socket listener, and HTTP server. +//! +//! The daemon loop drives the DHT node, processes client +//! requests, and runs periodic maintenance (GC, republish, +//! state persistence). Communication with the CLI happens +//! over a Unix socket using a line-oriented text protocol +//! (see [`crate::protocol`]). + +use std::io::{BufRead, BufReader, Write}; +use std::path::Path; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +use tesseras_dht::Node; + +use crate::base58; +use crate::ops; +use crate::paste::Paste; +use crate::protocol::{self, Request, Response}; +use crate::store::PasteStore; + +/// How often to garbage-collect expired pastes (10 min). +const GC_INTERVAL: Duration = Duration::from_secs(600); + +/// How often to republish local pastes to the DHT (30 min). +const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800); + +/// How often to persist routing table and state (5 min). +const SAVE_INTERVAL: Duration = Duration::from_secs(300); + +/// How often to sync DHT-replicated values to local store (5 s). +const SYNC_INTERVAL: Duration = Duration::from_secs(5); + +/// A request from the socket thread to the main thread. +pub struct DaemonRequest { + pub cmd: Request, + pub reply: mpsc::Sender, +} + +/// Run the daemon main loop. +pub fn run_daemon( + node: &mut Node, + store: &PasteStore, + rx: &mpsc::Receiver, + shutdown: &AtomicBool, +) { + let mut last_gc = Instant::now(); + let mut last_republish = Instant::now() - REPUBLISH_INTERVAL; + let mut last_save = Instant::now(); + let mut last_sync = Instant::now(); + + log::info!("daemon main loop started"); + + while !shutdown.load(Ordering::Relaxed) { + let _ = node.poll_timeout(Duration::from_millis(100)); + + while let Ok(req) = rx.try_recv() { + let is_shutdown = matches!(req.cmd, Request::Shutdown); + let resp = handle_request(node, store, req.cmd); + let _ = req.reply.send(resp); + if is_shutdown { + shutdown.store(true, Ordering::Relaxed); + } + } + + if last_sync.elapsed() >= SYNC_INTERVAL { + last_sync = Instant::now(); + sync_dht_to_store(node, store); + } + + if last_gc.elapsed() >= GC_INTERVAL { + last_gc = Instant::now(); + match store.gc() { + Ok(0) => {} + Ok(n) => log::info!("gc: removed {n} expired pastes"), + Err(e) => log::warn!("gc: {e}"), + } + } + + if last_republish.elapsed() >= REPUBLISH_INTERVAL { + last_republish = Instant::now(); + republish(node, store); + } + + if last_save.elapsed() >= SAVE_INTERVAL { + last_save = Instant::now(); + node.save_state(); + } + } + + log::info!("daemon main loop stopped, shutting down"); + node.shutdown(); +} + +/// Dispatch a single client request to the appropriate operation. +fn handle_request( + node: &mut Node, + store: &PasteStore, + cmd: Request, +) -> Response { + match cmd { + Request::Put { + ttl_secs, + content_b58, + encrypt, + } => { + let content = match base58::decode(&content_b58) { + Some(c) => c, + None => return Response::Err("invalid base58 content".into()), + }; + match ops::put_paste(node, store, &content, ttl_secs, encrypt) { + Ok(key) => Response::Ok(key), + Err(e) => Response::Err(e.to_string()), + } + } + Request::Get { key } => match ops::get_paste(node, store, &key) { + Ok(data) => Response::Ok(base58::encode(&data)), + Err(e) => Response::Err(e.to_string()), + }, + Request::Del { key } => match ops::delete_paste(node, store, &key) { + Ok(()) => Response::Ok("deleted".into()), + Err(e) => Response::Err(e.to_string()), + }, + Request::Pin { ref key } | Request::Unpin { ref key } => { + let is_pin = matches!(cmd, Request::Pin { .. }); + let key = key.clone(); + let hash = match ops::resolve_hash(&key) { + Ok(h) => h, + Err(e) => return Response::Err(e.to_string()), + }; + let result = if is_pin { + store.pin(&hash) + } else { + store.unpin(&hash) + }; + match result { + Ok(()) => { + let label = if is_pin { "pinned" } else { "unpinned" }; + Response::Ok(label.into()) + } + Err(e) => Response::Err(e.to_string()), + } + } + Request::Status => { + let m = node.metrics(); + let status = format!( + "peers={} stored={} pastes={} \ + sent={} recv={} lookups={}/{}", + node.routing_table_size(), + node.storage_count(), + store.paste_count(), + m.messages_sent, + m.messages_received, + m.lookups_completed, + m.lookups_started, + ); + Response::Ok(status) + } + Request::Shutdown => Response::Ok("shutting down".into()), + } +} + +/// Copy DHT-replicated values into the local file store so +/// the HTTP server can serve them without a DHT lookup. +fn sync_dht_to_store(node: &Node, store: &PasteStore) { + for (key, value) in node.dht_values() { + if key.len() != 32 { + continue; + } + if store.get_paste(&key).is_none() { + let _ = store.put_paste(&key, &value); + } + } +} + +/// Re-announce locally stored pastes to the DHT so they +/// remain reachable as nodes join and leave the network. +fn republish(node: &mut Node, store: &PasteStore) { + let keys = store.original_keys(); + if keys.is_empty() { + return; + } + + let mut count = 0u32; + for key in &keys { + if let Some(data) = store.get_paste(key) + && let Some(paste) = Paste::from_bytes(&data) + { + let remaining = if store.is_pinned(key) { + u16::MAX + } else if paste.is_expired() { + continue; + } else { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let expires = paste.created_at + paste.ttl_secs; + let rem = expires.saturating_sub(now); + std::cmp::min(rem, u16::MAX as u64) as u16 + }; + node.put(key, &data, remaining, false); + count += 1; + } + } + if count > 0 { + log::info!("republish: announced {count} pastes to DHT"); + } +} + +/// Run the Unix socket listener thread. +pub fn run_unix_listener( + sock_path: &Path, + tx: mpsc::Sender, + shutdown: &AtomicBool, +) { + let _ = std::fs::remove_file(sock_path); + + let listener = match std::os::unix::net::UnixListener::bind(sock_path) { + Ok(l) => l, + Err(e) => { + log::error!("unix: failed to bind {}: {e}", sock_path.display()); + return; + } + }; + + // Allow group members to connect (0o770) + use std::os::unix::fs::PermissionsExt; + let perms = std::fs::Permissions::from_mode(0o770); + if let Err(e) = std::fs::set_permissions(sock_path, perms) { + log::warn!("unix: failed to set socket permissions: {e}"); + } + + if let Err(e) = listener.set_nonblocking(true) { + log::error!("unix: failed to set non-blocking: {e}"); + return; + } + + log::info!("unix: listening on {}", sock_path.display()); + + while !shutdown.load(Ordering::Relaxed) { + match listener.accept() { + Ok((stream, _)) => { + if let Err(e) = handle_client(stream, &tx) { + log::debug!("unix: client disconnected: {e}"); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(50)); + } + Err(e) => { + log::warn!("unix: accept failed: {e}"); + std::thread::sleep(Duration::from_millis(100)); + } + } + } + + let _ = std::fs::remove_file(sock_path); +} + +/// Read requests line-by-line from a connected Unix socket +/// client, forwarding each to the daemon main loop via `tx`. +fn handle_client( + stream: std::os::unix::net::UnixStream, + tx: &mpsc::Sender, +) -> Result<(), Box> { + stream.set_nonblocking(false)?; + stream.set_read_timeout(Some(Duration::from_secs(60)))?; + + let reader = BufReader::new(&stream); + let mut writer = &stream; + + for line in reader.lines() { + let line = line?; + let cmd = match protocol::parse_request(&line) { + Ok(c) => c, + Err(e) => { + let resp = protocol::format_response(&Response::Err(e)); + writer.write_all(resp.as_bytes())?; + continue; + } + }; + + let is_shutdown = matches!(cmd, Request::Shutdown); + + let (reply_tx, reply_rx) = mpsc::channel(); + tx.send(DaemonRequest { + cmd, + reply: reply_tx, + })?; + + let resp = reply_rx + .recv_timeout(Duration::from_secs(60)) + .unwrap_or(Response::Err("timeout".into())); + writer.write_all(protocol::format_response(&resp).as_bytes())?; + + if is_shutdown { + break; + } + } + Ok(()) +} + +// ── HTTP server ───────────────────────────────────── + +/// Minimal HTTP server. Serves pastes at / or +/// //. Queries the daemon via Unix socket +/// so it can access DHT-replicated data too. +pub fn run_http( + port: u16, + sock_path: &Path, + store: &PasteStore, + shutdown: &AtomicBool, +) { + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); + let listener = match std::net::TcpListener::bind(addr) { + Ok(l) => l, + Err(e) => { + log::error!("http: failed to bind {addr}: {e}"); + return; + } + }; + if let Err(e) = listener.set_nonblocking(true) { + log::error!("http: failed to set non-blocking: {e}"); + return; + } + + log::info!("http: listening on {addr}"); + + while !shutdown.load(Ordering::Relaxed) { + match listener.accept() { + Ok((stream, _)) => { + let store = store.clone(); + handle_http(stream, &store, sock_path); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(50)); + } + Err(e) => { + log::debug!("http: accept failed: {e}"); + std::thread::sleep(Duration::from_millis(100)); + } + } + } +} + +fn handle_http( + mut stream: std::net::TcpStream, + store: &PasteStore, + sock_path: &Path, +) { + use std::io::Read; + + stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); + + let mut buf = [0u8; 4096]; + let n = match stream.read(&mut buf) { + Ok(n) => n, + Err(_) => return, + }; + let request = String::from_utf8_lossy(&buf[..n]); + + // Parse "GET / HTTP/1.x" + let path = match request.split_whitespace().nth(1) { + Some(p) => p, + None => { + http_response(&mut stream, 400, "text/plain", b"Bad Request"); + return; + } + }; + + if path == "/" || path == "/favicon.ico" { + http_response(&mut stream, 200, "text/plain", b"tesseras-paste\n"); + return; + } + + // Strip leading / + let key = path.trim_start_matches('/'); + if key.is_empty() { + http_response(&mut stream, 400, "text/plain", b"missing key"); + return; + } + + // Split hash#enckey (URL fragment won't arrive via + // HTTP, so also support hash/enckey as path) + let (hash_b58, enc_key_b58) = if let Some((h, k)) = key.split_once('/') { + (h, Some(k)) + } else { + (key, None) + }; + + let hash = match base58::decode(hash_b58) { + Some(h) if h.len() == 32 => h, + _ => { + http_response(&mut stream, 400, "text/plain", b"invalid key"); + return; + } + }; + + // Build the daemon-style key: hash#enckey + let daemon_key = match enc_key_b58 { + Some(ek) => format!("{hash_b58}#{ek}"), + None => hash_b58.to_string(), + }; + + // Try local store first (fast path) + let body = if let Some(data) = store.get_paste(&hash) { + match serve_paste_data(&data, enc_key_b58) { + Ok(b) => b, + Err((status, msg)) => { + http_response( + &mut stream, + status, + "text/plain", + msg.as_bytes(), + ); + return; + } + } + } else { + // Not local — ask daemon which does a DHT lookup + match dht_lookup_via_socket(sock_path, &daemon_key) { + Some(b) => b, + None => { + http_response(&mut stream, 404, "text/plain", b"not found"); + return; + } + } + }; + + let ct = if std::str::from_utf8(&body).is_ok() { + "text/plain; charset=utf-8" + } else { + "application/octet-stream" + }; + + http_response(&mut stream, 200, ct, &body); +} + +/// Deserialize a Paste from store bytes, optionally decrypt. +fn serve_paste_data( + data: &[u8], + enc_key_b58: Option<&str>, +) -> Result, (u16, &'static str)> { + let paste = Paste::from_bytes(data).ok_or((500, "corrupt paste"))?; + + if let Some(kb58) = enc_key_b58 { + let key_bytes = base58::decode(kb58).ok_or((400, "invalid enc key"))?; + if key_bytes.len() != 32 { + return Err((400, "invalid enc key")); + } + let mut key = [0u8; 32]; + key.copy_from_slice(&key_bytes); + crate::crypto::decrypt(&key, &paste.content) + .ok_or((403, "decryption failed")) + } else { + Ok(paste.content) + } +} + +/// Ask the daemon for a paste via Unix socket (triggers +/// a DHT lookup if not in local store). +/// Key format: "hash" or "hash#enckey". +fn dht_lookup_via_socket(sock_path: &Path, key: &str) -> Option> { + let sock = std::os::unix::net::UnixStream::connect(sock_path).ok()?; + sock.set_read_timeout(Some(Duration::from_secs(35))).ok(); + sock.set_write_timeout(Some(Duration::from_secs(5))).ok(); + + let cmd = format!("GET {key}\n"); + (&sock).write_all(cmd.as_bytes()).ok()?; + + let reader = BufReader::new(&sock); + let line = reader.lines().next()?.ok()?; + let rest = line.strip_prefix("OK ")?; + + base58::decode(rest) +} + +fn http_response( + stream: &mut std::net::TcpStream, + status: u16, + content_type: &str, + body: &[u8], +) { + let reason = match status { + 200 => "OK", + 400 => "Bad Request", + 403 => "Forbidden", + 404 => "Not Found", + 500 => "Internal Server Error", + _ => "Unknown", + }; + let header = format!( + "HTTP/1.1 {status} {reason}\r\n\ + Content-Type: {content_type}\r\n\ + Content-Length: {}\r\n\ + Connection: close\r\n\ + \r\n", + body.len(), + ); + let _ = stream.write_all(header.as_bytes()); + let _ = stream.write_all(body); +} -- cgit v1.2.3