summaryrefslogtreecommitdiffstats
path: root/src/daemon.rs
diff options
context:
space:
mode:
authormurilo ijanc2026-03-25 23:23:10 -0300
committermurilo ijanc2026-03-25 23:23:10 -0300
commita96da5f0704c50e8a4e4f047dcd3fb7c73fdf600 (patch)
treef98bb20411dbc6a49e8c286b054a88d89eea795f /src/daemon.rs
downloadtesseras-url-main.tar.gz
Initial implementation of tesseras-urlHEADmain
Decentralized URL shortener built on tesseras-dht. Includes: - tud daemon: DHT node, Unix socket API, HTTP 302 redirect server - tu CLI: shorten, resolve, del, list, status commands - Auto-generated slugs (8-byte SHA256, base58) or custom slugs - TTL support (default: forever) - Automatic re-join of bootstrap nodes when routing table is empty - OpenBSD pledge(2) and unveil(2) sandboxing - DNS SRV bootstrap discovery - Verbose mode (-v) for both binaries
Diffstat (limited to 'src/daemon.rs')
-rw-r--r--src/daemon.rs535
1 files changed, 535 insertions, 0 deletions
diff --git a/src/daemon.rs b/src/daemon.rs
new file mode 100644
index 0000000..8c63f47
--- /dev/null
+++ b/src/daemon.rs
@@ -0,0 +1,535 @@
+//! Daemon main loop, Unix socket listener, and HTTP server.
+
+use std::io::{BufRead, BufReader, Read, Write};
+use std::path::Path;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::mpsc;
+use std::time::{Duration, Instant};
+
+use tesseras_dht::Node;
+
+use crate::base58;
+use crate::ops;
+use crate::protocol::{self, Request, Response};
+use crate::store::UrlStore;
+use crate::url_entry::UrlEntry;
+
+/// How often to garbage-collect expired entries (10 min).
+const GC_INTERVAL: Duration = Duration::from_secs(600);
+
+/// How often to republish local entries to the DHT (30 min).
+const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800);
+
+/// How often to persist routing table and state (5 min).
+const SAVE_INTERVAL: Duration = Duration::from_secs(300);
+
+/// How often to attempt re-join when routing table is empty (60 s).
+const REJOIN_INTERVAL: Duration = Duration::from_secs(60);
+
+/// How often to sync DHT-replicated values to local store (5 s).
+const SYNC_INTERVAL: Duration = Duration::from_secs(5);
+
+/// A request from the socket thread to the main thread.
+pub struct DaemonRequest {
+ pub cmd: Request,
+ pub reply: mpsc::Sender<Response>,
+}
+
+/// Run the daemon main loop.
+pub fn run_daemon(
+ node: &mut Node,
+ store: &UrlStore,
+ rx: &mpsc::Receiver<DaemonRequest>,
+ shutdown: &AtomicBool,
+ bootstrap: &[String],
+) {
+ let mut last_gc = Instant::now();
+ let mut last_republish = Instant::now() - REPUBLISH_INTERVAL;
+ let mut last_save = Instant::now();
+ let mut last_sync = Instant::now();
+ let mut last_rejoin = Instant::now();
+
+ log::info!("daemon main loop started");
+
+ while !shutdown.load(Ordering::Relaxed) {
+ let _ = node.poll_timeout(Duration::from_millis(100));
+
+ while let Ok(req) = rx.try_recv() {
+ let is_shutdown = matches!(req.cmd, Request::Shutdown);
+ let resp = handle_request(node, store, req.cmd);
+ let _ = req.reply.send(resp);
+ if is_shutdown {
+ shutdown.store(true, Ordering::Relaxed);
+ }
+ }
+
+ // Re-join bootstrap nodes when the routing table is empty
+ if node.routing_table_size() == 0
+ && !bootstrap.is_empty()
+ && last_rejoin.elapsed() >= REJOIN_INTERVAL
+ {
+ last_rejoin = Instant::now();
+ log::warn!("routing table empty, re-joining bootstrap nodes");
+ for peer in bootstrap {
+ let parts: Vec<&str> = peer.rsplitn(2, ':').collect();
+ if parts.len() != 2 {
+ continue;
+ }
+ let host = parts[1];
+ if let Ok(p) = parts[0].parse::<u16>() {
+ use std::net::ToSocketAddrs;
+ if let Ok(addrs) =
+ format!("{host}:{p}").to_socket_addrs()
+ {
+ for addr in addrs {
+ node.unban(&addr);
+ }
+ }
+ if let Err(e) = node.join(host, p) {
+ log::warn!("rejoin: failed to join {peer}: {e}");
+ } else {
+ log::info!("rejoin: sent join to {peer}");
+ }
+ }
+ }
+ }
+
+ if last_sync.elapsed() >= SYNC_INTERVAL {
+ last_sync = Instant::now();
+ sync_dht_to_store(node, store);
+ }
+
+ if last_gc.elapsed() >= GC_INTERVAL {
+ last_gc = Instant::now();
+ match store.gc() {
+ Ok(0) => {}
+ Ok(n) => log::info!("gc: removed {n} expired entries"),
+ Err(e) => log::warn!("gc: {e}"),
+ }
+ }
+
+ if last_republish.elapsed() >= REPUBLISH_INTERVAL {
+ last_republish = Instant::now();
+ republish(node, store);
+ }
+
+ if last_save.elapsed() >= SAVE_INTERVAL {
+ last_save = Instant::now();
+ node.save_state();
+ }
+ }
+
+ log::info!("daemon main loop stopped, shutting down");
+ node.shutdown();
+}
+
+/// Dispatch a single client request.
+fn handle_request(
+ node: &mut Node,
+ store: &UrlStore,
+ cmd: Request,
+) -> Response {
+ match cmd {
+ Request::Shorten {
+ ttl_secs,
+ slug,
+ target_url,
+ } => match ops::shorten_url(node, store, &target_url, ttl_secs, &slug)
+ {
+ Ok(slug) => Response::Ok(slug),
+ Err(e) => Response::Err(e.to_string()),
+ },
+ Request::Resolve { slug } => {
+ match ops::resolve_url(node, store, &slug) {
+ Ok(url) => Response::Ok(url),
+ Err(e) => Response::Err(e.to_string()),
+ }
+ }
+ Request::Del { slug } => {
+ match ops::delete_url(node, store, &slug) {
+ Ok(()) => Response::Ok("deleted".into()),
+ Err(e) => Response::Err(e.to_string()),
+ }
+ }
+ Request::List => {
+ let entries = store.list_entries();
+ if entries.is_empty() {
+ return Response::Ok("(empty)".into());
+ }
+ let list: Vec<String> = entries
+ .iter()
+ .map(|(slug, url)| format!("{slug}\t{url}"))
+ .collect();
+ Response::Ok(base58::encode(list.join("\n").as_bytes()))
+ }
+ Request::Status => {
+ let m = node.metrics();
+ let status = format!(
+ "peers={} stored={} urls={} \
+ sent={} recv={} lookups={}/{}",
+ node.routing_table_size(),
+ node.storage_count(),
+ store.entry_count(),
+ m.messages_sent,
+ m.messages_received,
+ m.lookups_completed,
+ m.lookups_started,
+ );
+ Response::Ok(status)
+ }
+ Request::Shutdown => Response::Ok("shutting down".into()),
+ }
+}
+
+/// Copy DHT-replicated values into the local file store.
+fn sync_dht_to_store(node: &Node, store: &UrlStore) {
+ for (key, value) in node.dht_values() {
+ if key.len() != 32 {
+ continue;
+ }
+ if store.get_entry(&key).is_none() {
+ let _ = store.put_entry(&key, &value);
+ }
+ }
+}
+
+/// Re-announce locally stored entries to the DHT.
+fn republish(node: &mut Node, store: &UrlStore) {
+ let keys = store.original_keys();
+ if keys.is_empty() {
+ return;
+ }
+
+ let mut count = 0u32;
+ for key in &keys {
+ if let Some(data) = store.get_entry(key)
+ && let Some(entry) = UrlEntry::from_bytes(&data)
+ {
+ let remaining = if entry.ttl_secs == 0 {
+ u16::MAX
+ } else if entry.is_expired() {
+ continue;
+ } else {
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs();
+ let expires =
+ entry.created_at.saturating_add(entry.ttl_secs);
+ let rem = expires.saturating_sub(now);
+ std::cmp::min(rem, u16::MAX as u64) as u16
+ };
+ node.put(key, &data, remaining, false);
+ count += 1;
+ }
+ }
+ if count > 0 {
+ log::info!("republish: announced {count} entries to DHT");
+ }
+}
+
+/// Run the Unix socket listener thread.
+pub fn run_unix_listener(
+ sock_path: &Path,
+ tx: mpsc::Sender<DaemonRequest>,
+ shutdown: &AtomicBool,
+) {
+ let _ = std::fs::remove_file(sock_path);
+
+ let listener = match std::os::unix::net::UnixListener::bind(sock_path) {
+ Ok(l) => l,
+ Err(e) => {
+ log::error!(
+ "unix: failed to bind {}: {e}",
+ sock_path.display()
+ );
+ return;
+ }
+ };
+
+ use std::os::unix::fs::PermissionsExt;
+ let perms = std::fs::Permissions::from_mode(0o770);
+ if let Err(e) = std::fs::set_permissions(sock_path, perms) {
+ log::warn!("unix: failed to set socket permissions: {e}");
+ }
+
+ if let Err(e) = listener.set_nonblocking(true) {
+ log::error!("unix: failed to set non-blocking: {e}");
+ return;
+ }
+
+ log::info!("unix: listening on {}", sock_path.display());
+
+ while !shutdown.load(Ordering::Relaxed) {
+ match listener.accept() {
+ Ok((stream, _)) => {
+ if let Err(e) = handle_client(stream, &tx) {
+ log::debug!("unix: client disconnected: {e}");
+ }
+ }
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ std::thread::sleep(Duration::from_millis(50));
+ }
+ Err(e) => {
+ log::warn!("unix: accept failed: {e}");
+ std::thread::sleep(Duration::from_millis(100));
+ }
+ }
+ }
+
+ let _ = std::fs::remove_file(sock_path);
+}
+
+/// Maximum protocol line size (16 KiB is generous for URLs).
+const MAX_LINE_SIZE: usize = 16 * 1024;
+
+fn handle_client(
+ stream: std::os::unix::net::UnixStream,
+ tx: &mpsc::Sender<DaemonRequest>,
+) -> Result<(), Box<dyn std::error::Error>> {
+ if let Err(e) = stream.set_nonblocking(false) {
+ log::warn!("unix: failed to set blocking mode: {e}");
+ return Err(e.into());
+ }
+ if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(60))) {
+ log::warn!("unix: failed to set read timeout: {e}");
+ return Err(e.into());
+ }
+
+ let mut reader = BufReader::new(&stream);
+ let mut writer = &stream;
+ let mut line = String::new();
+
+ loop {
+ line.clear();
+ let n = (&mut reader)
+ .take(MAX_LINE_SIZE as u64)
+ .read_line(&mut line)?;
+ if n == 0 {
+ break;
+ }
+ if !line.ends_with('\n') && n >= MAX_LINE_SIZE {
+ let resp = protocol::format_response(&Response::Err(
+ "request too large".into(),
+ ));
+ writer.write_all(resp.as_bytes())?;
+ let mut discard = Vec::new();
+ let _ = (&mut reader)
+ .take(MAX_LINE_SIZE as u64)
+ .read_until(b'\n', &mut discard);
+ continue;
+ }
+ let line = line.trim();
+ let cmd = match protocol::parse_request(line) {
+ Ok(c) => c,
+ Err(e) => {
+ let resp = protocol::format_response(&Response::Err(e));
+ writer.write_all(resp.as_bytes())?;
+ continue;
+ }
+ };
+
+ let is_shutdown = matches!(cmd, Request::Shutdown);
+
+ let (reply_tx, reply_rx) = mpsc::channel();
+ tx.send(DaemonRequest {
+ cmd,
+ reply: reply_tx,
+ })?;
+
+ let resp = reply_rx
+ .recv_timeout(Duration::from_secs(60))
+ .unwrap_or(Response::Err("timeout".into()));
+ writer.write_all(protocol::format_response(&resp).as_bytes())?;
+
+ if is_shutdown {
+ break;
+ }
+ }
+ Ok(())
+}
+
+// ── HTTP server ─────────────────────────────────────
+
+/// Maximum concurrent HTTP handler threads.
+const MAX_HTTP_THREADS: usize = 8;
+
+/// Minimal HTTP server. Redirects at /<slug>.
+pub fn run_http(
+ port: u16,
+ sock_path: &Path,
+ store: &UrlStore,
+ shutdown: &AtomicBool,
+) {
+ let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
+ let listener = match std::net::TcpListener::bind(addr) {
+ Ok(l) => l,
+ Err(e) => {
+ log::error!("http: failed to bind {addr}: {e}");
+ return;
+ }
+ };
+ if let Err(e) = listener.set_nonblocking(true) {
+ log::error!("http: failed to set non-blocking: {e}");
+ return;
+ }
+
+ log::info!("http: listening on {addr}");
+
+ let active = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let sock_owned = sock_path.to_path_buf();
+
+ while !shutdown.load(Ordering::Relaxed) {
+ match listener.accept() {
+ Ok((stream, _)) => {
+ if active.load(Ordering::Relaxed) >= MAX_HTTP_THREADS {
+ log::warn!("http: max connections reached, rejecting");
+ let mut s = stream;
+ let _ = s.write_all(
+ b"HTTP/1.1 503 Service Unavailable\r\n\
+ Connection: close\r\n\r\n",
+ );
+ continue;
+ }
+ let store = store.clone();
+ let sock = sock_owned.clone();
+ let counter = Arc::clone(&active);
+ counter.fetch_add(1, Ordering::Relaxed);
+ std::thread::spawn(move || {
+ handle_http(stream, &store, &sock);
+ counter.fetch_sub(1, Ordering::Relaxed);
+ });
+ }
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ std::thread::sleep(Duration::from_millis(50));
+ }
+ Err(e) => {
+ log::debug!("http: accept failed: {e}");
+ std::thread::sleep(Duration::from_millis(100));
+ }
+ }
+ }
+}
+
+fn handle_http(
+ mut stream: std::net::TcpStream,
+ store: &UrlStore,
+ sock_path: &Path,
+) {
+ use std::io::Read;
+
+ stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
+
+ let mut buf = [0u8; 4096];
+ let n = match stream.read(&mut buf) {
+ Ok(n) => n,
+ Err(_) => return,
+ };
+ let request = String::from_utf8_lossy(&buf[..n]);
+
+ let mut parts = request.split_whitespace();
+ let method = parts.next().unwrap_or("");
+ let path = match parts.next() {
+ Some(p) => p,
+ None => {
+ http_response(&mut stream, 400, "text/plain", b"Bad Request");
+ return;
+ }
+ };
+
+ if method != "GET" && method != "HEAD" {
+ http_response(
+ &mut stream,
+ 405,
+ "text/plain",
+ b"Method Not Allowed",
+ );
+ return;
+ }
+
+ if path == "/" || path == "/favicon.ico" {
+ http_response(
+ &mut stream,
+ 200,
+ "text/plain",
+ b"tesseras-url shortener\n",
+ );
+ return;
+ }
+
+ let slug = path.trim_start_matches('/');
+ if slug.is_empty() {
+ http_response(&mut stream, 400, "text/plain", b"missing slug");
+ return;
+ }
+
+ // Try local store first
+ let dht_key = UrlEntry::dht_key(slug);
+ let target = if let Some(data) = store.get_entry(&dht_key) {
+ UrlEntry::from_bytes(&data).map(|e| e.target_url)
+ } else {
+ // Fall back to daemon (DHT lookup)
+ dht_resolve_via_socket(sock_path, slug)
+ };
+
+ match target {
+ Some(url) => http_redirect(&mut stream, &url),
+ None => {
+ http_response(&mut stream, 404, "text/plain", b"not found")
+ }
+ }
+}
+
+/// Ask the daemon for a URL resolution via Unix socket.
+fn dht_resolve_via_socket(sock_path: &Path, slug: &str) -> Option<String> {
+ let sock = std::os::unix::net::UnixStream::connect(sock_path).ok()?;
+ sock.set_read_timeout(Some(Duration::from_secs(35))).ok();
+ sock.set_write_timeout(Some(Duration::from_secs(5))).ok();
+
+ let cmd = format!("RESOLVE {slug}\n");
+ (&sock).write_all(cmd.as_bytes()).ok()?;
+
+ let reader = BufReader::new(&sock);
+ let line = reader.lines().next()?.ok()?;
+ let url = line.strip_prefix("OK ")?;
+ Some(url.to_string())
+}
+
+fn http_redirect(stream: &mut std::net::TcpStream, location: &str) {
+ let header = format!(
+ "HTTP/1.1 302 Found\r\n\
+ Location: {location}\r\n\
+ Content-Length: 0\r\n\
+ Connection: close\r\n\
+ \r\n"
+ );
+ let _ = stream.write_all(header.as_bytes());
+}
+
+fn http_response(
+ stream: &mut std::net::TcpStream,
+ status: u16,
+ content_type: &str,
+ body: &[u8],
+) {
+ let reason = match status {
+ 200 => "OK",
+ 400 => "Bad Request",
+ 404 => "Not Found",
+ 405 => "Method Not Allowed",
+ 500 => "Internal Server Error",
+ 503 => "Service Unavailable",
+ _ => "Unknown",
+ };
+ let header = format!(
+ "HTTP/1.1 {status} {reason}\r\n\
+ Content-Type: {content_type}\r\n\
+ Content-Length: {}\r\n\
+ Connection: close\r\n\
+ \r\n",
+ body.len(),
+ );
+ let _ = stream.write_all(header.as_bytes());
+ let _ = stream.write_all(body);
+}