aboutsummaryrefslogtreecommitdiffstats
path: root/src/metrics.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/metrics.rs')
-rw-r--r--src/metrics.rs121
1 files changed, 121 insertions, 0 deletions
diff --git a/src/metrics.rs b/src/metrics.rs
new file mode 100644
index 0000000..67ec30f
--- /dev/null
+++ b/src/metrics.rs
@@ -0,0 +1,121 @@
+//! Node metrics and observability.
+//!
+//! Atomic counters for messages, lookups, storage,
+//! and errors. Accessible via `Tessera::metrics()`.
+
+use std::sync::atomic::{AtomicU64, Ordering};
+
+/// Atomic metrics counters.
+pub struct Metrics {
+ pub messages_sent: AtomicU64,
+ pub messages_received: AtomicU64,
+ pub lookups_started: AtomicU64,
+ pub lookups_completed: AtomicU64,
+ pub rpc_timeouts: AtomicU64,
+ pub values_stored: AtomicU64,
+ pub packets_rejected: AtomicU64,
+ pub bytes_sent: AtomicU64,
+ pub bytes_received: AtomicU64,
+}
+
+impl Metrics {
+ pub fn new() -> Self {
+ Self {
+ messages_sent: AtomicU64::new(0),
+ messages_received: AtomicU64::new(0),
+ lookups_started: AtomicU64::new(0),
+ lookups_completed: AtomicU64::new(0),
+ rpc_timeouts: AtomicU64::new(0),
+ values_stored: AtomicU64::new(0),
+ packets_rejected: AtomicU64::new(0),
+ bytes_sent: AtomicU64::new(0),
+ bytes_received: AtomicU64::new(0),
+ }
+ }
+
+ /// Take a snapshot of all counters.
+ pub fn snapshot(&self) -> MetricsSnapshot {
+ MetricsSnapshot {
+ messages_sent: self.messages_sent.load(Ordering::Relaxed),
+ messages_received: self.messages_received.load(Ordering::Relaxed),
+ lookups_started: self.lookups_started.load(Ordering::Relaxed),
+ lookups_completed: self.lookups_completed.load(Ordering::Relaxed),
+ rpc_timeouts: self.rpc_timeouts.load(Ordering::Relaxed),
+ values_stored: self.values_stored.load(Ordering::Relaxed),
+ packets_rejected: self.packets_rejected.load(Ordering::Relaxed),
+ bytes_sent: self.bytes_sent.load(Ordering::Relaxed),
+ bytes_received: self.bytes_received.load(Ordering::Relaxed),
+ }
+ }
+}
+
+impl Default for Metrics {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Snapshot of metrics at a point in time.
+#[derive(Debug, Clone)]
+pub struct MetricsSnapshot {
+ pub messages_sent: u64,
+ pub messages_received: u64,
+ pub lookups_started: u64,
+ pub lookups_completed: u64,
+ pub rpc_timeouts: u64,
+ pub values_stored: u64,
+ pub packets_rejected: u64,
+ pub bytes_sent: u64,
+ pub bytes_received: u64,
+}
+
+impl std::fmt::Display for MetricsSnapshot {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "sent={} recv={} lookups={}/{} timeouts={} \
+ stored={} rejected={} bytes={}/{}",
+ self.messages_sent,
+ self.messages_received,
+ self.lookups_completed,
+ self.lookups_started,
+ self.rpc_timeouts,
+ self.values_stored,
+ self.packets_rejected,
+ self.bytes_sent,
+ self.bytes_received,
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn counters_start_zero() {
+ let m = Metrics::new();
+ let s = m.snapshot();
+ assert_eq!(s.messages_sent, 0);
+ assert_eq!(s.bytes_sent, 0);
+ }
+
+ #[test]
+ fn increment_and_snapshot() {
+ let m = Metrics::new();
+ m.messages_sent.fetch_add(5, Ordering::Relaxed);
+ m.bytes_sent.fetch_add(1000, Ordering::Relaxed);
+ let s = m.snapshot();
+ assert_eq!(s.messages_sent, 5);
+ assert_eq!(s.bytes_sent, 1000);
+ }
+
+ #[test]
+ fn display_format() {
+ let m = Metrics::new();
+ m.messages_sent.fetch_add(10, Ordering::Relaxed);
+ let s = m.snapshot();
+ let text = format!("{s}");
+ assert!(text.contains("sent=10"));
+ }
+}