aboutsummaryrefslogtreecommitdiffstats
path: root/src/socket.rs
diff options
context:
space:
mode:
authormurilo ijanc2026-03-24 15:04:03 -0300
committermurilo ijanc2026-03-24 15:04:03 -0300
commit9821aabf0b50d2487b07502d3d2cd89e7d62bdbe (patch)
tree53da095ff90cc755bac3d4bf699172b5e8cd07d6 /src/socket.rs
downloadtesseras-dht-9821aabf0b50d2487b07502d3d2cd89e7d62bdbe.tar.gz
Initial commitv0.1.0
NAT-aware Kademlia DHT library for peer-to-peer networks. Features: - Distributed key-value storage (iterative FIND_NODE, FIND_VALUE, STORE) - NAT traversal via DTUN hole-punching and proxy relay - Reliable Datagram Protocol (RDP) with 7-state connection machine - Datagram transport with automatic fragmentation/reassembly - Ed25519 packet authentication - 256-bit node IDs (Ed25519 public keys) - Rate limiting, ban list, and eclipse attack mitigation - Persistence and metrics - OpenBSD and Linux support
Diffstat (limited to 'src/socket.rs')
-rw-r--r--src/socket.rs159
1 files changed, 159 insertions, 0 deletions
diff --git a/src/socket.rs b/src/socket.rs
new file mode 100644
index 0000000..7081ff5
--- /dev/null
+++ b/src/socket.rs
@@ -0,0 +1,159 @@
+//! UDP I/O via mio (kqueue on OpenBSD, epoll on Linux).
+//!
+//! Supports dynamic registration of additional sockets
+//! (needed by NAT detector's temporary probe socket).
+
+use std::net::SocketAddr;
+use std::time::Duration;
+
+use mio::net::UdpSocket;
+use mio::{Events, Interest, Poll, Registry, Token};
+
+use crate::error::Error;
+
+/// Token for the primary DHT socket.
+pub const UDP_TOKEN: Token = Token(0);
+
+/// Event-driven network I/O loop.
+///
+/// Wraps a mio `Poll` with a primary UDP socket.
+/// Additional sockets can be registered dynamically
+/// (e.g. for NAT detection probes).
+pub struct NetLoop {
+ poll: Poll,
+ events: Events,
+ socket: UdpSocket,
+ next_token: usize,
+}
+
+impl NetLoop {
+ /// Bind a UDP socket and register it with the poller.
+ pub fn bind(addr: SocketAddr) -> Result<Self, Error> {
+ let poll = Poll::new()?;
+ let mut socket = UdpSocket::bind(addr)?;
+ poll.registry()
+ .register(&mut socket, UDP_TOKEN, Interest::READABLE)?;
+ Ok(Self {
+ poll,
+ events: Events::with_capacity(64),
+ socket,
+ next_token: 1,
+ })
+ }
+
+ /// Send a datagram to the given address.
+ pub fn send_to(
+ &self,
+ buf: &[u8],
+ addr: SocketAddr,
+ ) -> Result<usize, Error> {
+ self.socket.send_to(buf, addr).map_err(Error::Io)
+ }
+
+ /// Receive a datagram from the primary socket.
+ ///
+ /// Returns `(bytes_read, sender_address)` or
+ /// `WouldBlock` if no data available.
+ pub fn recv_from(
+ &self,
+ buf: &mut [u8],
+ ) -> Result<(usize, SocketAddr), Error> {
+ self.socket.recv_from(buf).map_err(Error::Io)
+ }
+
+ /// Poll for I/O events, blocking up to `timeout`.
+ pub fn poll_events(&mut self, timeout: Duration) -> Result<(), Error> {
+ self.poll.poll(&mut self.events, Some(timeout))?;
+ Ok(())
+ }
+
+ /// Iterate over events from the last `poll_events` call.
+ pub fn drain_events(&self) -> impl Iterator<Item = &mio::event::Event> {
+ self.events.iter()
+ }
+
+ /// Get the local address of the primary socket.
+ pub fn local_addr(&self) -> Result<SocketAddr, Error> {
+ self.socket.local_addr().map_err(Error::Io)
+ }
+
+ /// Access the mio registry for registering additional
+ /// sockets (e.g. NAT detection probe sockets).
+ pub fn registry(&self) -> &Registry {
+ self.poll.registry()
+ }
+
+ /// Allocate a new unique token for a dynamic socket.
+ /// Wraps at usize::MAX, skips 0 (reserved for
+ /// UDP_TOKEN).
+ pub fn next_token(&mut self) -> Token {
+ let t = Token(self.next_token);
+ self.next_token = self.next_token.wrapping_add(1).max(1);
+ t
+ }
+
+ /// Register an additional UDP socket with the poller.
+ ///
+ /// Returns the assigned token.
+ pub fn register_socket(
+ &mut self,
+ socket: &mut UdpSocket,
+ ) -> Result<Token, Error> {
+ let token = self.next_token();
+ self.poll
+ .registry()
+ .register(socket, token, Interest::READABLE)?;
+ Ok(token)
+ }
+
+ /// Deregister a previously registered socket.
+ pub fn deregister_socket(
+ &mut self,
+ socket: &mut UdpSocket,
+ ) -> Result<(), Error> {
+ self.poll.registry().deregister(socket)?;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn bind_and_local_addr() {
+ let net = NetLoop::bind("127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = net.local_addr().unwrap();
+ assert_eq!(addr.ip(), "127.0.0.1".parse::<std::net::IpAddr>().unwrap());
+ assert_ne!(addr.port(), 0);
+ }
+
+ #[test]
+ fn send_recv_loopback() {
+ let net = NetLoop::bind("127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = net.local_addr().unwrap();
+
+ // Send to self
+ let sent = net.send_to(b"ping", addr).unwrap();
+ assert_eq!(sent, 4);
+
+ // Poll for the event
+ let mut net = net;
+ net.poll_events(Duration::from_millis(100)).unwrap();
+
+ let mut buf = [0u8; 64];
+ let (len, from) = net.recv_from(&mut buf).unwrap();
+ assert_eq!(&buf[..len], b"ping");
+ assert_eq!(from, addr);
+ }
+
+ #[test]
+ fn register_extra_socket() {
+ let mut net = NetLoop::bind("127.0.0.1:0".parse().unwrap()).unwrap();
+ let mut extra =
+ UdpSocket::bind("127.0.0.1:0".parse().unwrap()).unwrap();
+ let token = net.register_socket(&mut extra).unwrap();
+ assert_ne!(token, UDP_TOKEN);
+ net.deregister_socket(&mut extra).unwrap();
+ }
+}