1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
//! RDP reliable transport example (equivalent to example5.cpp).
//!
//! Two nodes: server listens, client connects, sends
//! data, server receives it.
//!
//! Usage:
//! cargo run --example rdp
use std::time::Duration;
use tesseras_dht::Node;
use tesseras_dht::nat::NatState;
use tesseras_dht::rdp::RdpState;
const RDP_PORT: u16 = 5000;
fn main() {
env_logger::Builder::from_env(
env_logger::Env::default().default_filter_or("info"),
)
.format(|buf, record| {
use std::io::Write;
writeln!(
buf,
"{} [{}] {}",
record.level(),
record.target(),
record.args()
)
})
.init();
// Create two nodes
let mut server = Node::bind(0).expect("bind server");
server.set_nat_state(NatState::Global);
let server_addr = server.local_addr().unwrap();
let server_id = *server.id();
println!("Server: {} @ {server_addr}", server.id_hex());
let mut client = Node::bind(0).expect("bind client");
client.set_nat_state(NatState::Global);
println!("Client: {}", client.id_hex());
// Client joins server so they know each other
client.join("127.0.0.1", server_addr.port()).expect("join");
// Poll to exchange routing info
for _ in 0..10 {
server.poll().ok();
client.poll().ok();
std::thread::sleep(Duration::from_millis(20));
}
println!("Client knows {} peers", client.routing_table_size());
// Server listens on RDP port
let _listen = server.rdp_listen(RDP_PORT).expect("listen");
println!("Server listening on RDP port {RDP_PORT}");
// Client connects
let desc = client
.rdp_connect(0, &server_id, RDP_PORT)
.expect("connect");
println!("Client state: {:?}", client.rdp_state(desc).unwrap());
// Poll to complete handshake
for _ in 0..10 {
server.poll().ok();
client.poll().ok();
std::thread::sleep(Duration::from_millis(20));
}
println!(
"Client state after handshake: {:?}",
client.rdp_state(desc).unwrap_or(RdpState::Closed)
);
// Send data if connection is open
match client.rdp_state(desc) {
Ok(RdpState::Open) => {
for i in 0..3u16 {
let msg = format!("hello {i}");
match client.rdp_send(desc, msg.as_bytes()) {
Ok(n) => println!("Sent: '{msg}' ({n} bytes)"),
Err(e) => println!("Send error: {e}"),
}
}
// Poll to deliver
for _ in 0..10 {
server.poll().ok();
client.poll().ok();
std::thread::sleep(Duration::from_millis(20));
}
// Server reads received data
println!("\n--- Server reading ---");
let server_status = server.rdp_status();
for s in &server_status {
if s.state == RdpState::Open {
let mut buf = [0u8; 256];
loop {
match server.rdp_recv(s.sport as i32 + 1, &mut buf) {
Ok(0) => break,
Ok(n) => {
let msg = String::from_utf8_lossy(&buf[..n]);
println!("Server received: '{msg}'");
}
Err(_) => break,
}
}
}
}
// Try reading from desc 2 (server-side accepted desc)
let mut buf = [0u8; 256];
for attempt_desc in 1..=5 {
loop {
match server.rdp_recv(attempt_desc, &mut buf) {
Ok(0) => break,
Ok(n) => {
let msg = String::from_utf8_lossy(&buf[..n]);
println!("Server desc={attempt_desc}: '{msg}'");
}
Err(_) => break,
}
}
}
}
Ok(state) => {
println!("Connection not open: {state:?}");
}
Err(e) => {
println!("Descriptor error: {e}");
}
}
// Show status
println!("\n--- RDP Status ---");
for s in &client.rdp_status() {
println!(" state={:?} dport={} sport={}", s.state, s.dport, s.sport);
}
// Cleanup
client.rdp_close(desc);
println!("\n--- Done ---");
println!("Server: {server}");
println!("Client: {client}");
}
|