From a96da5f0704c50e8a4e4f047dcd3fb7c73fdf600 Mon Sep 17 00:00:00 2001 From: murilo ijanc Date: Wed, 25 Mar 2026 23:23:10 -0300 Subject: Initial implementation of tesseras-url Decentralized URL shortener built on tesseras-dht. Includes: - tud daemon: DHT node, Unix socket API, HTTP 302 redirect server - tu CLI: shorten, resolve, del, list, status commands - Auto-generated slugs (8-byte SHA256, base58) or custom slugs - TTL support (default: forever) - Automatic re-join of bootstrap nodes when routing table is empty - OpenBSD pledge(2) and unveil(2) sandboxing - DNS SRV bootstrap discovery - Verbose mode (-v) for both binaries --- src/daemon.rs | 535 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 535 insertions(+) create mode 100644 src/daemon.rs (limited to 'src/daemon.rs') diff --git a/src/daemon.rs b/src/daemon.rs new file mode 100644 index 0000000..8c63f47 --- /dev/null +++ b/src/daemon.rs @@ -0,0 +1,535 @@ +//! Daemon main loop, Unix socket listener, and HTTP server. + +use std::io::{BufRead, BufReader, Read, Write}; +use std::path::Path; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +use tesseras_dht::Node; + +use crate::base58; +use crate::ops; +use crate::protocol::{self, Request, Response}; +use crate::store::UrlStore; +use crate::url_entry::UrlEntry; + +/// How often to garbage-collect expired entries (10 min). +const GC_INTERVAL: Duration = Duration::from_secs(600); + +/// How often to republish local entries to the DHT (30 min). +const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800); + +/// How often to persist routing table and state (5 min). +const SAVE_INTERVAL: Duration = Duration::from_secs(300); + +/// How often to attempt re-join when routing table is empty (60 s). +const REJOIN_INTERVAL: Duration = Duration::from_secs(60); + +/// How often to sync DHT-replicated values to local store (5 s). +const SYNC_INTERVAL: Duration = Duration::from_secs(5); + +/// A request from the socket thread to the main thread. +pub struct DaemonRequest { + pub cmd: Request, + pub reply: mpsc::Sender, +} + +/// Run the daemon main loop. +pub fn run_daemon( + node: &mut Node, + store: &UrlStore, + rx: &mpsc::Receiver, + shutdown: &AtomicBool, + bootstrap: &[String], +) { + let mut last_gc = Instant::now(); + let mut last_republish = Instant::now() - REPUBLISH_INTERVAL; + let mut last_save = Instant::now(); + let mut last_sync = Instant::now(); + let mut last_rejoin = Instant::now(); + + log::info!("daemon main loop started"); + + while !shutdown.load(Ordering::Relaxed) { + let _ = node.poll_timeout(Duration::from_millis(100)); + + while let Ok(req) = rx.try_recv() { + let is_shutdown = matches!(req.cmd, Request::Shutdown); + let resp = handle_request(node, store, req.cmd); + let _ = req.reply.send(resp); + if is_shutdown { + shutdown.store(true, Ordering::Relaxed); + } + } + + // Re-join bootstrap nodes when the routing table is empty + if node.routing_table_size() == 0 + && !bootstrap.is_empty() + && last_rejoin.elapsed() >= REJOIN_INTERVAL + { + last_rejoin = Instant::now(); + log::warn!("routing table empty, re-joining bootstrap nodes"); + for peer in bootstrap { + let parts: Vec<&str> = peer.rsplitn(2, ':').collect(); + if parts.len() != 2 { + continue; + } + let host = parts[1]; + if let Ok(p) = parts[0].parse::() { + use std::net::ToSocketAddrs; + if let Ok(addrs) = + format!("{host}:{p}").to_socket_addrs() + { + for addr in addrs { + node.unban(&addr); + } + } + if let Err(e) = node.join(host, p) { + log::warn!("rejoin: failed to join {peer}: {e}"); + } else { + log::info!("rejoin: sent join to {peer}"); + } + } + } + } + + if last_sync.elapsed() >= SYNC_INTERVAL { + last_sync = Instant::now(); + sync_dht_to_store(node, store); + } + + if last_gc.elapsed() >= GC_INTERVAL { + last_gc = Instant::now(); + match store.gc() { + Ok(0) => {} + Ok(n) => log::info!("gc: removed {n} expired entries"), + Err(e) => log::warn!("gc: {e}"), + } + } + + if last_republish.elapsed() >= REPUBLISH_INTERVAL { + last_republish = Instant::now(); + republish(node, store); + } + + if last_save.elapsed() >= SAVE_INTERVAL { + last_save = Instant::now(); + node.save_state(); + } + } + + log::info!("daemon main loop stopped, shutting down"); + node.shutdown(); +} + +/// Dispatch a single client request. +fn handle_request( + node: &mut Node, + store: &UrlStore, + cmd: Request, +) -> Response { + match cmd { + Request::Shorten { + ttl_secs, + slug, + target_url, + } => match ops::shorten_url(node, store, &target_url, ttl_secs, &slug) + { + Ok(slug) => Response::Ok(slug), + Err(e) => Response::Err(e.to_string()), + }, + Request::Resolve { slug } => { + match ops::resolve_url(node, store, &slug) { + Ok(url) => Response::Ok(url), + Err(e) => Response::Err(e.to_string()), + } + } + Request::Del { slug } => { + match ops::delete_url(node, store, &slug) { + Ok(()) => Response::Ok("deleted".into()), + Err(e) => Response::Err(e.to_string()), + } + } + Request::List => { + let entries = store.list_entries(); + if entries.is_empty() { + return Response::Ok("(empty)".into()); + } + let list: Vec = entries + .iter() + .map(|(slug, url)| format!("{slug}\t{url}")) + .collect(); + Response::Ok(base58::encode(list.join("\n").as_bytes())) + } + Request::Status => { + let m = node.metrics(); + let status = format!( + "peers={} stored={} urls={} \ + sent={} recv={} lookups={}/{}", + node.routing_table_size(), + node.storage_count(), + store.entry_count(), + m.messages_sent, + m.messages_received, + m.lookups_completed, + m.lookups_started, + ); + Response::Ok(status) + } + Request::Shutdown => Response::Ok("shutting down".into()), + } +} + +/// Copy DHT-replicated values into the local file store. +fn sync_dht_to_store(node: &Node, store: &UrlStore) { + for (key, value) in node.dht_values() { + if key.len() != 32 { + continue; + } + if store.get_entry(&key).is_none() { + let _ = store.put_entry(&key, &value); + } + } +} + +/// Re-announce locally stored entries to the DHT. +fn republish(node: &mut Node, store: &UrlStore) { + let keys = store.original_keys(); + if keys.is_empty() { + return; + } + + let mut count = 0u32; + for key in &keys { + if let Some(data) = store.get_entry(key) + && let Some(entry) = UrlEntry::from_bytes(&data) + { + let remaining = if entry.ttl_secs == 0 { + u16::MAX + } else if entry.is_expired() { + continue; + } else { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let expires = + entry.created_at.saturating_add(entry.ttl_secs); + let rem = expires.saturating_sub(now); + std::cmp::min(rem, u16::MAX as u64) as u16 + }; + node.put(key, &data, remaining, false); + count += 1; + } + } + if count > 0 { + log::info!("republish: announced {count} entries to DHT"); + } +} + +/// Run the Unix socket listener thread. +pub fn run_unix_listener( + sock_path: &Path, + tx: mpsc::Sender, + shutdown: &AtomicBool, +) { + let _ = std::fs::remove_file(sock_path); + + let listener = match std::os::unix::net::UnixListener::bind(sock_path) { + Ok(l) => l, + Err(e) => { + log::error!( + "unix: failed to bind {}: {e}", + sock_path.display() + ); + return; + } + }; + + use std::os::unix::fs::PermissionsExt; + let perms = std::fs::Permissions::from_mode(0o770); + if let Err(e) = std::fs::set_permissions(sock_path, perms) { + log::warn!("unix: failed to set socket permissions: {e}"); + } + + if let Err(e) = listener.set_nonblocking(true) { + log::error!("unix: failed to set non-blocking: {e}"); + return; + } + + log::info!("unix: listening on {}", sock_path.display()); + + while !shutdown.load(Ordering::Relaxed) { + match listener.accept() { + Ok((stream, _)) => { + if let Err(e) = handle_client(stream, &tx) { + log::debug!("unix: client disconnected: {e}"); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(50)); + } + Err(e) => { + log::warn!("unix: accept failed: {e}"); + std::thread::sleep(Duration::from_millis(100)); + } + } + } + + let _ = std::fs::remove_file(sock_path); +} + +/// Maximum protocol line size (16 KiB is generous for URLs). +const MAX_LINE_SIZE: usize = 16 * 1024; + +fn handle_client( + stream: std::os::unix::net::UnixStream, + tx: &mpsc::Sender, +) -> Result<(), Box> { + if let Err(e) = stream.set_nonblocking(false) { + log::warn!("unix: failed to set blocking mode: {e}"); + return Err(e.into()); + } + if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(60))) { + log::warn!("unix: failed to set read timeout: {e}"); + return Err(e.into()); + } + + let mut reader = BufReader::new(&stream); + let mut writer = &stream; + let mut line = String::new(); + + loop { + line.clear(); + let n = (&mut reader) + .take(MAX_LINE_SIZE as u64) + .read_line(&mut line)?; + if n == 0 { + break; + } + if !line.ends_with('\n') && n >= MAX_LINE_SIZE { + let resp = protocol::format_response(&Response::Err( + "request too large".into(), + )); + writer.write_all(resp.as_bytes())?; + let mut discard = Vec::new(); + let _ = (&mut reader) + .take(MAX_LINE_SIZE as u64) + .read_until(b'\n', &mut discard); + continue; + } + let line = line.trim(); + let cmd = match protocol::parse_request(line) { + Ok(c) => c, + Err(e) => { + let resp = protocol::format_response(&Response::Err(e)); + writer.write_all(resp.as_bytes())?; + continue; + } + }; + + let is_shutdown = matches!(cmd, Request::Shutdown); + + let (reply_tx, reply_rx) = mpsc::channel(); + tx.send(DaemonRequest { + cmd, + reply: reply_tx, + })?; + + let resp = reply_rx + .recv_timeout(Duration::from_secs(60)) + .unwrap_or(Response::Err("timeout".into())); + writer.write_all(protocol::format_response(&resp).as_bytes())?; + + if is_shutdown { + break; + } + } + Ok(()) +} + +// ── HTTP server ───────────────────────────────────── + +/// Maximum concurrent HTTP handler threads. +const MAX_HTTP_THREADS: usize = 8; + +/// Minimal HTTP server. Redirects at /. +pub fn run_http( + port: u16, + sock_path: &Path, + store: &UrlStore, + shutdown: &AtomicBool, +) { + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); + let listener = match std::net::TcpListener::bind(addr) { + Ok(l) => l, + Err(e) => { + log::error!("http: failed to bind {addr}: {e}"); + return; + } + }; + if let Err(e) = listener.set_nonblocking(true) { + log::error!("http: failed to set non-blocking: {e}"); + return; + } + + log::info!("http: listening on {addr}"); + + let active = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let sock_owned = sock_path.to_path_buf(); + + while !shutdown.load(Ordering::Relaxed) { + match listener.accept() { + Ok((stream, _)) => { + if active.load(Ordering::Relaxed) >= MAX_HTTP_THREADS { + log::warn!("http: max connections reached, rejecting"); + let mut s = stream; + let _ = s.write_all( + b"HTTP/1.1 503 Service Unavailable\r\n\ + Connection: close\r\n\r\n", + ); + continue; + } + let store = store.clone(); + let sock = sock_owned.clone(); + let counter = Arc::clone(&active); + counter.fetch_add(1, Ordering::Relaxed); + std::thread::spawn(move || { + handle_http(stream, &store, &sock); + counter.fetch_sub(1, Ordering::Relaxed); + }); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::thread::sleep(Duration::from_millis(50)); + } + Err(e) => { + log::debug!("http: accept failed: {e}"); + std::thread::sleep(Duration::from_millis(100)); + } + } + } +} + +fn handle_http( + mut stream: std::net::TcpStream, + store: &UrlStore, + sock_path: &Path, +) { + use std::io::Read; + + stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); + + let mut buf = [0u8; 4096]; + let n = match stream.read(&mut buf) { + Ok(n) => n, + Err(_) => return, + }; + let request = String::from_utf8_lossy(&buf[..n]); + + let mut parts = request.split_whitespace(); + let method = parts.next().unwrap_or(""); + let path = match parts.next() { + Some(p) => p, + None => { + http_response(&mut stream, 400, "text/plain", b"Bad Request"); + return; + } + }; + + if method != "GET" && method != "HEAD" { + http_response( + &mut stream, + 405, + "text/plain", + b"Method Not Allowed", + ); + return; + } + + if path == "/" || path == "/favicon.ico" { + http_response( + &mut stream, + 200, + "text/plain", + b"tesseras-url shortener\n", + ); + return; + } + + let slug = path.trim_start_matches('/'); + if slug.is_empty() { + http_response(&mut stream, 400, "text/plain", b"missing slug"); + return; + } + + // Try local store first + let dht_key = UrlEntry::dht_key(slug); + let target = if let Some(data) = store.get_entry(&dht_key) { + UrlEntry::from_bytes(&data).map(|e| e.target_url) + } else { + // Fall back to daemon (DHT lookup) + dht_resolve_via_socket(sock_path, slug) + }; + + match target { + Some(url) => http_redirect(&mut stream, &url), + None => { + http_response(&mut stream, 404, "text/plain", b"not found") + } + } +} + +/// Ask the daemon for a URL resolution via Unix socket. +fn dht_resolve_via_socket(sock_path: &Path, slug: &str) -> Option { + let sock = std::os::unix::net::UnixStream::connect(sock_path).ok()?; + sock.set_read_timeout(Some(Duration::from_secs(35))).ok(); + sock.set_write_timeout(Some(Duration::from_secs(5))).ok(); + + let cmd = format!("RESOLVE {slug}\n"); + (&sock).write_all(cmd.as_bytes()).ok()?; + + let reader = BufReader::new(&sock); + let line = reader.lines().next()?.ok()?; + let url = line.strip_prefix("OK ")?; + Some(url.to_string()) +} + +fn http_redirect(stream: &mut std::net::TcpStream, location: &str) { + let header = format!( + "HTTP/1.1 302 Found\r\n\ + Location: {location}\r\n\ + Content-Length: 0\r\n\ + Connection: close\r\n\ + \r\n" + ); + let _ = stream.write_all(header.as_bytes()); +} + +fn http_response( + stream: &mut std::net::TcpStream, + status: u16, + content_type: &str, + body: &[u8], +) { + let reason = match status { + 200 => "OK", + 400 => "Bad Request", + 404 => "Not Found", + 405 => "Method Not Allowed", + 500 => "Internal Server Error", + 503 => "Service Unavailable", + _ => "Unknown", + }; + let header = format!( + "HTTP/1.1 {status} {reason}\r\n\ + Content-Type: {content_type}\r\n\ + Content-Length: {}\r\n\ + Connection: close\r\n\ + \r\n", + body.len(), + ); + let _ = stream.write_all(header.as_bytes()); + let _ = stream.write_all(body); +} -- cgit v1.2.3