//! Daemon main loop, Unix socket listener, and HTTP server. use std::io::{BufRead, BufReader, Read, Write}; use std::path::Path; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use std::time::{Duration, Instant}; use tesseras_dht::Node; use crate::base58; use crate::ops; use crate::protocol::{self, Request, Response}; use crate::store::UrlStore; use crate::url_entry::UrlEntry; /// How often to garbage-collect expired entries (10 min). const GC_INTERVAL: Duration = Duration::from_secs(600); /// How often to republish local entries to the DHT (30 min). const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800); /// How often to persist routing table and state (5 min). const SAVE_INTERVAL: Duration = Duration::from_secs(300); /// How often to attempt re-join when routing table is empty (60 s). const REJOIN_INTERVAL: Duration = Duration::from_secs(60); /// How often to sync DHT-replicated values to local store (5 s). const SYNC_INTERVAL: Duration = Duration::from_secs(5); /// A request from the socket thread to the main thread. pub struct DaemonRequest { pub cmd: Request, pub reply: mpsc::Sender, } /// Run the daemon main loop. pub fn run_daemon( node: &mut Node, store: &UrlStore, rx: &mpsc::Receiver, shutdown: &AtomicBool, bootstrap: &[String], ) { let mut last_gc = Instant::now(); let mut last_republish = Instant::now() - REPUBLISH_INTERVAL; let mut last_save = Instant::now(); let mut last_sync = Instant::now(); let mut last_rejoin = Instant::now(); log::info!("daemon main loop started"); while !shutdown.load(Ordering::Relaxed) { let _ = node.poll_timeout(Duration::from_millis(100)); while let Ok(req) = rx.try_recv() { let is_shutdown = matches!(req.cmd, Request::Shutdown); let resp = handle_request(node, store, req.cmd); let _ = req.reply.send(resp); if is_shutdown { shutdown.store(true, Ordering::Relaxed); } } // Re-join bootstrap nodes when the routing table is empty if node.routing_table_size() == 0 && !bootstrap.is_empty() && last_rejoin.elapsed() >= REJOIN_INTERVAL { last_rejoin = Instant::now(); log::warn!("routing table empty, re-joining bootstrap nodes"); for peer in bootstrap { let parts: Vec<&str> = peer.rsplitn(2, ':').collect(); if parts.len() != 2 { continue; } let host = parts[1]; if let Ok(p) = parts[0].parse::() { use std::net::ToSocketAddrs; if let Ok(addrs) = format!("{host}:{p}").to_socket_addrs() { for addr in addrs { node.unban(&addr); } } if let Err(e) = node.join(host, p) { log::warn!("rejoin: failed to join {peer}: {e}"); } else { log::info!("rejoin: sent join to {peer}"); } } } } if last_sync.elapsed() >= SYNC_INTERVAL { last_sync = Instant::now(); sync_dht_to_store(node, store); } if last_gc.elapsed() >= GC_INTERVAL { last_gc = Instant::now(); match store.gc() { Ok(0) => {} Ok(n) => log::info!("gc: removed {n} expired entries"), Err(e) => log::warn!("gc: {e}"), } } if last_republish.elapsed() >= REPUBLISH_INTERVAL { last_republish = Instant::now(); republish(node, store); } if last_save.elapsed() >= SAVE_INTERVAL { last_save = Instant::now(); node.save_state(); } } log::info!("daemon main loop stopped, shutting down"); node.shutdown(); } /// Dispatch a single client request. fn handle_request( node: &mut Node, store: &UrlStore, cmd: Request, ) -> Response { match cmd { Request::Shorten { ttl_secs, slug, target_url, } => match ops::shorten_url(node, store, &target_url, ttl_secs, &slug) { Ok(slug) => Response::Ok(slug), Err(e) => Response::Err(e.to_string()), }, Request::Resolve { slug } => { match ops::resolve_url(node, store, &slug) { Ok(url) => Response::Ok(url), Err(e) => Response::Err(e.to_string()), } } Request::Del { slug } => { match ops::delete_url(node, store, &slug) { Ok(()) => Response::Ok("deleted".into()), Err(e) => Response::Err(e.to_string()), } } Request::List => { let entries = store.list_entries(); if entries.is_empty() { return Response::Ok("(empty)".into()); } let list: Vec = entries .iter() .map(|(slug, url)| format!("{slug}\t{url}")) .collect(); Response::Ok(base58::encode(list.join("\n").as_bytes())) } Request::Status => { let m = node.metrics(); let status = format!( "peers={} stored={} urls={} \ sent={} recv={} lookups={}/{}", node.routing_table_size(), node.storage_count(), store.entry_count(), m.messages_sent, m.messages_received, m.lookups_completed, m.lookups_started, ); Response::Ok(status) } Request::Shutdown => Response::Ok("shutting down".into()), } } /// Copy DHT-replicated values into the local file store. fn sync_dht_to_store(node: &Node, store: &UrlStore) { for (key, value) in node.dht_values() { if key.len() != 32 { continue; } if store.get_entry(&key).is_none() { let _ = store.put_entry(&key, &value); } } } /// Re-announce locally stored entries to the DHT. fn republish(node: &mut Node, store: &UrlStore) { let keys = store.original_keys(); if keys.is_empty() { return; } let mut count = 0u32; for key in &keys { if let Some(data) = store.get_entry(key) && let Some(entry) = UrlEntry::from_bytes(&data) { let remaining = if entry.ttl_secs == 0 { u16::MAX } else if entry.is_expired() { continue; } else { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs(); let expires = entry.created_at.saturating_add(entry.ttl_secs); let rem = expires.saturating_sub(now); std::cmp::min(rem, u16::MAX as u64) as u16 }; node.put(key, &data, remaining, false); count += 1; } } if count > 0 { log::info!("republish: announced {count} entries to DHT"); } } /// Run the Unix socket listener thread. pub fn run_unix_listener( sock_path: &Path, tx: mpsc::Sender, shutdown: &AtomicBool, ) { let _ = std::fs::remove_file(sock_path); let listener = match std::os::unix::net::UnixListener::bind(sock_path) { Ok(l) => l, Err(e) => { log::error!( "unix: failed to bind {}: {e}", sock_path.display() ); return; } }; use std::os::unix::fs::PermissionsExt; let perms = std::fs::Permissions::from_mode(0o770); if let Err(e) = std::fs::set_permissions(sock_path, perms) { log::warn!("unix: failed to set socket permissions: {e}"); } if let Err(e) = listener.set_nonblocking(true) { log::error!("unix: failed to set non-blocking: {e}"); return; } log::info!("unix: listening on {}", sock_path.display()); while !shutdown.load(Ordering::Relaxed) { match listener.accept() { Ok((stream, _)) => { if let Err(e) = handle_client(stream, &tx) { log::debug!("unix: client disconnected: {e}"); } } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { std::thread::sleep(Duration::from_millis(50)); } Err(e) => { log::warn!("unix: accept failed: {e}"); std::thread::sleep(Duration::from_millis(100)); } } } let _ = std::fs::remove_file(sock_path); } /// Maximum protocol line size (16 KiB is generous for URLs). const MAX_LINE_SIZE: usize = 16 * 1024; fn handle_client( stream: std::os::unix::net::UnixStream, tx: &mpsc::Sender, ) -> Result<(), Box> { if let Err(e) = stream.set_nonblocking(false) { log::warn!("unix: failed to set blocking mode: {e}"); return Err(e.into()); } if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(60))) { log::warn!("unix: failed to set read timeout: {e}"); return Err(e.into()); } let mut reader = BufReader::new(&stream); let mut writer = &stream; let mut line = String::new(); loop { line.clear(); let n = (&mut reader) .take(MAX_LINE_SIZE as u64) .read_line(&mut line)?; if n == 0 { break; } if !line.ends_with('\n') && n >= MAX_LINE_SIZE { let resp = protocol::format_response(&Response::Err( "request too large".into(), )); writer.write_all(resp.as_bytes())?; let mut discard = Vec::new(); let _ = (&mut reader) .take(MAX_LINE_SIZE as u64) .read_until(b'\n', &mut discard); continue; } let line = line.trim(); let cmd = match protocol::parse_request(line) { Ok(c) => c, Err(e) => { let resp = protocol::format_response(&Response::Err(e)); writer.write_all(resp.as_bytes())?; continue; } }; let is_shutdown = matches!(cmd, Request::Shutdown); let (reply_tx, reply_rx) = mpsc::channel(); tx.send(DaemonRequest { cmd, reply: reply_tx, })?; let resp = reply_rx .recv_timeout(Duration::from_secs(60)) .unwrap_or(Response::Err("timeout".into())); writer.write_all(protocol::format_response(&resp).as_bytes())?; if is_shutdown { break; } } Ok(()) } // ── HTTP server ───────────────────────────────────── /// Maximum concurrent HTTP handler threads. const MAX_HTTP_THREADS: usize = 8; /// Minimal HTTP server. Redirects at /. pub fn run_http( port: u16, sock_path: &Path, store: &UrlStore, shutdown: &AtomicBool, ) { let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); let listener = match std::net::TcpListener::bind(addr) { Ok(l) => l, Err(e) => { log::error!("http: failed to bind {addr}: {e}"); return; } }; if let Err(e) = listener.set_nonblocking(true) { log::error!("http: failed to set non-blocking: {e}"); return; } log::info!("http: listening on {addr}"); let active = Arc::new(std::sync::atomic::AtomicUsize::new(0)); let sock_owned = sock_path.to_path_buf(); while !shutdown.load(Ordering::Relaxed) { match listener.accept() { Ok((stream, _)) => { if active.load(Ordering::Relaxed) >= MAX_HTTP_THREADS { log::warn!("http: max connections reached, rejecting"); let mut s = stream; let _ = s.write_all( b"HTTP/1.1 503 Service Unavailable\r\n\ Connection: close\r\n\r\n", ); continue; } let store = store.clone(); let sock = sock_owned.clone(); let counter = Arc::clone(&active); counter.fetch_add(1, Ordering::Relaxed); std::thread::spawn(move || { handle_http(stream, &store, &sock); counter.fetch_sub(1, Ordering::Relaxed); }); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { std::thread::sleep(Duration::from_millis(50)); } Err(e) => { log::debug!("http: accept failed: {e}"); std::thread::sleep(Duration::from_millis(100)); } } } } fn handle_http( mut stream: std::net::TcpStream, store: &UrlStore, sock_path: &Path, ) { use std::io::Read; stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); let mut buf = [0u8; 4096]; let n = match stream.read(&mut buf) { Ok(n) => n, Err(_) => return, }; let request = String::from_utf8_lossy(&buf[..n]); let mut parts = request.split_whitespace(); let method = parts.next().unwrap_or(""); let path = match parts.next() { Some(p) => p, None => { http_response(&mut stream, 400, "text/plain", b"Bad Request"); return; } }; if method != "GET" && method != "HEAD" { http_response( &mut stream, 405, "text/plain", b"Method Not Allowed", ); return; } if path == "/" || path == "/favicon.ico" { http_response( &mut stream, 200, "text/plain", b"tesseras-url shortener\n", ); return; } let slug = path.trim_start_matches('/'); if slug.is_empty() { http_response(&mut stream, 400, "text/plain", b"missing slug"); return; } // Try local store first let dht_key = UrlEntry::dht_key(slug); let target = if let Some(data) = store.get_entry(&dht_key) { UrlEntry::from_bytes(&data).map(|e| e.target_url) } else { // Fall back to daemon (DHT lookup) dht_resolve_via_socket(sock_path, slug) }; match target { Some(url) => http_redirect(&mut stream, &url), None => { http_response(&mut stream, 404, "text/plain", b"not found") } } } /// Ask the daemon for a URL resolution via Unix socket. fn dht_resolve_via_socket(sock_path: &Path, slug: &str) -> Option { let sock = std::os::unix::net::UnixStream::connect(sock_path).ok()?; sock.set_read_timeout(Some(Duration::from_secs(35))).ok(); sock.set_write_timeout(Some(Duration::from_secs(5))).ok(); let cmd = format!("RESOLVE {slug}\n"); (&sock).write_all(cmd.as_bytes()).ok()?; let reader = BufReader::new(&sock); let line = reader.lines().next()?.ok()?; let url = line.strip_prefix("OK ")?; Some(url.to_string()) } fn http_redirect(stream: &mut std::net::TcpStream, location: &str) { let header = format!( "HTTP/1.1 302 Found\r\n\ Location: {location}\r\n\ Content-Length: 0\r\n\ Connection: close\r\n\ \r\n" ); let _ = stream.write_all(header.as_bytes()); } fn http_response( stream: &mut std::net::TcpStream, status: u16, content_type: &str, body: &[u8], ) { let reason = match status { 200 => "OK", 400 => "Bad Request", 404 => "Not Found", 405 => "Method Not Allowed", 500 => "Internal Server Error", 503 => "Service Unavailable", _ => "Unknown", }; let header = format!( "HTTP/1.1 {status} {reason}\r\n\ Content-Type: {content_type}\r\n\ Content-Length: {}\r\n\ Connection: close\r\n\ \r\n", body.len(), ); let _ = stream.write_all(header.as_bytes()); let _ = stream.write_all(body); }