vector/internal_events/
open.rs

1use std::{
2    hint,
3    sync::{
4        Arc,
5        atomic::{AtomicUsize, Ordering},
6    },
7};
8
9use vector_lib::{
10    NamedInternalEvent, gauge,
11    internal_event::{GaugeName, InternalEvent},
12};
13
14#[derive(Debug, NamedInternalEvent)]
15pub struct ConnectionOpen {
16    pub count: usize,
17}
18
19impl InternalEvent for ConnectionOpen {
20    fn emit(self) {
21        gauge!(GaugeName::OpenConnections).set(self.count as f64);
22    }
23}
24
25#[derive(Debug, NamedInternalEvent)]
26pub struct EndpointsActive {
27    pub count: usize,
28}
29
30impl InternalEvent for EndpointsActive {
31    fn emit(self) {
32        gauge!(GaugeName::ActiveEndpoints).set(self.count as f64);
33    }
34}
35
36#[derive(Clone)]
37pub struct OpenGauge {
38    gauge: Arc<AtomicUsize>,
39}
40
41impl OpenGauge {
42    pub fn new() -> Self {
43        OpenGauge {
44            gauge: Arc::default(),
45        }
46    }
47
48    /// Increments and emits value once created.
49    /// Decrements and emits value once dropped.
50    pub fn open<E: Fn(usize)>(self, emitter: E) -> OpenToken<E> {
51        gauge_add(&self.gauge, 1, &emitter);
52        OpenToken {
53            gauge: self.gauge,
54            emitter,
55        }
56    }
57
58    #[cfg(all(feature = "sources-utils-net-unix", unix))]
59    pub fn any_open(&self) -> bool {
60        self.gauge.load(Ordering::Acquire) != 0
61    }
62}
63
64impl Default for OpenGauge {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70pub struct OpenToken<E: Fn(usize)> {
71    gauge: Arc<AtomicUsize>,
72    emitter: E,
73}
74
75impl<E: Fn(usize)> Drop for OpenToken<E> {
76    fn drop(&mut self) {
77        gauge_add(&self.gauge, -1, &self.emitter);
78    }
79}
80
81/// If reporting gauges from multiple threads, they can end up in a wrong order
82/// resulting in having wrong value for a prolonged period of time.
83/// This function performs a synchronization procedure that corrects that.
84fn gauge_add(gauge: &AtomicUsize, add: isize, emitter: impl Fn(usize)) {
85    // The goal of this function is to properly sequence calls to `emitter` from
86    // multiple threads. It is possible that `emitter` will be called multiple
87    // times -- worst case, `n^2 / 2` times where `n` is the number of parallel
88    // peers -- but this is acceptable.
89    //
90    // The implementation here is a spin lock on the `gauge` value with the
91    // critical section being solely for updating the `gauge` value by `add` and
92    // calling `emitter`. If we suffer priority inversion at higher peer counts
93    // we might consider the use of a mutex, which will participate in the OS's
94    // scheduler. See [this
95    // post](https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html)
96    // for details if you're working on something like that and need background.
97    //
98    // The order of calls to `emitter` are not guaranteed but it is guaranteed
99    // that the most recent holder of the lock will be the most recent caller of
100    // `emitter`.
101    let mut value = gauge.load(Ordering::Acquire);
102    loop {
103        let new_value = (value as isize + add) as usize;
104        emitter(new_value);
105        // Try to update gauge to new value and releasing writes to gauge metric
106        // in the process.  Otherwise acquire new writes to gauge metric.
107        //
108        // When `compare_exchange_weak` returns Ok our `new_value` is now the
109        // current value in memory across all CPUs. When the return is Err we
110        // retry with the now current value.
111        match gauge.compare_exchange_weak(value, new_value, Ordering::AcqRel, Ordering::Acquire) {
112            Ok(_) => break,
113            Err(x) => {
114                hint::spin_loop();
115                value = x;
116            }
117        }
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use std::{mem::drop, thread};
124
125    use super::*;
126
127    /// If this fails at any run, then the algorithm in `gauge_add` is faulty.
128    #[test]
129    fn eventually_consistent() {
130        let n = 8;
131        let m = 1000;
132        let gauge = OpenGauge::new();
133        let value = Arc::new(AtomicUsize::new(0));
134
135        let handles = (0..n)
136            .map(|_| {
137                let gauge = gauge.clone();
138                let value = Arc::clone(&value);
139                thread::spawn(move || {
140                    let mut work = 0;
141                    for _ in 0..m {
142                        let token = gauge
143                            .clone()
144                            .open(|count| value.store(count, Ordering::Release));
145                        // Do some work
146                        for i in 0..100 {
147                            work += i;
148                        }
149                        drop(token);
150                    }
151                    work
152                })
153            })
154            .collect::<Vec<_>>();
155
156        for handle in handles {
157            handle.join().unwrap();
158        }
159
160        assert_eq!(0, value.load(Ordering::Acquire));
161    }
162}