//! UDP I/O via mio (kqueue on OpenBSD, epoll on Linux). //! //! Supports dynamic registration of additional sockets //! (needed by NAT detector's temporary probe socket). use std::net::SocketAddr; use std::time::Duration; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Registry, Token}; use crate::error::Error; /// Token for the primary DHT socket. pub const UDP_TOKEN: Token = Token(0); /// Event-driven network I/O loop. /// /// Wraps a mio `Poll` with a primary UDP socket. /// Additional sockets can be registered dynamically /// (e.g. for NAT detection probes). pub struct NetLoop { poll: Poll, events: Events, socket: UdpSocket, next_token: usize, } impl NetLoop { /// Bind a UDP socket and register it with the poller. pub fn bind(addr: SocketAddr) -> Result { let poll = Poll::new()?; let mut socket = UdpSocket::bind(addr)?; poll.registry() .register(&mut socket, UDP_TOKEN, Interest::READABLE)?; Ok(Self { poll, events: Events::with_capacity(64), socket, next_token: 1, }) } /// Send a datagram to the given address. pub fn send_to( &self, buf: &[u8], addr: SocketAddr, ) -> Result { self.socket.send_to(buf, addr).map_err(Error::Io) } /// Receive a datagram from the primary socket. /// /// Returns `(bytes_read, sender_address)` or /// `WouldBlock` if no data available. pub fn recv_from( &self, buf: &mut [u8], ) -> Result<(usize, SocketAddr), Error> { self.socket.recv_from(buf).map_err(Error::Io) } /// Poll for I/O events, blocking up to `timeout`. pub fn poll_events(&mut self, timeout: Duration) -> Result<(), Error> { self.poll.poll(&mut self.events, Some(timeout))?; Ok(()) } /// Iterate over events from the last `poll_events` call. pub fn drain_events(&self) -> impl Iterator { self.events.iter() } /// Get the local address of the primary socket. pub fn local_addr(&self) -> Result { self.socket.local_addr().map_err(Error::Io) } /// Access the mio registry for registering additional /// sockets (e.g. NAT detection probe sockets). pub fn registry(&self) -> &Registry { self.poll.registry() } /// Allocate a new unique token for a dynamic socket. /// Wraps at usize::MAX, skips 0 (reserved for /// UDP_TOKEN). pub fn next_token(&mut self) -> Token { let t = Token(self.next_token); self.next_token = self.next_token.wrapping_add(1).max(1); t } /// Register an additional UDP socket with the poller. /// /// Returns the assigned token. pub fn register_socket( &mut self, socket: &mut UdpSocket, ) -> Result { let token = self.next_token(); self.poll .registry() .register(socket, token, Interest::READABLE)?; Ok(token) } /// Deregister a previously registered socket. pub fn deregister_socket( &mut self, socket: &mut UdpSocket, ) -> Result<(), Error> { self.poll.registry().deregister(socket)?; Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn bind_and_local_addr() { let net = NetLoop::bind("127.0.0.1:0".parse().unwrap()).unwrap(); let addr = net.local_addr().unwrap(); assert_eq!(addr.ip(), "127.0.0.1".parse::().unwrap()); assert_ne!(addr.port(), 0); } #[test] fn send_recv_loopback() { let net = NetLoop::bind("127.0.0.1:0".parse().unwrap()).unwrap(); let addr = net.local_addr().unwrap(); // Send to self let sent = net.send_to(b"ping", addr).unwrap(); assert_eq!(sent, 4); // Poll for the event let mut net = net; net.poll_events(Duration::from_millis(100)).unwrap(); let mut buf = [0u8; 64]; let (len, from) = net.recv_from(&mut buf).unwrap(); assert_eq!(&buf[..len], b"ping"); assert_eq!(from, addr); } #[test] fn register_extra_socket() { let mut net = NetLoop::bind("127.0.0.1:0".parse().unwrap()).unwrap(); let mut extra = UdpSocket::bind("127.0.0.1:0".parse().unwrap()).unwrap(); let token = net.register_socket(&mut extra).unwrap(); assert_ne!(token, UDP_TOKEN); net.deregister_socket(&mut extra).unwrap(); } }