diff options
| author | murilo ijanc | 2026-03-24 15:04:03 -0300 |
|---|---|---|
| committer | murilo ijanc | 2026-03-24 15:04:03 -0300 |
| commit | 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch) | |
| tree | 53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/rdp.rs | |
| download | tesseras-dht-9821aabf0b50d2487b07502d3d2cd89e7d62bdbe.tar.gz | |
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks.
Features:
- Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE)
- NAT traversal via DTUN hole-punching and proxy relay
- Reliable Datagram Protocol (RDP) with 7-state connection machine
- Datagram transport with automatic fragmentation/reassembly
- Ed25519 packet authentication
- 256-bit node IDs (Ed25519 public keys)
- Rate limiting, ban list, and eclipse attack mitigation
- Persistence and metrics
- OpenBSD and Linux support
Diffstat (limited to 'src/rdp.rs')
| -rw-r--r-- | src/rdp.rs | 1343 |
1 files changed, 1343 insertions, 0 deletions
diff --git a/src/rdp.rs b/src/rdp.rs new file mode 100644 index 0000000..96de51d --- /dev/null +++ b/src/rdp.rs @@ -0,0 +1,1343 @@ +//! Reliable Datagram Protocol (RDP). +//! +//! Provides TCP-like +//! reliable, ordered delivery over UDP with: +//! +//! - 7-state connection machine +//! - 3-way handshake (SYN / SYN-ACK / ACK) +//! - Sliding send and receive windows +//! - Cumulative ACK + Extended ACK (EACK/SACK) +//! - Delayed ACK (300ms) +//! - Retransmission (300ms timer) +//! - FIN-based graceful close +//! - RST for abrupt termination +//! +//! **No congestion control**. + +use std::collections::{HashMap, VecDeque}; +use std::time::{Duration, Instant}; + +use crate::id::NodeId; + +// ── Constants ──────────────────────────────────────── + +pub const RDP_FLAG_SYN: u8 = 0x80; +pub const RDP_FLAG_ACK: u8 = 0x40; +pub const RDP_FLAG_EAK: u8 = 0x20; +pub const RDP_FLAG_RST: u8 = 0x10; +pub const RDP_FLAG_NUL: u8 = 0x08; +pub const RDP_FLAG_FIN: u8 = 0x04; + +pub const RDP_RBUF_MAX_DEFAULT: u32 = 884; +pub const RDP_RCV_MAX_DEFAULT: u32 = 1024; +pub const RDP_WELL_KNOWN_PORT_MAX: u16 = 1024; +pub const RDP_SBUF_LIMIT: u16 = 884; +pub const RDP_TIMER_INTERVAL: Duration = Duration::from_millis(300); +pub const RDP_ACK_INTERVAL: Duration = Duration::from_millis(300); +pub const RDP_DEFAULT_MAX_RETRANS: Duration = Duration::from_secs(30); + +/// Generate a random initial sequence number to prevent +/// sequence prediction attacks. +fn random_isn() -> u32 { + let mut buf = [0u8; 4]; + crate::sys::random_bytes(&mut buf); + u32::from_ne_bytes(buf) +} + +/// RDP packet header (20 bytes on the wire). +pub const RDP_HEADER_SIZE: usize = 20; + +// ── Connection state ──────────────────────────────── + +/// RDP connection state (7 states). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RdpState { + Closed, + Listen, + SynSent, + SynRcvd, + Open, + CloseWaitPassive, + CloseWaitActive, +} + +/// Events emitted to the application. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RdpEvent { + /// Server accepted a new connection. + Accepted, + + /// Client connected successfully. + Connected, + + /// Connection refused by peer. + Refused, + + /// Connection reset by peer. + Reset, + + /// Connection failed (timeout). + Failed, + + /// Data available to read. + Ready2Read, + + /// Pipe broken (peer vanished). + Broken, +} + +/// RDP connection address. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RdpAddr { + pub did: NodeId, + pub dport: u16, + pub sport: u16, +} + +/// Status of a single RDP connection. +#[derive(Debug, Clone)] +pub struct RdpStatus { + pub state: RdpState, + pub did: NodeId, + pub dport: u16, + pub sport: u16, +} + +// ── Segment ───────────────────────────────────────── + +/// A segment in the send window. +#[derive(Debug, Clone)] +struct SendSegment { + data: Vec<u8>, + seqnum: u32, + sent_time: Option<Instant>, + is_sent: bool, + is_acked: bool, + + /// Retransmission timeout (doubles on each retry). + rt_secs: u64, +} + +/// A segment in the receive window. +#[derive(Debug, Clone)] +struct RecvSegment { + data: Vec<u8>, + seqnum: u32, + is_used: bool, + is_eacked: bool, +} + +// ── Connection ────────────────────────────────────── + +/// A single RDP connection. +struct RdpConnection { + addr: RdpAddr, + desc: i32, + state: RdpState, + + /// Server (passive) or client (active) side. + is_passive: bool, + + /// Connection has been closed locally. + is_closed: bool, + + /// Max segment size we can send (from peer's SYN). + sbuf_max: u32, + + /// Max segment size we can receive (our buffer). + rbuf_max: u32, + + // Send sequence variables + snd_nxt: u32, + snd_una: u32, + snd_max: u32, + snd_iss: u32, + + // Receive sequence variables + rcv_cur: u32, + + /// Max segments we can buffer. + rcv_max: u32, + rcv_irs: u32, + + /// Last sequence number we ACK'd. + rcv_ack: u32, + + // Windows + send_window: VecDeque<SendSegment>, + recv_window: Vec<Option<RecvSegment>>, + read_queue: VecDeque<Vec<u8>>, + + // Timing + last_ack_time: Instant, + syn_time: Option<Instant>, + close_time: Option<Instant>, + + /// SYN retry timeout (doubles on retry). + syn_rt_secs: u64, + + // ── RTT estimation (Jacobson/Karels) ──────── + /// Smoothed RTT estimate (microseconds). + srtt_us: u64, + + /// RTT variation (microseconds). + rttvar_us: u64, + + /// Retransmission timeout (microseconds). + rto_us: u64, + + // ── Congestion control (AIMD) ─────────────── + /// Congestion window (segments allowed in flight). + cwnd: u32, + + /// Slow-start threshold. + ssthresh: u32, + + /// RST retry state. + rst_time: Option<Instant>, + rst_rt_secs: u64, + is_retry_rst: bool, + + // Out-of-order tracking for EACK + rcvd_seqno: Vec<u32>, +} + +impl RdpConnection { + fn new(desc: i32, addr: RdpAddr, is_passive: bool) -> Self { + Self { + addr, + desc, + state: RdpState::Closed, + is_passive, + is_closed: false, + sbuf_max: RDP_SBUF_LIMIT as u32, + rbuf_max: RDP_RBUF_MAX_DEFAULT, + snd_nxt: 0, + snd_una: 0, + snd_max: RDP_RCV_MAX_DEFAULT, + snd_iss: 0, + rcv_cur: 0, + rcv_max: RDP_RCV_MAX_DEFAULT, + rcv_irs: 0, + rcv_ack: 0, + send_window: VecDeque::new(), + recv_window: Vec::new(), + read_queue: VecDeque::new(), + last_ack_time: Instant::now(), + syn_time: None, + close_time: None, + syn_rt_secs: 1, + + // Jacobson/Karels: initial RTO = 1s + srtt_us: 0, + rttvar_us: 500_000, // 500ms initial variance + rto_us: 1_000_000, // 1s initial RTO + + // AIMD congestion control + cwnd: 1, // start with 1 segment + ssthresh: RDP_RCV_MAX_DEFAULT, + rst_time: None, + rst_rt_secs: 1, + is_retry_rst: false, + rcvd_seqno: Vec::new(), + } + } + + /// Enqueue data for sending. + fn enqueue_send(&mut self, data: &[u8]) -> bool { + if self.is_closed { + return false; + } + if data.len() > self.sbuf_max as usize { + return false; + } + if self.send_window.len() >= self.snd_max as usize { + return false; + } + let seg = SendSegment { + data: data.to_vec(), + seqnum: self.snd_nxt, + sent_time: None, + is_sent: false, + is_acked: false, + rt_secs: 1, + }; + self.snd_nxt = self.snd_nxt.wrapping_add(1); + self.send_window.push_back(seg); + true + } + + /// Process a cumulative ACK. + fn recv_ack(&mut self, acknum: u32) { + while let Some(front) = self.send_window.front() { + if front.seqnum == acknum { + break; + } + + // Sequence numbers before acknum are acked + if is_before(front.seqnum, acknum) { + let seg = self.send_window.pop_front().unwrap(); + self.snd_una = self.snd_una.wrapping_add(1); + self.on_ack_received(); + + // Measure RTT from first-sent (non-retransmitted) + if let Some(sent) = seg.sent_time { + if seg.rt_secs <= 1 { + // Only use first transmission for RTT + let rtt_us = sent.elapsed().as_micros() as u64; + self.update_rtt(rtt_us); + } + } + } else { + break; + } + } + } + + /// Process an Extended ACK (EACK) for out-of-order + /// segments. + fn recv_eack(&mut self, eack_seqnum: u32) { + for seg in self.send_window.iter_mut() { + if seg.seqnum == eack_seqnum { + seg.is_acked = true; + break; + } + } + } + + /// Deliver in-order data from the receive window to + /// the read queue. + fn deliver_to_read_queue(&mut self) { + // Count contiguous in-order segments + let mut count = 0; + for slot in &self.recv_window { + match slot { + Some(seg) if seg.is_used => count += 1, + _ => break, + } + } + // Drain them all at once (O(n) instead of O(n²)) + if count > 0 { + for seg in self.recv_window.drain(..count).flatten() { + self.rcv_cur = seg.seqnum; + self.read_queue.push_back(seg.data); + } + } + } + + /// Maximum out-of-order gap before dropping. + /// Prevents memory exhaustion from malicious + /// high-seqnum packets. + const MAX_OOO_GAP: usize = 256; + + /// Place a received segment into the receive window. + fn recv_data(&mut self, seqnum: u32, data: Vec<u8>) { + let expected = self.rcv_cur.wrapping_add(1); + + if seqnum == expected { + // In-order: deliver directly + self.read_queue.push_back(data); + self.rcv_cur = seqnum; + self.deliver_to_read_queue(); + // Clean up rcvd_seqno for delivered segments + self.rcvd_seqno.retain(|&s| is_before(seqnum, s)); + } else if is_before(expected, seqnum) { + // Out-of-order: check gap before allocating + let offset = seqnum.wrapping_sub(expected) as usize; + + // Reject if gap too large (DoS protection) + if offset > Self::MAX_OOO_GAP { + log::debug!("RDP: dropping packet with gap {offset}"); + return; + } + + // Check total recv window capacity + if self.recv_window.len() >= self.rcv_max as usize { + return; + } + + while self.recv_window.len() <= offset { + self.recv_window.push(None); + } + self.recv_window[offset] = Some(RecvSegment { + data, + seqnum, + is_used: true, + is_eacked: false, + }); + // Use bounded set (cap at MAX_OOO_GAP) + if self.rcvd_seqno.len() < Self::MAX_OOO_GAP + && !self.rcvd_seqno.contains(&seqnum) + { + self.rcvd_seqno.push(seqnum); + } + } + + // else: duplicate, ignore + } + + /// Whether a delayed ACK should be sent. + /// Update RTT estimate using Jacobson/Karels algorithm. + /// + /// Called when we receive an ACK for a segment + /// whose `sent_time` we know. + fn update_rtt(&mut self, sample_us: u64) { + if self.srtt_us == 0 { + // First measurement + self.srtt_us = sample_us; + self.rttvar_us = sample_us / 2; + } else { + // RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R| + // SRTT = (1 - alpha) * SRTT + alpha * R + // alpha = 1/8, beta = 1/4 (RFC 6298) + let diff = sample_us.abs_diff(self.srtt_us); + self.rttvar_us = (3 * self.rttvar_us + diff) / 4; + self.srtt_us = (7 * self.srtt_us + sample_us) / 8; + } + + // RTO = SRTT + max(G, 4 * RTTVAR) + // G (clock granularity) = 1ms = 1000us + let k_rttvar = 4 * self.rttvar_us; + self.rto_us = self.srtt_us + k_rttvar.max(1000); + + // Clamp: min 200ms, max 60s + self.rto_us = self.rto_us.clamp(200_000, 60_000_000); + } + + /// Handle successful ACK: update congestion window. + fn on_ack_received(&mut self) { + if self.cwnd < self.ssthresh { + // Slow start: increase by 1 per ACK + self.cwnd += 1; + } else { + // Congestion avoidance: increase by 1/cwnd + // (approx 1 segment per RTT) + self.cwnd += 1u32.max(1 / self.cwnd.max(1)); + } + } + + /// Handle packet loss: halve congestion window. + fn on_loss_detected(&mut self) { + self.ssthresh = (self.cwnd / 2).max(2); + self.cwnd = self.ssthresh; + } + + fn needs_ack(&self) -> bool { + self.rcv_cur != self.rcv_ack + && self.last_ack_time.elapsed() >= RDP_ACK_INTERVAL + } + + /// Check for segments needing retransmission. + /// + /// Returns `false` if a segment has exceeded + /// max_retrans (broken pipe). + fn retransmit( + &mut self, + max_retrans: Duration, + ) -> (bool, Vec<SendSegment>) { + let mut to_send = Vec::new(); + let now = Instant::now(); + let rto_secs = (self.rto_us / 1_000_000).max(1); + + for seg in self.send_window.iter_mut() { + if !seg.is_sent { + break; + } + if seg.is_acked { + continue; + } + + // Check if we've exceeded max retransmission + if seg.rt_secs > max_retrans.as_secs() { + self.state = RdpState::Closed; + return (false, Vec::new()); // broken pipe + } + + if let Some(sent) = seg.sent_time { + // Use adaptive RTO for first retransmit, + // then exponential backoff + let timeout = if seg.rt_secs <= 1 { + rto_secs + } else { + seg.rt_secs + }; + let elapsed = now.duration_since(sent).as_secs(); + if elapsed > timeout { + seg.sent_time = Some(now); + seg.rt_secs = timeout * 2; // backoff + to_send.push(seg.clone()); + } + } + } + + // Loss detected → halve congestion window + if !to_send.is_empty() { + self.on_loss_detected(); + } + + (true, to_send) + } +} + +/// Check if sequence `a` comes before `b` (wrapping). +fn is_before(a: u32, b: u32) -> bool { + let diff = b.wrapping_sub(a); + diff > 0 && diff < 0x80000000 +} + +// ── Deferred action (invoke protection) ───────────── + +/// Actions deferred during event processing to avoid +/// reentrance issues. +#[derive(Debug)] +pub enum RdpAction { + /// Emit an event to the application. + Event { + desc: i32, + addr: RdpAddr, + event: RdpEvent, + }, + + /// Close a connection after processing. + Close(i32), +} + +// ── RDP manager ───────────────────────────────────── + +/// RDP protocol manager. +/// Incoming RDP packet for `Rdp::input()`. +pub struct RdpInput<'a> { + pub src: NodeId, + pub sport: u16, + pub dport: u16, + pub flags: u8, + pub seqnum: u32, + pub acknum: u32, + pub data: &'a [u8], +} + +/// RDP protocol manager. +/// +/// Manages multiple connections with descriptor-based +/// API (similar to file descriptors). +pub struct Rdp { + connections: HashMap<i32, RdpConnection>, + listeners: HashMap<u16, i32>, + addr_to_desc: HashMap<RdpAddr, i32>, + next_desc: i32, + max_retrans: Duration, +} + +impl Rdp { + pub fn new() -> Self { + Self { + connections: HashMap::new(), + listeners: HashMap::new(), + addr_to_desc: HashMap::new(), + next_desc: 1, + max_retrans: RDP_DEFAULT_MAX_RETRANS, + } + } + + /// Create a listening socket on `port`. + /// + /// Returns a descriptor for the listener. + pub fn listen(&mut self, port: u16) -> Result<i32, RdpError> { + if self.listeners.contains_key(&port) { + return Err(RdpError::PortInUse(port)); + } + let desc = self.alloc_desc(); + self.listeners.insert(port, desc); + Ok(desc) + } + + /// Initiate a connection to `dst:dport` from `sport`. + /// + /// Returns a descriptor for the connection. + pub fn connect( + &mut self, + sport: u16, + dst: NodeId, + dport: u16, + ) -> Result<i32, RdpError> { + let desc = self.alloc_desc(); + let addr = RdpAddr { + did: dst, + dport, + sport, + }; + + let mut conn = RdpConnection::new(desc, addr.clone(), false); + conn.state = RdpState::SynSent; + conn.snd_iss = random_isn(); + conn.snd_nxt = conn.snd_iss.wrapping_add(1); + conn.snd_una = conn.snd_iss; + conn.syn_time = Some(Instant::now()); + + self.addr_to_desc.insert(addr, desc); + self.connections.insert(desc, conn); + Ok(desc) + } + + /// Close a connection or listener. + pub fn close(&mut self, desc: i32) { + if let Some(mut conn) = self.connections.remove(&desc) { + conn.is_closed = true; + self.addr_to_desc.remove(&conn.addr); + log::debug!("RDP: closed desc {desc}"); + } + + // Also check listeners + self.listeners.retain(|_, d| *d != desc); + } + + /// Enqueue data for sending on a connection. + pub fn send(&mut self, desc: i32, data: &[u8]) -> Result<usize, RdpError> { + let conn = self + .connections + .get_mut(&desc) + .ok_or(RdpError::BadDescriptor(desc))?; + + if conn.state != RdpState::Open { + return Err(RdpError::NotOpen(desc)); + } + + if !conn.enqueue_send(data) { + return Err(RdpError::SendBufferFull); + } + + Ok(data.len()) + } + + /// Read available data from a connection. + /// + /// Returns the number of bytes read, or 0 if no + /// data available. + pub fn recv( + &mut self, + desc: i32, + buf: &mut [u8], + ) -> Result<usize, RdpError> { + let conn = self + .connections + .get_mut(&desc) + .ok_or(RdpError::BadDescriptor(desc))?; + + if let Some(data) = conn.read_queue.pop_front() { + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + Ok(len) + } else { + Ok(0) + } + } + + /// Get the state of a descriptor. + pub fn get_state(&self, desc: i32) -> Result<RdpState, RdpError> { + self.connections + .get(&desc) + .map(|c| c.state) + .ok_or(RdpError::BadDescriptor(desc)) + } + + /// Get status of all connections. + pub fn get_status(&self) -> Vec<RdpStatus> { + self.connections + .values() + .map(|c| RdpStatus { + state: c.state, + did: c.addr.did, + dport: c.addr.dport, + sport: c.addr.sport, + }) + .collect() + } + + /// Set the maximum retransmission timeout. + pub fn set_max_retrans(&mut self, dur: Duration) { + self.max_retrans = dur; + } + + /// Get the maximum retransmission timeout. + pub fn max_retrans(&self) -> Duration { + self.max_retrans + } + + /// Process incoming RDP data from a peer. + /// + /// Returns deferred actions (events, closes) to + /// avoid reentrance during processing. + pub fn input(&mut self, pkt: &RdpInput<'_>) -> Vec<RdpAction> { + let src = pkt.src; + let sport = pkt.sport; + let dport = pkt.dport; + let flags = pkt.flags; + let seqnum = pkt.seqnum; + let acknum = pkt.acknum; + let data = pkt.data; + let addr = RdpAddr { + did: src, + dport: sport, // their sport is our dport + sport: dport, // our dport is our sport + }; + let mut actions = Vec::new(); + + if let Some(&desc) = self.addr_to_desc.get(&addr) { + // Existing connection + if let Some(conn) = self.connections.get_mut(&desc) { + Self::process_connected( + conn, + flags, + seqnum, + acknum, + data, + &mut actions, + ); + } + } else if flags & RDP_FLAG_SYN != 0 { + // New inbound SYN → check listener + if let Some(&_listen_desc) = self.listeners.get(&dport) { + let desc = self.alloc_desc(); + let mut conn = RdpConnection::new(desc, addr.clone(), true); + conn.state = RdpState::SynRcvd; + conn.rcv_irs = seqnum; + conn.rcv_cur = seqnum; + conn.snd_iss = random_isn(); + conn.snd_nxt = conn.snd_iss.wrapping_add(1); + conn.snd_una = conn.snd_iss; + + self.addr_to_desc.insert(addr.clone(), desc); + self.connections.insert(desc, conn); + + actions.push(RdpAction::Event { + desc, + addr, + event: RdpEvent::Accepted, + }); + } + + // else: no listener → RST (ignored for now) + } + + actions + } + + /// Process a packet on an existing connection. + fn process_connected( + conn: &mut RdpConnection, + flags: u8, + seqnum: u32, + acknum: u32, + data: &[u8], + actions: &mut Vec<RdpAction>, + ) { + match conn.state { + RdpState::SynSent => { + if flags & RDP_FLAG_SYN != 0 && flags & RDP_FLAG_ACK != 0 { + conn.rcv_irs = seqnum; + conn.rcv_cur = seqnum; + conn.recv_ack(acknum); + conn.state = RdpState::Open; + actions.push(RdpAction::Event { + desc: conn.desc, + addr: conn.addr.clone(), + event: RdpEvent::Connected, + }); + } else if flags & RDP_FLAG_RST != 0 { + conn.state = RdpState::Closed; + actions.push(RdpAction::Event { + desc: conn.desc, + addr: conn.addr.clone(), + event: RdpEvent::Refused, + }); + } + } + RdpState::SynRcvd => { + if flags & RDP_FLAG_ACK != 0 { + conn.recv_ack(acknum); + conn.state = RdpState::Open; + } + } + RdpState::Open => { + if flags & RDP_FLAG_RST != 0 { + conn.state = RdpState::Closed; + actions.push(RdpAction::Event { + desc: conn.desc, + addr: conn.addr.clone(), + event: RdpEvent::Reset, + }); + return; + } + if flags & RDP_FLAG_FIN != 0 { + conn.state = if conn.is_passive { + RdpState::CloseWaitPassive + } else { + RdpState::CloseWaitActive + }; + conn.close_time = Some(Instant::now()); + return; + } + if flags & RDP_FLAG_ACK != 0 { + conn.recv_ack(acknum); + } + if flags & RDP_FLAG_EAK != 0 { + conn.recv_eack(seqnum); + } + if !data.is_empty() { + conn.recv_data(seqnum, data.to_vec()); + actions.push(RdpAction::Event { + desc: conn.desc, + addr: conn.addr.clone(), + event: RdpEvent::Ready2Read, + }); + } + } + RdpState::CloseWaitActive => { + if flags & RDP_FLAG_FIN != 0 { + conn.state = RdpState::Closed; + } + } + _ => {} + } + } + + /// Periodic tick: retransmit, delayed ACK, timeouts. + /// + /// Returns actions for timed-out connections. + pub fn tick(&mut self) -> Vec<RdpAction> { + let mut actions = Vec::new(); + let mut to_close = Vec::new(); + + for (desc, conn) in self.connections.iter_mut() { + // Check SYN timeout with exponential backoff + if conn.state == RdpState::SynSent { + if let Some(t) = conn.syn_time { + let elapsed = t.elapsed().as_secs(); + if elapsed > conn.syn_rt_secs { + if conn.syn_rt_secs > self.max_retrans.as_secs() { + actions.push(RdpAction::Event { + desc: *desc, + addr: conn.addr.clone(), + event: RdpEvent::Failed, + }); + to_close.push(*desc); + } else { + // Retry SYN with backoff + conn.syn_time = Some(Instant::now()); + conn.syn_rt_secs *= 2; + } + } + } + } + + // RST retry + if conn.is_retry_rst { + if let Some(t) = conn.rst_time { + if t.elapsed().as_secs() > conn.rst_rt_secs { + conn.rst_rt_secs *= 2; + conn.rst_time = Some(Instant::now()); + if conn.rst_rt_secs > self.max_retrans.as_secs() { + conn.is_retry_rst = false; + to_close.push(*desc); + } + } + } + } + + // Check close-wait timeout + if matches!( + conn.state, + RdpState::CloseWaitPassive | RdpState::CloseWaitActive + ) { + if let Some(t) = conn.close_time { + if t.elapsed() >= self.max_retrans { + to_close.push(*desc); + } + } + } + + // Retransmit unacked segments + if conn.state == RdpState::Open { + let (alive, _retransmits) = conn.retransmit(self.max_retrans); + if !alive { + // Broken pipe — exceeded max retrans + actions.push(RdpAction::Event { + desc: *desc, + addr: conn.addr.clone(), + event: RdpEvent::Broken, + }); + to_close.push(*desc); + } + + // Note: retransmitted segments are picked + // up by pending_output() in the next flush + } + } + + for d in to_close { + self.close(d); + } + + actions + } + + /// Number of active connections. + pub fn connection_count(&self) -> usize { + self.connections.len() + } + + /// Number of active listeners. + pub fn listener_count(&self) -> usize { + self.listeners.len() + } + + /// Build outgoing packets for a connection. + /// + /// Returns `(dst_id, sport, dport, packets)` where + /// each packet is `(flags, seqnum, acknum, data)`. + /// The caller wraps these in protocol messages and + /// sends via UDP. + pub fn pending_output(&mut self, desc: i32) -> Option<PendingOutput> { + let conn = self.connections.get_mut(&desc)?; + + let mut packets = Vec::new(); + + match conn.state { + RdpState::SynSent => { + // Send SYN with receive buffer params + // (rdp_syn: out_segs_max + seg_size_max) + let mut syn_data = Vec::with_capacity(4); + syn_data + .extend_from_slice(&(conn.rcv_max as u16).to_be_bytes()); + syn_data + .extend_from_slice(&(conn.rbuf_max as u16).to_be_bytes()); + packets.push(RdpPacket { + flags: RDP_FLAG_SYN, + seqnum: conn.snd_iss, + acknum: 0, + data: syn_data, + }); + } + RdpState::SynRcvd => { + // Send SYN+ACK + packets.push(RdpPacket { + flags: RDP_FLAG_SYN | RDP_FLAG_ACK, + seqnum: conn.snd_iss, + acknum: conn.rcv_cur, + data: Vec::new(), + }); + } + RdpState::Open => { + // Send ACK if needed + if conn.needs_ack() { + packets.push(RdpPacket { + flags: RDP_FLAG_ACK, + seqnum: conn.snd_nxt, + acknum: conn.rcv_cur, + data: Vec::new(), + }); + conn.last_ack_time = Instant::now(); + conn.rcv_ack = conn.rcv_cur; + } + + // Send EACKs for out-of-order recv segments + for seg in conn.recv_window.iter_mut().flatten() { + if seg.is_used && !seg.is_eacked { + packets.push(RdpPacket { + flags: RDP_FLAG_EAK | RDP_FLAG_ACK, + seqnum: seg.seqnum, + acknum: conn.rcv_cur, + data: Vec::new(), + }); + seg.is_eacked = true; + } + } + + // Send pending data segments, limited by + // congestion window (AIMD) + let in_flight = conn + .send_window + .iter() + .filter(|s| s.is_sent && !s.is_acked) + .count() as u32; + let can_send = conn.cwnd.saturating_sub(in_flight); + + let mut sent_count = 0u32; + for seg in &conn.send_window { + if sent_count >= can_send { + break; + } + if !seg.is_sent && !seg.is_acked { + packets.push(RdpPacket { + flags: RDP_FLAG_ACK, + seqnum: seg.seqnum, + acknum: conn.rcv_cur, + data: seg.data.clone(), + }); + sent_count += 1; + } + } + + // Mark as sent + let mut marked = 0u32; + for seg in conn.send_window.iter_mut() { + if marked >= can_send { + break; + } + if !seg.is_sent { + seg.is_sent = true; + seg.sent_time = Some(Instant::now()); + marked += 1; + } + } + } + _ => {} + } + + if packets.is_empty() { + return None; + } + + Some(PendingOutput { + dst: conn.addr.did, + sport: conn.addr.sport, + dport: conn.addr.dport, + packets, + }) + } + + /// Get all connection descriptors. + pub fn descriptors(&self) -> Vec<i32> { + self.connections.keys().copied().collect() + } + + fn alloc_desc(&mut self) -> i32 { + let d = self.next_desc; + // Wrap at i32::MAX to avoid overflow; skip 0 + // and negative values + self.next_desc = if d >= i32::MAX - 1 { 1 } else { d + 1 }; + d + } +} + +/// A pending outgoing RDP packet. +#[derive(Debug, Clone)] +pub struct RdpPacket { + pub flags: u8, + pub seqnum: u32, + pub acknum: u32, + pub data: Vec<u8>, +} + +/// Pending output for a connection. +#[derive(Debug)] +pub struct PendingOutput { + pub dst: NodeId, + pub sport: u16, + pub dport: u16, + pub packets: Vec<RdpPacket>, +} + +/// Build an RDP wire packet: rdp_head(20) + data. +pub fn build_rdp_wire( + flags: u8, + sport: u16, + dport: u16, + seqnum: u32, + acknum: u32, + data: &[u8], +) -> Vec<u8> { + let dlen = data.len() as u16; + let mut buf = vec![0u8; RDP_HEADER_SIZE + data.len()]; + buf[0] = flags; + buf[1] = (RDP_HEADER_SIZE / 2) as u8; // hlen in 16-bit words + buf[2..4].copy_from_slice(&sport.to_be_bytes()); + buf[4..6].copy_from_slice(&dport.to_be_bytes()); + buf[6..8].copy_from_slice(&dlen.to_be_bytes()); + buf[8..12].copy_from_slice(&seqnum.to_be_bytes()); + buf[12..16].copy_from_slice(&acknum.to_be_bytes()); + buf[16..20].fill(0); // reserved + buf[20..].copy_from_slice(data); + buf +} + +/// Parsed RDP wire header fields. +pub struct RdpWireHeader<'a> { + pub flags: u8, + pub sport: u16, + pub dport: u16, + pub seqnum: u32, + pub acknum: u32, + pub data: &'a [u8], +} + +/// Parse an RDP wire packet header. +pub fn parse_rdp_wire(buf: &[u8]) -> Option<RdpWireHeader<'_>> { + if buf.len() < RDP_HEADER_SIZE { + return None; + } + Some(RdpWireHeader { + flags: buf[0], + sport: u16::from_be_bytes([buf[2], buf[3]]), + dport: u16::from_be_bytes([buf[4], buf[5]]), + seqnum: u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]), + acknum: u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]), + data: &buf[RDP_HEADER_SIZE..], + }) +} + +impl Default for Rdp { + fn default() -> Self { + Self::new() + } +} + +// ── Errors ────────────────────────────────────────── + +#[derive(Debug)] +pub enum RdpError { + PortInUse(u16), + BadDescriptor(i32), + NotOpen(i32), + SendBufferFull, +} + +impl std::fmt::Display for RdpError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RdpError::PortInUse(p) => write!(f, "port {p} in use"), + RdpError::BadDescriptor(d) => write!(f, "bad descriptor {d}"), + RdpError::NotOpen(d) => write!(f, "descriptor {d} not open"), + RdpError::SendBufferFull => write!(f, "send buffer full"), + } + } +} + +impl std::error::Error for RdpError {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn listen_and_close() { + let mut rdp = Rdp::new(); + let desc = rdp.listen(5000).unwrap(); + assert_eq!(rdp.listener_count(), 1); + rdp.close(desc); + assert_eq!(rdp.listener_count(), 0); + } + + #[test] + fn listen_duplicate_port() { + let mut rdp = Rdp::new(); + rdp.listen(5000).unwrap(); + assert!(matches!(rdp.listen(5000), Err(RdpError::PortInUse(5000)))); + } + + #[test] + fn connect_creates_syn_sent() { + let mut rdp = Rdp::new(); + let dst = NodeId::from_bytes([0x01; 32]); + let desc = rdp.connect(0, dst, 5000).unwrap(); + assert_eq!(rdp.get_state(desc).unwrap(), RdpState::SynSent); + } + + #[test] + fn send_before_open_fails() { + let mut rdp = Rdp::new(); + let dst = NodeId::from_bytes([0x01; 32]); + let desc = rdp.connect(0, dst, 5000).unwrap(); + assert!(matches!( + rdp.send(desc, b"hello"), + Err(RdpError::NotOpen(_)) + )); + } + + #[test] + fn recv_empty() { + let mut rdp = Rdp::new(); + let dst = NodeId::from_bytes([0x01; 32]); + let desc = rdp.connect(0, dst, 5000).unwrap(); + let mut buf = [0u8; 64]; + + // SynSent state → bad descriptor or no data + assert!(rdp.recv(desc, &mut buf).is_ok()); + } + + #[test] + fn syn_ack_opens_connection() { + let mut rdp = Rdp::new(); + let dst = NodeId::from_bytes([0x01; 32]); + let desc = rdp.connect(1000, dst, 5000).unwrap(); + + // Simulate receiving SYN+ACK + let actions = rdp.input(&RdpInput { + src: dst, + sport: 5000, + dport: 1000, + flags: RDP_FLAG_SYN | RDP_FLAG_ACK, + seqnum: 100, + acknum: 1, + data: &[], + }); + + assert_eq!(rdp.get_state(desc).unwrap(), RdpState::Open); + assert!(actions.iter().any(|a| matches!( + a, + RdpAction::Event { + event: RdpEvent::Connected, + .. + } + ))); + } + + #[test] + fn inbound_syn_accepted() { + let mut rdp = Rdp::new(); + rdp.listen(5000).unwrap(); + + let peer = NodeId::from_bytes([0x02; 32]); + let actions = rdp.input(&RdpInput { + src: peer, + sport: 3000, + dport: 5000, + flags: RDP_FLAG_SYN, + seqnum: 200, + acknum: 0, + data: &[], + }); + + assert_eq!(rdp.connection_count(), 1); + assert!(actions.iter().any(|a| matches!( + a, + RdpAction::Event { + event: RdpEvent::Accepted, + .. + } + ))); + } + + #[test] + fn rst_resets_connection() { + let mut rdp = Rdp::new(); + let dst = NodeId::from_bytes([0x01; 32]); + let desc = rdp.connect(1000, dst, 5000).unwrap(); + + // Open first + rdp.input(&RdpInput { + src: dst, + sport: 5000, + dport: 1000, + flags: RDP_FLAG_SYN | RDP_FLAG_ACK, + seqnum: 100, + acknum: 1, + data: &[], + }); + assert_eq!(rdp.get_state(desc).unwrap(), RdpState::Open); + + // RST + let actions = rdp.input(&RdpInput { + src: dst, + sport: 5000, + dport: 1000, + flags: RDP_FLAG_RST, + seqnum: 0, + acknum: 0, + data: &[], + }); + assert_eq!(rdp.get_state(desc).unwrap(), RdpState::Closed); + assert!(actions.iter().any(|a| matches!( + a, + RdpAction::Event { + event: RdpEvent::Reset, + .. + } + ))); + } + + #[test] + fn data_delivery() { + let mut rdp = Rdp::new(); + let dst = NodeId::from_bytes([0x01; 32]); + let desc = rdp.connect(1000, dst, 5000).unwrap(); + + // Open + rdp.input(&RdpInput { + src: dst, + sport: 5000, + dport: 1000, + flags: RDP_FLAG_SYN | RDP_FLAG_ACK, + seqnum: 100, + acknum: 1, + data: &[], + }); + + // Receive data + let actions = rdp.input(&RdpInput { + src: dst, + sport: 5000, + dport: 1000, + flags: RDP_FLAG_ACK, + seqnum: 101, + acknum: 1, + data: b"hello", + }); + + assert!(actions.iter().any(|a| matches!( + a, + RdpAction::Event { + event: RdpEvent::Ready2Read, + .. + } + ))); + + let mut buf = [0u8; 64]; + let n = rdp.recv(desc, &mut buf).unwrap(); + assert_eq!(&buf[..n], b"hello"); + } + + #[test] + fn send_data_on_open() { + let mut rdp = Rdp::new(); + let dst = NodeId::from_bytes([0x01; 32]); + let desc = rdp.connect(1000, dst, 5000).unwrap(); + + // Open + rdp.input(&RdpInput { + src: dst, + sport: 5000, + dport: 1000, + flags: RDP_FLAG_SYN | RDP_FLAG_ACK, + seqnum: 100, + acknum: 1, + data: &[], + }); + + let n = rdp.send(desc, b"world").unwrap(); + assert_eq!(n, 5); + } + + #[test] + fn get_status() { + let mut rdp = Rdp::new(); + let dst = NodeId::from_bytes([0x01; 32]); + rdp.connect(1000, dst, 5000).unwrap(); + + let status = rdp.get_status(); + assert_eq!(status.len(), 1); + assert_eq!(status[0].state, RdpState::SynSent); + assert_eq!(status[0].dport, 5000); + } + + #[test] + fn is_before_wrapping() { + assert!(is_before(1, 2)); + assert!(is_before(0, 1)); + assert!(!is_before(2, 1)); + assert!(!is_before(5, 5)); + + // Wrapping: u32::MAX is before 0 + assert!(is_before(u32::MAX, 0)); + } +} |