//! Daemon main loop, Unix socket listener, and HTTP server. //! //! The daemon loop drives the DHT node, processes client //! requests, and runs periodic maintenance (GC, republish, //! state persistence). Communication with the CLI happens //! over a Unix socket using a line-oriented text protocol //! (see [`crate::protocol`]). use std::io::{BufRead, BufReader, Read, Write}; use std::path::Path; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use std::time::{Duration, Instant}; use tesseras_dht::Node; use crate::base58; use crate::ops; use crate::paste::Paste; use crate::protocol::{self, Request, Response}; use crate::store::PasteStore; /// How often to garbage-collect expired pastes (10 min). const GC_INTERVAL: Duration = Duration::from_secs(600); /// How often to republish local pastes to the DHT (30 min). const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800); /// How often to persist routing table and state (5 min). const SAVE_INTERVAL: Duration = Duration::from_secs(300); /// How often to sync DHT-replicated values to local store (5 s). const SYNC_INTERVAL: Duration = Duration::from_secs(5); /// A request from the socket thread to the main thread. pub struct DaemonRequest { pub cmd: Request, pub reply: mpsc::Sender, } /// Run the daemon main loop. pub fn run_daemon( node: &mut Node, store: &PasteStore, rx: &mpsc::Receiver, shutdown: &AtomicBool, ) { let mut last_gc = Instant::now(); let mut last_republish = Instant::now() - REPUBLISH_INTERVAL; let mut last_save = Instant::now(); let mut last_sync = Instant::now(); log::info!("daemon main loop started"); while !shutdown.load(Ordering::Relaxed) { let _ = node.poll_timeout(Duration::from_millis(100)); while let Ok(req) = rx.try_recv() { let is_shutdown = matches!(req.cmd, Request::Shutdown); let resp = handle_request(node, store, req.cmd); let _ = req.reply.send(resp); if is_shutdown { shutdown.store(true, Ordering::Relaxed); } } if last_sync.elapsed() >= SYNC_INTERVAL { last_sync = Instant::now(); sync_dht_to_store(node, store); } if last_gc.elapsed() >= GC_INTERVAL { last_gc = Instant::now(); match store.gc() { Ok(0) => {} Ok(n) => log::info!("gc: removed {n} expired pastes"), Err(e) => log::warn!("gc: {e}"), } } if last_republish.elapsed() >= REPUBLISH_INTERVAL { last_republish = Instant::now(); republish(node, store); } if last_save.elapsed() >= SAVE_INTERVAL { last_save = Instant::now(); node.save_state(); } } log::info!("daemon main loop stopped, shutting down"); node.shutdown(); } /// Dispatch a single client request to the appropriate operation. fn handle_request( node: &mut Node, store: &PasteStore, cmd: Request, ) -> Response { match cmd { Request::Put { ttl_secs, content_b58, encrypt, } => { let content = match base58::decode(&content_b58) { Some(c) => c, None => return Response::Err("invalid base58 content".into()), }; match ops::put_paste(node, store, &content, ttl_secs, encrypt) { Ok(key) => Response::Ok(key), Err(e) => Response::Err(e.to_string()), } } Request::Get { key } => match ops::get_paste(node, store, &key) { Ok(data) => Response::Ok(base58::encode(&data)), Err(e) => Response::Err(e.to_string()), }, Request::Del { key } => match ops::delete_paste(node, store, &key) { Ok(()) => Response::Ok("deleted".into()), Err(e) => Response::Err(e.to_string()), }, Request::Pin { ref key } | Request::Unpin { ref key } => { let is_pin = matches!(cmd, Request::Pin { .. }); let key = key.clone(); let hash = match ops::resolve_hash(&key) { Ok(h) => h, Err(e) => return Response::Err(e.to_string()), }; let result = if is_pin { store.pin(&hash) } else { store.unpin(&hash) }; match result { Ok(()) => { let label = if is_pin { "pinned" } else { "unpinned" }; Response::Ok(label.into()) } Err(e) => Response::Err(e.to_string()), } } Request::Status => { let m = node.metrics(); let status = format!( "peers={} stored={} pastes={} \ sent={} recv={} lookups={}/{}", node.routing_table_size(), node.storage_count(), store.paste_count(), m.messages_sent, m.messages_received, m.lookups_completed, m.lookups_started, ); Response::Ok(status) } Request::Shutdown => Response::Ok("shutting down".into()), } } /// Copy DHT-replicated values into the local file store so /// the HTTP server can serve them without a DHT lookup. fn sync_dht_to_store(node: &Node, store: &PasteStore) { for (key, value) in node.dht_values() { if key.len() != 32 { continue; } if store.get_paste(&key).is_none() { let _ = store.put_paste(&key, &value); } } } /// Re-announce locally stored pastes to the DHT so they /// remain reachable as nodes join and leave the network. fn republish(node: &mut Node, store: &PasteStore) { let keys = store.original_keys(); if keys.is_empty() { return; } let mut count = 0u32; for key in &keys { if let Some(data) = store.get_paste(key) && let Some(paste) = Paste::from_bytes(&data) { let remaining = if store.is_pinned(key) { u16::MAX } else if paste.is_expired() { continue; } else { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(); let expires = paste.created_at.saturating_add(paste.ttl_secs); let rem = expires.saturating_sub(now); std::cmp::min(rem, u16::MAX as u64) as u16 }; node.put(key, &data, remaining, false); count += 1; } } if count > 0 { log::info!("republish: announced {count} pastes to DHT"); } } /// Run the Unix socket listener thread. pub fn run_unix_listener( sock_path: &Path, tx: mpsc::Sender, shutdown: &AtomicBool, ) { let _ = std::fs::remove_file(sock_path); let listener = match std::os::unix::net::UnixListener::bind(sock_path) { Ok(l) => l, Err(e) => { log::error!("unix: failed to bind {}: {e}", sock_path.display()); return; } }; // Allow group members to connect (0o770) use std::os::unix::fs::PermissionsExt; let perms = std::fs::Permissions::from_mode(0o770); if let Err(e) = std::fs::set_permissions(sock_path, perms) { log::warn!("unix: failed to set socket permissions: {e}"); } if let Err(e) = listener.set_nonblocking(true) { log::error!("unix: failed to set non-blocking: {e}"); return; } log::info!("unix: listening on {}", sock_path.display()); while !shutdown.load(Ordering::Relaxed) { match listener.accept() { Ok((stream, _)) => { if let Err(e) = handle_client(stream, &tx) { log::debug!("unix: client disconnected: {e}"); } } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { std::thread::sleep(Duration::from_millis(50)); } Err(e) => { log::warn!("unix: accept failed: {e}"); std::thread::sleep(Duration::from_millis(100)); } } } let _ = std::fs::remove_file(sock_path); } /// Maximum protocol line size (128 KiB covers the 64 KiB paste /// limit after base58 expansion plus command overhead). const MAX_LINE_SIZE: usize = 128 * 1024; /// Read requests line-by-line from a connected Unix socket /// client, forwarding each to the daemon main loop via `tx`. fn handle_client( stream: std::os::unix::net::UnixStream, tx: &mpsc::Sender, ) -> Result<(), Box> { if let Err(e) = stream.set_nonblocking(false) { log::warn!("unix: failed to set blocking mode: {e}"); return Err(e.into()); } if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(60))) { log::warn!("unix: failed to set read timeout: {e}"); return Err(e.into()); } let mut reader = BufReader::new(&stream); let mut writer = &stream; let mut line = String::new(); loop { line.clear(); // Limit read to MAX_LINE_SIZE to prevent a client from // exhausting memory with an unbounded request line. let n = (&mut reader) .take(MAX_LINE_SIZE as u64) .read_line(&mut line)?; if n == 0 { break; } if !line.ends_with('\n') && n >= MAX_LINE_SIZE { let resp = protocol::format_response(&Response::Err( "request too large".into(), )); writer.write_all(resp.as_bytes())?; // Drain remaining bytes until newline let mut discard = String::new(); let _ = reader.read_line(&mut discard); continue; } let line = line.trim(); let cmd = match protocol::parse_request(line) { Ok(c) => c, Err(e) => { let resp = protocol::format_response(&Response::Err(e)); writer.write_all(resp.as_bytes())?; continue; } }; let is_shutdown = matches!(cmd, Request::Shutdown); let (reply_tx, reply_rx) = mpsc::channel(); tx.send(DaemonRequest { cmd, reply: reply_tx, })?; let resp = reply_rx .recv_timeout(Duration::from_secs(60)) .unwrap_or(Response::Err("timeout".into())); writer.write_all(protocol::format_response(&resp).as_bytes())?; if is_shutdown { break; } } Ok(()) } // ── HTTP server ───────────────────────────────────── /// Maximum concurrent HTTP handler threads. const MAX_HTTP_THREADS: usize = 8; /// Minimal HTTP server. Serves pastes at / or /// //. Queries the daemon via Unix socket /// so it can access DHT-replicated data too. pub fn run_http( port: u16, sock_path: &Path, store: &PasteStore, shutdown: &AtomicBool, ) { let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); let listener = match std::net::TcpListener::bind(addr) { Ok(l) => l, Err(e) => { log::error!("http: failed to bind {addr}: {e}"); return; } }; if let Err(e) = listener.set_nonblocking(true) { log::error!("http: failed to set non-blocking: {e}"); return; } log::info!("http: listening on {addr}"); let active = Arc::new(std::sync::atomic::AtomicUsize::new(0)); let sock_owned = sock_path.to_path_buf(); while !shutdown.load(Ordering::Relaxed) { match listener.accept() { Ok((stream, _)) => { if active.load(Ordering::Relaxed) >= MAX_HTTP_THREADS { log::warn!("http: max connections reached, rejecting"); let mut s = stream; let _ = s.write_all( b"HTTP/1.1 503 Service Unavailable\r\n\ Connection: close\r\n\r\n", ); continue; } let store = store.clone(); let sock = sock_owned.clone(); let counter = Arc::clone(&active); counter.fetch_add(1, Ordering::Relaxed); std::thread::spawn(move || { handle_http(stream, &store, &sock); counter.fetch_sub(1, Ordering::Relaxed); }); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { std::thread::sleep(Duration::from_millis(50)); } Err(e) => { log::debug!("http: accept failed: {e}"); std::thread::sleep(Duration::from_millis(100)); } } } } fn handle_http( mut stream: std::net::TcpStream, store: &PasteStore, sock_path: &Path, ) { use std::io::Read; stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); let mut buf = [0u8; 4096]; let n = match stream.read(&mut buf) { Ok(n) => n, Err(_) => return, }; let request = String::from_utf8_lossy(&buf[..n]); // Parse "METHOD / HTTP/1.x" let mut parts = request.split_whitespace(); let method = parts.next().unwrap_or(""); let path = match parts.next() { Some(p) => p, None => { http_response(&mut stream, 400, "text/plain", b"Bad Request"); return; } }; if method != "GET" && method != "HEAD" { http_response(&mut stream, 405, "text/plain", b"Method Not Allowed"); return; } if path == "/" || path == "/favicon.ico" { http_response( &mut stream, 200, "text/plain", b"Hello Tesseras World\n", ); return; } // Strip leading / let key = path.trim_start_matches('/'); if key.is_empty() { http_response(&mut stream, 400, "text/plain", b"missing key"); return; } // Split hash#enckey (URL fragment won't arrive via // HTTP, so also support hash/enckey as path) let (hash_b58, enc_key_b58) = if let Some((h, k)) = key.split_once('/') { (h, Some(k)) } else { (key, None) }; let hash = match base58::decode(hash_b58) { Some(h) if h.len() == 32 => h, _ => { http_response(&mut stream, 400, "text/plain", b"invalid key"); return; } }; // Build the daemon-style key: hash#enckey let daemon_key = match enc_key_b58 { Some(ek) => format!("{hash_b58}#{ek}"), None => hash_b58.to_string(), }; // Try local store first (fast path) let body = if let Some(data) = store.get_paste(&hash) { match serve_paste_data(&data, enc_key_b58) { Ok(b) => b, Err((status, msg)) => { http_response( &mut stream, status, "text/plain", msg.as_bytes(), ); return; } } } else { // Not local — ask daemon which does a DHT lookup match dht_lookup_via_socket(sock_path, &daemon_key) { Some(b) => b, None => { http_response(&mut stream, 404, "text/plain", b"not found"); return; } } }; let ct = if std::str::from_utf8(&body).is_ok() { "text/plain; charset=utf-8" } else { "application/octet-stream" }; http_response(&mut stream, 200, ct, &body); } /// Deserialize a Paste from store bytes, optionally decrypt. fn serve_paste_data( data: &[u8], enc_key_b58: Option<&str>, ) -> Result, (u16, &'static str)> { let paste = Paste::from_bytes(data).ok_or((500, "corrupt paste"))?; if let Some(kb58) = enc_key_b58 { let key_bytes = base58::decode(kb58).ok_or((400, "invalid enc key"))?; if key_bytes.len() != 32 { return Err((400, "invalid enc key")); } let mut key = [0u8; 32]; key.copy_from_slice(&key_bytes); crate::crypto::decrypt(&key, &paste.content) .ok_or((403, "decryption failed")) } else { Ok(paste.content) } } /// Ask the daemon for a paste via Unix socket (triggers /// a DHT lookup if not in local store). /// Key format: "hash" or "hash#enckey". fn dht_lookup_via_socket(sock_path: &Path, key: &str) -> Option> { let sock = std::os::unix::net::UnixStream::connect(sock_path).ok()?; sock.set_read_timeout(Some(Duration::from_secs(35))).ok(); sock.set_write_timeout(Some(Duration::from_secs(5))).ok(); let cmd = format!("GET {key}\n"); (&sock).write_all(cmd.as_bytes()).ok()?; let reader = BufReader::new(&sock); let line = reader.lines().next()?.ok()?; let rest = line.strip_prefix("OK ")?; base58::decode(rest) } fn http_response( stream: &mut std::net::TcpStream, status: u16, content_type: &str, body: &[u8], ) { let reason = match status { 200 => "OK", 400 => "Bad Request", 403 => "Forbidden", 405 => "Method Not Allowed", 404 => "Not Found", 500 => "Internal Server Error", _ => "Unknown", }; let header = format!( "HTTP/1.1 {status} {reason}\r\n\ Content-Type: {content_type}\r\n\ Content-Length: {}\r\n\ Connection: close\r\n\ \r\n", body.len(), ); let _ = stream.write_all(header.as_bytes()); let _ = stream.write_all(body); }