aboutsummaryrefslogtreecommitdiffstats
path: root/src/rdp.rs
diff options
context:
space:
mode:
authormurilo ijanc2026-03-24 15:04:03 -0300
committermurilo ijanc2026-03-24 15:04:03 -0300
commit9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch)
tree53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/rdp.rs
downloadtesseras-dht-0.1.0.tar.gz
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support
Diffstat (limited to 'src/rdp.rs')
-rw-r--r--src/rdp.rs1343
1 files changed, 1343 insertions, 0 deletions
diff --git a/src/rdp.rs b/src/rdp.rs
new file mode 100644
index 0000000..96de51d
--- /dev/null
+++ b/src/rdp.rs
@@ -0,0 +1,1343 @@
+//! Reliable Datagram Protocol (RDP).
+//!
+//! Provides TCP-like
+//! reliable, ordered delivery over UDP with:
+//!
+//! - 7-state connection machine
+//! - 3-way handshake (SYN / SYN-ACK / ACK)
+//! - Sliding send and receive windows
+//! - Cumulative ACK + Extended ACK (EACK/SACK)
+//! - Delayed ACK (300ms)
+//! - Retransmission (300ms timer)
+//! - FIN-based graceful close
+//! - RST for abrupt termination
+//!
+//! **No congestion control**.
+
+use std::collections::{HashMap, VecDeque};
+use std::time::{Duration, Instant};
+
+use crate::id::NodeId;
+
+// ── Constants ────────────────────────────────────────
+
+pub const RDP_FLAG_SYN: u8 = 0x80;
+pub const RDP_FLAG_ACK: u8 = 0x40;
+pub const RDP_FLAG_EAK: u8 = 0x20;
+pub const RDP_FLAG_RST: u8 = 0x10;
+pub const RDP_FLAG_NUL: u8 = 0x08;
+pub const RDP_FLAG_FIN: u8 = 0x04;
+
+pub const RDP_RBUF_MAX_DEFAULT: u32 = 884;
+pub const RDP_RCV_MAX_DEFAULT: u32 = 1024;
+pub const RDP_WELL_KNOWN_PORT_MAX: u16 = 1024;
+pub const RDP_SBUF_LIMIT: u16 = 884;
+pub const RDP_TIMER_INTERVAL: Duration = Duration::from_millis(300);
+pub const RDP_ACK_INTERVAL: Duration = Duration::from_millis(300);
+pub const RDP_DEFAULT_MAX_RETRANS: Duration = Duration::from_secs(30);
+
+/// Generate a random initial sequence number to prevent
+/// sequence prediction attacks.
+fn random_isn() -> u32 {
+ let mut buf = [0u8; 4];
+ crate::sys::random_bytes(&mut buf);
+ u32::from_ne_bytes(buf)
+}
+
+/// RDP packet header (20 bytes on the wire).
+pub const RDP_HEADER_SIZE: usize = 20;
+
+// ── Connection state ────────────────────────────────
+
+/// RDP connection state (7 states).
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum RdpState {
+ Closed,
+ Listen,
+ SynSent,
+ SynRcvd,
+ Open,
+ CloseWaitPassive,
+ CloseWaitActive,
+}
+
+/// Events emitted to the application.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum RdpEvent {
+ /// Server accepted a new connection.
+ Accepted,
+
+ /// Client connected successfully.
+ Connected,
+
+ /// Connection refused by peer.
+ Refused,
+
+ /// Connection reset by peer.
+ Reset,
+
+ /// Connection failed (timeout).
+ Failed,
+
+ /// Data available to read.
+ Ready2Read,
+
+ /// Pipe broken (peer vanished).
+ Broken,
+}
+
+/// RDP connection address.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct RdpAddr {
+ pub did: NodeId,
+ pub dport: u16,
+ pub sport: u16,
+}
+
+/// Status of a single RDP connection.
+#[derive(Debug, Clone)]
+pub struct RdpStatus {
+ pub state: RdpState,
+ pub did: NodeId,
+ pub dport: u16,
+ pub sport: u16,
+}
+
+// ── Segment ─────────────────────────────────────────
+
+/// A segment in the send window.
+#[derive(Debug, Clone)]
+struct SendSegment {
+ data: Vec<u8>,
+ seqnum: u32,
+ sent_time: Option<Instant>,
+ is_sent: bool,
+ is_acked: bool,
+
+ /// Retransmission timeout (doubles on each retry).
+ rt_secs: u64,
+}
+
+/// A segment in the receive window.
+#[derive(Debug, Clone)]
+struct RecvSegment {
+ data: Vec<u8>,
+ seqnum: u32,
+ is_used: bool,
+ is_eacked: bool,
+}
+
+// ── Connection ──────────────────────────────────────
+
+/// A single RDP connection.
+struct RdpConnection {
+ addr: RdpAddr,
+ desc: i32,
+ state: RdpState,
+
+ /// Server (passive) or client (active) side.
+ is_passive: bool,
+
+ /// Connection has been closed locally.
+ is_closed: bool,
+
+ /// Max segment size we can send (from peer's SYN).
+ sbuf_max: u32,
+
+ /// Max segment size we can receive (our buffer).
+ rbuf_max: u32,
+
+ // Send sequence variables
+ snd_nxt: u32,
+ snd_una: u32,
+ snd_max: u32,
+ snd_iss: u32,
+
+ // Receive sequence variables
+ rcv_cur: u32,
+
+ /// Max segments we can buffer.
+ rcv_max: u32,
+ rcv_irs: u32,
+
+ /// Last sequence number we ACK'd.
+ rcv_ack: u32,
+
+ // Windows
+ send_window: VecDeque<SendSegment>,
+ recv_window: Vec<Option<RecvSegment>>,
+ read_queue: VecDeque<Vec<u8>>,
+
+ // Timing
+ last_ack_time: Instant,
+ syn_time: Option<Instant>,
+ close_time: Option<Instant>,
+
+ /// SYN retry timeout (doubles on retry).
+ syn_rt_secs: u64,
+
+ // ── RTT estimation (Jacobson/Karels) ────────
+ /// Smoothed RTT estimate (microseconds).
+ srtt_us: u64,
+
+ /// RTT variation (microseconds).
+ rttvar_us: u64,
+
+ /// Retransmission timeout (microseconds).
+ rto_us: u64,
+
+ // ── Congestion control (AIMD) ───────────────
+ /// Congestion window (segments allowed in flight).
+ cwnd: u32,
+
+ /// Slow-start threshold.
+ ssthresh: u32,
+
+ /// RST retry state.
+ rst_time: Option<Instant>,
+ rst_rt_secs: u64,
+ is_retry_rst: bool,
+
+ // Out-of-order tracking for EACK
+ rcvd_seqno: Vec<u32>,
+}
+
+impl RdpConnection {
+ fn new(desc: i32, addr: RdpAddr, is_passive: bool) -> Self {
+ Self {
+ addr,
+ desc,
+ state: RdpState::Closed,
+ is_passive,
+ is_closed: false,
+ sbuf_max: RDP_SBUF_LIMIT as u32,
+ rbuf_max: RDP_RBUF_MAX_DEFAULT,
+ snd_nxt: 0,
+ snd_una: 0,
+ snd_max: RDP_RCV_MAX_DEFAULT,
+ snd_iss: 0,
+ rcv_cur: 0,
+ rcv_max: RDP_RCV_MAX_DEFAULT,
+ rcv_irs: 0,
+ rcv_ack: 0,
+ send_window: VecDeque::new(),
+ recv_window: Vec::new(),
+ read_queue: VecDeque::new(),
+ last_ack_time: Instant::now(),
+ syn_time: None,
+ close_time: None,
+ syn_rt_secs: 1,
+
+ // Jacobson/Karels: initial RTO = 1s
+ srtt_us: 0,
+ rttvar_us: 500_000, // 500ms initial variance
+ rto_us: 1_000_000, // 1s initial RTO
+
+ // AIMD congestion control
+ cwnd: 1, // start with 1 segment
+ ssthresh: RDP_RCV_MAX_DEFAULT,
+ rst_time: None,
+ rst_rt_secs: 1,
+ is_retry_rst: false,
+ rcvd_seqno: Vec::new(),
+ }
+ }
+
+ /// Enqueue data for sending.
+ fn enqueue_send(&mut self, data: &[u8]) -> bool {
+ if self.is_closed {
+ return false;
+ }
+ if data.len() > self.sbuf_max as usize {
+ return false;
+ }
+ if self.send_window.len() >= self.snd_max as usize {
+ return false;
+ }
+ let seg = SendSegment {
+ data: data.to_vec(),
+ seqnum: self.snd_nxt,
+ sent_time: None,
+ is_sent: false,
+ is_acked: false,
+ rt_secs: 1,
+ };
+ self.snd_nxt = self.snd_nxt.wrapping_add(1);
+ self.send_window.push_back(seg);
+ true
+ }
+
+ /// Process a cumulative ACK.
+ fn recv_ack(&mut self, acknum: u32) {
+ while let Some(front) = self.send_window.front() {
+ if front.seqnum == acknum {
+ break;
+ }
+
+ // Sequence numbers before acknum are acked
+ if is_before(front.seqnum, acknum) {
+ let seg = self.send_window.pop_front().unwrap();
+ self.snd_una = self.snd_una.wrapping_add(1);
+ self.on_ack_received();
+
+ // Measure RTT from first-sent (non-retransmitted)
+ if let Some(sent) = seg.sent_time {
+ if seg.rt_secs <= 1 {
+ // Only use first transmission for RTT
+ let rtt_us = sent.elapsed().as_micros() as u64;
+ self.update_rtt(rtt_us);
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ /// Process an Extended ACK (EACK) for out-of-order
+ /// segments.
+ fn recv_eack(&mut self, eack_seqnum: u32) {
+ for seg in self.send_window.iter_mut() {
+ if seg.seqnum == eack_seqnum {
+ seg.is_acked = true;
+ break;
+ }
+ }
+ }
+
+ /// Deliver in-order data from the receive window to
+ /// the read queue.
+ fn deliver_to_read_queue(&mut self) {
+ // Count contiguous in-order segments
+ let mut count = 0;
+ for slot in &self.recv_window {
+ match slot {
+ Some(seg) if seg.is_used => count += 1,
+ _ => break,
+ }
+ }
+ // Drain them all at once (O(n) instead of O(n²))
+ if count > 0 {
+ for seg in self.recv_window.drain(..count).flatten() {
+ self.rcv_cur = seg.seqnum;
+ self.read_queue.push_back(seg.data);
+ }
+ }
+ }
+
+ /// Maximum out-of-order gap before dropping.
+ /// Prevents memory exhaustion from malicious
+ /// high-seqnum packets.
+ const MAX_OOO_GAP: usize = 256;
+
+ /// Place a received segment into the receive window.
+ fn recv_data(&mut self, seqnum: u32, data: Vec<u8>) {
+ let expected = self.rcv_cur.wrapping_add(1);
+
+ if seqnum == expected {
+ // In-order: deliver directly
+ self.read_queue.push_back(data);
+ self.rcv_cur = seqnum;
+ self.deliver_to_read_queue();
+ // Clean up rcvd_seqno for delivered segments
+ self.rcvd_seqno.retain(|&s| is_before(seqnum, s));
+ } else if is_before(expected, seqnum) {
+ // Out-of-order: check gap before allocating
+ let offset = seqnum.wrapping_sub(expected) as usize;
+
+ // Reject if gap too large (DoS protection)
+ if offset > Self::MAX_OOO_GAP {
+ log::debug!("RDP: dropping packet with gap {offset}");
+ return;
+ }
+
+ // Check total recv window capacity
+ if self.recv_window.len() >= self.rcv_max as usize {
+ return;
+ }
+
+ while self.recv_window.len() <= offset {
+ self.recv_window.push(None);
+ }
+ self.recv_window[offset] = Some(RecvSegment {
+ data,
+ seqnum,
+ is_used: true,
+ is_eacked: false,
+ });
+ // Use bounded set (cap at MAX_OOO_GAP)
+ if self.rcvd_seqno.len() < Self::MAX_OOO_GAP
+ && !self.rcvd_seqno.contains(&seqnum)
+ {
+ self.rcvd_seqno.push(seqnum);
+ }
+ }
+
+ // else: duplicate, ignore
+ }
+
+ /// Whether a delayed ACK should be sent.
+ /// Update RTT estimate using Jacobson/Karels algorithm.
+ ///
+ /// Called when we receive an ACK for a segment
+ /// whose `sent_time` we know.
+ fn update_rtt(&mut self, sample_us: u64) {
+ if self.srtt_us == 0 {
+ // First measurement
+ self.srtt_us = sample_us;
+ self.rttvar_us = sample_us / 2;
+ } else {
+ // RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R|
+ // SRTT = (1 - alpha) * SRTT + alpha * R
+ // alpha = 1/8, beta = 1/4 (RFC 6298)
+ let diff = sample_us.abs_diff(self.srtt_us);
+ self.rttvar_us = (3 * self.rttvar_us + diff) / 4;
+ self.srtt_us = (7 * self.srtt_us + sample_us) / 8;
+ }
+
+ // RTO = SRTT + max(G, 4 * RTTVAR)
+ // G (clock granularity) = 1ms = 1000us
+ let k_rttvar = 4 * self.rttvar_us;
+ self.rto_us = self.srtt_us + k_rttvar.max(1000);
+
+ // Clamp: min 200ms, max 60s
+ self.rto_us = self.rto_us.clamp(200_000, 60_000_000);
+ }
+
+ /// Handle successful ACK: update congestion window.
+ fn on_ack_received(&mut self) {
+ if self.cwnd < self.ssthresh {
+ // Slow start: increase by 1 per ACK
+ self.cwnd += 1;
+ } else {
+ // Congestion avoidance: increase by 1/cwnd
+ // (approx 1 segment per RTT)
+ self.cwnd += 1u32.max(1 / self.cwnd.max(1));
+ }
+ }
+
+ /// Handle packet loss: halve congestion window.
+ fn on_loss_detected(&mut self) {
+ self.ssthresh = (self.cwnd / 2).max(2);
+ self.cwnd = self.ssthresh;
+ }
+
+ fn needs_ack(&self) -> bool {
+ self.rcv_cur != self.rcv_ack
+ && self.last_ack_time.elapsed() >= RDP_ACK_INTERVAL
+ }
+
+ /// Check for segments needing retransmission.
+ ///
+ /// Returns `false` if a segment has exceeded
+ /// max_retrans (broken pipe).
+ fn retransmit(
+ &mut self,
+ max_retrans: Duration,
+ ) -> (bool, Vec<SendSegment>) {
+ let mut to_send = Vec::new();
+ let now = Instant::now();
+ let rto_secs = (self.rto_us / 1_000_000).max(1);
+
+ for seg in self.send_window.iter_mut() {
+ if !seg.is_sent {
+ break;
+ }
+ if seg.is_acked {
+ continue;
+ }
+
+ // Check if we've exceeded max retransmission
+ if seg.rt_secs > max_retrans.as_secs() {
+ self.state = RdpState::Closed;
+ return (false, Vec::new()); // broken pipe
+ }
+
+ if let Some(sent) = seg.sent_time {
+ // Use adaptive RTO for first retransmit,
+ // then exponential backoff
+ let timeout = if seg.rt_secs <= 1 {
+ rto_secs
+ } else {
+ seg.rt_secs
+ };
+ let elapsed = now.duration_since(sent).as_secs();
+ if elapsed > timeout {
+ seg.sent_time = Some(now);
+ seg.rt_secs = timeout * 2; // backoff
+ to_send.push(seg.clone());
+ }
+ }
+ }
+
+ // Loss detected → halve congestion window
+ if !to_send.is_empty() {
+ self.on_loss_detected();
+ }
+
+ (true, to_send)
+ }
+}
+
+/// Check if sequence `a` comes before `b` (wrapping).
+fn is_before(a: u32, b: u32) -> bool {
+ let diff = b.wrapping_sub(a);
+ diff > 0 && diff < 0x80000000
+}
+
+// ── Deferred action (invoke protection) ─────────────
+
+/// Actions deferred during event processing to avoid
+/// reentrance issues.
+#[derive(Debug)]
+pub enum RdpAction {
+ /// Emit an event to the application.
+ Event {
+ desc: i32,
+ addr: RdpAddr,
+ event: RdpEvent,
+ },
+
+ /// Close a connection after processing.
+ Close(i32),
+}
+
+// ── RDP manager ─────────────────────────────────────
+
+/// RDP protocol manager.
+/// Incoming RDP packet for `Rdp::input()`.
+pub struct RdpInput<'a> {
+ pub src: NodeId,
+ pub sport: u16,
+ pub dport: u16,
+ pub flags: u8,
+ pub seqnum: u32,
+ pub acknum: u32,
+ pub data: &'a [u8],
+}
+
+/// RDP protocol manager.
+///
+/// Manages multiple connections with descriptor-based
+/// API (similar to file descriptors).
+pub struct Rdp {
+ connections: HashMap<i32, RdpConnection>,
+ listeners: HashMap<u16, i32>,
+ addr_to_desc: HashMap<RdpAddr, i32>,
+ next_desc: i32,
+ max_retrans: Duration,
+}
+
+impl Rdp {
+ pub fn new() -> Self {
+ Self {
+ connections: HashMap::new(),
+ listeners: HashMap::new(),
+ addr_to_desc: HashMap::new(),
+ next_desc: 1,
+ max_retrans: RDP_DEFAULT_MAX_RETRANS,
+ }
+ }
+
+ /// Create a listening socket on `port`.
+ ///
+ /// Returns a descriptor for the listener.
+ pub fn listen(&mut self, port: u16) -> Result<i32, RdpError> {
+ if self.listeners.contains_key(&port) {
+ return Err(RdpError::PortInUse(port));
+ }
+ let desc = self.alloc_desc();
+ self.listeners.insert(port, desc);
+ Ok(desc)
+ }
+
+ /// Initiate a connection to `dst:dport` from `sport`.
+ ///
+ /// Returns a descriptor for the connection.
+ pub fn connect(
+ &mut self,
+ sport: u16,
+ dst: NodeId,
+ dport: u16,
+ ) -> Result<i32, RdpError> {
+ let desc = self.alloc_desc();
+ let addr = RdpAddr {
+ did: dst,
+ dport,
+ sport,
+ };
+
+ let mut conn = RdpConnection::new(desc, addr.clone(), false);
+ conn.state = RdpState::SynSent;
+ conn.snd_iss = random_isn();
+ conn.snd_nxt = conn.snd_iss.wrapping_add(1);
+ conn.snd_una = conn.snd_iss;
+ conn.syn_time = Some(Instant::now());
+
+ self.addr_to_desc.insert(addr, desc);
+ self.connections.insert(desc, conn);
+ Ok(desc)
+ }
+
+ /// Close a connection or listener.
+ pub fn close(&mut self, desc: i32) {
+ if let Some(mut conn) = self.connections.remove(&desc) {
+ conn.is_closed = true;
+ self.addr_to_desc.remove(&conn.addr);
+ log::debug!("RDP: closed desc {desc}");
+ }
+
+ // Also check listeners
+ self.listeners.retain(|_, d| *d != desc);
+ }
+
+ /// Enqueue data for sending on a connection.
+ pub fn send(&mut self, desc: i32, data: &[u8]) -> Result<usize, RdpError> {
+ let conn = self
+ .connections
+ .get_mut(&desc)
+ .ok_or(RdpError::BadDescriptor(desc))?;
+
+ if conn.state != RdpState::Open {
+ return Err(RdpError::NotOpen(desc));
+ }
+
+ if !conn.enqueue_send(data) {
+ return Err(RdpError::SendBufferFull);
+ }
+
+ Ok(data.len())
+ }
+
+ /// Read available data from a connection.
+ ///
+ /// Returns the number of bytes read, or 0 if no
+ /// data available.
+ pub fn recv(
+ &mut self,
+ desc: i32,
+ buf: &mut [u8],
+ ) -> Result<usize, RdpError> {
+ let conn = self
+ .connections
+ .get_mut(&desc)
+ .ok_or(RdpError::BadDescriptor(desc))?;
+
+ if let Some(data) = conn.read_queue.pop_front() {
+ let len = data.len().min(buf.len());
+ buf[..len].copy_from_slice(&data[..len]);
+ Ok(len)
+ } else {
+ Ok(0)
+ }
+ }
+
+ /// Get the state of a descriptor.
+ pub fn get_state(&self, desc: i32) -> Result<RdpState, RdpError> {
+ self.connections
+ .get(&desc)
+ .map(|c| c.state)
+ .ok_or(RdpError::BadDescriptor(desc))
+ }
+
+ /// Get status of all connections.
+ pub fn get_status(&self) -> Vec<RdpStatus> {
+ self.connections
+ .values()
+ .map(|c| RdpStatus {
+ state: c.state,
+ did: c.addr.did,
+ dport: c.addr.dport,
+ sport: c.addr.sport,
+ })
+ .collect()
+ }
+
+ /// Set the maximum retransmission timeout.
+ pub fn set_max_retrans(&mut self, dur: Duration) {
+ self.max_retrans = dur;
+ }
+
+ /// Get the maximum retransmission timeout.
+ pub fn max_retrans(&self) -> Duration {
+ self.max_retrans
+ }
+
+ /// Process incoming RDP data from a peer.
+ ///
+ /// Returns deferred actions (events, closes) to
+ /// avoid reentrance during processing.
+ pub fn input(&mut self, pkt: &RdpInput<'_>) -> Vec<RdpAction> {
+ let src = pkt.src;
+ let sport = pkt.sport;
+ let dport = pkt.dport;
+ let flags = pkt.flags;
+ let seqnum = pkt.seqnum;
+ let acknum = pkt.acknum;
+ let data = pkt.data;
+ let addr = RdpAddr {
+ did: src,
+ dport: sport, // their sport is our dport
+ sport: dport, // our dport is our sport
+ };
+ let mut actions = Vec::new();
+
+ if let Some(&desc) = self.addr_to_desc.get(&addr) {
+ // Existing connection
+ if let Some(conn) = self.connections.get_mut(&desc) {
+ Self::process_connected(
+ conn,
+ flags,
+ seqnum,
+ acknum,
+ data,
+ &mut actions,
+ );
+ }
+ } else if flags & RDP_FLAG_SYN != 0 {
+ // New inbound SYN → check listener
+ if let Some(&_listen_desc) = self.listeners.get(&dport) {
+ let desc = self.alloc_desc();
+ let mut conn = RdpConnection::new(desc, addr.clone(), true);
+ conn.state = RdpState::SynRcvd;
+ conn.rcv_irs = seqnum;
+ conn.rcv_cur = seqnum;
+ conn.snd_iss = random_isn();
+ conn.snd_nxt = conn.snd_iss.wrapping_add(1);
+ conn.snd_una = conn.snd_iss;
+
+ self.addr_to_desc.insert(addr.clone(), desc);
+ self.connections.insert(desc, conn);
+
+ actions.push(RdpAction::Event {
+ desc,
+ addr,
+ event: RdpEvent::Accepted,
+ });
+ }
+
+ // else: no listener → RST (ignored for now)
+ }
+
+ actions
+ }
+
+ /// Process a packet on an existing connection.
+ fn process_connected(
+ conn: &mut RdpConnection,
+ flags: u8,
+ seqnum: u32,
+ acknum: u32,
+ data: &[u8],
+ actions: &mut Vec<RdpAction>,
+ ) {
+ match conn.state {
+ RdpState::SynSent => {
+ if flags & RDP_FLAG_SYN != 0 && flags & RDP_FLAG_ACK != 0 {
+ conn.rcv_irs = seqnum;
+ conn.rcv_cur = seqnum;
+ conn.recv_ack(acknum);
+ conn.state = RdpState::Open;
+ actions.push(RdpAction::Event {
+ desc: conn.desc,
+ addr: conn.addr.clone(),
+ event: RdpEvent::Connected,
+ });
+ } else if flags & RDP_FLAG_RST != 0 {
+ conn.state = RdpState::Closed;
+ actions.push(RdpAction::Event {
+ desc: conn.desc,
+ addr: conn.addr.clone(),
+ event: RdpEvent::Refused,
+ });
+ }
+ }
+ RdpState::SynRcvd => {
+ if flags & RDP_FLAG_ACK != 0 {
+ conn.recv_ack(acknum);
+ conn.state = RdpState::Open;
+ }
+ }
+ RdpState::Open => {
+ if flags & RDP_FLAG_RST != 0 {
+ conn.state = RdpState::Closed;
+ actions.push(RdpAction::Event {
+ desc: conn.desc,
+ addr: conn.addr.clone(),
+ event: RdpEvent::Reset,
+ });
+ return;
+ }
+ if flags & RDP_FLAG_FIN != 0 {
+ conn.state = if conn.is_passive {
+ RdpState::CloseWaitPassive
+ } else {
+ RdpState::CloseWaitActive
+ };
+ conn.close_time = Some(Instant::now());
+ return;
+ }
+ if flags & RDP_FLAG_ACK != 0 {
+ conn.recv_ack(acknum);
+ }
+ if flags & RDP_FLAG_EAK != 0 {
+ conn.recv_eack(seqnum);
+ }
+ if !data.is_empty() {
+ conn.recv_data(seqnum, data.to_vec());
+ actions.push(RdpAction::Event {
+ desc: conn.desc,
+ addr: conn.addr.clone(),
+ event: RdpEvent::Ready2Read,
+ });
+ }
+ }
+ RdpState::CloseWaitActive => {
+ if flags & RDP_FLAG_FIN != 0 {
+ conn.state = RdpState::Closed;
+ }
+ }
+ _ => {}
+ }
+ }
+
+ /// Periodic tick: retransmit, delayed ACK, timeouts.
+ ///
+ /// Returns actions for timed-out connections.
+ pub fn tick(&mut self) -> Vec<RdpAction> {
+ let mut actions = Vec::new();
+ let mut to_close = Vec::new();
+
+ for (desc, conn) in self.connections.iter_mut() {
+ // Check SYN timeout with exponential backoff
+ if conn.state == RdpState::SynSent {
+ if let Some(t) = conn.syn_time {
+ let elapsed = t.elapsed().as_secs();
+ if elapsed > conn.syn_rt_secs {
+ if conn.syn_rt_secs > self.max_retrans.as_secs() {
+ actions.push(RdpAction::Event {
+ desc: *desc,
+ addr: conn.addr.clone(),
+ event: RdpEvent::Failed,
+ });
+ to_close.push(*desc);
+ } else {
+ // Retry SYN with backoff
+ conn.syn_time = Some(Instant::now());
+ conn.syn_rt_secs *= 2;
+ }
+ }
+ }
+ }
+
+ // RST retry
+ if conn.is_retry_rst {
+ if let Some(t) = conn.rst_time {
+ if t.elapsed().as_secs() > conn.rst_rt_secs {
+ conn.rst_rt_secs *= 2;
+ conn.rst_time = Some(Instant::now());
+ if conn.rst_rt_secs > self.max_retrans.as_secs() {
+ conn.is_retry_rst = false;
+ to_close.push(*desc);
+ }
+ }
+ }
+ }
+
+ // Check close-wait timeout
+ if matches!(
+ conn.state,
+ RdpState::CloseWaitPassive | RdpState::CloseWaitActive
+ ) {
+ if let Some(t) = conn.close_time {
+ if t.elapsed() >= self.max_retrans {
+ to_close.push(*desc);
+ }
+ }
+ }
+
+ // Retransmit unacked segments
+ if conn.state == RdpState::Open {
+ let (alive, _retransmits) = conn.retransmit(self.max_retrans);
+ if !alive {
+ // Broken pipe — exceeded max retrans
+ actions.push(RdpAction::Event {
+ desc: *desc,
+ addr: conn.addr.clone(),
+ event: RdpEvent::Broken,
+ });
+ to_close.push(*desc);
+ }
+
+ // Note: retransmitted segments are picked
+ // up by pending_output() in the next flush
+ }
+ }
+
+ for d in to_close {
+ self.close(d);
+ }
+
+ actions
+ }
+
+ /// Number of active connections.
+ pub fn connection_count(&self) -> usize {
+ self.connections.len()
+ }
+
+ /// Number of active listeners.
+ pub fn listener_count(&self) -> usize {
+ self.listeners.len()
+ }
+
+ /// Build outgoing packets for a connection.
+ ///
+ /// Returns `(dst_id, sport, dport, packets)` where
+ /// each packet is `(flags, seqnum, acknum, data)`.
+ /// The caller wraps these in protocol messages and
+ /// sends via UDP.
+ pub fn pending_output(&mut self, desc: i32) -> Option<PendingOutput> {
+ let conn = self.connections.get_mut(&desc)?;
+
+ let mut packets = Vec::new();
+
+ match conn.state {
+ RdpState::SynSent => {
+ // Send SYN with receive buffer params
+ // (rdp_syn: out_segs_max + seg_size_max)
+ let mut syn_data = Vec::with_capacity(4);
+ syn_data
+ .extend_from_slice(&(conn.rcv_max as u16).to_be_bytes());
+ syn_data
+ .extend_from_slice(&(conn.rbuf_max as u16).to_be_bytes());
+ packets.push(RdpPacket {
+ flags: RDP_FLAG_SYN,
+ seqnum: conn.snd_iss,
+ acknum: 0,
+ data: syn_data,
+ });
+ }
+ RdpState::SynRcvd => {
+ // Send SYN+ACK
+ packets.push(RdpPacket {
+ flags: RDP_FLAG_SYN | RDP_FLAG_ACK,
+ seqnum: conn.snd_iss,
+ acknum: conn.rcv_cur,
+ data: Vec::new(),
+ });
+ }
+ RdpState::Open => {
+ // Send ACK if needed
+ if conn.needs_ack() {
+ packets.push(RdpPacket {
+ flags: RDP_FLAG_ACK,
+ seqnum: conn.snd_nxt,
+ acknum: conn.rcv_cur,
+ data: Vec::new(),
+ });
+ conn.last_ack_time = Instant::now();
+ conn.rcv_ack = conn.rcv_cur;
+ }
+
+ // Send EACKs for out-of-order recv segments
+ for seg in conn.recv_window.iter_mut().flatten() {
+ if seg.is_used && !seg.is_eacked {
+ packets.push(RdpPacket {
+ flags: RDP_FLAG_EAK | RDP_FLAG_ACK,
+ seqnum: seg.seqnum,
+ acknum: conn.rcv_cur,
+ data: Vec::new(),
+ });
+ seg.is_eacked = true;
+ }
+ }
+
+ // Send pending data segments, limited by
+ // congestion window (AIMD)
+ let in_flight = conn
+ .send_window
+ .iter()
+ .filter(|s| s.is_sent && !s.is_acked)
+ .count() as u32;
+ let can_send = conn.cwnd.saturating_sub(in_flight);
+
+ let mut sent_count = 0u32;
+ for seg in &conn.send_window {
+ if sent_count >= can_send {
+ break;
+ }
+ if !seg.is_sent && !seg.is_acked {
+ packets.push(RdpPacket {
+ flags: RDP_FLAG_ACK,
+ seqnum: seg.seqnum,
+ acknum: conn.rcv_cur,
+ data: seg.data.clone(),
+ });
+ sent_count += 1;
+ }
+ }
+
+ // Mark as sent
+ let mut marked = 0u32;
+ for seg in conn.send_window.iter_mut() {
+ if marked >= can_send {
+ break;
+ }
+ if !seg.is_sent {
+ seg.is_sent = true;
+ seg.sent_time = Some(Instant::now());
+ marked += 1;
+ }
+ }
+ }
+ _ => {}
+ }
+
+ if packets.is_empty() {
+ return None;
+ }
+
+ Some(PendingOutput {
+ dst: conn.addr.did,
+ sport: conn.addr.sport,
+ dport: conn.addr.dport,
+ packets,
+ })
+ }
+
+ /// Get all connection descriptors.
+ pub fn descriptors(&self) -> Vec<i32> {
+ self.connections.keys().copied().collect()
+ }
+
+ fn alloc_desc(&mut self) -> i32 {
+ let d = self.next_desc;
+ // Wrap at i32::MAX to avoid overflow; skip 0
+ // and negative values
+ self.next_desc = if d >= i32::MAX - 1 { 1 } else { d + 1 };
+ d
+ }
+}
+
+/// A pending outgoing RDP packet.
+#[derive(Debug, Clone)]
+pub struct RdpPacket {
+ pub flags: u8,
+ pub seqnum: u32,
+ pub acknum: u32,
+ pub data: Vec<u8>,
+}
+
+/// Pending output for a connection.
+#[derive(Debug)]
+pub struct PendingOutput {
+ pub dst: NodeId,
+ pub sport: u16,
+ pub dport: u16,
+ pub packets: Vec<RdpPacket>,
+}
+
+/// Build an RDP wire packet: rdp_head(20) + data.
+pub fn build_rdp_wire(
+ flags: u8,
+ sport: u16,
+ dport: u16,
+ seqnum: u32,
+ acknum: u32,
+ data: &[u8],
+) -> Vec<u8> {
+ let dlen = data.len() as u16;
+ let mut buf = vec![0u8; RDP_HEADER_SIZE + data.len()];
+ buf[0] = flags;
+ buf[1] = (RDP_HEADER_SIZE / 2) as u8; // hlen in 16-bit words
+ buf[2..4].copy_from_slice(&sport.to_be_bytes());
+ buf[4..6].copy_from_slice(&dport.to_be_bytes());
+ buf[6..8].copy_from_slice(&dlen.to_be_bytes());
+ buf[8..12].copy_from_slice(&seqnum.to_be_bytes());
+ buf[12..16].copy_from_slice(&acknum.to_be_bytes());
+ buf[16..20].fill(0); // reserved
+ buf[20..].copy_from_slice(data);
+ buf
+}
+
+/// Parsed RDP wire header fields.
+pub struct RdpWireHeader<'a> {
+ pub flags: u8,
+ pub sport: u16,
+ pub dport: u16,
+ pub seqnum: u32,
+ pub acknum: u32,
+ pub data: &'a [u8],
+}
+
+/// Parse an RDP wire packet header.
+pub fn parse_rdp_wire(buf: &[u8]) -> Option<RdpWireHeader<'_>> {
+ if buf.len() < RDP_HEADER_SIZE {
+ return None;
+ }
+ Some(RdpWireHeader {
+ flags: buf[0],
+ sport: u16::from_be_bytes([buf[2], buf[3]]),
+ dport: u16::from_be_bytes([buf[4], buf[5]]),
+ seqnum: u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]),
+ acknum: u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]),
+ data: &buf[RDP_HEADER_SIZE..],
+ })
+}
+
+impl Default for Rdp {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// ── Errors ──────────────────────────────────────────
+
+#[derive(Debug)]
+pub enum RdpError {
+ PortInUse(u16),
+ BadDescriptor(i32),
+ NotOpen(i32),
+ SendBufferFull,
+}
+
+impl std::fmt::Display for RdpError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ RdpError::PortInUse(p) => write!(f, "port {p} in use"),
+ RdpError::BadDescriptor(d) => write!(f, "bad descriptor {d}"),
+ RdpError::NotOpen(d) => write!(f, "descriptor {d} not open"),
+ RdpError::SendBufferFull => write!(f, "send buffer full"),
+ }
+ }
+}
+
+impl std::error::Error for RdpError {}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn listen_and_close() {
+ let mut rdp = Rdp::new();
+ let desc = rdp.listen(5000).unwrap();
+ assert_eq!(rdp.listener_count(), 1);
+ rdp.close(desc);
+ assert_eq!(rdp.listener_count(), 0);
+ }
+
+ #[test]
+ fn listen_duplicate_port() {
+ let mut rdp = Rdp::new();
+ rdp.listen(5000).unwrap();
+ assert!(matches!(rdp.listen(5000), Err(RdpError::PortInUse(5000))));
+ }
+
+ #[test]
+ fn connect_creates_syn_sent() {
+ let mut rdp = Rdp::new();
+ let dst = NodeId::from_bytes([0x01; 32]);
+ let desc = rdp.connect(0, dst, 5000).unwrap();
+ assert_eq!(rdp.get_state(desc).unwrap(), RdpState::SynSent);
+ }
+
+ #[test]
+ fn send_before_open_fails() {
+ let mut rdp = Rdp::new();
+ let dst = NodeId::from_bytes([0x01; 32]);
+ let desc = rdp.connect(0, dst, 5000).unwrap();
+ assert!(matches!(
+ rdp.send(desc, b"hello"),
+ Err(RdpError::NotOpen(_))
+ ));
+ }
+
+ #[test]
+ fn recv_empty() {
+ let mut rdp = Rdp::new();
+ let dst = NodeId::from_bytes([0x01; 32]);
+ let desc = rdp.connect(0, dst, 5000).unwrap();
+ let mut buf = [0u8; 64];
+
+ // SynSent state → bad descriptor or no data
+ assert!(rdp.recv(desc, &mut buf).is_ok());
+ }
+
+ #[test]
+ fn syn_ack_opens_connection() {
+ let mut rdp = Rdp::new();
+ let dst = NodeId::from_bytes([0x01; 32]);
+ let desc = rdp.connect(1000, dst, 5000).unwrap();
+
+ // Simulate receiving SYN+ACK
+ let actions = rdp.input(&RdpInput {
+ src: dst,
+ sport: 5000,
+ dport: 1000,
+ flags: RDP_FLAG_SYN | RDP_FLAG_ACK,
+ seqnum: 100,
+ acknum: 1,
+ data: &[],
+ });
+
+ assert_eq!(rdp.get_state(desc).unwrap(), RdpState::Open);
+ assert!(actions.iter().any(|a| matches!(
+ a,
+ RdpAction::Event {
+ event: RdpEvent::Connected,
+ ..
+ }
+ )));
+ }
+
+ #[test]
+ fn inbound_syn_accepted() {
+ let mut rdp = Rdp::new();
+ rdp.listen(5000).unwrap();
+
+ let peer = NodeId::from_bytes([0x02; 32]);
+ let actions = rdp.input(&RdpInput {
+ src: peer,
+ sport: 3000,
+ dport: 5000,
+ flags: RDP_FLAG_SYN,
+ seqnum: 200,
+ acknum: 0,
+ data: &[],
+ });
+
+ assert_eq!(rdp.connection_count(), 1);
+ assert!(actions.iter().any(|a| matches!(
+ a,
+ RdpAction::Event {
+ event: RdpEvent::Accepted,
+ ..
+ }
+ )));
+ }
+
+ #[test]
+ fn rst_resets_connection() {
+ let mut rdp = Rdp::new();
+ let dst = NodeId::from_bytes([0x01; 32]);
+ let desc = rdp.connect(1000, dst, 5000).unwrap();
+
+ // Open first
+ rdp.input(&RdpInput {
+ src: dst,
+ sport: 5000,
+ dport: 1000,
+ flags: RDP_FLAG_SYN | RDP_FLAG_ACK,
+ seqnum: 100,
+ acknum: 1,
+ data: &[],
+ });
+ assert_eq!(rdp.get_state(desc).unwrap(), RdpState::Open);
+
+ // RST
+ let actions = rdp.input(&RdpInput {
+ src: dst,
+ sport: 5000,
+ dport: 1000,
+ flags: RDP_FLAG_RST,
+ seqnum: 0,
+ acknum: 0,
+ data: &[],
+ });
+ assert_eq!(rdp.get_state(desc).unwrap(), RdpState::Closed);
+ assert!(actions.iter().any(|a| matches!(
+ a,
+ RdpAction::Event {
+ event: RdpEvent::Reset,
+ ..
+ }
+ )));
+ }
+
+ #[test]
+ fn data_delivery() {
+ let mut rdp = Rdp::new();
+ let dst = NodeId::from_bytes([0x01; 32]);
+ let desc = rdp.connect(1000, dst, 5000).unwrap();
+
+ // Open
+ rdp.input(&RdpInput {
+ src: dst,
+ sport: 5000,
+ dport: 1000,
+ flags: RDP_FLAG_SYN | RDP_FLAG_ACK,
+ seqnum: 100,
+ acknum: 1,
+ data: &[],
+ });
+
+ // Receive data
+ let actions = rdp.input(&RdpInput {
+ src: dst,
+ sport: 5000,
+ dport: 1000,
+ flags: RDP_FLAG_ACK,
+ seqnum: 101,
+ acknum: 1,
+ data: b"hello",
+ });
+
+ assert!(actions.iter().any(|a| matches!(
+ a,
+ RdpAction::Event {
+ event: RdpEvent::Ready2Read,
+ ..
+ }
+ )));
+
+ let mut buf = [0u8; 64];
+ let n = rdp.recv(desc, &mut buf).unwrap();
+ assert_eq!(&buf[..n], b"hello");
+ }
+
+ #[test]
+ fn send_data_on_open() {
+ let mut rdp = Rdp::new();
+ let dst = NodeId::from_bytes([0x01; 32]);
+ let desc = rdp.connect(1000, dst, 5000).unwrap();
+
+ // Open
+ rdp.input(&RdpInput {
+ src: dst,
+ sport: 5000,
+ dport: 1000,
+ flags: RDP_FLAG_SYN | RDP_FLAG_ACK,
+ seqnum: 100,
+ acknum: 1,
+ data: &[],
+ });
+
+ let n = rdp.send(desc, b"world").unwrap();
+ assert_eq!(n, 5);
+ }
+
+ #[test]
+ fn get_status() {
+ let mut rdp = Rdp::new();
+ let dst = NodeId::from_bytes([0x01; 32]);
+ rdp.connect(1000, dst, 5000).unwrap();
+
+ let status = rdp.get_status();
+ assert_eq!(status.len(), 1);
+ assert_eq!(status[0].state, RdpState::SynSent);
+ assert_eq!(status[0].dport, 5000);
+ }
+
+ #[test]
+ fn is_before_wrapping() {
+ assert!(is_before(1, 2));
+ assert!(is_before(0, 1));
+ assert!(!is_before(2, 1));
+ assert!(!is_before(5, 5));
+
+ // Wrapping: u32::MAX is before 0
+ assert!(is_before(u32::MAX, 0));
+ }
+}