diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/bin/tp.rs | 323 | ||||
| -rw-r--r-- | src/daemon.rs | 186 | ||||
| -rw-r--r-- | src/ops.rs | 167 | ||||
| -rw-r--r-- | src/paste.rs | 21 | ||||
| -rw-r--r-- | src/protocol.rs | 34 | ||||
| -rw-r--r-- | src/store.rs | 121 |
6 files changed, 735 insertions, 117 deletions
diff --git a/src/bin/tp.rs b/src/bin/tp.rs index fbfc872..9a69832 100644 --- a/src/bin/tp.rs +++ b/src/bin/tp.rs @@ -2,32 +2,84 @@ //! //! Sends commands to the `tpd` daemon over a Unix socket. //! Reads paste content from stdin (put) and writes it to -//! stdout (get). +//! stdout (get). Large pastes (> 8 KiB) are automatically +//! split into chunks on the client side. +use std::collections::BTreeMap; use std::io::{BufRead, BufReader, Read, Write}; use std::os::unix::net::UnixStream; use std::path::PathBuf; #[path = "../base58.rs"] mod base58; +#[path = "../crypto.rs"] +mod crypto; #[path = "../sandbox.rs"] mod sandbox; +/// Maximum paste size: 1.44 MB (floppy disk). +const MAX_PASTE: usize = 1_440 * 1024; + +/// Chunk size matching the DHT fragment limit. +const CHUNK_SIZE: usize = 8 * 1024; + fn default_socket() -> PathBuf { PathBuf::from("/var/tesseras-paste/daemon.sock") } +fn labels_path() -> PathBuf { + if let Ok(home) = std::env::var("HOME") { + PathBuf::from(home).join(".config/tp/labels") + } else { + PathBuf::from("/tmp/tp-labels") + } +} + +fn load_labels(path: &PathBuf) -> BTreeMap<String, String> { + let mut map = BTreeMap::new(); + let data = match std::fs::read_to_string(path) { + Ok(d) => d, + Err(_) => return map, + }; + for line in data.lines() { + if let Some((key, label)) = line.split_once('\t') { + map.insert(key.to_string(), label.to_string()); + } + } + map +} + +fn save_labels(path: &PathBuf, labels: &BTreeMap<String, String>) { + if let Some(parent) = path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let mut buf = String::new(); + for (key, label) in labels { + buf.push_str(key); + buf.push('\t'); + buf.push_str(label); + buf.push('\n'); + } + if let Err(e) = std::fs::write(path, buf.as_bytes()) { + eprintln!("warning: could not save labels: {e}"); + } +} + fn usage() { eprintln!("usage: tp [-s sock] [-v] <command> [args]"); eprintln!(); eprintln!("commands:"); - eprintln!(" put [-t ttl] [-p] read stdin, store paste"); + eprintln!(" put [-t ttl] [-p] [-l label]"); + eprintln!(" read stdin, store paste"); eprintln!(" -p public (no encryption)"); + eprintln!(" -l attach a label"); eprintln!(" get <key> retrieve paste to stdout"); - eprintln!(" del <key> delete paste"); - eprintln!(" pin <key> pin (never expires)"); - eprintln!(" unpin <key> unpin"); - eprintln!(" status show daemon status"); + eprintln!(" del <key> delete paste"); + eprintln!(" pin <key> pin (never expires)"); + eprintln!(" unpin <key> unpin"); + eprintln!(" list list labeled pastes"); + eprintln!(" label <key> <text> add or update a label"); + eprintln!(" status show daemon status"); eprintln!(); eprintln!(" -s sock Unix socket path"); eprintln!(" -v verbose output"); @@ -49,6 +101,47 @@ fn parse_ttl(s: &str) -> Result<u64, String> { } } +/// Send a request over the socket and read the response. +/// Returns the data on OK, or exits on ERR. +fn send_recv( + stream: &UnixStream, + reader: &mut BufReader<&UnixStream>, + request: &str, + verbose: bool, +) -> String { + if verbose { + eprintln!(">> {}", request.trim()); + } + if let Err(e) = (stream as &UnixStream).write_all(request.as_bytes()) { + eprintln!("error: writing to socket: {e}"); + std::process::exit(1); + } + let mut line = String::new(); + match reader.read_line(&mut line) { + Ok(0) => { + eprintln!("error: daemon closed connection"); + std::process::exit(1); + } + Err(e) => { + eprintln!("error: reading from socket: {e}"); + std::process::exit(1); + } + _ => {} + } + if verbose { + eprintln!("<< {}", line.trim()); + } + if let Some(data) = line.trim().strip_prefix("OK ") { + data.to_string() + } else if let Some(msg) = line.trim().strip_prefix("ERR ") { + eprintln!("error: {msg}"); + std::process::exit(1); + } else { + eprintln!("error: unexpected response: {}", line.trim()); + std::process::exit(1); + } +} + fn main() { let args: Vec<String> = std::env::args().collect(); @@ -89,12 +182,62 @@ fn main() { } let command = &cmd_args[0]; + + // Handle client-only commands before sandboxing + match command.as_str() { + "list" => { + let lpath = labels_path(); + let labels = load_labels(&lpath); + if labels.is_empty() { + eprintln!("no labels"); + } else { + for (key, label) in &labels { + println!("{key}\t{label}"); + } + } + return; + } + "label" => { + let key = cmd_args.get(1).unwrap_or_else(|| { + eprintln!("error: label requires a key and text"); + std::process::exit(1); + }); + let text: String = cmd_args[2..].join(" "); + if text.is_empty() { + eprintln!("error: label requires text"); + std::process::exit(1); + } + let lpath = labels_path(); + let mut labels = load_labels(&lpath); + labels.insert(key.clone(), text); + save_labels(&lpath, &labels); + return; + } + _ => {} + } + + // ── Parse command into request(s) ────────────────── + + // For most commands we build a single request string. + // The "put" command may use chunked mode (multiple requests). + enum PutData { + Single(String), + Chunked { + data: Vec<u8>, + ttl_secs: u64, + enc_key: Option<[u8; crypto::KEY_SIZE]>, + }, + } + let mut is_get = false; + let mut put_label: Option<String> = None; + let mut del_key: Option<String> = None; - let request = match command.as_str() { + let put_data = match command.as_str() { "put" => { let mut ttl = "24h".to_string(); let mut public = false; + let mut label: Option<String> = None; let mut j = 1; while j < cmd_args.len() { match cmd_args[j].as_str() { @@ -105,6 +248,12 @@ fn main() { } } "-p" => public = true, + "-l" => { + j += 1; + if j < cmd_args.len() { + label = Some(cmd_args[j].clone()); + } + } _ => {} } j += 1; @@ -116,9 +265,7 @@ fn main() { std::process::exit(1); } }; - // Read at most MAX_PASTE + 1 byte so we can detect - // oversized input without unbounded allocation. - const MAX_PASTE: usize = 64 * 1024; + let mut content = Vec::new(); match std::io::stdin() .take((MAX_PASTE + 1) as u64) @@ -129,7 +276,7 @@ fn main() { std::process::exit(1); } Ok(n) if n > MAX_PASTE => { - eprintln!("error: input exceeds 64 KiB limit"); + eprintln!("error: input exceeds 1.44 MB limit"); std::process::exit(1); } Err(e) => { @@ -138,8 +285,31 @@ fn main() { } _ => {} } - let cmd = if public { "PUTP" } else { "PUT" }; - format!("{cmd} {ttl_secs} {}\n", base58::encode(&content)) + + put_label = label; + + if content.len() <= CHUNK_SIZE { + // Small paste — single PUT/PUTP + let cmd = if public { "PUTP" } else { "PUT" }; + PutData::Single(format!( + "{cmd} {ttl_secs} {}\n", + base58::encode(&content) + )) + } else { + // Large paste — client-side chunking + let (data, enc_key) = if public { + (content, None) + } else { + let key = crypto::generate_key(); + let encrypted = crypto::encrypt(&key, &content); + (encrypted, Some(key)) + }; + PutData::Chunked { + data, + ttl_secs, + enc_key, + } + } } "get" => { let key = cmd_args.get(1).unwrap_or_else(|| { @@ -147,30 +317,31 @@ fn main() { std::process::exit(1); }); is_get = true; - format!("GET {key}\n") + PutData::Single(format!("GET {key}\n")) } "del" => { let key = cmd_args.get(1).unwrap_or_else(|| { eprintln!("error: del requires a key"); std::process::exit(1); }); - format!("DEL {key}\n") + del_key = Some(key.clone()); + PutData::Single(format!("DEL {key}\n")) } "pin" => { let key = cmd_args.get(1).unwrap_or_else(|| { eprintln!("error: pin requires a key"); std::process::exit(1); }); - format!("PIN {key}\n") + PutData::Single(format!("PIN {key}\n")) } "unpin" => { let key = cmd_args.get(1).unwrap_or_else(|| { eprintln!("error: unpin requires a key"); std::process::exit(1); }); - format!("UNPIN {key}\n") + PutData::Single(format!("UNPIN {key}\n")) } - "status" => "STATUS\n".to_string(), + "status" => PutData::Single("STATUS\n".to_string()), other => { eprintln!("unknown command: {other}"); usage(); @@ -179,19 +350,23 @@ fn main() { }; // ── Sandbox ───────────────────────────────────── + let lpath = labels_path(); sandbox::do_unveil(&sock_path, "rw"); + if let Some(parent) = lpath.parent() { + let _ = std::fs::create_dir_all(parent); + sandbox::do_unveil(parent, "rwc"); + } sandbox::unveil_lock(); - sandbox::do_pledge("stdio unix rpath"); + sandbox::do_pledge("stdio unix rpath wpath cpath"); if verbose { eprintln!("socket: {}", sock_path.display()); - eprintln!(">> {}", request.trim()); } let stream = match UnixStream::connect(&sock_path) { Ok(s) => s, Err(e) => { - eprintln!("error: cannot connect to {}: {e}", sock_path.display(),); + eprintln!("error: cannot connect to {}: {e}", sock_path.display()); eprintln!("hint: is tpd running?"); std::process::exit(1); } @@ -201,25 +376,81 @@ fn main() { .set_read_timeout(Some(std::time::Duration::from_secs(60))) .ok(); - let mut writer = &stream; - if let Err(e) = writer.write_all(request.as_bytes()) { - eprintln!("error: writing to socket: {e}"); - std::process::exit(1); - } + let mut reader = BufReader::new(&stream); + + match put_data { + PutData::Chunked { + data, + ttl_secs, + enc_key, + } => { + // ── Chunked put ───────────────────────── + let n_chunks = + (data.len() + CHUNK_SIZE - 1) / CHUNK_SIZE; + if verbose { + eprintln!( + "chunked: {} bytes, {} chunks", + data.len(), + n_chunks, + ); + } + + // Send each chunk via PUTC + let mut chunk_hashes: Vec<Vec<u8>> = Vec::new(); + for (i, chunk) in data.chunks(CHUNK_SIZE).enumerate() { + let req = format!( + "PUTC {} {}\n", + ttl_secs, + base58::encode(chunk), + ); + let hash_b58 = send_recv(&stream, &mut reader, &req, verbose); + let hash = base58::decode(&hash_b58).unwrap_or_else(|| { + eprintln!("error: invalid hash from daemon"); + std::process::exit(1); + }); + if verbose { + eprintln!("chunk {}/{}: {hash_b58}", i + 1, n_chunks); + } + chunk_hashes.push(hash); + } + + // Build manifest: count(u16 BE) || hash1 || hash2 || ... + let count = chunk_hashes.len() as u16; + let mut manifest = + Vec::with_capacity(2 + 32 * chunk_hashes.len()); + manifest.extend_from_slice(&count.to_be_bytes()); + for hash in &chunk_hashes { + manifest.extend_from_slice(hash); + } + + let req = format!( + "PUTM {} {}\n", + ttl_secs, + base58::encode(&manifest), + ); + let manifest_hash = send_recv(&stream, &mut reader, &req, verbose); - let reader = BufReader::new(&stream); - for line in reader.lines() { - let line = match line { - Ok(l) => l, - Err(_) => break, - }; - if verbose { - eprintln!("<< {}", line); + let key_str = match enc_key { + Some(key) => { + format!("{manifest_hash}#{}", base58::encode(&key)) + } + None => manifest_hash, + }; + println!("{key_str}"); + + // Save label + if let Some(ref label) = put_label { + let mut labels = load_labels(&lpath); + labels.insert(key_str, label.clone()); + save_labels(&lpath, &labels); + } } - if let Some(data) = line.strip_prefix("OK ") { + PutData::Single(request) => { + // ── Single request ────────────────────── + let data = send_recv(&stream, &mut reader, &request, verbose); + if is_get { - // Decode base58 → raw bytes → stdout - match base58::decode(data) { + match base58::decode(&data) { Some(bytes) => { if let Err(e) = std::io::stdout().write_all(&bytes) { eprintln!("error: writing to stdout: {e}"); @@ -231,10 +462,20 @@ fn main() { } else { println!("{data}"); } - break; - } else if let Some(msg) = line.strip_prefix("ERR ") { - eprintln!("error: {msg}"); - std::process::exit(1); + + // Save label on successful put + if let Some(ref label) = put_label { + let mut labels = load_labels(&lpath); + labels.insert(data.to_string(), label.clone()); + save_labels(&lpath, &labels); + } + // Remove label on successful del + if let Some(ref key) = del_key { + let mut labels = load_labels(&lpath); + if labels.remove(key).is_some() { + save_labels(&lpath, &labels); + } + } } } } diff --git a/src/daemon.rs b/src/daemon.rs index 8952d78..62e0977 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -24,8 +24,10 @@ use crate::store::PasteStore; /// How often to garbage-collect expired pastes (10 min). const GC_INTERVAL: Duration = Duration::from_secs(600); -/// How often to republish local pastes to the DHT (30 min). -const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800); +/// How often to republish local pastes to the DHT (5 min). +/// Kept short so chunked pastes reach peers promptly; +/// per-put throttling prevents burst bans. +const REPUBLISH_INTERVAL: Duration = Duration::from_secs(300); /// How often to persist routing table and state (5 min). const SAVE_INTERVAL: Duration = Duration::from_secs(300); @@ -297,6 +299,32 @@ fn handle_request( Err(e) => Response::Err(e.to_string()), } } + Request::PutChunk { + ttl_secs, + content_b58, + } => { + let content = match base58::decode(&content_b58) { + Some(c) => c, + None => return Response::Err("invalid base58 content".into()), + }; + match ops::put_chunk(store, &content, ttl_secs) { + Ok(hash) => Response::Ok(hash), + Err(e) => Response::Err(e.to_string()), + } + } + Request::PutManifest { + ttl_secs, + content_b58, + } => { + let content = match base58::decode(&content_b58) { + Some(c) => c, + None => return Response::Err("invalid base58 content".into()), + }; + match ops::put_manifest(node, store, &content, ttl_secs) { + Ok(hash) => Response::Ok(hash), + Err(e) => Response::Err(e.to_string()), + } + } Request::Get { key } => match ops::get_paste(node, store, &key) { Ok(data) => Response::Ok(base58::encode(&data)), Err(e) => Response::Err(e.to_string()), @@ -328,11 +356,12 @@ fn handle_request( Request::Status => { let m = node.metrics(); let status = format!( - "peers={} stored={} pastes={} \ + "peers={} stored={} pastes={} chunks={} \ sent={} recv={} lookups={}/{}", node.routing_table_size(), node.storage_count(), store.paste_count(), + store.chunk_count(), m.messages_sent, m.messages_received, m.lookups_started, @@ -346,7 +375,35 @@ fn handle_request( /// Copy DHT-replicated values into the local file store so /// the HTTP server can serve them without a DHT lookup. +/// Values already in pastes/ or chunks/ are skipped. +/// Values referenced by a local manifest are stored as chunks. fn sync_dht_to_store(node: &Node, store: &PasteStore) { + // Collect chunk hashes from all known manifests so we + // can route incoming DHT values to the right directory. + let mut known_chunks = std::collections::HashSet::new(); + for key in store.original_keys() { + if let Some(data) = store.get_paste(&key) { + if data.first() == Some(&crate::paste::FORMAT_VERSION_CHUNKED) { + if let Some(paste) = Paste::from_bytes(&data) { + let content = &paste.content; + if content.len() >= 2 { + let count = + u16::from_be_bytes([content[0], content[1]]) + as usize; + for i in 0..count { + let start = 2 + i * 32; + if start + 32 <= content.len() { + known_chunks.insert( + content[start..start + 32].to_vec(), + ); + } + } + } + } + } + } + } + for (key, value) in node.dht_values() { if key.len() != 32 { continue; @@ -354,23 +411,37 @@ fn sync_dht_to_store(node: &Node, store: &PasteStore) { if store.is_blocked(&key) { continue; } - if store.get_paste(&key).is_none() { + if store.get_paste(&key).is_some() || store.get_chunk(&key).is_some() { + continue; + } + if known_chunks.contains(key.as_slice()) { + let _ = store.put_chunk(&key, &value); + } else { let _ = store.put_paste(&key, &value); } } } -/// Re-announce locally stored pastes to the DHT so they -/// remain reachable as nodes join and leave the network. +/// Delay between consecutive DHT store operations during +/// republish to avoid triggering rate-limit bans on peers. +const REPUBLISH_THROTTLE: Duration = Duration::from_millis(200); + +/// Re-announce locally stored pastes and chunks to the DHT so +/// they remain reachable as nodes join and leave the network. +/// Inserts a small delay between puts to avoid bursting. fn republish(node: &mut Node, store: &PasteStore) { - let keys = store.original_keys(); - if keys.is_empty() { + let mut paste_keys = store.original_keys(); + let chunk_keys = store.chunk_keys(); + paste_keys.extend(chunk_keys); + + if paste_keys.is_empty() { return; } let mut count = 0u32; - for key in &keys { - if let Some(data) = store.get_paste(key) + for key in &paste_keys { + let data = store.get_paste(key).or_else(|| store.get_chunk(key)); + if let Some(data) = data && let Some(paste) = Paste::from_bytes(&data) { let remaining = if store.is_pinned(key) { @@ -386,6 +457,10 @@ fn republish(node: &mut Node, store: &PasteStore) { let rem = expires.saturating_sub(now); std::cmp::min(rem, u16::MAX as u64) as u16 }; + if count > 0 { + std::thread::sleep(REPUBLISH_THROTTLE); + let _ = node.poll_timeout(Duration::from_millis(1)); + } node.put(key, &data, remaining, false); count += 1; } @@ -445,9 +520,10 @@ pub fn run_unix_listener( let _ = std::fs::remove_file(sock_path); } -/// Maximum protocol line size (128 KiB covers the 64 KiB paste -/// limit after base58 expansion plus command overhead). -const MAX_LINE_SIZE: usize = 128 * 1024; +/// Maximum protocol line size. With client-side chunking, +/// the largest request is a PUTC with an 8 KiB chunk +/// (~11 KiB base58-encoded) plus command overhead. +const MAX_LINE_SIZE: usize = 16 * 1024; /// Read requests line-by-line from a connected Unix socket /// client, forwarding each to the daemon main loop via `tx`. @@ -657,18 +733,34 @@ fn handle_http( None => hash_b58.to_string(), }; - // Try local store first (fast path) + // Try local store first (fast path). let body = if let Some(data) = store.get_paste(&hash) { - match serve_paste_data(&data, enc_key_b58) { - Ok(b) => b, - Err((status, msg)) => { - http_response( - &mut stream, - status, - "text/plain", - msg.as_bytes(), - ); - return; + if data.first() == Some(&crate::paste::FORMAT_VERSION_CHUNKED) { + // Chunked manifest — reassemble directly from store. + match serve_chunked_paste(&data, store, enc_key_b58) { + Ok(b) => b, + Err((status, msg)) => { + http_response( + &mut stream, + status, + "text/plain", + msg.as_bytes(), + ); + return; + } + } + } else { + match serve_paste_data(&data, enc_key_b58) { + Ok(b) => b, + Err((status, msg)) => { + http_response( + &mut stream, + status, + "text/plain", + msg.as_bytes(), + ); + return; + } } } } else { @@ -712,6 +804,52 @@ fn serve_paste_data( } } +/// Reassemble a chunked paste directly from the local store. +/// Reads the manifest, fetches each chunk from chunks/ or +/// pastes/, concatenates, and optionally decrypts. +fn serve_chunked_paste( + manifest_data: &[u8], + store: &PasteStore, + enc_key_b58: Option<&str>, +) -> Result<Vec<u8>, (u16, &'static str)> { + let paste = Paste::from_bytes(manifest_data).ok_or((500, "corrupt manifest"))?; + let content = &paste.content; + + if content.len() < 2 { + return Err((500, "corrupt manifest")); + } + let count = u16::from_be_bytes([content[0], content[1]]) as usize; + if content.len() != 2 + count * 32 { + return Err((500, "corrupt manifest")); + } + + let mut assembled = Vec::new(); + for i in 0..count { + let start = 2 + i * 32; + let chunk_hash = &content[start..start + 32]; + let chunk_data = store + .get_chunk(chunk_hash) + .or_else(|| store.get_paste(chunk_hash)) + .ok_or((404, "missing chunk"))?; + let chunk_paste = + Paste::from_bytes(&chunk_data).ok_or((500, "corrupt chunk"))?; + assembled.extend_from_slice(&chunk_paste.content); + } + + if let Some(kb58) = enc_key_b58 { + let key_bytes = base58::decode(kb58).ok_or((400, "invalid enc key"))?; + if key_bytes.len() != 32 { + return Err((400, "invalid enc key")); + } + let mut key = [0u8; 32]; + key.copy_from_slice(&key_bytes); + crate::crypto::decrypt(&key, &assembled) + .ok_or((403, "decryption failed")) + } else { + Ok(assembled) + } +} + /// Ask the daemon for a paste via Unix socket (triggers /// a DHT lookup if not in local store). /// Key format: "hash" or "hash#enckey". @@ -9,7 +9,7 @@ use tesseras_dht::Node; use crate::base58; use crate::crypto; -use crate::paste::{MAX_PASTE_SIZE, Paste}; +use crate::paste::{CHUNK_SIZE, FORMAT_VERSION_CHUNKED, MAX_PASTE_SIZE, Paste}; use crate::store::PasteStore; /// Timeout for blocking DHT lookups. @@ -39,6 +39,52 @@ impl std::fmt::Display for PasteError { } } +/// Parse a manifest content into a list of chunk hashes. +fn parse_manifest(content: &[u8]) -> Option<Vec<[u8; 32]>> { + if content.len() < 2 { + return None; + } + let count = u16::from_be_bytes([content[0], content[1]]) as usize; + if content.len() != 2 + count * 32 { + return None; + } + let mut hashes = Vec::with_capacity(count); + for i in 0..count { + let start = 2 + i * 32; + let mut hash = [0u8; 32]; + hash.copy_from_slice(&content[start..start + 32]); + hashes.push(hash); + } + Some(hashes) +} + +/// Fetch paste data from local store (pastes + chunks), +/// falling back to a DHT lookup. +fn fetch_paste_data( + node: &mut Node, + store: &PasteStore, + hash: &[u8], +) -> Result<Vec<u8>, PasteError> { + if let Some(local) = store.get_paste(hash) { + return Ok(local); + } + if let Some(local) = store.get_chunk(hash) { + return Ok(local); + } + let vals = node.get_blocking(hash, OP_TIMEOUT); + if vals.is_empty() { + return Err(PasteError::NotFound); + } + match vals.iter().find(|v| { + Paste::from_bytes(v) + .map(|p| Paste::content_key(&p.content) == *hash) + .unwrap_or(false) + }) { + Some(v) => Ok(v.clone()), + None => Err(PasteError::NotFound), + } +} + /// Decode the hash portion of a key string ("hash#enckey" or "hash"). /// Returns the 32-byte hash. fn parse_hash(key_str: &str) -> Result<Vec<u8>, PasteError> { @@ -50,8 +96,11 @@ fn parse_hash(key_str: &str) -> Result<Vec<u8>, PasteError> { Ok(hash) } -/// Store a paste. If `encrypt` is true, encrypts the content and -/// returns "hash#enckey" in base58. Otherwise returns just the hash. +/// Store a single paste. If `encrypt` is true, encrypts the content +/// and returns "hash#enckey" in base58. Otherwise returns just the hash. +/// +/// For content larger than [`CHUNK_SIZE`], use the chunked protocol +/// (PUTC + PUTM) from the client instead. pub fn put_paste( node: &mut Node, store: &PasteStore, @@ -94,8 +143,61 @@ pub fn put_paste( } } +/// Store a single chunk in the chunks directory. +/// Returns the chunk hash in base58. +pub fn put_chunk( + store: &PasteStore, + content: &[u8], + ttl_secs: u64, +) -> Result<String, PasteError> { + if content.len() > CHUNK_SIZE { + return Err(PasteError::TooLarge); + } + let paste = Paste::new(content.to_vec(), ttl_secs); + let serialized = paste.to_bytes(); + let hash = Paste::content_key(&paste.content); + store + .put_chunk(&hash, &serialized) + .map_err(|e| PasteError::Internal(e.to_string()))?; + Ok(base58::encode(&hash)) +} + +/// Store a chunked-paste manifest (version 2). +/// The manifest is announced to the DHT immediately; +/// chunks are replicated via the periodic republish cycle. +pub fn put_manifest( + node: &mut Node, + store: &PasteStore, + content: &[u8], + ttl_secs: u64, +) -> Result<String, PasteError> { + if parse_manifest(content).is_none() { + return Err(PasteError::Internal("invalid manifest".into())); + } + let manifest = Paste { + version: FORMAT_VERSION_CHUNKED, + content: content.to_vec(), + created_at: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + ttl_secs, + }; + let serialized = manifest.to_bytes(); + let hash = Paste::content_key(&manifest.content); + store + .put_paste(&hash, &serialized) + .map_err(|e| PasteError::Internal(e.to_string()))?; + let dht_ttl = std::cmp::min(ttl_secs, u16::MAX as u64) as u16; + node.put(&hash, &serialized, dht_ttl, false); + let hash_b58 = base58::encode(&hash); + log::info!("put: stored manifest {hash_b58}"); + Ok(hash_b58) +} + /// Retrieve a paste by key ("hash#enckey" or bare "hash"). /// Tries local store first, then falls back to a blocking DHT lookup. +/// Transparently reassembles chunked (version-2) pastes. pub fn get_paste( node: &mut Node, store: &PasteStore, @@ -111,31 +213,29 @@ pub fn get_paste( return Err(PasteError::InvalidKey); } - let data = if let Some(local) = store.get_paste(&hash) { - local - } else { - let vals = node.get_blocking(&hash, OP_TIMEOUT); - if vals.is_empty() { - return Err(PasteError::NotFound); - } - // Verify DHT result: the content hash must match the - // requested key to prevent a malicious node from - // injecting arbitrary data. - match vals.iter().find(|v| { - Paste::from_bytes(v) - .map(|p| Paste::content_key(&p.content) == *hash) - .unwrap_or(false) - }) { - Some(v) => v.clone(), - None => return Err(PasteError::NotFound), - } - }; - + let data = fetch_paste_data(node, store, &hash)?; let paste = Paste::from_bytes(&data).ok_or(PasteError::InvalidKey)?; + if paste.is_expired() && !store.is_pinned(&hash) { return Err(PasteError::Expired); } + // Reassemble chunked paste (version 2 = manifest). + let content = if paste.version == FORMAT_VERSION_CHUNKED { + let chunk_hashes = parse_manifest(&paste.content) + .ok_or(PasteError::Internal("corrupt manifest".into()))?; + let mut assembled = Vec::new(); + for chunk_hash in &chunk_hashes { + let chunk_data = fetch_paste_data(node, store, chunk_hash)?; + let chunk_paste = Paste::from_bytes(&chunk_data) + .ok_or(PasteError::Internal("corrupt chunk".into()))?; + assembled.extend_from_slice(&chunk_paste.content); + } + assembled + } else { + paste.content + }; + if let Some(kb58) = enc_key_b58 { let key_bytes = base58::decode(kb58).ok_or(PasteError::InvalidKey)?; if key_bytes.len() != crypto::KEY_SIZE { @@ -143,22 +243,39 @@ pub fn get_paste( } let mut key = [0u8; crypto::KEY_SIZE]; key.copy_from_slice(&key_bytes); - crypto::decrypt(&key, &paste.content) + crypto::decrypt(&key, &content) .ok_or(PasteError::DecryptionFailed) } else { - Ok(paste.content) + Ok(content) } } /// Delete a paste from local store and the DHT. /// Creates a block marker so the paste is not /// re-imported from the DHT by sync. +/// For chunked pastes, also deletes all chunks. pub fn delete_paste( node: &mut Node, store: &PasteStore, key_str: &str, ) -> Result<(), PasteError> { let hash = parse_hash(key_str)?; + + // If this is a chunked manifest, delete its chunks too. + if let Some(data) = store.get_paste(&hash) { + if let Some(paste) = Paste::from_bytes(&data) { + if paste.version == FORMAT_VERSION_CHUNKED { + if let Some(chunk_hashes) = parse_manifest(&paste.content) { + for chunk_hash in &chunk_hashes { + store.block(chunk_hash); + store.remove_chunk(chunk_hash); + node.delete(chunk_hash); + } + } + } + } + } + store.block(&hash); store.remove_paste(&hash); store.unpin(&hash).ok(); diff --git a/src/paste.rs b/src/paste.rs index 8bfe979..6567b9d 100644 --- a/src/paste.rs +++ b/src/paste.rs @@ -8,12 +8,25 @@ use tesseras_dht::sha2::{Digest, Sha256}; -/// Maximum paste size: 64 KiB. -pub const MAX_PASTE_SIZE: usize = 64 * 1024; - -/// Current format version. +/// Maximum paste size: 1.44 MB (floppy disk). +pub const MAX_PASTE_SIZE: usize = 1_440 * 1024; + +/// Chunk size for large pastes: 8 KiB. +/// The DHT fragments datagrams into 896-byte pieces with a +/// maximum of 10 fragments (~8960 bytes reassembled). After +/// subtracting the Paste header (17 bytes) and StoreMsg overhead, +/// 8 KiB fits comfortably within one DHT message. +/// Pastes larger than this are split into chunks, each stored +/// as a separate DHT value, with a version-2 manifest that +/// lists the chunk hashes. +pub const CHUNK_SIZE: usize = 8 * 1024; + +/// Current format version (single paste). const FORMAT_VERSION: u8 = 1; +/// Format version for chunked paste manifests. +pub const FORMAT_VERSION_CHUNKED: u8 = 2; + /// Header size: version(1) + created_at(8) + ttl(8) = 17. const HEADER_SIZE: usize = 17; diff --git a/src/protocol.rs b/src/protocol.rs index d45cdd8..cb90f5b 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -3,6 +3,8 @@ //! Simple line-oriented text protocol: //! PUT <ttl_secs> <content>\n //! PUTP <ttl_secs> <content>\n +//! PUTC <ttl_secs> <content>\n (store chunk) +//! PUTM <ttl_secs> <content>\n (store manifest) //! GET <key>\n //! DEL <key>\n //! PIN <key>\n @@ -21,6 +23,14 @@ pub enum Request { content_b58: String, encrypt: bool, }, + PutChunk { + ttl_secs: u64, + content_b58: String, + }, + PutManifest { + ttl_secs: u64, + content_b58: String, + }, Get { key: String, }, @@ -67,6 +77,30 @@ pub fn parse_request(line: &str) -> Result<Request, String> { encrypt: cmd == "PUT", }) } + "PUTC" => { + let ttl_str = + parts.next().ok_or("PUTC requires: PUTC <ttl> <data>")?; + let content_b58 = + parts.next().ok_or("PUTC requires: PUTC <ttl> <data>")?; + let ttl_secs: u64 = + ttl_str.parse().map_err(|_| "invalid TTL number")?; + Ok(Request::PutChunk { + ttl_secs, + content_b58: content_b58.to_string(), + }) + } + "PUTM" => { + let ttl_str = + parts.next().ok_or("PUTM requires: PUTM <ttl> <data>")?; + let content_b58 = + parts.next().ok_or("PUTM requires: PUTM <ttl> <data>")?; + let ttl_secs: u64 = + ttl_str.parse().map_err(|_| "invalid TTL number")?; + Ok(Request::PutManifest { + ttl_secs, + content_b58: content_b58.to_string(), + }) + } "GET" => { let key = parts.next().ok_or("GET requires: GET <key>")?; Ok(Request::Get { diff --git a/src/store.rs b/src/store.rs index 2e4f53a..75e932f 100644 --- a/src/store.rs +++ b/src/store.rs @@ -2,6 +2,7 @@ //! //! Simple directory layout: //! <root>/pastes/<hash>.bin +//! <root>/chunks/<hash>.bin //! <root>/pins/<hash> //! <root>/blocked/<hash> //! <root>/contacts.bin @@ -21,9 +22,10 @@ pub struct PasteStore { impl PasteStore { /// Open or create a store rooted at the given directory. - /// Creates `pastes/`, `pins/`, and `blocked/` subdirectories. + /// Creates `pastes/`, `chunks/`, `pins/`, and `blocked/` subdirectories. pub fn open(root: &Path) -> std::io::Result<Self> { fs::create_dir_all(root.join("pastes"))?; + fs::create_dir_all(root.join("chunks"))?; fs::create_dir_all(root.join("pins"))?; fs::create_dir_all(root.join("blocked"))?; Ok(PasteStore { @@ -35,6 +37,10 @@ impl PasteStore { self.root.join("pastes").join(base58::encode(key)) } + fn chunk_path(&self, key: &[u8]) -> PathBuf { + self.root.join("chunks").join(base58::encode(key)) + } + fn pin_path(&self, key: &[u8]) -> PathBuf { self.root.join("pins").join(base58::encode(key)) } @@ -82,6 +88,65 @@ impl PasteStore { let _ = fs::remove_file(self.paste_path(key)); } + // ── Chunk CRUD ───────────────────────────────── + + /// Write a chunk to the chunks directory. + pub fn put_chunk(&self, key: &[u8], value: &[u8]) -> std::io::Result<()> { + let path = self.chunk_path(key); + atomic_write(&path, &[key, value]) + } + + /// Read a chunk from the chunks directory. + pub fn get_chunk(&self, key: &[u8]) -> Option<Vec<u8>> { + if self.is_blocked(key) { + return None; + } + let path = self.chunk_path(key); + let data = fs::read(&path).ok()?; + if data.len() < 32 { + return None; + } + Some(data[32..].to_vec()) + } + + /// Delete a chunk file from disk (no-op if absent). + pub fn remove_chunk(&self, key: &[u8]) { + let _ = fs::remove_file(self.chunk_path(key)); + } + + /// List all non-blocked chunk keys. + pub fn chunk_keys(&self) -> Vec<Vec<u8>> { + let dir = self.root.join("chunks"); + let entries = match fs::read_dir(&dir) { + Ok(e) => e, + Err(_) => return Vec::new(), + }; + + let mut keys = Vec::new(); + for entry in entries.flatten() { + let data = match fs::read(entry.path()) { + Ok(d) => d, + Err(_) => continue, + }; + if data.len() < 32 { + continue; + } + let key = &data[..32]; + if self.is_blocked(key) { + continue; + } + // Check expiry on the chunk paste + let value = &data[32..]; + if let Some(paste) = Paste::from_bytes(value) + && paste.is_expired() + { + continue; + } + keys.push(key.to_vec()); + } + keys + } + /// List all non-expired, non-blocked paste keys. pub fn original_keys(&self) -> Vec<Vec<u8>> { let dir = self.root.join("pastes"); @@ -145,41 +210,51 @@ impl PasteStore { // ── GC ────────────────────────────────────────── - /// Remove expired pastes from disk. Pinned pastes are kept. + /// Remove expired pastes and chunks from disk. Pinned pastes are kept. pub fn gc(&self) -> std::io::Result<usize> { - let dir = self.root.join("pastes"); - let entries = fs::read_dir(&dir)?; let mut removed = 0; - - for entry in entries.flatten() { - let data = match fs::read(entry.path()) { - Ok(d) => d, + for subdir in &["pastes", "chunks"] { + let dir = self.root.join(subdir); + let entries = match fs::read_dir(&dir) { + Ok(e) => e, Err(_) => continue, }; - if data.len() < 32 { - continue; - } - let key = &data[..32]; - let value = &data[32..]; - - if self.is_pinned(key) { - continue; - } - if let Some(paste) = Paste::from_bytes(value) - && paste.is_expired() - { - let _ = fs::remove_file(entry.path()); - removed += 1; + for entry in entries.flatten() { + let data = match fs::read(entry.path()) { + Ok(d) => d, + Err(_) => continue, + }; + if data.len() < 32 { + continue; + } + let key = &data[..32]; + let value = &data[32..]; + + if self.is_pinned(key) { + continue; + } + if let Some(paste) = Paste::from_bytes(value) + && paste.is_expired() + { + let _ = fs::remove_file(entry.path()); + removed += 1; + } } } Ok(removed) } - /// Count stored pastes. + /// Count stored pastes (excludes chunks). pub fn paste_count(&self) -> usize { let dir = self.root.join("pastes"); fs::read_dir(&dir).map(|e| e.count()).unwrap_or(0) } + + /// Count stored chunks. + pub fn chunk_count(&self) -> usize { + let dir = self.root.join("chunks"); + fs::read_dir(&dir).map(|e| e.count()).unwrap_or(0) + } } /// Write data to `path` atomically: write to a temporary file in |