vector_buffers/topology/channel/
limited_queue.rs

1use std::{
2    cmp,
3    fmt::{self, Debug},
4    num::NonZeroUsize,
5    pin::Pin,
6    sync::{
7        Arc,
8        atomic::{AtomicUsize, Ordering},
9    },
10    time::Instant,
11};
12
13#[cfg(test)]
14use std::sync::Mutex;
15
16use async_stream::stream;
17use crossbeam_queue::{ArrayQueue, SegQueue};
18use futures::Stream;
19use metrics::{Gauge, Histogram, gauge, histogram};
20use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
21use vector_common::stats::TimeEwmaGauge;
22
23use crate::{InMemoryBufferable, config::MemoryBufferSize};
24
25pub const DEFAULT_EWMA_HALF_LIFE_SECONDS: f64 = 5.0;
26
27/// Error returned by `LimitedSender::send` when the receiver has disconnected.
28#[derive(Debug, PartialEq, Eq)]
29pub struct SendError<T>(pub T);
30
31impl<T> fmt::Display for SendError<T> {
32    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
33        write!(fmt, "receiver disconnected")
34    }
35}
36
37impl<T: fmt::Debug> std::error::Error for SendError<T> {}
38
39/// Error returned by `LimitedSender::try_send`.
40#[derive(Debug, PartialEq, Eq)]
41pub enum TrySendError<T> {
42    InsufficientCapacity(T),
43    Disconnected(T),
44}
45
46impl<T> TrySendError<T> {
47    pub fn into_inner(self) -> T {
48        match self {
49            Self::InsufficientCapacity(item) | Self::Disconnected(item) => item,
50        }
51    }
52}
53
54impl<T> fmt::Display for TrySendError<T> {
55    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
56        match self {
57            Self::InsufficientCapacity(_) => {
58                write!(fmt, "channel lacks sufficient capacity for send")
59            }
60            Self::Disconnected(_) => write!(fmt, "receiver disconnected"),
61        }
62    }
63}
64
65impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
66
67// Trait over common queue operations so implementation can be chosen at initialization phase
68trait QueueImpl<T>: Send + Sync + fmt::Debug {
69    fn push(&self, item: T);
70    fn pop(&self) -> Option<T>;
71}
72
73impl<T> QueueImpl<T> for ArrayQueue<T>
74where
75    T: Send + Sync + fmt::Debug,
76{
77    fn push(&self, item: T) {
78        self.push(item)
79            .unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full."));
80    }
81
82    fn pop(&self) -> Option<T> {
83        self.pop()
84    }
85}
86
87impl<T> QueueImpl<T> for SegQueue<T>
88where
89    T: Send + Sync + fmt::Debug,
90{
91    fn push(&self, item: T) {
92        self.push(item);
93    }
94
95    fn pop(&self) -> Option<T> {
96        self.pop()
97    }
98}
99
100#[derive(Clone, Debug)]
101pub struct ChannelMetricMetadata {
102    prefix: &'static str,
103    output: Option<String>,
104}
105
106impl ChannelMetricMetadata {
107    pub fn new(prefix: &'static str, output: Option<String>) -> Self {
108        Self { prefix, output }
109    }
110}
111
112#[derive(Clone, Debug)]
113struct Metrics {
114    histogram: Histogram,
115    gauge: Gauge,
116    mean_gauge: TimeEwmaGauge,
117    // We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
118    // since the value is static, we never need to update it. The compiler detects this as an unused
119    // field, so we need to suppress the warning here.
120    #[expect(dead_code)]
121    max_gauge: Gauge,
122    #[expect(dead_code)]
123    legacy_max_gauge: Gauge,
124    #[cfg(test)]
125    recorded_values: Arc<Mutex<Vec<usize>>>,
126}
127
128impl Metrics {
129    #[expect(clippy::cast_precision_loss)] // We have to convert buffer sizes for a gauge, it's okay to lose precision here.
130    #[allow(clippy::disallowed_macros)] // Metric names are constructed dynamically from runtime prefixes.
131    fn new(
132        limit: MemoryBufferSize,
133        metadata: ChannelMetricMetadata,
134        ewma_half_life_seconds: Option<f64>,
135    ) -> Self {
136        let ewma_half_life_seconds =
137            ewma_half_life_seconds.unwrap_or(DEFAULT_EWMA_HALF_LIFE_SECONDS);
138        let ChannelMetricMetadata { prefix, output } = metadata;
139        let (legacy_suffix, gauge_suffix, max_value) = match limit {
140            MemoryBufferSize::MaxEvents(max_events) => (
141                "_max_event_size",
142                "_max_size_events",
143                max_events.get() as f64,
144            ),
145            MemoryBufferSize::MaxSize(max_bytes) => {
146                ("_max_byte_size", "_max_size_bytes", max_bytes.get() as f64)
147            }
148        };
149        let max_gauge_name = format!("{prefix}{gauge_suffix}");
150        let legacy_max_gauge_name = format!("{prefix}{legacy_suffix}");
151        let histogram_name = format!("{prefix}_utilization");
152        let gauge_name = format!("{prefix}_utilization_level");
153        let mean_name = format!("{prefix}_utilization_mean");
154        #[cfg(test)]
155        let recorded_values = Arc::new(Mutex::new(Vec::new()));
156        if let Some(label_value) = output {
157            let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
158            max_gauge.set(max_value);
159            let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
160            // DEPRECATED: buffer-bytes-events-metrics
161            let legacy_max_gauge = gauge!(legacy_max_gauge_name, "output" => label_value.clone());
162            legacy_max_gauge.set(max_value);
163            Self {
164                histogram: histogram!(histogram_name, "output" => label_value.clone()),
165                gauge: gauge!(gauge_name, "output" => label_value.clone()),
166                mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
167                max_gauge,
168                legacy_max_gauge,
169                #[cfg(test)]
170                recorded_values,
171            }
172        } else {
173            let max_gauge = gauge!(max_gauge_name);
174            max_gauge.set(max_value);
175            let mean_gauge_handle = gauge!(mean_name);
176            // DEPRECATED: buffer-bytes-events-metrics
177            let legacy_max_gauge = gauge!(legacy_max_gauge_name);
178            legacy_max_gauge.set(max_value);
179            Self {
180                histogram: histogram!(histogram_name),
181                gauge: gauge!(gauge_name),
182                mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
183                max_gauge,
184                legacy_max_gauge,
185                #[cfg(test)]
186                recorded_values,
187            }
188        }
189    }
190
191    #[expect(clippy::cast_precision_loss)]
192    fn record(&self, value: usize, reference: Instant) {
193        self.histogram.record(value as f64);
194        self.gauge.set(value as f64);
195        self.mean_gauge.record(value as f64, reference);
196        #[cfg(test)]
197        if let Ok(mut recorded) = self.recorded_values.lock() {
198            recorded.push(value);
199        }
200    }
201}
202
203#[derive(Debug)]
204struct Inner<T> {
205    data: Arc<dyn QueueImpl<(OwnedSemaphorePermit, T)>>,
206    limit: MemoryBufferSize,
207    limiter: Arc<Semaphore>,
208    read_waker: Arc<Notify>,
209    metrics: Option<Metrics>,
210    capacity: NonZeroUsize,
211}
212
213impl<T> Clone for Inner<T> {
214    fn clone(&self) -> Self {
215        Self {
216            data: self.data.clone(),
217            limit: self.limit,
218            limiter: self.limiter.clone(),
219            read_waker: self.read_waker.clone(),
220            metrics: self.metrics.clone(),
221            capacity: self.capacity,
222        }
223    }
224}
225
226impl<T: Send + Sync + Debug + 'static> Inner<T> {
227    fn new(
228        limit: MemoryBufferSize,
229        metric_metadata: Option<ChannelMetricMetadata>,
230        ewma_half_life_seconds: Option<f64>,
231    ) -> Self {
232        let read_waker = Arc::new(Notify::new());
233        let metrics =
234            metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_half_life_seconds));
235        match limit {
236            MemoryBufferSize::MaxEvents(max_events) => Inner {
237                data: Arc::new(ArrayQueue::new(max_events.get())),
238                limit,
239                limiter: Arc::new(Semaphore::new(max_events.get())),
240                read_waker,
241                metrics,
242                capacity: max_events,
243            },
244            MemoryBufferSize::MaxSize(max_bytes) => Inner {
245                data: Arc::new(SegQueue::new()),
246                limit,
247                limiter: Arc::new(Semaphore::new(max_bytes.get())),
248                read_waker,
249                metrics,
250                capacity: max_bytes,
251            },
252        }
253    }
254
255    /// Records a send after acquiring all required permits.
256    ///
257    /// The `size` value is the true utilization contribution of `item`, which may exceed the number
258    /// of permits acquired for oversized payloads.
259    fn send_with_permits(&mut self, size: usize, permits: OwnedSemaphorePermit, item: T) {
260        if let Some(metrics) = &self.metrics {
261            // For normal items, capacity - available_permits() exactly represents the total queued
262            // utilization (including this item's just-acquired permits). For oversized items that
263            // acquired fewer permits than their true size, `size` is the correct utilization since
264            // the queue must have been empty for the oversized acquire to succeed.
265            let utilization = size.max(self.used_capacity());
266            metrics.record(utilization, Instant::now());
267        }
268        self.data.push((permits, item));
269        self.read_waker.notify_one();
270    }
271}
272
273impl<T> Inner<T> {
274    fn used_capacity(&self) -> usize {
275        self.capacity.get() - self.limiter.available_permits()
276    }
277
278    fn pop_and_record(&self) -> Option<T> {
279        self.data.pop().map(|(permit, item)| {
280            if let Some(metrics) = &self.metrics {
281                // Compute remaining utilization from the semaphore state. Since our permits haven't
282                // been released yet, used_capacity is stable against racing senders acquiring those
283                // permits.
284                let utilization = self.used_capacity().saturating_sub(permit.num_permits());
285                metrics.record(utilization, Instant::now());
286            }
287            // Release permits after recording so a waiting sender cannot enqueue a new item
288            // before this pop's utilization measurement is taken.
289            drop(permit);
290            item
291        })
292    }
293}
294
295#[derive(Debug)]
296pub struct LimitedSender<T> {
297    inner: Inner<T>,
298    sender_count: Arc<AtomicUsize>,
299}
300
301impl<T: InMemoryBufferable> LimitedSender<T> {
302    #[allow(clippy::cast_possible_truncation)]
303    fn calc_required_permits(&self, item: &T) -> (usize, u32) {
304        // We have to limit the number of permits we ask for to the overall limit since we're always
305        // willing to store more items than the limit if the queue is entirely empty, because
306        // otherwise we might deadlock ourselves by not being able to send a single item.
307        let value = match self.inner.limit {
308            MemoryBufferSize::MaxSize(_) => item.allocated_bytes(),
309            MemoryBufferSize::MaxEvents(_) => item.event_count(),
310        };
311        let limit = self.inner.capacity.get();
312        (value, cmp::min(limit, value) as u32)
313    }
314
315    /// Gets the number of items that this channel could accept.
316    pub fn available_capacity(&self) -> usize {
317        self.inner.limiter.available_permits()
318    }
319
320    /// Sends an item into the channel.
321    ///
322    /// # Errors
323    ///
324    /// If the receiver has disconnected (does not exist anymore), then `Err(SendError)` be returned
325    /// with the given `item`.
326    pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
327        // Calculate how many permits we need, and wait until we can acquire all of them.
328        let (size, permits_required) = self.calc_required_permits(&item);
329        match self
330            .inner
331            .limiter
332            .clone()
333            .acquire_many_owned(permits_required)
334            .await
335        {
336            Ok(permits) => {
337                self.inner.send_with_permits(size, permits, item);
338                trace!("Sent item.");
339                Ok(())
340            }
341            Err(_) => Err(SendError(item)),
342        }
343    }
344
345    /// Attempts to send an item into the channel.
346    ///
347    /// # Errors
348    ///
349    /// If the receiver has disconnected (does not exist anymore), then
350    /// `Err(TrySendError::Disconnected)` be returned with the given `item`. If the channel has
351    /// insufficient capacity for the item, then `Err(TrySendError::InsufficientCapacity)` will be
352    /// returned with the given `item`.
353    ///
354    /// # Panics
355    ///
356    /// Will panic if adding ack amount overflows.
357    pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
358        // Calculate how many permits we need, and try to acquire them all without waiting.
359        let (size, permits_required) = self.calc_required_permits(&item);
360        match self
361            .inner
362            .limiter
363            .clone()
364            .try_acquire_many_owned(permits_required)
365        {
366            Ok(permits) => {
367                self.inner.send_with_permits(size, permits, item);
368                trace!("Attempt to send item succeeded.");
369                Ok(())
370            }
371            Err(TryAcquireError::NoPermits) => Err(TrySendError::InsufficientCapacity(item)),
372            Err(TryAcquireError::Closed) => Err(TrySendError::Disconnected(item)),
373        }
374    }
375}
376
377impl<T> Clone for LimitedSender<T> {
378    fn clone(&self) -> Self {
379        self.sender_count.fetch_add(1, Ordering::SeqCst);
380
381        Self {
382            inner: self.inner.clone(),
383            sender_count: Arc::clone(&self.sender_count),
384        }
385    }
386}
387
388impl<T> Drop for LimitedSender<T> {
389    fn drop(&mut self) {
390        // If we're the last sender to drop, close the semaphore on our way out the door.
391        if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
392            self.inner.limiter.close();
393            self.inner.read_waker.notify_one();
394        }
395    }
396}
397
398#[derive(Debug)]
399pub struct LimitedReceiver<T> {
400    inner: Inner<T>,
401}
402
403impl<T: Send + 'static> LimitedReceiver<T> {
404    /// Gets the number of items that this channel could accept.
405    pub fn available_capacity(&self) -> usize {
406        self.inner.limiter.available_permits()
407    }
408
409    pub async fn next(&mut self) -> Option<T> {
410        loop {
411            if let Some(item) = self.inner.pop_and_record() {
412                return Some(item);
413            }
414
415            // There wasn't an item for us to pop, so see if the channel is actually closed.  If so,
416            // then it's time for us to close up shop as well.
417            if self.inner.limiter.is_closed() {
418                if self.available_capacity() < self.inner.capacity.get() {
419                    // We only terminate when closed and fully drained. A close can race with queue
420                    // visibility while items/in-flight permits still exist.
421                    tokio::task::yield_now().await;
422                    continue;
423                }
424                return None;
425            }
426
427            // We're not closed, so we need to wait for a writer to tell us they made some
428            // progress.  This might end up being a spurious wakeup since `Notify` will
429            // store a wake-up if there are no waiters, but oh well.
430            self.inner.read_waker.notified().await;
431        }
432    }
433
434    pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = T> + Send>> {
435        let mut receiver = self;
436        Box::pin(stream! {
437            while let Some(item) = receiver.next().await {
438                yield item;
439            }
440        })
441    }
442}
443
444impl<T> Drop for LimitedReceiver<T> {
445    fn drop(&mut self) {
446        // Notify senders that the channel is now closed by closing the semaphore.  Any pending
447        // acquisitions will be awoken and notified that the semaphore is closed, and further new
448        // sends will immediately see the semaphore is closed.
449        self.inner.limiter.close();
450    }
451}
452
453pub fn limited<T: InMemoryBufferable + fmt::Debug>(
454    limit: MemoryBufferSize,
455    metric_metadata: Option<ChannelMetricMetadata>,
456    ewma_half_life_seconds: Option<f64>,
457) -> (LimitedSender<T>, LimitedReceiver<T>) {
458    let inner = Inner::new(limit, metric_metadata, ewma_half_life_seconds);
459
460    let sender = LimitedSender {
461        inner: inner.clone(),
462        sender_count: Arc::new(AtomicUsize::new(1)),
463    };
464    let receiver = LimitedReceiver { inner };
465
466    (sender, receiver)
467}
468
469#[cfg(test)]
470mod tests {
471    use std::num::NonZeroUsize;
472
473    use rand::{Rng as _, SeedableRng as _, rngs::SmallRng};
474    use tokio_test::{assert_pending, assert_ready, task::spawn};
475    use vector_common::byte_size_of::ByteSizeOf;
476
477    use super::{ChannelMetricMetadata, LimitedReceiver, LimitedSender, limited};
478    use crate::{
479        MemoryBufferSize,
480        test::MultiEventRecord,
481        topology::{channel::limited_queue::SendError, test_util::Sample},
482    };
483
484    #[tokio::test]
485    async fn send_receive() {
486        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
487        let (mut tx, mut rx) = limited(limit, None, None);
488
489        assert_eq!(2, tx.available_capacity());
490
491        let msg = Sample::new(42);
492
493        // Create our send and receive futures.
494        let mut send = spawn(async { tx.send(msg).await });
495
496        let mut recv = spawn(async { rx.next().await });
497
498        // Nobody should be woken up.
499        assert!(!send.is_woken());
500        assert!(!recv.is_woken());
501
502        // Try polling our receive, which should be pending because we haven't anything yet.
503        assert_pending!(recv.poll());
504
505        // We should immediately be able to complete a send as there is available capacity.
506        assert_eq!(Ok(()), assert_ready!(send.poll()));
507
508        // Now our receive should have been woken up, and should immediately be ready.
509        assert!(recv.is_woken());
510        assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
511    }
512
513    #[tokio::test]
514    async fn records_utilization() {
515        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
516        let (mut tx, mut rx) = limited(
517            limit,
518            Some(ChannelMetricMetadata::new("test_channel", None)),
519            None,
520        );
521
522        let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
523
524        tx.send(Sample::new(1)).await.expect("send should succeed");
525        let records = metrics.lock().unwrap().clone();
526        assert_eq!(records.len(), 1);
527        assert_eq!(records.last().copied(), Some(1));
528
529        assert_eq!(Sample::new(1), rx.next().await.unwrap());
530        let records = metrics.lock().unwrap();
531        assert_eq!(records.len(), 2);
532        assert_eq!(records.last().copied(), Some(0));
533    }
534
535    #[tokio::test]
536    async fn oversized_send_records_true_utilization_via_normal_send_path() {
537        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
538        let (mut tx, mut rx) = limited(
539            limit,
540            Some(ChannelMetricMetadata::new("test_channel_oversized", None)),
541            None,
542        );
543        let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
544
545        // Normal send path: permits are capped to the limit (2), but utilization should reflect
546        // the true item contribution (3).
547        let oversized = MultiEventRecord::new(3);
548        tx.send(oversized.clone())
549            .await
550            .expect("send should succeed");
551
552        let records = metrics.lock().unwrap().clone();
553        assert_eq!(records.len(), 1);
554        assert_eq!(records.last().copied(), Some(3));
555
556        assert_eq!(Some(oversized), rx.next().await);
557        let records = metrics.lock().unwrap().clone();
558        assert_eq!(records.len(), 2);
559        assert_eq!(records.last().copied(), Some(0));
560    }
561
562    #[test]
563    fn test_limiting_by_byte_size() {
564        let max_elements = 10;
565        let msg = Sample::new_with_heap_allocated_values(50);
566        let msg_size = msg.allocated_bytes();
567        let max_allowed_bytes = msg_size * max_elements;
568
569        // With this configuration a maximum of exactly 10 messages can fit in the channel
570        let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap());
571        let (mut tx, mut rx) = limited(limit, None, None);
572
573        assert_eq!(max_allowed_bytes, tx.available_capacity());
574
575        // Send 10 messages into the channel, filling it
576        for _ in 0..10 {
577            let msg_clone = msg.clone();
578            let mut f = spawn(async { tx.send(msg_clone).await });
579            assert_eq!(Ok(()), assert_ready!(f.poll()));
580        }
581        // With the 10th message in the channel no space should be left
582        assert_eq!(0, tx.available_capacity());
583
584        // Attemting to produce one more then the max capacity should block
585        let mut send_final = spawn({
586            let msg_clone = msg.clone();
587            async { tx.send(msg_clone).await }
588        });
589        assert_pending!(send_final.poll());
590
591        // Read all data from the channel, assert final states are as expected
592        for _ in 0..10 {
593            let mut f = spawn(async { rx.next().await });
594            let value = assert_ready!(f.poll());
595            assert_eq!(value.allocated_bytes(), msg_size);
596        }
597        // Channel should have no more data
598        let mut recv = spawn(async { rx.next().await });
599        assert_pending!(recv.poll());
600    }
601
602    #[test]
603    fn sender_waits_for_more_capacity_when_none_available() {
604        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
605        let (mut tx, mut rx) = limited(limit, None, None);
606
607        assert_eq!(1, tx.available_capacity());
608
609        let msg1 = Sample::new(42);
610        let msg2 = Sample::new(43);
611
612        // Create our send and receive futures.
613        let mut send1 = spawn(async { tx.send(msg1).await });
614
615        let mut recv1 = spawn(async { rx.next().await });
616
617        // Nobody should be woken up.
618        assert!(!send1.is_woken());
619        assert!(!recv1.is_woken());
620
621        // Try polling our receive, which should be pending because we haven't anything yet.
622        assert_pending!(recv1.poll());
623
624        // We should immediately be able to complete a send as there is available capacity.
625        assert_eq!(Ok(()), assert_ready!(send1.poll()));
626        drop(send1);
627
628        assert_eq!(0, tx.available_capacity());
629
630        // Now our receive should have been woken up, and should immediately be ready... but we
631        // aren't going to read the value just yet.
632        assert!(recv1.is_woken());
633
634        // Now trigger a second send, which should block as there's no available capacity.
635        let mut send2 = spawn(async { tx.send(msg2).await });
636
637        assert!(!send2.is_woken());
638        assert_pending!(send2.poll());
639
640        // Now if we receive the item, our second send should be woken up and be able to send in.
641        assert_eq!(Some(Sample::new(42)), assert_ready!(recv1.poll()));
642        drop(recv1);
643
644        // Since the second send was already waiting for permits, the semaphore returns them
645        // directly to our waiting send, which should now be woken up and able to complete:
646        assert_eq!(0, rx.available_capacity());
647        assert!(send2.is_woken());
648
649        let mut recv2 = spawn(async { rx.next().await });
650        assert_pending!(recv2.poll());
651
652        assert_eq!(Ok(()), assert_ready!(send2.poll()));
653        drop(send2);
654
655        assert_eq!(0, tx.available_capacity());
656
657        // And the final receive to get our second send:
658        assert!(recv2.is_woken());
659        assert_eq!(Some(Sample::new(43)), assert_ready!(recv2.poll()));
660
661        assert_eq!(1, tx.available_capacity());
662    }
663
664    #[test]
665    fn sender_waits_for_more_capacity_when_partial_available() {
666        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap());
667        let (mut tx, mut rx) = limited(limit, None, None);
668
669        assert_eq!(7, tx.available_capacity());
670
671        let msgs1 = vec![
672            MultiEventRecord::new(1),
673            MultiEventRecord::new(2),
674            MultiEventRecord::new(3),
675        ];
676        let msg2 = MultiEventRecord::new(4);
677
678        // Create our send and receive futures.
679        let mut small_sends = spawn(async {
680            for msg in msgs1.clone() {
681                tx.send(msg).await?;
682            }
683
684            Ok::<_, SendError<MultiEventRecord>>(())
685        });
686
687        let mut recv1 = spawn(async { rx.next().await });
688
689        // Nobody should be woken up.
690        assert!(!small_sends.is_woken());
691        assert!(!recv1.is_woken());
692
693        // Try polling our receive, which should be pending because we haven't anything yet.
694        assert_pending!(recv1.poll());
695
696        // We should immediately be able to complete our three event sends, which we have
697        // available capacity for, but will consume all but one of the available slots.
698        assert_eq!(Ok(()), assert_ready!(small_sends.poll()));
699        drop(small_sends);
700
701        assert_eq!(1, tx.available_capacity());
702
703        // Now our receive should have been woken up, and should immediately be ready, but we won't
704        // receive just yet.
705        assert!(recv1.is_woken());
706
707        // Now trigger a second send that has four events, and needs to wait for two receives to happen.
708        let mut send2 = spawn(tx.send(msg2.clone()));
709
710        assert!(!send2.is_woken());
711        assert_pending!(send2.poll());
712
713        // Now if we receive the first item, our second send should be woken up but still not able
714        // to send.
715        assert_eq!(Some(&msgs1[0]), assert_ready!(recv1.poll()).as_ref());
716        drop(recv1);
717
718        // Callers waiting to acquire permits have the permits immediately transfer to them when one
719        // (or more) are released, so we expect this to be zero until we send and then read the
720        // third item.
721        assert_eq!(0, rx.available_capacity());
722
723        // We don't get woken up until all permits have been acquired.
724        assert!(!send2.is_woken());
725
726        // Our second read should unlock enough available capacity for the second send once complete.
727        let mut recv2 = spawn(async { rx.next().await });
728        assert!(!recv2.is_woken());
729        assert_eq!(Some(&msgs1[1]), assert_ready!(recv2.poll()).as_ref());
730        drop(recv2);
731
732        assert_eq!(0, rx.available_capacity());
733
734        assert!(send2.is_woken());
735        assert_eq!(Ok(()), assert_ready!(send2.poll()));
736
737        // And just make sure we see those last two sends.
738        let mut recv3 = spawn(async { rx.next().await });
739        assert!(!recv3.is_woken());
740        assert_eq!(Some(&msgs1[2]), assert_ready!(recv3.poll()).as_ref());
741        drop(recv3);
742
743        assert_eq!(3, rx.available_capacity());
744
745        let mut recv4 = spawn(async { rx.next().await });
746        assert!(!recv4.is_woken());
747        assert_eq!(Some(msg2), assert_ready!(recv4.poll()));
748        drop(recv4);
749
750        assert_eq!(7, rx.available_capacity());
751    }
752
753    #[test]
754    fn empty_receiver_returns_none_when_last_sender_drops() {
755        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
756        let (mut tx, mut rx) = limited(limit, None, None);
757
758        assert_eq!(1, tx.available_capacity());
759
760        let tx2 = tx.clone();
761        let msg = Sample::new(42);
762
763        // Create our send and receive futures.
764        let mut send = spawn(async { tx.send(msg).await });
765
766        let mut recv = spawn(async { rx.next().await });
767
768        // Nobody should be woken up.
769        assert!(!send.is_woken());
770        assert!(!recv.is_woken());
771
772        // Try polling our receive, which should be pending because we haven't anything yet.
773        assert_pending!(recv.poll());
774
775        // Now drop our second sender, which shouldn't do anything yet.
776        drop(tx2);
777        assert!(!recv.is_woken());
778        assert_pending!(recv.poll());
779
780        // Now drop our second sender, but not before doing a send, which should trigger closing the
781        // semaphore which should let the receiver complete with no further waiting: one item and
782        // then `None`.
783        assert_eq!(Ok(()), assert_ready!(send.poll()));
784        drop(send);
785        drop(tx);
786
787        assert!(recv.is_woken());
788        assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
789        drop(recv);
790
791        let mut recv2 = spawn(async { rx.next().await });
792        assert!(!recv2.is_woken());
793        assert_eq!(None, assert_ready!(recv2.poll()));
794    }
795
796    #[test]
797    fn receiver_returns_none_once_empty_when_last_sender_drops() {
798        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
799        let (tx, mut rx) = limited::<Sample>(limit, None, None);
800
801        assert_eq!(1, tx.available_capacity());
802
803        let tx2 = tx.clone();
804
805        // Create our receive future.
806        let mut recv = spawn(async { rx.next().await });
807
808        // Nobody should be woken up.
809        assert!(!recv.is_woken());
810
811        // Try polling our receive, which should be pending because we haven't anything yet.
812        assert_pending!(recv.poll());
813
814        // Now drop our first sender, which shouldn't do anything yet.
815        drop(tx);
816        assert!(!recv.is_woken());
817        assert_pending!(recv.poll());
818
819        // Now drop our second sender, which should trigger closing the semaphore which should let
820        // the receive complete as there are no items to read.
821        drop(tx2);
822        assert!(recv.is_woken());
823        assert_eq!(None, assert_ready!(recv.poll()));
824    }
825
826    #[test]
827    fn oversized_send_allowed_when_empty() {
828        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
829        let (mut tx, mut rx) = limited(limit, None, None);
830
831        assert_eq!(1, tx.available_capacity());
832
833        let msg = MultiEventRecord::new(2);
834
835        // Create our send and receive futures.
836        let mut send = spawn(async { tx.send(msg.clone()).await });
837
838        let mut recv = spawn(async { rx.next().await });
839
840        // Nobody should be woken up.
841        assert!(!send.is_woken());
842        assert!(!recv.is_woken());
843
844        // We should immediately be able to complete our send, which we don't have full
845        // available capacity for, but will consume all of the available slots.
846        assert_eq!(Ok(()), assert_ready!(send.poll()));
847        drop(send);
848
849        assert_eq!(0, tx.available_capacity());
850
851        // Now we should be able to get back the oversized item, but our capacity should not be
852        // greater than what we started with.
853        assert_eq!(Some(msg), assert_ready!(recv.poll()));
854        drop(recv);
855
856        assert_eq!(1, rx.available_capacity());
857    }
858
859    #[test]
860    fn oversized_send_allowed_when_partial_capacity() {
861        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
862        let (mut tx, mut rx) = limited(limit, None, None);
863
864        assert_eq!(2, tx.available_capacity());
865
866        let msg1 = MultiEventRecord::new(1);
867        let msg2 = MultiEventRecord::new(3);
868
869        // Create our send future.
870        let mut send = spawn(async { tx.send(msg1.clone()).await });
871
872        // Nobody should be woken up.
873        assert!(!send.is_woken());
874
875        // We should immediately be able to complete our send, which will only use up a single slot.
876        assert_eq!(Ok(()), assert_ready!(send.poll()));
877        drop(send);
878
879        assert_eq!(1, tx.available_capacity());
880
881        // Now we'll trigger another send which has an oversized item.  It shouldn't be able to send
882        // until all permits are available.
883        let mut send2 = spawn(async { tx.send(msg2.clone()).await });
884
885        assert!(!send2.is_woken());
886        assert_pending!(send2.poll());
887
888        assert_eq!(0, rx.available_capacity());
889
890        // Now do a receive which should return the one consumed slot, essentially allowing all
891        // permits to be acquired by the blocked send.
892        let mut recv = spawn(async { rx.next().await });
893        assert!(!recv.is_woken());
894        assert!(!send2.is_woken());
895
896        assert_eq!(Some(msg1), assert_ready!(recv.poll()));
897        drop(recv);
898
899        assert_eq!(0, rx.available_capacity());
900
901        // Now our blocked send should be able to proceed, and we should be able to read back the
902        // item.
903        assert_eq!(Ok(()), assert_ready!(send2.poll()));
904        drop(send2);
905
906        assert_eq!(0, tx.available_capacity());
907
908        let mut recv2 = spawn(async { rx.next().await });
909        assert_eq!(Some(msg2), assert_ready!(recv2.poll()));
910
911        assert_eq!(2, tx.available_capacity());
912    }
913
914    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
915    async fn concurrent_send_receive_metrics_remain_valid() {
916        const ITEM_COUNT: usize = 4_000;
917
918        // Try different sizes of the buffer, from 10 to 1000 events.
919        for size in 1..=100 {
920            let limit = NonZeroUsize::new(size * 10).unwrap();
921            let (tx, rx) = limited(
922                MemoryBufferSize::MaxEvents(limit),
923                Some(ChannelMetricMetadata::new("test_channel_concurrent", None)),
924                None,
925            );
926            let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
927
928            let sender = tokio::spawn(send_samples(tx, ITEM_COUNT));
929            let receiver = tokio::spawn(receive_samples(rx, ITEM_COUNT));
930
931            sender.await.expect("sender task should not panic");
932            receiver.await.expect("receiver task should not panic");
933
934            let recorded = metrics.lock().unwrap().clone();
935            assert_eq!(
936                recorded.len(),
937                ITEM_COUNT * 2,
938                "expected one metric update per send and per receive"
939            );
940
941            // For MaxEvents with single-event messages, the occupancy counter tracks exact
942            // utilization, so values must stay within [0, limit].
943            let max_allowed = limit.get();
944            let observed_max = recorded.iter().copied().max().unwrap_or_default();
945            assert!(
946                recorded.iter().all(|value| *value <= max_allowed),
947                "observed utilization value above valid bound: max={observed_max}, allowed={max_allowed}"
948            );
949        }
950    }
951
952    async fn send_samples(mut tx: LimitedSender<Sample>, item_count: usize) {
953        let mut rng = SmallRng::from_rng(&mut rand::rng());
954
955        for i in 0..item_count {
956            tx.send(Sample::new(i as u64))
957                .await
958                .expect("send should succeed");
959            if rng.random::<u8>() % 8 == 0 {
960                tokio::task::yield_now().await;
961            }
962        }
963    }
964
965    async fn receive_samples(mut rx: LimitedReceiver<Sample>, item_count: usize) {
966        let mut rng = SmallRng::from_rng(&mut rand::rng());
967
968        for i in 0..item_count {
969            let next = rx
970                .next()
971                .await
972                .expect("receiver should yield all sent items");
973            assert_eq!(Sample::new(i as u64), next);
974            if rng.random::<u8>() % 8 == 0 {
975                tokio::task::yield_now().await;
976            }
977        }
978    }
979}