aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bin/tp.rs323
-rw-r--r--src/daemon.rs186
-rw-r--r--src/ops.rs167
-rw-r--r--src/paste.rs21
-rw-r--r--src/protocol.rs34
-rw-r--r--src/store.rs121
6 files changed, 735 insertions, 117 deletions
diff --git a/src/bin/tp.rs b/src/bin/tp.rs
index fbfc872..9a69832 100644
--- a/src/bin/tp.rs
+++ b/src/bin/tp.rs
@@ -2,32 +2,84 @@
//!
//! Sends commands to the `tpd` daemon over a Unix socket.
//! Reads paste content from stdin (put) and writes it to
-//! stdout (get).
+//! stdout (get). Large pastes (> 8 KiB) are automatically
+//! split into chunks on the client side.
+use std::collections::BTreeMap;
use std::io::{BufRead, BufReader, Read, Write};
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
#[path = "../base58.rs"]
mod base58;
+#[path = "../crypto.rs"]
+mod crypto;
#[path = "../sandbox.rs"]
mod sandbox;
+/// Maximum paste size: 1.44 MB (floppy disk).
+const MAX_PASTE: usize = 1_440 * 1024;
+
+/// Chunk size matching the DHT fragment limit.
+const CHUNK_SIZE: usize = 8 * 1024;
+
fn default_socket() -> PathBuf {
PathBuf::from("/var/tesseras-paste/daemon.sock")
}
+fn labels_path() -> PathBuf {
+ if let Ok(home) = std::env::var("HOME") {
+ PathBuf::from(home).join(".config/tp/labels")
+ } else {
+ PathBuf::from("/tmp/tp-labels")
+ }
+}
+
+fn load_labels(path: &PathBuf) -> BTreeMap<String, String> {
+ let mut map = BTreeMap::new();
+ let data = match std::fs::read_to_string(path) {
+ Ok(d) => d,
+ Err(_) => return map,
+ };
+ for line in data.lines() {
+ if let Some((key, label)) = line.split_once('\t') {
+ map.insert(key.to_string(), label.to_string());
+ }
+ }
+ map
+}
+
+fn save_labels(path: &PathBuf, labels: &BTreeMap<String, String>) {
+ if let Some(parent) = path.parent() {
+ let _ = std::fs::create_dir_all(parent);
+ }
+ let mut buf = String::new();
+ for (key, label) in labels {
+ buf.push_str(key);
+ buf.push('\t');
+ buf.push_str(label);
+ buf.push('\n');
+ }
+ if let Err(e) = std::fs::write(path, buf.as_bytes()) {
+ eprintln!("warning: could not save labels: {e}");
+ }
+}
+
fn usage() {
eprintln!("usage: tp [-s sock] [-v] <command> [args]");
eprintln!();
eprintln!("commands:");
- eprintln!(" put [-t ttl] [-p] read stdin, store paste");
+ eprintln!(" put [-t ttl] [-p] [-l label]");
+ eprintln!(" read stdin, store paste");
eprintln!(" -p public (no encryption)");
+ eprintln!(" -l attach a label");
eprintln!(" get <key> retrieve paste to stdout");
- eprintln!(" del <key> delete paste");
- eprintln!(" pin <key> pin (never expires)");
- eprintln!(" unpin <key> unpin");
- eprintln!(" status show daemon status");
+ eprintln!(" del <key> delete paste");
+ eprintln!(" pin <key> pin (never expires)");
+ eprintln!(" unpin <key> unpin");
+ eprintln!(" list list labeled pastes");
+ eprintln!(" label <key> <text> add or update a label");
+ eprintln!(" status show daemon status");
eprintln!();
eprintln!(" -s sock Unix socket path");
eprintln!(" -v verbose output");
@@ -49,6 +101,47 @@ fn parse_ttl(s: &str) -> Result<u64, String> {
}
}
+/// Send a request over the socket and read the response.
+/// Returns the data on OK, or exits on ERR.
+fn send_recv(
+ stream: &UnixStream,
+ reader: &mut BufReader<&UnixStream>,
+ request: &str,
+ verbose: bool,
+) -> String {
+ if verbose {
+ eprintln!(">> {}", request.trim());
+ }
+ if let Err(e) = (stream as &UnixStream).write_all(request.as_bytes()) {
+ eprintln!("error: writing to socket: {e}");
+ std::process::exit(1);
+ }
+ let mut line = String::new();
+ match reader.read_line(&mut line) {
+ Ok(0) => {
+ eprintln!("error: daemon closed connection");
+ std::process::exit(1);
+ }
+ Err(e) => {
+ eprintln!("error: reading from socket: {e}");
+ std::process::exit(1);
+ }
+ _ => {}
+ }
+ if verbose {
+ eprintln!("<< {}", line.trim());
+ }
+ if let Some(data) = line.trim().strip_prefix("OK ") {
+ data.to_string()
+ } else if let Some(msg) = line.trim().strip_prefix("ERR ") {
+ eprintln!("error: {msg}");
+ std::process::exit(1);
+ } else {
+ eprintln!("error: unexpected response: {}", line.trim());
+ std::process::exit(1);
+ }
+}
+
fn main() {
let args: Vec<String> = std::env::args().collect();
@@ -89,12 +182,62 @@ fn main() {
}
let command = &cmd_args[0];
+
+ // Handle client-only commands before sandboxing
+ match command.as_str() {
+ "list" => {
+ let lpath = labels_path();
+ let labels = load_labels(&lpath);
+ if labels.is_empty() {
+ eprintln!("no labels");
+ } else {
+ for (key, label) in &labels {
+ println!("{key}\t{label}");
+ }
+ }
+ return;
+ }
+ "label" => {
+ let key = cmd_args.get(1).unwrap_or_else(|| {
+ eprintln!("error: label requires a key and text");
+ std::process::exit(1);
+ });
+ let text: String = cmd_args[2..].join(" ");
+ if text.is_empty() {
+ eprintln!("error: label requires text");
+ std::process::exit(1);
+ }
+ let lpath = labels_path();
+ let mut labels = load_labels(&lpath);
+ labels.insert(key.clone(), text);
+ save_labels(&lpath, &labels);
+ return;
+ }
+ _ => {}
+ }
+
+ // ── Parse command into request(s) ──────────────────
+
+ // For most commands we build a single request string.
+ // The "put" command may use chunked mode (multiple requests).
+ enum PutData {
+ Single(String),
+ Chunked {
+ data: Vec<u8>,
+ ttl_secs: u64,
+ enc_key: Option<[u8; crypto::KEY_SIZE]>,
+ },
+ }
+
let mut is_get = false;
+ let mut put_label: Option<String> = None;
+ let mut del_key: Option<String> = None;
- let request = match command.as_str() {
+ let put_data = match command.as_str() {
"put" => {
let mut ttl = "24h".to_string();
let mut public = false;
+ let mut label: Option<String> = None;
let mut j = 1;
while j < cmd_args.len() {
match cmd_args[j].as_str() {
@@ -105,6 +248,12 @@ fn main() {
}
}
"-p" => public = true,
+ "-l" => {
+ j += 1;
+ if j < cmd_args.len() {
+ label = Some(cmd_args[j].clone());
+ }
+ }
_ => {}
}
j += 1;
@@ -116,9 +265,7 @@ fn main() {
std::process::exit(1);
}
};
- // Read at most MAX_PASTE + 1 byte so we can detect
- // oversized input without unbounded allocation.
- const MAX_PASTE: usize = 64 * 1024;
+
let mut content = Vec::new();
match std::io::stdin()
.take((MAX_PASTE + 1) as u64)
@@ -129,7 +276,7 @@ fn main() {
std::process::exit(1);
}
Ok(n) if n > MAX_PASTE => {
- eprintln!("error: input exceeds 64 KiB limit");
+ eprintln!("error: input exceeds 1.44 MB limit");
std::process::exit(1);
}
Err(e) => {
@@ -138,8 +285,31 @@ fn main() {
}
_ => {}
}
- let cmd = if public { "PUTP" } else { "PUT" };
- format!("{cmd} {ttl_secs} {}\n", base58::encode(&content))
+
+ put_label = label;
+
+ if content.len() <= CHUNK_SIZE {
+ // Small paste — single PUT/PUTP
+ let cmd = if public { "PUTP" } else { "PUT" };
+ PutData::Single(format!(
+ "{cmd} {ttl_secs} {}\n",
+ base58::encode(&content)
+ ))
+ } else {
+ // Large paste — client-side chunking
+ let (data, enc_key) = if public {
+ (content, None)
+ } else {
+ let key = crypto::generate_key();
+ let encrypted = crypto::encrypt(&key, &content);
+ (encrypted, Some(key))
+ };
+ PutData::Chunked {
+ data,
+ ttl_secs,
+ enc_key,
+ }
+ }
}
"get" => {
let key = cmd_args.get(1).unwrap_or_else(|| {
@@ -147,30 +317,31 @@ fn main() {
std::process::exit(1);
});
is_get = true;
- format!("GET {key}\n")
+ PutData::Single(format!("GET {key}\n"))
}
"del" => {
let key = cmd_args.get(1).unwrap_or_else(|| {
eprintln!("error: del requires a key");
std::process::exit(1);
});
- format!("DEL {key}\n")
+ del_key = Some(key.clone());
+ PutData::Single(format!("DEL {key}\n"))
}
"pin" => {
let key = cmd_args.get(1).unwrap_or_else(|| {
eprintln!("error: pin requires a key");
std::process::exit(1);
});
- format!("PIN {key}\n")
+ PutData::Single(format!("PIN {key}\n"))
}
"unpin" => {
let key = cmd_args.get(1).unwrap_or_else(|| {
eprintln!("error: unpin requires a key");
std::process::exit(1);
});
- format!("UNPIN {key}\n")
+ PutData::Single(format!("UNPIN {key}\n"))
}
- "status" => "STATUS\n".to_string(),
+ "status" => PutData::Single("STATUS\n".to_string()),
other => {
eprintln!("unknown command: {other}");
usage();
@@ -179,19 +350,23 @@ fn main() {
};
// ── Sandbox ─────────────────────────────────────
+ let lpath = labels_path();
sandbox::do_unveil(&sock_path, "rw");
+ if let Some(parent) = lpath.parent() {
+ let _ = std::fs::create_dir_all(parent);
+ sandbox::do_unveil(parent, "rwc");
+ }
sandbox::unveil_lock();
- sandbox::do_pledge("stdio unix rpath");
+ sandbox::do_pledge("stdio unix rpath wpath cpath");
if verbose {
eprintln!("socket: {}", sock_path.display());
- eprintln!(">> {}", request.trim());
}
let stream = match UnixStream::connect(&sock_path) {
Ok(s) => s,
Err(e) => {
- eprintln!("error: cannot connect to {}: {e}", sock_path.display(),);
+ eprintln!("error: cannot connect to {}: {e}", sock_path.display());
eprintln!("hint: is tpd running?");
std::process::exit(1);
}
@@ -201,25 +376,81 @@ fn main() {
.set_read_timeout(Some(std::time::Duration::from_secs(60)))
.ok();
- let mut writer = &stream;
- if let Err(e) = writer.write_all(request.as_bytes()) {
- eprintln!("error: writing to socket: {e}");
- std::process::exit(1);
- }
+ let mut reader = BufReader::new(&stream);
+
+ match put_data {
+ PutData::Chunked {
+ data,
+ ttl_secs,
+ enc_key,
+ } => {
+ // ── Chunked put ─────────────────────────
+ let n_chunks =
+ (data.len() + CHUNK_SIZE - 1) / CHUNK_SIZE;
+ if verbose {
+ eprintln!(
+ "chunked: {} bytes, {} chunks",
+ data.len(),
+ n_chunks,
+ );
+ }
+
+ // Send each chunk via PUTC
+ let mut chunk_hashes: Vec<Vec<u8>> = Vec::new();
+ for (i, chunk) in data.chunks(CHUNK_SIZE).enumerate() {
+ let req = format!(
+ "PUTC {} {}\n",
+ ttl_secs,
+ base58::encode(chunk),
+ );
+ let hash_b58 = send_recv(&stream, &mut reader, &req, verbose);
+ let hash = base58::decode(&hash_b58).unwrap_or_else(|| {
+ eprintln!("error: invalid hash from daemon");
+ std::process::exit(1);
+ });
+ if verbose {
+ eprintln!("chunk {}/{}: {hash_b58}", i + 1, n_chunks);
+ }
+ chunk_hashes.push(hash);
+ }
+
+ // Build manifest: count(u16 BE) || hash1 || hash2 || ...
+ let count = chunk_hashes.len() as u16;
+ let mut manifest =
+ Vec::with_capacity(2 + 32 * chunk_hashes.len());
+ manifest.extend_from_slice(&count.to_be_bytes());
+ for hash in &chunk_hashes {
+ manifest.extend_from_slice(hash);
+ }
+
+ let req = format!(
+ "PUTM {} {}\n",
+ ttl_secs,
+ base58::encode(&manifest),
+ );
+ let manifest_hash = send_recv(&stream, &mut reader, &req, verbose);
- let reader = BufReader::new(&stream);
- for line in reader.lines() {
- let line = match line {
- Ok(l) => l,
- Err(_) => break,
- };
- if verbose {
- eprintln!("<< {}", line);
+ let key_str = match enc_key {
+ Some(key) => {
+ format!("{manifest_hash}#{}", base58::encode(&key))
+ }
+ None => manifest_hash,
+ };
+ println!("{key_str}");
+
+ // Save label
+ if let Some(ref label) = put_label {
+ let mut labels = load_labels(&lpath);
+ labels.insert(key_str, label.clone());
+ save_labels(&lpath, &labels);
+ }
}
- if let Some(data) = line.strip_prefix("OK ") {
+ PutData::Single(request) => {
+ // ── Single request ──────────────────────
+ let data = send_recv(&stream, &mut reader, &request, verbose);
+
if is_get {
- // Decode base58 → raw bytes → stdout
- match base58::decode(data) {
+ match base58::decode(&data) {
Some(bytes) => {
if let Err(e) = std::io::stdout().write_all(&bytes) {
eprintln!("error: writing to stdout: {e}");
@@ -231,10 +462,20 @@ fn main() {
} else {
println!("{data}");
}
- break;
- } else if let Some(msg) = line.strip_prefix("ERR ") {
- eprintln!("error: {msg}");
- std::process::exit(1);
+
+ // Save label on successful put
+ if let Some(ref label) = put_label {
+ let mut labels = load_labels(&lpath);
+ labels.insert(data.to_string(), label.clone());
+ save_labels(&lpath, &labels);
+ }
+ // Remove label on successful del
+ if let Some(ref key) = del_key {
+ let mut labels = load_labels(&lpath);
+ if labels.remove(key).is_some() {
+ save_labels(&lpath, &labels);
+ }
+ }
}
}
}
diff --git a/src/daemon.rs b/src/daemon.rs
index 8952d78..62e0977 100644
--- a/src/daemon.rs
+++ b/src/daemon.rs
@@ -24,8 +24,10 @@ use crate::store::PasteStore;
/// How often to garbage-collect expired pastes (10 min).
const GC_INTERVAL: Duration = Duration::from_secs(600);
-/// How often to republish local pastes to the DHT (30 min).
-const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800);
+/// How often to republish local pastes to the DHT (5 min).
+/// Kept short so chunked pastes reach peers promptly;
+/// per-put throttling prevents burst bans.
+const REPUBLISH_INTERVAL: Duration = Duration::from_secs(300);
/// How often to persist routing table and state (5 min).
const SAVE_INTERVAL: Duration = Duration::from_secs(300);
@@ -297,6 +299,32 @@ fn handle_request(
Err(e) => Response::Err(e.to_string()),
}
}
+ Request::PutChunk {
+ ttl_secs,
+ content_b58,
+ } => {
+ let content = match base58::decode(&content_b58) {
+ Some(c) => c,
+ None => return Response::Err("invalid base58 content".into()),
+ };
+ match ops::put_chunk(store, &content, ttl_secs) {
+ Ok(hash) => Response::Ok(hash),
+ Err(e) => Response::Err(e.to_string()),
+ }
+ }
+ Request::PutManifest {
+ ttl_secs,
+ content_b58,
+ } => {
+ let content = match base58::decode(&content_b58) {
+ Some(c) => c,
+ None => return Response::Err("invalid base58 content".into()),
+ };
+ match ops::put_manifest(node, store, &content, ttl_secs) {
+ Ok(hash) => Response::Ok(hash),
+ Err(e) => Response::Err(e.to_string()),
+ }
+ }
Request::Get { key } => match ops::get_paste(node, store, &key) {
Ok(data) => Response::Ok(base58::encode(&data)),
Err(e) => Response::Err(e.to_string()),
@@ -328,11 +356,12 @@ fn handle_request(
Request::Status => {
let m = node.metrics();
let status = format!(
- "peers={} stored={} pastes={} \
+ "peers={} stored={} pastes={} chunks={} \
sent={} recv={} lookups={}/{}",
node.routing_table_size(),
node.storage_count(),
store.paste_count(),
+ store.chunk_count(),
m.messages_sent,
m.messages_received,
m.lookups_started,
@@ -346,7 +375,35 @@ fn handle_request(
/// Copy DHT-replicated values into the local file store so
/// the HTTP server can serve them without a DHT lookup.
+/// Values already in pastes/ or chunks/ are skipped.
+/// Values referenced by a local manifest are stored as chunks.
fn sync_dht_to_store(node: &Node, store: &PasteStore) {
+ // Collect chunk hashes from all known manifests so we
+ // can route incoming DHT values to the right directory.
+ let mut known_chunks = std::collections::HashSet::new();
+ for key in store.original_keys() {
+ if let Some(data) = store.get_paste(&key) {
+ if data.first() == Some(&crate::paste::FORMAT_VERSION_CHUNKED) {
+ if let Some(paste) = Paste::from_bytes(&data) {
+ let content = &paste.content;
+ if content.len() >= 2 {
+ let count =
+ u16::from_be_bytes([content[0], content[1]])
+ as usize;
+ for i in 0..count {
+ let start = 2 + i * 32;
+ if start + 32 <= content.len() {
+ known_chunks.insert(
+ content[start..start + 32].to_vec(),
+ );
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
for (key, value) in node.dht_values() {
if key.len() != 32 {
continue;
@@ -354,23 +411,37 @@ fn sync_dht_to_store(node: &Node, store: &PasteStore) {
if store.is_blocked(&key) {
continue;
}
- if store.get_paste(&key).is_none() {
+ if store.get_paste(&key).is_some() || store.get_chunk(&key).is_some() {
+ continue;
+ }
+ if known_chunks.contains(key.as_slice()) {
+ let _ = store.put_chunk(&key, &value);
+ } else {
let _ = store.put_paste(&key, &value);
}
}
}
-/// Re-announce locally stored pastes to the DHT so they
-/// remain reachable as nodes join and leave the network.
+/// Delay between consecutive DHT store operations during
+/// republish to avoid triggering rate-limit bans on peers.
+const REPUBLISH_THROTTLE: Duration = Duration::from_millis(200);
+
+/// Re-announce locally stored pastes and chunks to the DHT so
+/// they remain reachable as nodes join and leave the network.
+/// Inserts a small delay between puts to avoid bursting.
fn republish(node: &mut Node, store: &PasteStore) {
- let keys = store.original_keys();
- if keys.is_empty() {
+ let mut paste_keys = store.original_keys();
+ let chunk_keys = store.chunk_keys();
+ paste_keys.extend(chunk_keys);
+
+ if paste_keys.is_empty() {
return;
}
let mut count = 0u32;
- for key in &keys {
- if let Some(data) = store.get_paste(key)
+ for key in &paste_keys {
+ let data = store.get_paste(key).or_else(|| store.get_chunk(key));
+ if let Some(data) = data
&& let Some(paste) = Paste::from_bytes(&data)
{
let remaining = if store.is_pinned(key) {
@@ -386,6 +457,10 @@ fn republish(node: &mut Node, store: &PasteStore) {
let rem = expires.saturating_sub(now);
std::cmp::min(rem, u16::MAX as u64) as u16
};
+ if count > 0 {
+ std::thread::sleep(REPUBLISH_THROTTLE);
+ let _ = node.poll_timeout(Duration::from_millis(1));
+ }
node.put(key, &data, remaining, false);
count += 1;
}
@@ -445,9 +520,10 @@ pub fn run_unix_listener(
let _ = std::fs::remove_file(sock_path);
}
-/// Maximum protocol line size (128 KiB covers the 64 KiB paste
-/// limit after base58 expansion plus command overhead).
-const MAX_LINE_SIZE: usize = 128 * 1024;
+/// Maximum protocol line size. With client-side chunking,
+/// the largest request is a PUTC with an 8 KiB chunk
+/// (~11 KiB base58-encoded) plus command overhead.
+const MAX_LINE_SIZE: usize = 16 * 1024;
/// Read requests line-by-line from a connected Unix socket
/// client, forwarding each to the daemon main loop via `tx`.
@@ -657,18 +733,34 @@ fn handle_http(
None => hash_b58.to_string(),
};
- // Try local store first (fast path)
+ // Try local store first (fast path).
let body = if let Some(data) = store.get_paste(&hash) {
- match serve_paste_data(&data, enc_key_b58) {
- Ok(b) => b,
- Err((status, msg)) => {
- http_response(
- &mut stream,
- status,
- "text/plain",
- msg.as_bytes(),
- );
- return;
+ if data.first() == Some(&crate::paste::FORMAT_VERSION_CHUNKED) {
+ // Chunked manifest — reassemble directly from store.
+ match serve_chunked_paste(&data, store, enc_key_b58) {
+ Ok(b) => b,
+ Err((status, msg)) => {
+ http_response(
+ &mut stream,
+ status,
+ "text/plain",
+ msg.as_bytes(),
+ );
+ return;
+ }
+ }
+ } else {
+ match serve_paste_data(&data, enc_key_b58) {
+ Ok(b) => b,
+ Err((status, msg)) => {
+ http_response(
+ &mut stream,
+ status,
+ "text/plain",
+ msg.as_bytes(),
+ );
+ return;
+ }
}
}
} else {
@@ -712,6 +804,52 @@ fn serve_paste_data(
}
}
+/// Reassemble a chunked paste directly from the local store.
+/// Reads the manifest, fetches each chunk from chunks/ or
+/// pastes/, concatenates, and optionally decrypts.
+fn serve_chunked_paste(
+ manifest_data: &[u8],
+ store: &PasteStore,
+ enc_key_b58: Option<&str>,
+) -> Result<Vec<u8>, (u16, &'static str)> {
+ let paste = Paste::from_bytes(manifest_data).ok_or((500, "corrupt manifest"))?;
+ let content = &paste.content;
+
+ if content.len() < 2 {
+ return Err((500, "corrupt manifest"));
+ }
+ let count = u16::from_be_bytes([content[0], content[1]]) as usize;
+ if content.len() != 2 + count * 32 {
+ return Err((500, "corrupt manifest"));
+ }
+
+ let mut assembled = Vec::new();
+ for i in 0..count {
+ let start = 2 + i * 32;
+ let chunk_hash = &content[start..start + 32];
+ let chunk_data = store
+ .get_chunk(chunk_hash)
+ .or_else(|| store.get_paste(chunk_hash))
+ .ok_or((404, "missing chunk"))?;
+ let chunk_paste =
+ Paste::from_bytes(&chunk_data).ok_or((500, "corrupt chunk"))?;
+ assembled.extend_from_slice(&chunk_paste.content);
+ }
+
+ if let Some(kb58) = enc_key_b58 {
+ let key_bytes = base58::decode(kb58).ok_or((400, "invalid enc key"))?;
+ if key_bytes.len() != 32 {
+ return Err((400, "invalid enc key"));
+ }
+ let mut key = [0u8; 32];
+ key.copy_from_slice(&key_bytes);
+ crate::crypto::decrypt(&key, &assembled)
+ .ok_or((403, "decryption failed"))
+ } else {
+ Ok(assembled)
+ }
+}
+
/// Ask the daemon for a paste via Unix socket (triggers
/// a DHT lookup if not in local store).
/// Key format: "hash" or "hash#enckey".
diff --git a/src/ops.rs b/src/ops.rs
index f45b9ab..2d6b016 100644
--- a/src/ops.rs
+++ b/src/ops.rs
@@ -9,7 +9,7 @@ use tesseras_dht::Node;
use crate::base58;
use crate::crypto;
-use crate::paste::{MAX_PASTE_SIZE, Paste};
+use crate::paste::{CHUNK_SIZE, FORMAT_VERSION_CHUNKED, MAX_PASTE_SIZE, Paste};
use crate::store::PasteStore;
/// Timeout for blocking DHT lookups.
@@ -39,6 +39,52 @@ impl std::fmt::Display for PasteError {
}
}
+/// Parse a manifest content into a list of chunk hashes.
+fn parse_manifest(content: &[u8]) -> Option<Vec<[u8; 32]>> {
+ if content.len() < 2 {
+ return None;
+ }
+ let count = u16::from_be_bytes([content[0], content[1]]) as usize;
+ if content.len() != 2 + count * 32 {
+ return None;
+ }
+ let mut hashes = Vec::with_capacity(count);
+ for i in 0..count {
+ let start = 2 + i * 32;
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&content[start..start + 32]);
+ hashes.push(hash);
+ }
+ Some(hashes)
+}
+
+/// Fetch paste data from local store (pastes + chunks),
+/// falling back to a DHT lookup.
+fn fetch_paste_data(
+ node: &mut Node,
+ store: &PasteStore,
+ hash: &[u8],
+) -> Result<Vec<u8>, PasteError> {
+ if let Some(local) = store.get_paste(hash) {
+ return Ok(local);
+ }
+ if let Some(local) = store.get_chunk(hash) {
+ return Ok(local);
+ }
+ let vals = node.get_blocking(hash, OP_TIMEOUT);
+ if vals.is_empty() {
+ return Err(PasteError::NotFound);
+ }
+ match vals.iter().find(|v| {
+ Paste::from_bytes(v)
+ .map(|p| Paste::content_key(&p.content) == *hash)
+ .unwrap_or(false)
+ }) {
+ Some(v) => Ok(v.clone()),
+ None => Err(PasteError::NotFound),
+ }
+}
+
/// Decode the hash portion of a key string ("hash#enckey" or "hash").
/// Returns the 32-byte hash.
fn parse_hash(key_str: &str) -> Result<Vec<u8>, PasteError> {
@@ -50,8 +96,11 @@ fn parse_hash(key_str: &str) -> Result<Vec<u8>, PasteError> {
Ok(hash)
}
-/// Store a paste. If `encrypt` is true, encrypts the content and
-/// returns "hash#enckey" in base58. Otherwise returns just the hash.
+/// Store a single paste. If `encrypt` is true, encrypts the content
+/// and returns "hash#enckey" in base58. Otherwise returns just the hash.
+///
+/// For content larger than [`CHUNK_SIZE`], use the chunked protocol
+/// (PUTC + PUTM) from the client instead.
pub fn put_paste(
node: &mut Node,
store: &PasteStore,
@@ -94,8 +143,61 @@ pub fn put_paste(
}
}
+/// Store a single chunk in the chunks directory.
+/// Returns the chunk hash in base58.
+pub fn put_chunk(
+ store: &PasteStore,
+ content: &[u8],
+ ttl_secs: u64,
+) -> Result<String, PasteError> {
+ if content.len() > CHUNK_SIZE {
+ return Err(PasteError::TooLarge);
+ }
+ let paste = Paste::new(content.to_vec(), ttl_secs);
+ let serialized = paste.to_bytes();
+ let hash = Paste::content_key(&paste.content);
+ store
+ .put_chunk(&hash, &serialized)
+ .map_err(|e| PasteError::Internal(e.to_string()))?;
+ Ok(base58::encode(&hash))
+}
+
+/// Store a chunked-paste manifest (version 2).
+/// The manifest is announced to the DHT immediately;
+/// chunks are replicated via the periodic republish cycle.
+pub fn put_manifest(
+ node: &mut Node,
+ store: &PasteStore,
+ content: &[u8],
+ ttl_secs: u64,
+) -> Result<String, PasteError> {
+ if parse_manifest(content).is_none() {
+ return Err(PasteError::Internal("invalid manifest".into()));
+ }
+ let manifest = Paste {
+ version: FORMAT_VERSION_CHUNKED,
+ content: content.to_vec(),
+ created_at: std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs(),
+ ttl_secs,
+ };
+ let serialized = manifest.to_bytes();
+ let hash = Paste::content_key(&manifest.content);
+ store
+ .put_paste(&hash, &serialized)
+ .map_err(|e| PasteError::Internal(e.to_string()))?;
+ let dht_ttl = std::cmp::min(ttl_secs, u16::MAX as u64) as u16;
+ node.put(&hash, &serialized, dht_ttl, false);
+ let hash_b58 = base58::encode(&hash);
+ log::info!("put: stored manifest {hash_b58}");
+ Ok(hash_b58)
+}
+
/// Retrieve a paste by key ("hash#enckey" or bare "hash").
/// Tries local store first, then falls back to a blocking DHT lookup.
+/// Transparently reassembles chunked (version-2) pastes.
pub fn get_paste(
node: &mut Node,
store: &PasteStore,
@@ -111,31 +213,29 @@ pub fn get_paste(
return Err(PasteError::InvalidKey);
}
- let data = if let Some(local) = store.get_paste(&hash) {
- local
- } else {
- let vals = node.get_blocking(&hash, OP_TIMEOUT);
- if vals.is_empty() {
- return Err(PasteError::NotFound);
- }
- // Verify DHT result: the content hash must match the
- // requested key to prevent a malicious node from
- // injecting arbitrary data.
- match vals.iter().find(|v| {
- Paste::from_bytes(v)
- .map(|p| Paste::content_key(&p.content) == *hash)
- .unwrap_or(false)
- }) {
- Some(v) => v.clone(),
- None => return Err(PasteError::NotFound),
- }
- };
-
+ let data = fetch_paste_data(node, store, &hash)?;
let paste = Paste::from_bytes(&data).ok_or(PasteError::InvalidKey)?;
+
if paste.is_expired() && !store.is_pinned(&hash) {
return Err(PasteError::Expired);
}
+ // Reassemble chunked paste (version 2 = manifest).
+ let content = if paste.version == FORMAT_VERSION_CHUNKED {
+ let chunk_hashes = parse_manifest(&paste.content)
+ .ok_or(PasteError::Internal("corrupt manifest".into()))?;
+ let mut assembled = Vec::new();
+ for chunk_hash in &chunk_hashes {
+ let chunk_data = fetch_paste_data(node, store, chunk_hash)?;
+ let chunk_paste = Paste::from_bytes(&chunk_data)
+ .ok_or(PasteError::Internal("corrupt chunk".into()))?;
+ assembled.extend_from_slice(&chunk_paste.content);
+ }
+ assembled
+ } else {
+ paste.content
+ };
+
if let Some(kb58) = enc_key_b58 {
let key_bytes = base58::decode(kb58).ok_or(PasteError::InvalidKey)?;
if key_bytes.len() != crypto::KEY_SIZE {
@@ -143,22 +243,39 @@ pub fn get_paste(
}
let mut key = [0u8; crypto::KEY_SIZE];
key.copy_from_slice(&key_bytes);
- crypto::decrypt(&key, &paste.content)
+ crypto::decrypt(&key, &content)
.ok_or(PasteError::DecryptionFailed)
} else {
- Ok(paste.content)
+ Ok(content)
}
}
/// Delete a paste from local store and the DHT.
/// Creates a block marker so the paste is not
/// re-imported from the DHT by sync.
+/// For chunked pastes, also deletes all chunks.
pub fn delete_paste(
node: &mut Node,
store: &PasteStore,
key_str: &str,
) -> Result<(), PasteError> {
let hash = parse_hash(key_str)?;
+
+ // If this is a chunked manifest, delete its chunks too.
+ if let Some(data) = store.get_paste(&hash) {
+ if let Some(paste) = Paste::from_bytes(&data) {
+ if paste.version == FORMAT_VERSION_CHUNKED {
+ if let Some(chunk_hashes) = parse_manifest(&paste.content) {
+ for chunk_hash in &chunk_hashes {
+ store.block(chunk_hash);
+ store.remove_chunk(chunk_hash);
+ node.delete(chunk_hash);
+ }
+ }
+ }
+ }
+ }
+
store.block(&hash);
store.remove_paste(&hash);
store.unpin(&hash).ok();
diff --git a/src/paste.rs b/src/paste.rs
index 8bfe979..6567b9d 100644
--- a/src/paste.rs
+++ b/src/paste.rs
@@ -8,12 +8,25 @@
use tesseras_dht::sha2::{Digest, Sha256};
-/// Maximum paste size: 64 KiB.
-pub const MAX_PASTE_SIZE: usize = 64 * 1024;
-
-/// Current format version.
+/// Maximum paste size: 1.44 MB (floppy disk).
+pub const MAX_PASTE_SIZE: usize = 1_440 * 1024;
+
+/// Chunk size for large pastes: 8 KiB.
+/// The DHT fragments datagrams into 896-byte pieces with a
+/// maximum of 10 fragments (~8960 bytes reassembled). After
+/// subtracting the Paste header (17 bytes) and StoreMsg overhead,
+/// 8 KiB fits comfortably within one DHT message.
+/// Pastes larger than this are split into chunks, each stored
+/// as a separate DHT value, with a version-2 manifest that
+/// lists the chunk hashes.
+pub const CHUNK_SIZE: usize = 8 * 1024;
+
+/// Current format version (single paste).
const FORMAT_VERSION: u8 = 1;
+/// Format version for chunked paste manifests.
+pub const FORMAT_VERSION_CHUNKED: u8 = 2;
+
/// Header size: version(1) + created_at(8) + ttl(8) = 17.
const HEADER_SIZE: usize = 17;
diff --git a/src/protocol.rs b/src/protocol.rs
index d45cdd8..cb90f5b 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -3,6 +3,8 @@
//! Simple line-oriented text protocol:
//! PUT <ttl_secs> <content>\n
//! PUTP <ttl_secs> <content>\n
+//! PUTC <ttl_secs> <content>\n (store chunk)
+//! PUTM <ttl_secs> <content>\n (store manifest)
//! GET <key>\n
//! DEL <key>\n
//! PIN <key>\n
@@ -21,6 +23,14 @@ pub enum Request {
content_b58: String,
encrypt: bool,
},
+ PutChunk {
+ ttl_secs: u64,
+ content_b58: String,
+ },
+ PutManifest {
+ ttl_secs: u64,
+ content_b58: String,
+ },
Get {
key: String,
},
@@ -67,6 +77,30 @@ pub fn parse_request(line: &str) -> Result<Request, String> {
encrypt: cmd == "PUT",
})
}
+ "PUTC" => {
+ let ttl_str =
+ parts.next().ok_or("PUTC requires: PUTC <ttl> <data>")?;
+ let content_b58 =
+ parts.next().ok_or("PUTC requires: PUTC <ttl> <data>")?;
+ let ttl_secs: u64 =
+ ttl_str.parse().map_err(|_| "invalid TTL number")?;
+ Ok(Request::PutChunk {
+ ttl_secs,
+ content_b58: content_b58.to_string(),
+ })
+ }
+ "PUTM" => {
+ let ttl_str =
+ parts.next().ok_or("PUTM requires: PUTM <ttl> <data>")?;
+ let content_b58 =
+ parts.next().ok_or("PUTM requires: PUTM <ttl> <data>")?;
+ let ttl_secs: u64 =
+ ttl_str.parse().map_err(|_| "invalid TTL number")?;
+ Ok(Request::PutManifest {
+ ttl_secs,
+ content_b58: content_b58.to_string(),
+ })
+ }
"GET" => {
let key = parts.next().ok_or("GET requires: GET <key>")?;
Ok(Request::Get {
diff --git a/src/store.rs b/src/store.rs
index 2e4f53a..75e932f 100644
--- a/src/store.rs
+++ b/src/store.rs
@@ -2,6 +2,7 @@
//!
//! Simple directory layout:
//! <root>/pastes/<hash>.bin
+//! <root>/chunks/<hash>.bin
//! <root>/pins/<hash>
//! <root>/blocked/<hash>
//! <root>/contacts.bin
@@ -21,9 +22,10 @@ pub struct PasteStore {
impl PasteStore {
/// Open or create a store rooted at the given directory.
- /// Creates `pastes/`, `pins/`, and `blocked/` subdirectories.
+ /// Creates `pastes/`, `chunks/`, `pins/`, and `blocked/` subdirectories.
pub fn open(root: &Path) -> std::io::Result<Self> {
fs::create_dir_all(root.join("pastes"))?;
+ fs::create_dir_all(root.join("chunks"))?;
fs::create_dir_all(root.join("pins"))?;
fs::create_dir_all(root.join("blocked"))?;
Ok(PasteStore {
@@ -35,6 +37,10 @@ impl PasteStore {
self.root.join("pastes").join(base58::encode(key))
}
+ fn chunk_path(&self, key: &[u8]) -> PathBuf {
+ self.root.join("chunks").join(base58::encode(key))
+ }
+
fn pin_path(&self, key: &[u8]) -> PathBuf {
self.root.join("pins").join(base58::encode(key))
}
@@ -82,6 +88,65 @@ impl PasteStore {
let _ = fs::remove_file(self.paste_path(key));
}
+ // ── Chunk CRUD ─────────────────────────────────
+
+ /// Write a chunk to the chunks directory.
+ pub fn put_chunk(&self, key: &[u8], value: &[u8]) -> std::io::Result<()> {
+ let path = self.chunk_path(key);
+ atomic_write(&path, &[key, value])
+ }
+
+ /// Read a chunk from the chunks directory.
+ pub fn get_chunk(&self, key: &[u8]) -> Option<Vec<u8>> {
+ if self.is_blocked(key) {
+ return None;
+ }
+ let path = self.chunk_path(key);
+ let data = fs::read(&path).ok()?;
+ if data.len() < 32 {
+ return None;
+ }
+ Some(data[32..].to_vec())
+ }
+
+ /// Delete a chunk file from disk (no-op if absent).
+ pub fn remove_chunk(&self, key: &[u8]) {
+ let _ = fs::remove_file(self.chunk_path(key));
+ }
+
+ /// List all non-blocked chunk keys.
+ pub fn chunk_keys(&self) -> Vec<Vec<u8>> {
+ let dir = self.root.join("chunks");
+ let entries = match fs::read_dir(&dir) {
+ Ok(e) => e,
+ Err(_) => return Vec::new(),
+ };
+
+ let mut keys = Vec::new();
+ for entry in entries.flatten() {
+ let data = match fs::read(entry.path()) {
+ Ok(d) => d,
+ Err(_) => continue,
+ };
+ if data.len() < 32 {
+ continue;
+ }
+ let key = &data[..32];
+ if self.is_blocked(key) {
+ continue;
+ }
+ // Check expiry on the chunk paste
+ let value = &data[32..];
+ if let Some(paste) = Paste::from_bytes(value)
+ && paste.is_expired()
+ {
+ continue;
+ }
+ keys.push(key.to_vec());
+ }
+ keys
+ }
+
/// List all non-expired, non-blocked paste keys.
pub fn original_keys(&self) -> Vec<Vec<u8>> {
let dir = self.root.join("pastes");
@@ -145,41 +210,51 @@ impl PasteStore {
// ── GC ──────────────────────────────────────────
- /// Remove expired pastes from disk. Pinned pastes are kept.
+ /// Remove expired pastes and chunks from disk. Pinned pastes are kept.
pub fn gc(&self) -> std::io::Result<usize> {
- let dir = self.root.join("pastes");
- let entries = fs::read_dir(&dir)?;
let mut removed = 0;
-
- for entry in entries.flatten() {
- let data = match fs::read(entry.path()) {
- Ok(d) => d,
+ for subdir in &["pastes", "chunks"] {
+ let dir = self.root.join(subdir);
+ let entries = match fs::read_dir(&dir) {
+ Ok(e) => e,
Err(_) => continue,
};
- if data.len() < 32 {
- continue;
- }
- let key = &data[..32];
- let value = &data[32..];
-
- if self.is_pinned(key) {
- continue;
- }
- if let Some(paste) = Paste::from_bytes(value)
- && paste.is_expired()
- {
- let _ = fs::remove_file(entry.path());
- removed += 1;
+ for entry in entries.flatten() {
+ let data = match fs::read(entry.path()) {
+ Ok(d) => d,
+ Err(_) => continue,
+ };
+ if data.len() < 32 {
+ continue;
+ }
+ let key = &data[..32];
+ let value = &data[32..];
+
+ if self.is_pinned(key) {
+ continue;
+ }
+ if let Some(paste) = Paste::from_bytes(value)
+ && paste.is_expired()
+ {
+ let _ = fs::remove_file(entry.path());
+ removed += 1;
+ }
}
}
Ok(removed)
}
- /// Count stored pastes.
+ /// Count stored pastes (excludes chunks).
pub fn paste_count(&self) -> usize {
let dir = self.root.join("pastes");
fs::read_dir(&dir).map(|e| e.count()).unwrap_or(0)
}
+
+ /// Count stored chunks.
+ pub fn chunk_count(&self) -> usize {
+ let dir = self.root.join("chunks");
+ fs::read_dir(&dir).map(|e| e.count()).unwrap_or(0)
+ }
}
/// Write data to `path` atomically: write to a temporary file in