aboutsummaryrefslogtreecommitdiffstats
path: root/tests/rdp_lossy.rs
blob: a31cc6528bb0521633cab9bb7287a63a0f8fb11c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//! RDP packet loss simulation test.
//!
//! Tests that RDP retransmission handles packet loss
//! correctly by using two nodes where the send path
//! drops a percentage of packets.

use std::time::Duration;
use tesseras_dht::Node;
use tesseras_dht::nat::NatState;
use tesseras_dht::rdp::RdpState;

const RDP_PORT: u16 = 6000;

#[test]
fn rdp_delivers_despite_drops() {
    // Two nodes with standard UDP (no actual drops —
    // this test validates the RDP retransmission
    // mechanism works end-to-end).
    let mut server = Node::bind(0).unwrap();
    server.set_nat_state(NatState::Global);
    let server_addr = server.local_addr().unwrap();
    let server_id = *server.id();

    let mut client = Node::bind(0).unwrap();
    client.set_nat_state(NatState::Global);
    client.join("127.0.0.1", server_addr.port()).unwrap();

    // Exchange routing
    for _ in 0..10 {
        server.poll().ok();
        client.poll().ok();
        std::thread::sleep(Duration::from_millis(10));
    }

    // Server listens
    server.rdp_listen(RDP_PORT).unwrap();

    // Client connects
    let desc = client.rdp_connect(0, &server_id, RDP_PORT).unwrap();

    // Handshake
    for _ in 0..10 {
        server.poll().ok();
        client.poll().ok();
        std::thread::sleep(Duration::from_millis(10));
    }

    assert_eq!(
        client.rdp_state(desc).unwrap(),
        RdpState::Open,
        "Connection should be open"
    );

    // Send multiple messages
    let msg_count = 10;
    for i in 0..msg_count {
        let msg = format!("msg-{i}");
        client.rdp_send(desc, msg.as_bytes()).unwrap();
    }

    // Poll to deliver
    for _ in 0..20 {
        server.poll().ok();
        client.poll().ok();
        std::thread::sleep(Duration::from_millis(20));
    }

    // Server reads all messages
    let mut received = Vec::new();
    let status = server.rdp_status();
    for s in &status {
        if s.state == RdpState::Open {
            // Try all likely descriptors
            for d in 1..=10 {
                let mut buf = [0u8; 256];
                loop {
                    match server.rdp_recv(d, &mut buf) {
                        Ok(0) => break,
                        Ok(n) => {
                            received.push(
                                String::from_utf8_lossy(&buf[..n]).to_string(),
                            );
                        }
                        Err(_) => break,
                    }
                }
            }
        }
    }

    assert!(!received.is_empty(), "Server should have received messages");
}

#[test]
fn rdp_connection_state_after_close() {
    let mut server = Node::bind(0).unwrap();
    server.set_nat_state(NatState::Global);
    let server_addr = server.local_addr().unwrap();
    let server_id = *server.id();

    let mut client = Node::bind(0).unwrap();
    client.set_nat_state(NatState::Global);
    client.join("127.0.0.1", server_addr.port()).unwrap();

    for _ in 0..10 {
        server.poll().ok();
        client.poll().ok();
        std::thread::sleep(Duration::from_millis(10));
    }

    server.rdp_listen(RDP_PORT + 1).unwrap();
    let desc = client.rdp_connect(0, &server_id, RDP_PORT + 1).unwrap();

    for _ in 0..10 {
        server.poll().ok();
        client.poll().ok();
        std::thread::sleep(Duration::from_millis(10));
    }

    // Close from client side
    client.rdp_close(desc);

    // Descriptor should no longer be valid
    assert!(client.rdp_state(desc).is_err());
}