//! Reliable Datagram Protocol (RDP). //! //! Provides TCP-like //! reliable, ordered delivery over UDP with: //! //! - 7-state connection machine //! - 3-way handshake (SYN / SYN-ACK / ACK) //! - Sliding send and receive windows //! - Cumulative ACK + Extended ACK (EACK/SACK) //! - Delayed ACK (300ms) //! - Retransmission (300ms timer) //! - FIN-based graceful close //! - RST for abrupt termination //! //! **No congestion control**. use std::collections::{HashMap, VecDeque}; use std::time::{Duration, Instant}; use crate::id::NodeId; // ── Constants ──────────────────────────────────────── pub const RDP_FLAG_SYN: u8 = 0x80; pub const RDP_FLAG_ACK: u8 = 0x40; pub const RDP_FLAG_EAK: u8 = 0x20; pub const RDP_FLAG_RST: u8 = 0x10; pub const RDP_FLAG_NUL: u8 = 0x08; pub const RDP_FLAG_FIN: u8 = 0x04; pub const RDP_RBUF_MAX_DEFAULT: u32 = 884; pub const RDP_RCV_MAX_DEFAULT: u32 = 1024; pub const RDP_WELL_KNOWN_PORT_MAX: u16 = 1024; pub const RDP_SBUF_LIMIT: u16 = 884; pub const RDP_TIMER_INTERVAL: Duration = Duration::from_millis(300); pub const RDP_ACK_INTERVAL: Duration = Duration::from_millis(300); pub const RDP_DEFAULT_MAX_RETRANS: Duration = Duration::from_secs(30); /// Generate a random initial sequence number to prevent /// sequence prediction attacks. fn random_isn() -> u32 { let mut buf = [0u8; 4]; crate::sys::random_bytes(&mut buf); u32::from_ne_bytes(buf) } /// RDP packet header (20 bytes on the wire). pub const RDP_HEADER_SIZE: usize = 20; // ── Connection state ──────────────────────────────── /// RDP connection state (7 states). #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RdpState { Closed, Listen, SynSent, SynRcvd, Open, CloseWaitPassive, CloseWaitActive, } /// Events emitted to the application. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RdpEvent { /// Server accepted a new connection. Accepted, /// Client connected successfully. Connected, /// Connection refused by peer. Refused, /// Connection reset by peer. Reset, /// Connection failed (timeout). Failed, /// Data available to read. Ready2Read, /// Pipe broken (peer vanished). Broken, } /// RDP connection address. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RdpAddr { pub did: NodeId, pub dport: u16, pub sport: u16, } /// Status of a single RDP connection. #[derive(Debug, Clone)] pub struct RdpStatus { pub state: RdpState, pub did: NodeId, pub dport: u16, pub sport: u16, } // ── Segment ───────────────────────────────────────── /// A segment in the send window. #[derive(Debug, Clone)] struct SendSegment { data: Vec, seqnum: u32, sent_time: Option, is_sent: bool, is_acked: bool, /// Retransmission timeout (doubles on each retry). rt_secs: u64, } /// A segment in the receive window. #[derive(Debug, Clone)] struct RecvSegment { data: Vec, seqnum: u32, is_used: bool, is_eacked: bool, } // ── Connection ────────────────────────────────────── /// A single RDP connection. struct RdpConnection { addr: RdpAddr, desc: i32, state: RdpState, /// Server (passive) or client (active) side. is_passive: bool, /// Connection has been closed locally. is_closed: bool, /// Max segment size we can send (from peer's SYN). sbuf_max: u32, /// Max segment size we can receive (our buffer). rbuf_max: u32, // Send sequence variables snd_nxt: u32, snd_una: u32, snd_max: u32, snd_iss: u32, // Receive sequence variables rcv_cur: u32, /// Max segments we can buffer. rcv_max: u32, rcv_irs: u32, /// Last sequence number we ACK'd. rcv_ack: u32, // Windows send_window: VecDeque, recv_window: Vec>, read_queue: VecDeque>, // Timing last_ack_time: Instant, syn_time: Option, close_time: Option, /// SYN retry timeout (doubles on retry). syn_rt_secs: u64, // ── RTT estimation (Jacobson/Karels) ──────── /// Smoothed RTT estimate (microseconds). srtt_us: u64, /// RTT variation (microseconds). rttvar_us: u64, /// Retransmission timeout (microseconds). rto_us: u64, // ── Congestion control (AIMD) ─────────────── /// Congestion window (segments allowed in flight). cwnd: u32, /// Slow-start threshold. ssthresh: u32, /// RST retry state. rst_time: Option, rst_rt_secs: u64, is_retry_rst: bool, // Out-of-order tracking for EACK rcvd_seqno: Vec, } impl RdpConnection { fn new(desc: i32, addr: RdpAddr, is_passive: bool) -> Self { Self { addr, desc, state: RdpState::Closed, is_passive, is_closed: false, sbuf_max: RDP_SBUF_LIMIT as u32, rbuf_max: RDP_RBUF_MAX_DEFAULT, snd_nxt: 0, snd_una: 0, snd_max: RDP_RCV_MAX_DEFAULT, snd_iss: 0, rcv_cur: 0, rcv_max: RDP_RCV_MAX_DEFAULT, rcv_irs: 0, rcv_ack: 0, send_window: VecDeque::new(), recv_window: Vec::new(), read_queue: VecDeque::new(), last_ack_time: Instant::now(), syn_time: None, close_time: None, syn_rt_secs: 1, // Jacobson/Karels: initial RTO = 1s srtt_us: 0, rttvar_us: 500_000, // 500ms initial variance rto_us: 1_000_000, // 1s initial RTO // AIMD congestion control cwnd: 1, // start with 1 segment ssthresh: RDP_RCV_MAX_DEFAULT, rst_time: None, rst_rt_secs: 1, is_retry_rst: false, rcvd_seqno: Vec::new(), } } /// Enqueue data for sending. fn enqueue_send(&mut self, data: &[u8]) -> bool { if self.is_closed { return false; } if data.len() > self.sbuf_max as usize { return false; } if self.send_window.len() >= self.snd_max as usize { return false; } let seg = SendSegment { data: data.to_vec(), seqnum: self.snd_nxt, sent_time: None, is_sent: false, is_acked: false, rt_secs: 1, }; self.snd_nxt = self.snd_nxt.wrapping_add(1); self.send_window.push_back(seg); true } /// Process a cumulative ACK. fn recv_ack(&mut self, acknum: u32) { while let Some(front) = self.send_window.front() { if front.seqnum == acknum { break; } // Sequence numbers before acknum are acked if is_before(front.seqnum, acknum) { let seg = self.send_window.pop_front().unwrap(); self.snd_una = self.snd_una.wrapping_add(1); self.on_ack_received(); // Measure RTT from first-sent (non-retransmitted) if let Some(sent) = seg.sent_time { if seg.rt_secs <= 1 { // Only use first transmission for RTT let rtt_us = sent.elapsed().as_micros() as u64; self.update_rtt(rtt_us); } } } else { break; } } } /// Process an Extended ACK (EACK) for out-of-order /// segments. fn recv_eack(&mut self, eack_seqnum: u32) { for seg in self.send_window.iter_mut() { if seg.seqnum == eack_seqnum { seg.is_acked = true; break; } } } /// Deliver in-order data from the receive window to /// the read queue. fn deliver_to_read_queue(&mut self) { // Count contiguous in-order segments let mut count = 0; for slot in &self.recv_window { match slot { Some(seg) if seg.is_used => count += 1, _ => break, } } // Drain them all at once (O(n) instead of O(n²)) if count > 0 { for seg in self.recv_window.drain(..count).flatten() { self.rcv_cur = seg.seqnum; self.read_queue.push_back(seg.data); } } } /// Maximum out-of-order gap before dropping. /// Prevents memory exhaustion from malicious /// high-seqnum packets. const MAX_OOO_GAP: usize = 256; /// Place a received segment into the receive window. fn recv_data(&mut self, seqnum: u32, data: Vec) { let expected = self.rcv_cur.wrapping_add(1); if seqnum == expected { // In-order: deliver directly self.read_queue.push_back(data); self.rcv_cur = seqnum; self.deliver_to_read_queue(); // Clean up rcvd_seqno for delivered segments self.rcvd_seqno.retain(|&s| is_before(seqnum, s)); } else if is_before(expected, seqnum) { // Out-of-order: check gap before allocating let offset = seqnum.wrapping_sub(expected) as usize; // Reject if gap too large (DoS protection) if offset > Self::MAX_OOO_GAP { log::debug!("RDP: dropping packet with gap {offset}"); return; } // Check total recv window capacity if self.recv_window.len() >= self.rcv_max as usize { return; } while self.recv_window.len() <= offset { self.recv_window.push(None); } self.recv_window[offset] = Some(RecvSegment { data, seqnum, is_used: true, is_eacked: false, }); // Use bounded set (cap at MAX_OOO_GAP) if self.rcvd_seqno.len() < Self::MAX_OOO_GAP && !self.rcvd_seqno.contains(&seqnum) { self.rcvd_seqno.push(seqnum); } } // else: duplicate, ignore } /// Whether a delayed ACK should be sent. /// Update RTT estimate using Jacobson/Karels algorithm. /// /// Called when we receive an ACK for a segment /// whose `sent_time` we know. fn update_rtt(&mut self, sample_us: u64) { if self.srtt_us == 0 { // First measurement self.srtt_us = sample_us; self.rttvar_us = sample_us / 2; } else { // RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R| // SRTT = (1 - alpha) * SRTT + alpha * R // alpha = 1/8, beta = 1/4 (RFC 6298) let diff = sample_us.abs_diff(self.srtt_us); self.rttvar_us = (3 * self.rttvar_us + diff) / 4; self.srtt_us = (7 * self.srtt_us + sample_us) / 8; } // RTO = SRTT + max(G, 4 * RTTVAR) // G (clock granularity) = 1ms = 1000us let k_rttvar = 4 * self.rttvar_us; self.rto_us = self.srtt_us + k_rttvar.max(1000); // Clamp: min 200ms, max 60s self.rto_us = self.rto_us.clamp(200_000, 60_000_000); } /// Handle successful ACK: update congestion window. fn on_ack_received(&mut self) { if self.cwnd < self.ssthresh { // Slow start: increase by 1 per ACK self.cwnd += 1; } else { // Congestion avoidance: increase by 1/cwnd // (approx 1 segment per RTT) self.cwnd += 1u32.max(1 / self.cwnd.max(1)); } } /// Handle packet loss: halve congestion window. fn on_loss_detected(&mut self) { self.ssthresh = (self.cwnd / 2).max(2); self.cwnd = self.ssthresh; } fn needs_ack(&self) -> bool { self.rcv_cur != self.rcv_ack && self.last_ack_time.elapsed() >= RDP_ACK_INTERVAL } /// Check for segments needing retransmission. /// /// Returns `false` if a segment has exceeded /// max_retrans (broken pipe). fn retransmit( &mut self, max_retrans: Duration, ) -> (bool, Vec) { let mut to_send = Vec::new(); let now = Instant::now(); let rto_secs = (self.rto_us / 1_000_000).max(1); for seg in self.send_window.iter_mut() { if !seg.is_sent { break; } if seg.is_acked { continue; } // Check if we've exceeded max retransmission if seg.rt_secs > max_retrans.as_secs() { self.state = RdpState::Closed; return (false, Vec::new()); // broken pipe } if let Some(sent) = seg.sent_time { // Use adaptive RTO for first retransmit, // then exponential backoff let timeout = if seg.rt_secs <= 1 { rto_secs } else { seg.rt_secs }; let elapsed = now.duration_since(sent).as_secs(); if elapsed > timeout { seg.sent_time = Some(now); seg.rt_secs = timeout * 2; // backoff to_send.push(seg.clone()); } } } // Loss detected → halve congestion window if !to_send.is_empty() { self.on_loss_detected(); } (true, to_send) } } /// Check if sequence `a` comes before `b` (wrapping). fn is_before(a: u32, b: u32) -> bool { let diff = b.wrapping_sub(a); diff > 0 && diff < 0x80000000 } // ── Deferred action (invoke protection) ───────────── /// Actions deferred during event processing to avoid /// reentrance issues. #[derive(Debug)] pub enum RdpAction { /// Emit an event to the application. Event { desc: i32, addr: RdpAddr, event: RdpEvent, }, /// Close a connection after processing. Close(i32), } // ── RDP manager ───────────────────────────────────── /// RDP protocol manager. /// Incoming RDP packet for `Rdp::input()`. pub struct RdpInput<'a> { pub src: NodeId, pub sport: u16, pub dport: u16, pub flags: u8, pub seqnum: u32, pub acknum: u32, pub data: &'a [u8], } /// RDP protocol manager. /// /// Manages multiple connections with descriptor-based /// API (similar to file descriptors). pub struct Rdp { connections: HashMap, listeners: HashMap, addr_to_desc: HashMap, next_desc: i32, max_retrans: Duration, } impl Rdp { pub fn new() -> Self { Self { connections: HashMap::new(), listeners: HashMap::new(), addr_to_desc: HashMap::new(), next_desc: 1, max_retrans: RDP_DEFAULT_MAX_RETRANS, } } /// Create a listening socket on `port`. /// /// Returns a descriptor for the listener. pub fn listen(&mut self, port: u16) -> Result { if self.listeners.contains_key(&port) { return Err(RdpError::PortInUse(port)); } let desc = self.alloc_desc(); self.listeners.insert(port, desc); Ok(desc) } /// Initiate a connection to `dst:dport` from `sport`. /// /// Returns a descriptor for the connection. pub fn connect( &mut self, sport: u16, dst: NodeId, dport: u16, ) -> Result { let desc = self.alloc_desc(); let addr = RdpAddr { did: dst, dport, sport, }; let mut conn = RdpConnection::new(desc, addr.clone(), false); conn.state = RdpState::SynSent; conn.snd_iss = random_isn(); conn.snd_nxt = conn.snd_iss.wrapping_add(1); conn.snd_una = conn.snd_iss; conn.syn_time = Some(Instant::now()); self.addr_to_desc.insert(addr, desc); self.connections.insert(desc, conn); Ok(desc) } /// Close a connection or listener. pub fn close(&mut self, desc: i32) { if let Some(mut conn) = self.connections.remove(&desc) { conn.is_closed = true; self.addr_to_desc.remove(&conn.addr); log::debug!("RDP: closed desc {desc}"); } // Also check listeners self.listeners.retain(|_, d| *d != desc); } /// Enqueue data for sending on a connection. pub fn send(&mut self, desc: i32, data: &[u8]) -> Result { let conn = self .connections .get_mut(&desc) .ok_or(RdpError::BadDescriptor(desc))?; if conn.state != RdpState::Open { return Err(RdpError::NotOpen(desc)); } if !conn.enqueue_send(data) { return Err(RdpError::SendBufferFull); } Ok(data.len()) } /// Read available data from a connection. /// /// Returns the number of bytes read, or 0 if no /// data available. pub fn recv( &mut self, desc: i32, buf: &mut [u8], ) -> Result { let conn = self .connections .get_mut(&desc) .ok_or(RdpError::BadDescriptor(desc))?; if let Some(data) = conn.read_queue.pop_front() { let len = data.len().min(buf.len()); buf[..len].copy_from_slice(&data[..len]); Ok(len) } else { Ok(0) } } /// Get the state of a descriptor. pub fn get_state(&self, desc: i32) -> Result { self.connections .get(&desc) .map(|c| c.state) .ok_or(RdpError::BadDescriptor(desc)) } /// Get status of all connections. pub fn get_status(&self) -> Vec { self.connections .values() .map(|c| RdpStatus { state: c.state, did: c.addr.did, dport: c.addr.dport, sport: c.addr.sport, }) .collect() } /// Set the maximum retransmission timeout. pub fn set_max_retrans(&mut self, dur: Duration) { self.max_retrans = dur; } /// Get the maximum retransmission timeout. pub fn max_retrans(&self) -> Duration { self.max_retrans } /// Process incoming RDP data from a peer. /// /// Returns deferred actions (events, closes) to /// avoid reentrance during processing. pub fn input(&mut self, pkt: &RdpInput<'_>) -> Vec { let src = pkt.src; let sport = pkt.sport; let dport = pkt.dport; let flags = pkt.flags; let seqnum = pkt.seqnum; let acknum = pkt.acknum; let data = pkt.data; let addr = RdpAddr { did: src, dport: sport, // their sport is our dport sport: dport, // our dport is our sport }; let mut actions = Vec::new(); if let Some(&desc) = self.addr_to_desc.get(&addr) { // Existing connection if let Some(conn) = self.connections.get_mut(&desc) { Self::process_connected( conn, flags, seqnum, acknum, data, &mut actions, ); } } else if flags & RDP_FLAG_SYN != 0 { // New inbound SYN → check listener if let Some(&_listen_desc) = self.listeners.get(&dport) { let desc = self.alloc_desc(); let mut conn = RdpConnection::new(desc, addr.clone(), true); conn.state = RdpState::SynRcvd; conn.rcv_irs = seqnum; conn.rcv_cur = seqnum; conn.snd_iss = random_isn(); conn.snd_nxt = conn.snd_iss.wrapping_add(1); conn.snd_una = conn.snd_iss; self.addr_to_desc.insert(addr.clone(), desc); self.connections.insert(desc, conn); actions.push(RdpAction::Event { desc, addr, event: RdpEvent::Accepted, }); } // else: no listener → RST (ignored for now) } actions } /// Process a packet on an existing connection. fn process_connected( conn: &mut RdpConnection, flags: u8, seqnum: u32, acknum: u32, data: &[u8], actions: &mut Vec, ) { match conn.state { RdpState::SynSent => { if flags & RDP_FLAG_SYN != 0 && flags & RDP_FLAG_ACK != 0 { conn.rcv_irs = seqnum; conn.rcv_cur = seqnum; conn.recv_ack(acknum); conn.state = RdpState::Open; actions.push(RdpAction::Event { desc: conn.desc, addr: conn.addr.clone(), event: RdpEvent::Connected, }); } else if flags & RDP_FLAG_RST != 0 { conn.state = RdpState::Closed; actions.push(RdpAction::Event { desc: conn.desc, addr: conn.addr.clone(), event: RdpEvent::Refused, }); } } RdpState::SynRcvd => { if flags & RDP_FLAG_ACK != 0 { conn.recv_ack(acknum); conn.state = RdpState::Open; } } RdpState::Open => { if flags & RDP_FLAG_RST != 0 { conn.state = RdpState::Closed; actions.push(RdpAction::Event { desc: conn.desc, addr: conn.addr.clone(), event: RdpEvent::Reset, }); return; } if flags & RDP_FLAG_FIN != 0 { conn.state = if conn.is_passive { RdpState::CloseWaitPassive } else { RdpState::CloseWaitActive }; conn.close_time = Some(Instant::now()); return; } if flags & RDP_FLAG_ACK != 0 { conn.recv_ack(acknum); } if flags & RDP_FLAG_EAK != 0 { conn.recv_eack(seqnum); } if !data.is_empty() { conn.recv_data(seqnum, data.to_vec()); actions.push(RdpAction::Event { desc: conn.desc, addr: conn.addr.clone(), event: RdpEvent::Ready2Read, }); } } RdpState::CloseWaitActive => { if flags & RDP_FLAG_FIN != 0 { conn.state = RdpState::Closed; } } _ => {} } } /// Periodic tick: retransmit, delayed ACK, timeouts. /// /// Returns actions for timed-out connections. pub fn tick(&mut self) -> Vec { let mut actions = Vec::new(); let mut to_close = Vec::new(); for (desc, conn) in self.connections.iter_mut() { // Check SYN timeout with exponential backoff if conn.state == RdpState::SynSent { if let Some(t) = conn.syn_time { let elapsed = t.elapsed().as_secs(); if elapsed > conn.syn_rt_secs { if conn.syn_rt_secs > self.max_retrans.as_secs() { actions.push(RdpAction::Event { desc: *desc, addr: conn.addr.clone(), event: RdpEvent::Failed, }); to_close.push(*desc); } else { // Retry SYN with backoff conn.syn_time = Some(Instant::now()); conn.syn_rt_secs *= 2; } } } } // RST retry if conn.is_retry_rst { if let Some(t) = conn.rst_time { if t.elapsed().as_secs() > conn.rst_rt_secs { conn.rst_rt_secs *= 2; conn.rst_time = Some(Instant::now()); if conn.rst_rt_secs > self.max_retrans.as_secs() { conn.is_retry_rst = false; to_close.push(*desc); } } } } // Check close-wait timeout if matches!( conn.state, RdpState::CloseWaitPassive | RdpState::CloseWaitActive ) { if let Some(t) = conn.close_time { if t.elapsed() >= self.max_retrans { to_close.push(*desc); } } } // Retransmit unacked segments if conn.state == RdpState::Open { let (alive, _retransmits) = conn.retransmit(self.max_retrans); if !alive { // Broken pipe — exceeded max retrans actions.push(RdpAction::Event { desc: *desc, addr: conn.addr.clone(), event: RdpEvent::Broken, }); to_close.push(*desc); } // Note: retransmitted segments are picked // up by pending_output() in the next flush } } for d in to_close { self.close(d); } actions } /// Number of active connections. pub fn connection_count(&self) -> usize { self.connections.len() } /// Number of active listeners. pub fn listener_count(&self) -> usize { self.listeners.len() } /// Build outgoing packets for a connection. /// /// Returns `(dst_id, sport, dport, packets)` where /// each packet is `(flags, seqnum, acknum, data)`. /// The caller wraps these in protocol messages and /// sends via UDP. pub fn pending_output(&mut self, desc: i32) -> Option { let conn = self.connections.get_mut(&desc)?; let mut packets = Vec::new(); match conn.state { RdpState::SynSent => { // Send SYN with receive buffer params // (rdp_syn: out_segs_max + seg_size_max) let mut syn_data = Vec::with_capacity(4); syn_data .extend_from_slice(&(conn.rcv_max as u16).to_be_bytes()); syn_data .extend_from_slice(&(conn.rbuf_max as u16).to_be_bytes()); packets.push(RdpPacket { flags: RDP_FLAG_SYN, seqnum: conn.snd_iss, acknum: 0, data: syn_data, }); } RdpState::SynRcvd => { // Send SYN+ACK packets.push(RdpPacket { flags: RDP_FLAG_SYN | RDP_FLAG_ACK, seqnum: conn.snd_iss, acknum: conn.rcv_cur, data: Vec::new(), }); } RdpState::Open => { // Send ACK if needed if conn.needs_ack() { packets.push(RdpPacket { flags: RDP_FLAG_ACK, seqnum: conn.snd_nxt, acknum: conn.rcv_cur, data: Vec::new(), }); conn.last_ack_time = Instant::now(); conn.rcv_ack = conn.rcv_cur; } // Send EACKs for out-of-order recv segments for seg in conn.recv_window.iter_mut().flatten() { if seg.is_used && !seg.is_eacked { packets.push(RdpPacket { flags: RDP_FLAG_EAK | RDP_FLAG_ACK, seqnum: seg.seqnum, acknum: conn.rcv_cur, data: Vec::new(), }); seg.is_eacked = true; } } // Send pending data segments, limited by // congestion window (AIMD) let in_flight = conn .send_window .iter() .filter(|s| s.is_sent && !s.is_acked) .count() as u32; let can_send = conn.cwnd.saturating_sub(in_flight); let mut sent_count = 0u32; for seg in &conn.send_window { if sent_count >= can_send { break; } if !seg.is_sent && !seg.is_acked { packets.push(RdpPacket { flags: RDP_FLAG_ACK, seqnum: seg.seqnum, acknum: conn.rcv_cur, data: seg.data.clone(), }); sent_count += 1; } } // Mark as sent let mut marked = 0u32; for seg in conn.send_window.iter_mut() { if marked >= can_send { break; } if !seg.is_sent { seg.is_sent = true; seg.sent_time = Some(Instant::now()); marked += 1; } } } _ => {} } if packets.is_empty() { return None; } Some(PendingOutput { dst: conn.addr.did, sport: conn.addr.sport, dport: conn.addr.dport, packets, }) } /// Get all connection descriptors. pub fn descriptors(&self) -> Vec { self.connections.keys().copied().collect() } fn alloc_desc(&mut self) -> i32 { let d = self.next_desc; // Wrap at i32::MAX to avoid overflow; skip 0 // and negative values self.next_desc = if d >= i32::MAX - 1 { 1 } else { d + 1 }; d } } /// A pending outgoing RDP packet. #[derive(Debug, Clone)] pub struct RdpPacket { pub flags: u8, pub seqnum: u32, pub acknum: u32, pub data: Vec, } /// Pending output for a connection. #[derive(Debug)] pub struct PendingOutput { pub dst: NodeId, pub sport: u16, pub dport: u16, pub packets: Vec, } /// Build an RDP wire packet: rdp_head(20) + data. pub fn build_rdp_wire( flags: u8, sport: u16, dport: u16, seqnum: u32, acknum: u32, data: &[u8], ) -> Vec { let dlen = data.len() as u16; let mut buf = vec![0u8; RDP_HEADER_SIZE + data.len()]; buf[0] = flags; buf[1] = (RDP_HEADER_SIZE / 2) as u8; // hlen in 16-bit words buf[2..4].copy_from_slice(&sport.to_be_bytes()); buf[4..6].copy_from_slice(&dport.to_be_bytes()); buf[6..8].copy_from_slice(&dlen.to_be_bytes()); buf[8..12].copy_from_slice(&seqnum.to_be_bytes()); buf[12..16].copy_from_slice(&acknum.to_be_bytes()); buf[16..20].fill(0); // reserved buf[20..].copy_from_slice(data); buf } /// Parsed RDP wire header fields. pub struct RdpWireHeader<'a> { pub flags: u8, pub sport: u16, pub dport: u16, pub seqnum: u32, pub acknum: u32, pub data: &'a [u8], } /// Parse an RDP wire packet header. pub fn parse_rdp_wire(buf: &[u8]) -> Option> { if buf.len() < RDP_HEADER_SIZE { return None; } Some(RdpWireHeader { flags: buf[0], sport: u16::from_be_bytes([buf[2], buf[3]]), dport: u16::from_be_bytes([buf[4], buf[5]]), seqnum: u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]), acknum: u32::from_be_bytes([buf[12], buf[13], buf[14], buf[15]]), data: &buf[RDP_HEADER_SIZE..], }) } impl Default for Rdp { fn default() -> Self { Self::new() } } // ── Errors ────────────────────────────────────────── #[derive(Debug)] pub enum RdpError { PortInUse(u16), BadDescriptor(i32), NotOpen(i32), SendBufferFull, } impl std::fmt::Display for RdpError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RdpError::PortInUse(p) => write!(f, "port {p} in use"), RdpError::BadDescriptor(d) => write!(f, "bad descriptor {d}"), RdpError::NotOpen(d) => write!(f, "descriptor {d} not open"), RdpError::SendBufferFull => write!(f, "send buffer full"), } } } impl std::error::Error for RdpError {} #[cfg(test)] mod tests { use super::*; #[test] fn listen_and_close() { let mut rdp = Rdp::new(); let desc = rdp.listen(5000).unwrap(); assert_eq!(rdp.listener_count(), 1); rdp.close(desc); assert_eq!(rdp.listener_count(), 0); } #[test] fn listen_duplicate_port() { let mut rdp = Rdp::new(); rdp.listen(5000).unwrap(); assert!(matches!(rdp.listen(5000), Err(RdpError::PortInUse(5000)))); } #[test] fn connect_creates_syn_sent() { let mut rdp = Rdp::new(); let dst = NodeId::from_bytes([0x01; 32]); let desc = rdp.connect(0, dst, 5000).unwrap(); assert_eq!(rdp.get_state(desc).unwrap(), RdpState::SynSent); } #[test] fn send_before_open_fails() { let mut rdp = Rdp::new(); let dst = NodeId::from_bytes([0x01; 32]); let desc = rdp.connect(0, dst, 5000).unwrap(); assert!(matches!( rdp.send(desc, b"hello"), Err(RdpError::NotOpen(_)) )); } #[test] fn recv_empty() { let mut rdp = Rdp::new(); let dst = NodeId::from_bytes([0x01; 32]); let desc = rdp.connect(0, dst, 5000).unwrap(); let mut buf = [0u8; 64]; // SynSent state → bad descriptor or no data assert!(rdp.recv(desc, &mut buf).is_ok()); } #[test] fn syn_ack_opens_connection() { let mut rdp = Rdp::new(); let dst = NodeId::from_bytes([0x01; 32]); let desc = rdp.connect(1000, dst, 5000).unwrap(); // Simulate receiving SYN+ACK let actions = rdp.input(&RdpInput { src: dst, sport: 5000, dport: 1000, flags: RDP_FLAG_SYN | RDP_FLAG_ACK, seqnum: 100, acknum: 1, data: &[], }); assert_eq!(rdp.get_state(desc).unwrap(), RdpState::Open); assert!(actions.iter().any(|a| matches!( a, RdpAction::Event { event: RdpEvent::Connected, .. } ))); } #[test] fn inbound_syn_accepted() { let mut rdp = Rdp::new(); rdp.listen(5000).unwrap(); let peer = NodeId::from_bytes([0x02; 32]); let actions = rdp.input(&RdpInput { src: peer, sport: 3000, dport: 5000, flags: RDP_FLAG_SYN, seqnum: 200, acknum: 0, data: &[], }); assert_eq!(rdp.connection_count(), 1); assert!(actions.iter().any(|a| matches!( a, RdpAction::Event { event: RdpEvent::Accepted, .. } ))); } #[test] fn rst_resets_connection() { let mut rdp = Rdp::new(); let dst = NodeId::from_bytes([0x01; 32]); let desc = rdp.connect(1000, dst, 5000).unwrap(); // Open first rdp.input(&RdpInput { src: dst, sport: 5000, dport: 1000, flags: RDP_FLAG_SYN | RDP_FLAG_ACK, seqnum: 100, acknum: 1, data: &[], }); assert_eq!(rdp.get_state(desc).unwrap(), RdpState::Open); // RST let actions = rdp.input(&RdpInput { src: dst, sport: 5000, dport: 1000, flags: RDP_FLAG_RST, seqnum: 0, acknum: 0, data: &[], }); assert_eq!(rdp.get_state(desc).unwrap(), RdpState::Closed); assert!(actions.iter().any(|a| matches!( a, RdpAction::Event { event: RdpEvent::Reset, .. } ))); } #[test] fn data_delivery() { let mut rdp = Rdp::new(); let dst = NodeId::from_bytes([0x01; 32]); let desc = rdp.connect(1000, dst, 5000).unwrap(); // Open rdp.input(&RdpInput { src: dst, sport: 5000, dport: 1000, flags: RDP_FLAG_SYN | RDP_FLAG_ACK, seqnum: 100, acknum: 1, data: &[], }); // Receive data let actions = rdp.input(&RdpInput { src: dst, sport: 5000, dport: 1000, flags: RDP_FLAG_ACK, seqnum: 101, acknum: 1, data: b"hello", }); assert!(actions.iter().any(|a| matches!( a, RdpAction::Event { event: RdpEvent::Ready2Read, .. } ))); let mut buf = [0u8; 64]; let n = rdp.recv(desc, &mut buf).unwrap(); assert_eq!(&buf[..n], b"hello"); } #[test] fn send_data_on_open() { let mut rdp = Rdp::new(); let dst = NodeId::from_bytes([0x01; 32]); let desc = rdp.connect(1000, dst, 5000).unwrap(); // Open rdp.input(&RdpInput { src: dst, sport: 5000, dport: 1000, flags: RDP_FLAG_SYN | RDP_FLAG_ACK, seqnum: 100, acknum: 1, data: &[], }); let n = rdp.send(desc, b"world").unwrap(); assert_eq!(n, 5); } #[test] fn get_status() { let mut rdp = Rdp::new(); let dst = NodeId::from_bytes([0x01; 32]); rdp.connect(1000, dst, 5000).unwrap(); let status = rdp.get_status(); assert_eq!(status.len(), 1); assert_eq!(status[0].state, RdpState::SynSent); assert_eq!(status[0].dport, 5000); } #[test] fn is_before_wrapping() { assert!(is_before(1, 2)); assert!(is_before(0, 1)); assert!(!is_before(2, 1)); assert!(!is_before(5, 5)); // Wrapping: u32::MAX is before 0 assert!(is_before(u32::MAX, 0)); } }