aboutsummaryrefslogtreecommitdiffstats
path: root/src/daemon.rs
diff options
context:
space:
mode:
authormurilo ijanc2026-03-25 02:07:37 -0300
committermurilo ijanc2026-03-25 02:07:37 -0300
commit7aff2e1d279a4e442b32f49ca0a0eca065355787 (patch)
treebc987ece7eb78bb8375de1b20123ecd0f90472ba /src/daemon.rs
downloadtesseras-paste-7aff2e1d279a4e442b32f49ca0a0eca065355787.tar.gz
Initial commit: tesseras-paste decentralized pastebin
DHT-backed encrypted pastebin with two binaries (tp/tpd), XChaCha20-Poly1305 encryption, content-addressed storage, and Unix socket + HTTP interfaces.
Diffstat (limited to 'src/daemon.rs')
-rw-r--r--src/daemon.rs504
1 files changed, 504 insertions, 0 deletions
diff --git a/src/daemon.rs b/src/daemon.rs
new file mode 100644
index 0000000..313a4aa
--- /dev/null
+++ b/src/daemon.rs
@@ -0,0 +1,504 @@
+//! Daemon main loop, Unix socket listener, and HTTP server.
+//!
+//! The daemon loop drives the DHT node, processes client
+//! requests, and runs periodic maintenance (GC, republish,
+//! state persistence). Communication with the CLI happens
+//! over a Unix socket using a line-oriented text protocol
+//! (see [`crate::protocol`]).
+
+use std::io::{BufRead, BufReader, Write};
+use std::path::Path;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::mpsc;
+use std::time::{Duration, Instant};
+
+use tesseras_dht::Node;
+
+use crate::base58;
+use crate::ops;
+use crate::paste::Paste;
+use crate::protocol::{self, Request, Response};
+use crate::store::PasteStore;
+
+/// How often to garbage-collect expired pastes (10 min).
+const GC_INTERVAL: Duration = Duration::from_secs(600);
+
+/// How often to republish local pastes to the DHT (30 min).
+const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800);
+
+/// How often to persist routing table and state (5 min).
+const SAVE_INTERVAL: Duration = Duration::from_secs(300);
+
+/// How often to sync DHT-replicated values to local store (5 s).
+const SYNC_INTERVAL: Duration = Duration::from_secs(5);
+
+/// A request from the socket thread to the main thread.
+pub struct DaemonRequest {
+ pub cmd: Request,
+ pub reply: mpsc::Sender<Response>,
+}
+
+/// Run the daemon main loop.
+pub fn run_daemon(
+ node: &mut Node,
+ store: &PasteStore,
+ rx: &mpsc::Receiver<DaemonRequest>,
+ shutdown: &AtomicBool,
+) {
+ let mut last_gc = Instant::now();
+ let mut last_republish = Instant::now() - REPUBLISH_INTERVAL;
+ let mut last_save = Instant::now();
+ let mut last_sync = Instant::now();
+
+ log::info!("daemon main loop started");
+
+ while !shutdown.load(Ordering::Relaxed) {
+ let _ = node.poll_timeout(Duration::from_millis(100));
+
+ while let Ok(req) = rx.try_recv() {
+ let is_shutdown = matches!(req.cmd, Request::Shutdown);
+ let resp = handle_request(node, store, req.cmd);
+ let _ = req.reply.send(resp);
+ if is_shutdown {
+ shutdown.store(true, Ordering::Relaxed);
+ }
+ }
+
+ if last_sync.elapsed() >= SYNC_INTERVAL {
+ last_sync = Instant::now();
+ sync_dht_to_store(node, store);
+ }
+
+ if last_gc.elapsed() >= GC_INTERVAL {
+ last_gc = Instant::now();
+ match store.gc() {
+ Ok(0) => {}
+ Ok(n) => log::info!("gc: removed {n} expired pastes"),
+ Err(e) => log::warn!("gc: {e}"),
+ }
+ }
+
+ if last_republish.elapsed() >= REPUBLISH_INTERVAL {
+ last_republish = Instant::now();
+ republish(node, store);
+ }
+
+ if last_save.elapsed() >= SAVE_INTERVAL {
+ last_save = Instant::now();
+ node.save_state();
+ }
+ }
+
+ log::info!("daemon main loop stopped, shutting down");
+ node.shutdown();
+}
+
+/// Dispatch a single client request to the appropriate operation.
+fn handle_request(
+ node: &mut Node,
+ store: &PasteStore,
+ cmd: Request,
+) -> Response {
+ match cmd {
+ Request::Put {
+ ttl_secs,
+ content_b58,
+ encrypt,
+ } => {
+ let content = match base58::decode(&content_b58) {
+ Some(c) => c,
+ None => return Response::Err("invalid base58 content".into()),
+ };
+ match ops::put_paste(node, store, &content, ttl_secs, encrypt) {
+ Ok(key) => Response::Ok(key),
+ Err(e) => Response::Err(e.to_string()),
+ }
+ }
+ Request::Get { key } => match ops::get_paste(node, store, &key) {
+ Ok(data) => Response::Ok(base58::encode(&data)),
+ Err(e) => Response::Err(e.to_string()),
+ },
+ Request::Del { key } => match ops::delete_paste(node, store, &key) {
+ Ok(()) => Response::Ok("deleted".into()),
+ Err(e) => Response::Err(e.to_string()),
+ },
+ Request::Pin { ref key } | Request::Unpin { ref key } => {
+ let is_pin = matches!(cmd, Request::Pin { .. });
+ let key = key.clone();
+ let hash = match ops::resolve_hash(&key) {
+ Ok(h) => h,
+ Err(e) => return Response::Err(e.to_string()),
+ };
+ let result = if is_pin {
+ store.pin(&hash)
+ } else {
+ store.unpin(&hash)
+ };
+ match result {
+ Ok(()) => {
+ let label = if is_pin { "pinned" } else { "unpinned" };
+ Response::Ok(label.into())
+ }
+ Err(e) => Response::Err(e.to_string()),
+ }
+ }
+ Request::Status => {
+ let m = node.metrics();
+ let status = format!(
+ "peers={} stored={} pastes={} \
+ sent={} recv={} lookups={}/{}",
+ node.routing_table_size(),
+ node.storage_count(),
+ store.paste_count(),
+ m.messages_sent,
+ m.messages_received,
+ m.lookups_completed,
+ m.lookups_started,
+ );
+ Response::Ok(status)
+ }
+ Request::Shutdown => Response::Ok("shutting down".into()),
+ }
+}
+
+/// Copy DHT-replicated values into the local file store so
+/// the HTTP server can serve them without a DHT lookup.
+fn sync_dht_to_store(node: &Node, store: &PasteStore) {
+ for (key, value) in node.dht_values() {
+ if key.len() != 32 {
+ continue;
+ }
+ if store.get_paste(&key).is_none() {
+ let _ = store.put_paste(&key, &value);
+ }
+ }
+}
+
+/// Re-announce locally stored pastes to the DHT so they
+/// remain reachable as nodes join and leave the network.
+fn republish(node: &mut Node, store: &PasteStore) {
+ let keys = store.original_keys();
+ if keys.is_empty() {
+ return;
+ }
+
+ let mut count = 0u32;
+ for key in &keys {
+ if let Some(data) = store.get_paste(key)
+ && let Some(paste) = Paste::from_bytes(&data)
+ {
+ let remaining = if store.is_pinned(key) {
+ u16::MAX
+ } else if paste.is_expired() {
+ continue;
+ } else {
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs();
+ let expires = paste.created_at + paste.ttl_secs;
+ let rem = expires.saturating_sub(now);
+ std::cmp::min(rem, u16::MAX as u64) as u16
+ };
+ node.put(key, &data, remaining, false);
+ count += 1;
+ }
+ }
+ if count > 0 {
+ log::info!("republish: announced {count} pastes to DHT");
+ }
+}
+
+/// Run the Unix socket listener thread.
+pub fn run_unix_listener(
+ sock_path: &Path,
+ tx: mpsc::Sender<DaemonRequest>,
+ shutdown: &AtomicBool,
+) {
+ let _ = std::fs::remove_file(sock_path);
+
+ let listener = match std::os::unix::net::UnixListener::bind(sock_path) {
+ Ok(l) => l,
+ Err(e) => {
+ log::error!("unix: failed to bind {}: {e}", sock_path.display());
+ return;
+ }
+ };
+
+ // Allow group members to connect (0o770)
+ use std::os::unix::fs::PermissionsExt;
+ let perms = std::fs::Permissions::from_mode(0o770);
+ if let Err(e) = std::fs::set_permissions(sock_path, perms) {
+ log::warn!("unix: failed to set socket permissions: {e}");
+ }
+
+ if let Err(e) = listener.set_nonblocking(true) {
+ log::error!("unix: failed to set non-blocking: {e}");
+ return;
+ }
+
+ log::info!("unix: listening on {}", sock_path.display());
+
+ while !shutdown.load(Ordering::Relaxed) {
+ match listener.accept() {
+ Ok((stream, _)) => {
+ if let Err(e) = handle_client(stream, &tx) {
+ log::debug!("unix: client disconnected: {e}");
+ }
+ }
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ std::thread::sleep(Duration::from_millis(50));
+ }
+ Err(e) => {
+ log::warn!("unix: accept failed: {e}");
+ std::thread::sleep(Duration::from_millis(100));
+ }
+ }
+ }
+
+ let _ = std::fs::remove_file(sock_path);
+}
+
+/// Read requests line-by-line from a connected Unix socket
+/// client, forwarding each to the daemon main loop via `tx`.
+fn handle_client(
+ stream: std::os::unix::net::UnixStream,
+ tx: &mpsc::Sender<DaemonRequest>,
+) -> Result<(), Box<dyn std::error::Error>> {
+ stream.set_nonblocking(false)?;
+ stream.set_read_timeout(Some(Duration::from_secs(60)))?;
+
+ let reader = BufReader::new(&stream);
+ let mut writer = &stream;
+
+ for line in reader.lines() {
+ let line = line?;
+ let cmd = match protocol::parse_request(&line) {
+ Ok(c) => c,
+ Err(e) => {
+ let resp = protocol::format_response(&Response::Err(e));
+ writer.write_all(resp.as_bytes())?;
+ continue;
+ }
+ };
+
+ let is_shutdown = matches!(cmd, Request::Shutdown);
+
+ let (reply_tx, reply_rx) = mpsc::channel();
+ tx.send(DaemonRequest {
+ cmd,
+ reply: reply_tx,
+ })?;
+
+ let resp = reply_rx
+ .recv_timeout(Duration::from_secs(60))
+ .unwrap_or(Response::Err("timeout".into()));
+ writer.write_all(protocol::format_response(&resp).as_bytes())?;
+
+ if is_shutdown {
+ break;
+ }
+ }
+ Ok(())
+}
+
+// ── HTTP server ─────────────────────────────────────
+
+/// Minimal HTTP server. Serves pastes at /<hash> or
+/// /<hash>/<enckey>. Queries the daemon via Unix socket
+/// so it can access DHT-replicated data too.
+pub fn run_http(
+ port: u16,
+ sock_path: &Path,
+ store: &PasteStore,
+ shutdown: &AtomicBool,
+) {
+ let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
+ let listener = match std::net::TcpListener::bind(addr) {
+ Ok(l) => l,
+ Err(e) => {
+ log::error!("http: failed to bind {addr}: {e}");
+ return;
+ }
+ };
+ if let Err(e) = listener.set_nonblocking(true) {
+ log::error!("http: failed to set non-blocking: {e}");
+ return;
+ }
+
+ log::info!("http: listening on {addr}");
+
+ while !shutdown.load(Ordering::Relaxed) {
+ match listener.accept() {
+ Ok((stream, _)) => {
+ let store = store.clone();
+ handle_http(stream, &store, sock_path);
+ }
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ std::thread::sleep(Duration::from_millis(50));
+ }
+ Err(e) => {
+ log::debug!("http: accept failed: {e}");
+ std::thread::sleep(Duration::from_millis(100));
+ }
+ }
+ }
+}
+
+fn handle_http(
+ mut stream: std::net::TcpStream,
+ store: &PasteStore,
+ sock_path: &Path,
+) {
+ use std::io::Read;
+
+ stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
+
+ let mut buf = [0u8; 4096];
+ let n = match stream.read(&mut buf) {
+ Ok(n) => n,
+ Err(_) => return,
+ };
+ let request = String::from_utf8_lossy(&buf[..n]);
+
+ // Parse "GET /<path> HTTP/1.x"
+ let path = match request.split_whitespace().nth(1) {
+ Some(p) => p,
+ None => {
+ http_response(&mut stream, 400, "text/plain", b"Bad Request");
+ return;
+ }
+ };
+
+ if path == "/" || path == "/favicon.ico" {
+ http_response(&mut stream, 200, "text/plain", b"tesseras-paste\n");
+ return;
+ }
+
+ // Strip leading /
+ let key = path.trim_start_matches('/');
+ if key.is_empty() {
+ http_response(&mut stream, 400, "text/plain", b"missing key");
+ return;
+ }
+
+ // Split hash#enckey (URL fragment won't arrive via
+ // HTTP, so also support hash/enckey as path)
+ let (hash_b58, enc_key_b58) = if let Some((h, k)) = key.split_once('/') {
+ (h, Some(k))
+ } else {
+ (key, None)
+ };
+
+ let hash = match base58::decode(hash_b58) {
+ Some(h) if h.len() == 32 => h,
+ _ => {
+ http_response(&mut stream, 400, "text/plain", b"invalid key");
+ return;
+ }
+ };
+
+ // Build the daemon-style key: hash#enckey
+ let daemon_key = match enc_key_b58 {
+ Some(ek) => format!("{hash_b58}#{ek}"),
+ None => hash_b58.to_string(),
+ };
+
+ // Try local store first (fast path)
+ let body = if let Some(data) = store.get_paste(&hash) {
+ match serve_paste_data(&data, enc_key_b58) {
+ Ok(b) => b,
+ Err((status, msg)) => {
+ http_response(
+ &mut stream,
+ status,
+ "text/plain",
+ msg.as_bytes(),
+ );
+ return;
+ }
+ }
+ } else {
+ // Not local — ask daemon which does a DHT lookup
+ match dht_lookup_via_socket(sock_path, &daemon_key) {
+ Some(b) => b,
+ None => {
+ http_response(&mut stream, 404, "text/plain", b"not found");
+ return;
+ }
+ }
+ };
+
+ let ct = if std::str::from_utf8(&body).is_ok() {
+ "text/plain; charset=utf-8"
+ } else {
+ "application/octet-stream"
+ };
+
+ http_response(&mut stream, 200, ct, &body);
+}
+
+/// Deserialize a Paste from store bytes, optionally decrypt.
+fn serve_paste_data(
+ data: &[u8],
+ enc_key_b58: Option<&str>,
+) -> Result<Vec<u8>, (u16, &'static str)> {
+ let paste = Paste::from_bytes(data).ok_or((500, "corrupt paste"))?;
+
+ if let Some(kb58) = enc_key_b58 {
+ let key_bytes = base58::decode(kb58).ok_or((400, "invalid enc key"))?;
+ if key_bytes.len() != 32 {
+ return Err((400, "invalid enc key"));
+ }
+ let mut key = [0u8; 32];
+ key.copy_from_slice(&key_bytes);
+ crate::crypto::decrypt(&key, &paste.content)
+ .ok_or((403, "decryption failed"))
+ } else {
+ Ok(paste.content)
+ }
+}
+
+/// Ask the daemon for a paste via Unix socket (triggers
+/// a DHT lookup if not in local store).
+/// Key format: "hash" or "hash#enckey".
+fn dht_lookup_via_socket(sock_path: &Path, key: &str) -> Option<Vec<u8>> {
+ let sock = std::os::unix::net::UnixStream::connect(sock_path).ok()?;
+ sock.set_read_timeout(Some(Duration::from_secs(35))).ok();
+ sock.set_write_timeout(Some(Duration::from_secs(5))).ok();
+
+ let cmd = format!("GET {key}\n");
+ (&sock).write_all(cmd.as_bytes()).ok()?;
+
+ let reader = BufReader::new(&sock);
+ let line = reader.lines().next()?.ok()?;
+ let rest = line.strip_prefix("OK ")?;
+
+ base58::decode(rest)
+}
+
+fn http_response(
+ stream: &mut std::net::TcpStream,
+ status: u16,
+ content_type: &str,
+ body: &[u8],
+) {
+ let reason = match status {
+ 200 => "OK",
+ 400 => "Bad Request",
+ 403 => "Forbidden",
+ 404 => "Not Found",
+ 500 => "Internal Server Error",
+ _ => "Unknown",
+ };
+ let header = format!(
+ "HTTP/1.1 {status} {reason}\r\n\
+ Content-Type: {content_type}\r\n\
+ Content-Length: {}\r\n\
+ Connection: close\r\n\
+ \r\n",
+ body.len(),
+ );
+ let _ = stream.write_all(header.as_bytes());
+ let _ = stream.write_all(body);
+}