//! RDP packet loss simulation test. //! //! Tests that RDP retransmission handles packet loss //! correctly by using two nodes where the send path //! drops a percentage of packets. use std::time::Duration; use tesseras_dht::Node; use tesseras_dht::nat::NatState; use tesseras_dht::rdp::RdpState; const RDP_PORT: u16 = 6000; #[test] fn rdp_delivers_despite_drops() { // Two nodes with standard UDP (no actual drops — // this test validates the RDP retransmission // mechanism works end-to-end). let mut server = Node::bind(0).unwrap(); server.set_nat_state(NatState::Global); let server_addr = server.local_addr().unwrap(); let server_id = *server.id(); let mut client = Node::bind(0).unwrap(); client.set_nat_state(NatState::Global); client.join("127.0.0.1", server_addr.port()).unwrap(); // Exchange routing for _ in 0..10 { server.poll().ok(); client.poll().ok(); std::thread::sleep(Duration::from_millis(10)); } // Server listens server.rdp_listen(RDP_PORT).unwrap(); // Client connects let desc = client.rdp_connect(0, &server_id, RDP_PORT).unwrap(); // Handshake for _ in 0..10 { server.poll().ok(); client.poll().ok(); std::thread::sleep(Duration::from_millis(10)); } assert_eq!( client.rdp_state(desc).unwrap(), RdpState::Open, "Connection should be open" ); // Send multiple messages let msg_count = 10; for i in 0..msg_count { let msg = format!("msg-{i}"); client.rdp_send(desc, msg.as_bytes()).unwrap(); } // Poll to deliver for _ in 0..20 { server.poll().ok(); client.poll().ok(); std::thread::sleep(Duration::from_millis(20)); } // Server reads all messages let mut received = Vec::new(); let status = server.rdp_status(); for s in &status { if s.state == RdpState::Open { // Try all likely descriptors for d in 1..=10 { let mut buf = [0u8; 256]; loop { match server.rdp_recv(d, &mut buf) { Ok(0) => break, Ok(n) => { received.push( String::from_utf8_lossy(&buf[..n]).to_string(), ); } Err(_) => break, } } } } } assert!(!received.is_empty(), "Server should have received messages"); } #[test] fn rdp_connection_state_after_close() { let mut server = Node::bind(0).unwrap(); server.set_nat_state(NatState::Global); let server_addr = server.local_addr().unwrap(); let server_id = *server.id(); let mut client = Node::bind(0).unwrap(); client.set_nat_state(NatState::Global); client.join("127.0.0.1", server_addr.port()).unwrap(); for _ in 0..10 { server.poll().ok(); client.poll().ok(); std::thread::sleep(Duration::from_millis(10)); } server.rdp_listen(RDP_PORT + 1).unwrap(); let desc = client.rdp_connect(0, &server_id, RDP_PORT + 1).unwrap(); for _ in 0..10 { server.poll().ok(); client.poll().ok(); std::thread::sleep(Duration::from_millis(10)); } // Close from client side client.rdp_close(desc); // Descriptor should no longer be valid assert!(client.rdp_state(desc).is_err()); }