aboutsummaryrefslogtreecommitdiffstats
path: root/src/daemon.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/daemon.rs')
-rw-r--r--src/daemon.rs186
1 files changed, 162 insertions, 24 deletions
diff --git a/src/daemon.rs b/src/daemon.rs
index 8952d78..62e0977 100644
--- a/src/daemon.rs
+++ b/src/daemon.rs
@@ -24,8 +24,10 @@ use crate::store::PasteStore;
/// How often to garbage-collect expired pastes (10 min).
const GC_INTERVAL: Duration = Duration::from_secs(600);
-/// How often to republish local pastes to the DHT (30 min).
-const REPUBLISH_INTERVAL: Duration = Duration::from_secs(1800);
+/// How often to republish local pastes to the DHT (5 min).
+/// Kept short so chunked pastes reach peers promptly;
+/// per-put throttling prevents burst bans.
+const REPUBLISH_INTERVAL: Duration = Duration::from_secs(300);
/// How often to persist routing table and state (5 min).
const SAVE_INTERVAL: Duration = Duration::from_secs(300);
@@ -297,6 +299,32 @@ fn handle_request(
Err(e) => Response::Err(e.to_string()),
}
}
+ Request::PutChunk {
+ ttl_secs,
+ content_b58,
+ } => {
+ let content = match base58::decode(&content_b58) {
+ Some(c) => c,
+ None => return Response::Err("invalid base58 content".into()),
+ };
+ match ops::put_chunk(store, &content, ttl_secs) {
+ Ok(hash) => Response::Ok(hash),
+ Err(e) => Response::Err(e.to_string()),
+ }
+ }
+ Request::PutManifest {
+ ttl_secs,
+ content_b58,
+ } => {
+ let content = match base58::decode(&content_b58) {
+ Some(c) => c,
+ None => return Response::Err("invalid base58 content".into()),
+ };
+ match ops::put_manifest(node, store, &content, ttl_secs) {
+ Ok(hash) => Response::Ok(hash),
+ Err(e) => Response::Err(e.to_string()),
+ }
+ }
Request::Get { key } => match ops::get_paste(node, store, &key) {
Ok(data) => Response::Ok(base58::encode(&data)),
Err(e) => Response::Err(e.to_string()),
@@ -328,11 +356,12 @@ fn handle_request(
Request::Status => {
let m = node.metrics();
let status = format!(
- "peers={} stored={} pastes={} \
+ "peers={} stored={} pastes={} chunks={} \
sent={} recv={} lookups={}/{}",
node.routing_table_size(),
node.storage_count(),
store.paste_count(),
+ store.chunk_count(),
m.messages_sent,
m.messages_received,
m.lookups_started,
@@ -346,7 +375,35 @@ fn handle_request(
/// Copy DHT-replicated values into the local file store so
/// the HTTP server can serve them without a DHT lookup.
+/// Values already in pastes/ or chunks/ are skipped.
+/// Values referenced by a local manifest are stored as chunks.
fn sync_dht_to_store(node: &Node, store: &PasteStore) {
+ // Collect chunk hashes from all known manifests so we
+ // can route incoming DHT values to the right directory.
+ let mut known_chunks = std::collections::HashSet::new();
+ for key in store.original_keys() {
+ if let Some(data) = store.get_paste(&key) {
+ if data.first() == Some(&crate::paste::FORMAT_VERSION_CHUNKED) {
+ if let Some(paste) = Paste::from_bytes(&data) {
+ let content = &paste.content;
+ if content.len() >= 2 {
+ let count =
+ u16::from_be_bytes([content[0], content[1]])
+ as usize;
+ for i in 0..count {
+ let start = 2 + i * 32;
+ if start + 32 <= content.len() {
+ known_chunks.insert(
+ content[start..start + 32].to_vec(),
+ );
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
for (key, value) in node.dht_values() {
if key.len() != 32 {
continue;
@@ -354,23 +411,37 @@ fn sync_dht_to_store(node: &Node, store: &PasteStore) {
if store.is_blocked(&key) {
continue;
}
- if store.get_paste(&key).is_none() {
+ if store.get_paste(&key).is_some() || store.get_chunk(&key).is_some() {
+ continue;
+ }
+ if known_chunks.contains(key.as_slice()) {
+ let _ = store.put_chunk(&key, &value);
+ } else {
let _ = store.put_paste(&key, &value);
}
}
}
-/// Re-announce locally stored pastes to the DHT so they
-/// remain reachable as nodes join and leave the network.
+/// Delay between consecutive DHT store operations during
+/// republish to avoid triggering rate-limit bans on peers.
+const REPUBLISH_THROTTLE: Duration = Duration::from_millis(200);
+
+/// Re-announce locally stored pastes and chunks to the DHT so
+/// they remain reachable as nodes join and leave the network.
+/// Inserts a small delay between puts to avoid bursting.
fn republish(node: &mut Node, store: &PasteStore) {
- let keys = store.original_keys();
- if keys.is_empty() {
+ let mut paste_keys = store.original_keys();
+ let chunk_keys = store.chunk_keys();
+ paste_keys.extend(chunk_keys);
+
+ if paste_keys.is_empty() {
return;
}
let mut count = 0u32;
- for key in &keys {
- if let Some(data) = store.get_paste(key)
+ for key in &paste_keys {
+ let data = store.get_paste(key).or_else(|| store.get_chunk(key));
+ if let Some(data) = data
&& let Some(paste) = Paste::from_bytes(&data)
{
let remaining = if store.is_pinned(key) {
@@ -386,6 +457,10 @@ fn republish(node: &mut Node, store: &PasteStore) {
let rem = expires.saturating_sub(now);
std::cmp::min(rem, u16::MAX as u64) as u16
};
+ if count > 0 {
+ std::thread::sleep(REPUBLISH_THROTTLE);
+ let _ = node.poll_timeout(Duration::from_millis(1));
+ }
node.put(key, &data, remaining, false);
count += 1;
}
@@ -445,9 +520,10 @@ pub fn run_unix_listener(
let _ = std::fs::remove_file(sock_path);
}
-/// Maximum protocol line size (128 KiB covers the 64 KiB paste
-/// limit after base58 expansion plus command overhead).
-const MAX_LINE_SIZE: usize = 128 * 1024;
+/// Maximum protocol line size. With client-side chunking,
+/// the largest request is a PUTC with an 8 KiB chunk
+/// (~11 KiB base58-encoded) plus command overhead.
+const MAX_LINE_SIZE: usize = 16 * 1024;
/// Read requests line-by-line from a connected Unix socket
/// client, forwarding each to the daemon main loop via `tx`.
@@ -657,18 +733,34 @@ fn handle_http(
None => hash_b58.to_string(),
};
- // Try local store first (fast path)
+ // Try local store first (fast path).
let body = if let Some(data) = store.get_paste(&hash) {
- match serve_paste_data(&data, enc_key_b58) {
- Ok(b) => b,
- Err((status, msg)) => {
- http_response(
- &mut stream,
- status,
- "text/plain",
- msg.as_bytes(),
- );
- return;
+ if data.first() == Some(&crate::paste::FORMAT_VERSION_CHUNKED) {
+ // Chunked manifest — reassemble directly from store.
+ match serve_chunked_paste(&data, store, enc_key_b58) {
+ Ok(b) => b,
+ Err((status, msg)) => {
+ http_response(
+ &mut stream,
+ status,
+ "text/plain",
+ msg.as_bytes(),
+ );
+ return;
+ }
+ }
+ } else {
+ match serve_paste_data(&data, enc_key_b58) {
+ Ok(b) => b,
+ Err((status, msg)) => {
+ http_response(
+ &mut stream,
+ status,
+ "text/plain",
+ msg.as_bytes(),
+ );
+ return;
+ }
}
}
} else {
@@ -712,6 +804,52 @@ fn serve_paste_data(
}
}
+/// Reassemble a chunked paste directly from the local store.
+/// Reads the manifest, fetches each chunk from chunks/ or
+/// pastes/, concatenates, and optionally decrypts.
+fn serve_chunked_paste(
+ manifest_data: &[u8],
+ store: &PasteStore,
+ enc_key_b58: Option<&str>,
+) -> Result<Vec<u8>, (u16, &'static str)> {
+ let paste = Paste::from_bytes(manifest_data).ok_or((500, "corrupt manifest"))?;
+ let content = &paste.content;
+
+ if content.len() < 2 {
+ return Err((500, "corrupt manifest"));
+ }
+ let count = u16::from_be_bytes([content[0], content[1]]) as usize;
+ if content.len() != 2 + count * 32 {
+ return Err((500, "corrupt manifest"));
+ }
+
+ let mut assembled = Vec::new();
+ for i in 0..count {
+ let start = 2 + i * 32;
+ let chunk_hash = &content[start..start + 32];
+ let chunk_data = store
+ .get_chunk(chunk_hash)
+ .or_else(|| store.get_paste(chunk_hash))
+ .ok_or((404, "missing chunk"))?;
+ let chunk_paste =
+ Paste::from_bytes(&chunk_data).ok_or((500, "corrupt chunk"))?;
+ assembled.extend_from_slice(&chunk_paste.content);
+ }
+
+ if let Some(kb58) = enc_key_b58 {
+ let key_bytes = base58::decode(kb58).ok_or((400, "invalid enc key"))?;
+ if key_bytes.len() != 32 {
+ return Err((400, "invalid enc key"));
+ }
+ let mut key = [0u8; 32];
+ key.copy_from_slice(&key_bytes);
+ crate::crypto::decrypt(&key, &assembled)
+ .ok_or((403, "decryption failed"))
+ } else {
+ Ok(assembled)
+ }
+}
+
/// Ask the daemon for a paste via Unix socket (triggers
/// a DHT lookup if not in local store).
/// Key format: "hash" or "hash#enckey".