diff options
Diffstat (limited to 'src/daemon.rs')
| -rw-r--r-- | src/daemon.rs | 186 |
1 files changed, 162 insertions, 24 deletions
diff --git a/src/daemon.rs b/src/daemon.rs index 8952d78..62e0977 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -24,8 +24,10 @@ use crate::store::PasteStore; /// How often to garbage-collect expired pastes (10 min). const GC_INTERVAL: Duration = Duration::from_secs(600); -/// How often to republish local pastes to the DHT (30 min). -const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800); +/// How often to republish local pastes to the DHT (5 min). +/// Kept short so chunked pastes reach peers promptly; +/// per-put throttling prevents burst bans. +const REPUBLISH_INTERVAL: Duration = Duration::from_secs(300); /// How often to persist routing table and state (5 min). const SAVE_INTERVAL: Duration = Duration::from_secs(300); @@ -297,6 +299,32 @@ fn handle_request( Err(e) => Response::Err(e.to_string()), } } + Request::PutChunk { + ttl_secs, + content_b58, + } => { + let content = match base58::decode(&content_b58) { + Some(c) => c, + None => return Response::Err("invalid base58 content".into()), + }; + match ops::put_chunk(store, &content, ttl_secs) { + Ok(hash) => Response::Ok(hash), + Err(e) => Response::Err(e.to_string()), + } + } + Request::PutManifest { + ttl_secs, + content_b58, + } => { + let content = match base58::decode(&content_b58) { + Some(c) => c, + None => return Response::Err("invalid base58 content".into()), + }; + match ops::put_manifest(node, store, &content, ttl_secs) { + Ok(hash) => Response::Ok(hash), + Err(e) => Response::Err(e.to_string()), + } + } Request::Get { key } => match ops::get_paste(node, store, &key) { Ok(data) => Response::Ok(base58::encode(&data)), Err(e) => Response::Err(e.to_string()), @@ -328,11 +356,12 @@ fn handle_request( Request::Status => { let m = node.metrics(); let status = format!( - "peers={} stored={} pastes={} \ + "peers={} stored={} pastes={} chunks={} \ sent={} recv={} lookups={}/{}", node.routing_table_size(), node.storage_count(), store.paste_count(), + store.chunk_count(), m.messages_sent, m.messages_received, m.lookups_started, @@ -346,7 +375,35 @@ fn handle_request( /// Copy DHT-replicated values into the local file store so /// the HTTP server can serve them without a DHT lookup. +/// Values already in pastes/ or chunks/ are skipped. +/// Values referenced by a local manifest are stored as chunks. fn sync_dht_to_store(node: &Node, store: &PasteStore) { + // Collect chunk hashes from all known manifests so we + // can route incoming DHT values to the right directory. + let mut known_chunks = std::collections::HashSet::new(); + for key in store.original_keys() { + if let Some(data) = store.get_paste(&key) { + if data.first() == Some(&crate::paste::FORMAT_VERSION_CHUNKED) { + if let Some(paste) = Paste::from_bytes(&data) { + let content = &paste.content; + if content.len() >= 2 { + let count = + u16::from_be_bytes([content[0], content[1]]) + as usize; + for i in 0..count { + let start = 2 + i * 32; + if start + 32 <= content.len() { + known_chunks.insert( + content[start..start + 32].to_vec(), + ); + } + } + } + } + } + } + } + for (key, value) in node.dht_values() { if key.len() != 32 { continue; @@ -354,23 +411,37 @@ fn sync_dht_to_store(node: &Node, store: &PasteStore) { if store.is_blocked(&key) { continue; } - if store.get_paste(&key).is_none() { + if store.get_paste(&key).is_some() || store.get_chunk(&key).is_some() { + continue; + } + if known_chunks.contains(key.as_slice()) { + let _ = store.put_chunk(&key, &value); + } else { let _ = store.put_paste(&key, &value); } } } -/// Re-announce locally stored pastes to the DHT so they -/// remain reachable as nodes join and leave the network. +/// Delay between consecutive DHT store operations during +/// republish to avoid triggering rate-limit bans on peers. +const REPUBLISH_THROTTLE: Duration = Duration::from_millis(200); + +/// Re-announce locally stored pastes and chunks to the DHT so +/// they remain reachable as nodes join and leave the network. +/// Inserts a small delay between puts to avoid bursting. fn republish(node: &mut Node, store: &PasteStore) { - let keys = store.original_keys(); - if keys.is_empty() { + let mut paste_keys = store.original_keys(); + let chunk_keys = store.chunk_keys(); + paste_keys.extend(chunk_keys); + + if paste_keys.is_empty() { return; } let mut count = 0u32; - for key in &keys { - if let Some(data) = store.get_paste(key) + for key in &paste_keys { + let data = store.get_paste(key).or_else(|| store.get_chunk(key)); + if let Some(data) = data && let Some(paste) = Paste::from_bytes(&data) { let remaining = if store.is_pinned(key) { @@ -386,6 +457,10 @@ fn republish(node: &mut Node, store: &PasteStore) { let rem = expires.saturating_sub(now); std::cmp::min(rem, u16::MAX as u64) as u16 }; + if count > 0 { + std::thread::sleep(REPUBLISH_THROTTLE); + let _ = node.poll_timeout(Duration::from_millis(1)); + } node.put(key, &data, remaining, false); count += 1; } @@ -445,9 +520,10 @@ pub fn run_unix_listener( let _ = std::fs::remove_file(sock_path); } -/// Maximum protocol line size (128 KiB covers the 64 KiB paste -/// limit after base58 expansion plus command overhead). -const MAX_LINE_SIZE: usize = 128 * 1024; +/// Maximum protocol line size. With client-side chunking, +/// the largest request is a PUTC with an 8 KiB chunk +/// (~11 KiB base58-encoded) plus command overhead. +const MAX_LINE_SIZE: usize = 16 * 1024; /// Read requests line-by-line from a connected Unix socket /// client, forwarding each to the daemon main loop via `tx`. @@ -657,18 +733,34 @@ fn handle_http( None => hash_b58.to_string(), }; - // Try local store first (fast path) + // Try local store first (fast path). let body = if let Some(data) = store.get_paste(&hash) { - match serve_paste_data(&data, enc_key_b58) { - Ok(b) => b, - Err((status, msg)) => { - http_response( - &mut stream, - status, - "text/plain", - msg.as_bytes(), - ); - return; + if data.first() == Some(&crate::paste::FORMAT_VERSION_CHUNKED) { + // Chunked manifest — reassemble directly from store. + match serve_chunked_paste(&data, store, enc_key_b58) { + Ok(b) => b, + Err((status, msg)) => { + http_response( + &mut stream, + status, + "text/plain", + msg.as_bytes(), + ); + return; + } + } + } else { + match serve_paste_data(&data, enc_key_b58) { + Ok(b) => b, + Err((status, msg)) => { + http_response( + &mut stream, + status, + "text/plain", + msg.as_bytes(), + ); + return; + } } } } else { @@ -712,6 +804,52 @@ fn serve_paste_data( } } +/// Reassemble a chunked paste directly from the local store. +/// Reads the manifest, fetches each chunk from chunks/ or +/// pastes/, concatenates, and optionally decrypts. +fn serve_chunked_paste( + manifest_data: &[u8], + store: &PasteStore, + enc_key_b58: Option<&str>, +) -> Result<Vec<u8>, (u16, &'static str)> { + let paste = Paste::from_bytes(manifest_data).ok_or((500, "corrupt manifest"))?; + let content = &paste.content; + + if content.len() < 2 { + return Err((500, "corrupt manifest")); + } + let count = u16::from_be_bytes([content[0], content[1]]) as usize; + if content.len() != 2 + count * 32 { + return Err((500, "corrupt manifest")); + } + + let mut assembled = Vec::new(); + for i in 0..count { + let start = 2 + i * 32; + let chunk_hash = &content[start..start + 32]; + let chunk_data = store + .get_chunk(chunk_hash) + .or_else(|| store.get_paste(chunk_hash)) + .ok_or((404, "missing chunk"))?; + let chunk_paste = + Paste::from_bytes(&chunk_data).ok_or((500, "corrupt chunk"))?; + assembled.extend_from_slice(&chunk_paste.content); + } + + if let Some(kb58) = enc_key_b58 { + let key_bytes = base58::decode(kb58).ok_or((400, "invalid enc key"))?; + if key_bytes.len() != 32 { + return Err((400, "invalid enc key")); + } + let mut key = [0u8; 32]; + key.copy_from_slice(&key_bytes); + crate::crypto::decrypt(&key, &assembled) + .ok_or((403, "decryption failed")) + } else { + Ok(assembled) + } +} + /// Ask the daemon for a paste via Unix socket (triggers /// a DHT lookup if not in local store). /// Key format: "hash" or "hash#enckey". |