diff options
| author | murilo ijanc | 2026-03-24 15:04:03 -0300 |
|---|---|---|
| committer | murilo ijanc | 2026-03-24 15:04:03 -0300 |
| commit | 9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch) | |
| tree | 53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/socket.rs | |
| download | tesseras-dht-e908bc01403f4b8ef2a65fa6be43716fd1c6e003.tar.gz | |
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks.
Features:
- Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE)
- NAT traversal via DTUN hole-punching and proxy relay
- Reliable Datagram Protocol (RDP) with 7-state connection machine
- Datagram transport with automatic fragmentation/reassembly
- Ed25519 packet authentication
- 256-bit node IDs (Ed25519 public keys)
- Rate limiting, ban list, and eclipse attack mitigation
- Persistence and metrics
- OpenBSD and Linux support
Diffstat (limited to 'src/socket.rs')
| -rw-r--r-- | src/socket.rs | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/src/socket.rs b/src/socket.rs new file mode 100644 index 0000000..7081ff5 --- /dev/null +++ b/src/socket.rs @@ -0,0 +1,159 @@ +//! UDP I/O via mio (kqueue on OpenBSD, epoll on Linux). +//! +//! Supports dynamic registration of additional sockets +//! (needed by NAT detector's temporary probe socket). + +use std::net::SocketAddr; +use std::time::Duration; + +use mio::net::UdpSocket; +use mio::{Events, Interest, Poll, Registry, Token}; + +use crate::error::Error; + +/// Token for the primary DHT socket. +pub const UDP_TOKEN: Token = Token(0); + +/// Event-driven network I/O loop. +/// +/// Wraps a mio `Poll` with a primary UDP socket. +/// Additional sockets can be registered dynamically +/// (e.g. for NAT detection probes). +pub struct NetLoop { + poll: Poll, + events: Events, + socket: UdpSocket, + next_token: usize, +} + +impl NetLoop { + /// Bind a UDP socket and register it with the poller. + pub fn bind(addr: SocketAddr) -> Result<Self, Error> { + let poll = Poll::new()?; + let mut socket = UdpSocket::bind(addr)?; + poll.registry() + .register(&mut socket, UDP_TOKEN, Interest::READABLE)?; + Ok(Self { + poll, + events: Events::with_capacity(64), + socket, + next_token: 1, + }) + } + + /// Send a datagram to the given address. + pub fn send_to( + &self, + buf: &[u8], + addr: SocketAddr, + ) -> Result<usize, Error> { + self.socket.send_to(buf, addr).map_err(Error::Io) + } + + /// Receive a datagram from the primary socket. + /// + /// Returns `(bytes_read, sender_address)` or + /// `WouldBlock` if no data available. + pub fn recv_from( + &self, + buf: &mut [u8], + ) -> Result<(usize, SocketAddr), Error> { + self.socket.recv_from(buf).map_err(Error::Io) + } + + /// Poll for I/O events, blocking up to `timeout`. + pub fn poll_events(&mut self, timeout: Duration) -> Result<(), Error> { + self.poll.poll(&mut self.events, Some(timeout))?; + Ok(()) + } + + /// Iterate over events from the last `poll_events` call. + pub fn drain_events(&self) -> impl Iterator<Item = &mio::event::Event> { + self.events.iter() + } + + /// Get the local address of the primary socket. + pub fn local_addr(&self) -> Result<SocketAddr, Error> { + self.socket.local_addr().map_err(Error::Io) + } + + /// Access the mio registry for registering additional + /// sockets (e.g. NAT detection probe sockets). + pub fn registry(&self) -> &Registry { + self.poll.registry() + } + + /// Allocate a new unique token for a dynamic socket. + /// Wraps at usize::MAX, skips 0 (reserved for + /// UDP_TOKEN). + pub fn next_token(&mut self) -> Token { + let t = Token(self.next_token); + self.next_token = self.next_token.wrapping_add(1).max(1); + t + } + + /// Register an additional UDP socket with the poller. + /// + /// Returns the assigned token. + pub fn register_socket( + &mut self, + socket: &mut UdpSocket, + ) -> Result<Token, Error> { + let token = self.next_token(); + self.poll + .registry() + .register(socket, token, Interest::READABLE)?; + Ok(token) + } + + /// Deregister a previously registered socket. + pub fn deregister_socket( + &mut self, + socket: &mut UdpSocket, + ) -> Result<(), Error> { + self.poll.registry().deregister(socket)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn bind_and_local_addr() { + let net = NetLoop::bind("127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = net.local_addr().unwrap(); + assert_eq!(addr.ip(), "127.0.0.1".parse::<std::net::IpAddr>().unwrap()); + assert_ne!(addr.port(), 0); + } + + #[test] + fn send_recv_loopback() { + let net = NetLoop::bind("127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = net.local_addr().unwrap(); + + // Send to self + let sent = net.send_to(b"ping", addr).unwrap(); + assert_eq!(sent, 4); + + // Poll for the event + let mut net = net; + net.poll_events(Duration::from_millis(100)).unwrap(); + + let mut buf = [0u8; 64]; + let (len, from) = net.recv_from(&mut buf).unwrap(); + assert_eq!(&buf[..len], b"ping"); + assert_eq!(from, addr); + } + + #[test] + fn register_extra_socket() { + let mut net = NetLoop::bind("127.0.0.1:0".parse().unwrap()).unwrap(); + let mut extra = + UdpSocket::bind("127.0.0.1:0".parse().unwrap()).unwrap(); + let token = net.register_socket(&mut extra).unwrap(); + assert_ne!(token, UDP_TOKEN); + net.deregister_socket(&mut extra).unwrap(); + } +} |