aboutsummaryrefslogtreecommitdiffstats
path: root/examples/rdp.rs
blob: 319c77945b6ce8cab493b28c238c6a101bc67fd7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//! RDP reliable transport example (equivalent to example5.cpp).
//!
//! Two nodes: server listens, client connects, sends
//! data, server receives it.
//!
//! Usage:
//!   cargo run --example rdp

use std::time::Duration;
use tesseras_dht::Node;
use tesseras_dht::nat::NatState;
use tesseras_dht::rdp::RdpState;

const RDP_PORT: u16 = 5000;

fn main() {
    env_logger::Builder::from_env(
        env_logger::Env::default().default_filter_or("info"),
    )
    .format(|buf, record| {
        use std::io::Write;
        writeln!(
            buf,
            "{} [{}] {}",
            record.level(),
            record.target(),
            record.args()
        )
    })
    .init();

    // Create two nodes
    let mut server = Node::bind(0).expect("bind server");
    server.set_nat_state(NatState::Global);
    let server_addr = server.local_addr().unwrap();
    let server_id = *server.id();
    println!("Server: {} @ {server_addr}", server.id_hex());

    let mut client = Node::bind(0).expect("bind client");
    client.set_nat_state(NatState::Global);
    println!("Client: {}", client.id_hex());

    // Client joins server so they know each other
    client.join("127.0.0.1", server_addr.port()).expect("join");

    // Poll to exchange routing info
    for _ in 0..10 {
        server.poll().ok();
        client.poll().ok();
        std::thread::sleep(Duration::from_millis(20));
    }
    println!("Client knows {} peers", client.routing_table_size());

    // Server listens on RDP port
    let _listen = server.rdp_listen(RDP_PORT).expect("listen");
    println!("Server listening on RDP port {RDP_PORT}");

    // Client connects
    let desc = client
        .rdp_connect(0, &server_id, RDP_PORT)
        .expect("connect");
    println!("Client state: {:?}", client.rdp_state(desc).unwrap());

    // Poll to complete handshake
    for _ in 0..10 {
        server.poll().ok();
        client.poll().ok();
        std::thread::sleep(Duration::from_millis(20));
    }

    println!(
        "Client state after handshake: {:?}",
        client.rdp_state(desc).unwrap_or(RdpState::Closed)
    );

    // Send data if connection is open
    match client.rdp_state(desc) {
        Ok(RdpState::Open) => {
            for i in 0..3u16 {
                let msg = format!("hello {i}");
                match client.rdp_send(desc, msg.as_bytes()) {
                    Ok(n) => println!("Sent: '{msg}' ({n} bytes)"),
                    Err(e) => println!("Send error: {e}"),
                }
            }

            // Poll to deliver
            for _ in 0..10 {
                server.poll().ok();
                client.poll().ok();
                std::thread::sleep(Duration::from_millis(20));
            }

            // Server reads received data
            println!("\n--- Server reading ---");
            let server_status = server.rdp_status();
            for s in &server_status {
                if s.state == RdpState::Open {
                    let mut buf = [0u8; 256];
                    loop {
                        match server.rdp_recv(s.sport as i32 + 1, &mut buf) {
                            Ok(0) => break,
                            Ok(n) => {
                                let msg = String::from_utf8_lossy(&buf[..n]);
                                println!("Server received: '{msg}'");
                            }
                            Err(_) => break,
                        }
                    }
                }
            }
            // Try reading from desc 2 (server-side accepted desc)
            let mut buf = [0u8; 256];
            for attempt_desc in 1..=5 {
                loop {
                    match server.rdp_recv(attempt_desc, &mut buf) {
                        Ok(0) => break,
                        Ok(n) => {
                            let msg = String::from_utf8_lossy(&buf[..n]);
                            println!("Server desc={attempt_desc}: '{msg}'");
                        }
                        Err(_) => break,
                    }
                }
            }
        }
        Ok(state) => {
            println!("Connection not open: {state:?}");
        }
        Err(e) => {
            println!("Descriptor error: {e}");
        }
    }

    // Show status
    println!("\n--- RDP Status ---");
    for s in &client.rdp_status() {
        println!("  state={:?} dport={} sport={}", s.state, s.dport, s.sport);
    }

    // Cleanup
    client.rdp_close(desc);

    println!("\n--- Done ---");
    println!("Server: {server}");
    println!("Client: {client}");
}