1use std::{
2 cmp,
3 fmt::{self, Debug},
4 num::NonZeroUsize,
5 pin::Pin,
6 sync::{
7 Arc,
8 atomic::{AtomicUsize, Ordering},
9 },
10 time::Instant,
11};
12
13#[cfg(test)]
14use std::sync::Mutex;
15
16use async_stream::stream;
17use crossbeam_queue::{ArrayQueue, SegQueue};
18use futures::Stream;
19use metrics::{Gauge, Histogram, gauge, histogram};
20use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
21use vector_common::stats::TimeEwmaGauge;
22
23use crate::{InMemoryBufferable, config::MemoryBufferSize};
24
25pub const DEFAULT_EWMA_HALF_LIFE_SECONDS: f64 = 5.0;
26
27#[derive(Debug, PartialEq, Eq)]
29pub struct SendError<T>(pub T);
30
31impl<T> fmt::Display for SendError<T> {
32 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
33 write!(fmt, "receiver disconnected")
34 }
35}
36
37impl<T: fmt::Debug> std::error::Error for SendError<T> {}
38
39#[derive(Debug, PartialEq, Eq)]
41pub enum TrySendError<T> {
42 InsufficientCapacity(T),
43 Disconnected(T),
44}
45
46impl<T> TrySendError<T> {
47 pub fn into_inner(self) -> T {
48 match self {
49 Self::InsufficientCapacity(item) | Self::Disconnected(item) => item,
50 }
51 }
52}
53
54impl<T> fmt::Display for TrySendError<T> {
55 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::InsufficientCapacity(_) => {
58 write!(fmt, "channel lacks sufficient capacity for send")
59 }
60 Self::Disconnected(_) => write!(fmt, "receiver disconnected"),
61 }
62 }
63}
64
65impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
66
67trait QueueImpl<T>: Send + Sync + fmt::Debug {
69 fn push(&self, item: T);
70 fn pop(&self) -> Option<T>;
71}
72
73impl<T> QueueImpl<T> for ArrayQueue<T>
74where
75 T: Send + Sync + fmt::Debug,
76{
77 fn push(&self, item: T) {
78 self.push(item)
79 .unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full."));
80 }
81
82 fn pop(&self) -> Option<T> {
83 self.pop()
84 }
85}
86
87impl<T> QueueImpl<T> for SegQueue<T>
88where
89 T: Send + Sync + fmt::Debug,
90{
91 fn push(&self, item: T) {
92 self.push(item);
93 }
94
95 fn pop(&self) -> Option<T> {
96 self.pop()
97 }
98}
99
100#[derive(Clone, Debug)]
101pub struct ChannelMetricMetadata {
102 prefix: &'static str,
103 output: Option<String>,
104}
105
106impl ChannelMetricMetadata {
107 pub fn new(prefix: &'static str, output: Option<String>) -> Self {
108 Self { prefix, output }
109 }
110}
111
112#[derive(Clone, Debug)]
113struct Metrics {
114 histogram: Histogram,
115 gauge: Gauge,
116 mean_gauge: TimeEwmaGauge,
117 #[expect(dead_code)]
121 max_gauge: Gauge,
122 #[expect(dead_code)]
123 legacy_max_gauge: Gauge,
124 #[cfg(test)]
125 recorded_values: Arc<Mutex<Vec<usize>>>,
126}
127
128impl Metrics {
129 #[expect(clippy::cast_precision_loss)] #[allow(clippy::disallowed_macros)] fn new(
132 limit: MemoryBufferSize,
133 metadata: ChannelMetricMetadata,
134 ewma_half_life_seconds: Option<f64>,
135 ) -> Self {
136 let ewma_half_life_seconds =
137 ewma_half_life_seconds.unwrap_or(DEFAULT_EWMA_HALF_LIFE_SECONDS);
138 let ChannelMetricMetadata { prefix, output } = metadata;
139 let (legacy_suffix, gauge_suffix, max_value) = match limit {
140 MemoryBufferSize::MaxEvents(max_events) => (
141 "_max_event_size",
142 "_max_size_events",
143 max_events.get() as f64,
144 ),
145 MemoryBufferSize::MaxSize(max_bytes) => {
146 ("_max_byte_size", "_max_size_bytes", max_bytes.get() as f64)
147 }
148 };
149 let max_gauge_name = format!("{prefix}{gauge_suffix}");
150 let legacy_max_gauge_name = format!("{prefix}{legacy_suffix}");
151 let histogram_name = format!("{prefix}_utilization");
152 let gauge_name = format!("{prefix}_utilization_level");
153 let mean_name = format!("{prefix}_utilization_mean");
154 #[cfg(test)]
155 let recorded_values = Arc::new(Mutex::new(Vec::new()));
156 if let Some(label_value) = output {
157 let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
158 max_gauge.set(max_value);
159 let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
160 let legacy_max_gauge = gauge!(legacy_max_gauge_name, "output" => label_value.clone());
162 legacy_max_gauge.set(max_value);
163 Self {
164 histogram: histogram!(histogram_name, "output" => label_value.clone()),
165 gauge: gauge!(gauge_name, "output" => label_value.clone()),
166 mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
167 max_gauge,
168 legacy_max_gauge,
169 #[cfg(test)]
170 recorded_values,
171 }
172 } else {
173 let max_gauge = gauge!(max_gauge_name);
174 max_gauge.set(max_value);
175 let mean_gauge_handle = gauge!(mean_name);
176 let legacy_max_gauge = gauge!(legacy_max_gauge_name);
178 legacy_max_gauge.set(max_value);
179 Self {
180 histogram: histogram!(histogram_name),
181 gauge: gauge!(gauge_name),
182 mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
183 max_gauge,
184 legacy_max_gauge,
185 #[cfg(test)]
186 recorded_values,
187 }
188 }
189 }
190
191 #[expect(clippy::cast_precision_loss)]
192 fn record(&self, value: usize, reference: Instant) {
193 self.histogram.record(value as f64);
194 self.gauge.set(value as f64);
195 self.mean_gauge.record(value as f64, reference);
196 #[cfg(test)]
197 if let Ok(mut recorded) = self.recorded_values.lock() {
198 recorded.push(value);
199 }
200 }
201}
202
203#[derive(Debug)]
204struct Inner<T> {
205 data: Arc<dyn QueueImpl<(OwnedSemaphorePermit, T)>>,
206 limit: MemoryBufferSize,
207 limiter: Arc<Semaphore>,
208 read_waker: Arc<Notify>,
209 metrics: Option<Metrics>,
210 capacity: NonZeroUsize,
211}
212
213impl<T> Clone for Inner<T> {
214 fn clone(&self) -> Self {
215 Self {
216 data: self.data.clone(),
217 limit: self.limit,
218 limiter: self.limiter.clone(),
219 read_waker: self.read_waker.clone(),
220 metrics: self.metrics.clone(),
221 capacity: self.capacity,
222 }
223 }
224}
225
226impl<T: Send + Sync + Debug + 'static> Inner<T> {
227 fn new(
228 limit: MemoryBufferSize,
229 metric_metadata: Option<ChannelMetricMetadata>,
230 ewma_half_life_seconds: Option<f64>,
231 ) -> Self {
232 let read_waker = Arc::new(Notify::new());
233 let metrics =
234 metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_half_life_seconds));
235 match limit {
236 MemoryBufferSize::MaxEvents(max_events) => Inner {
237 data: Arc::new(ArrayQueue::new(max_events.get())),
238 limit,
239 limiter: Arc::new(Semaphore::new(max_events.get())),
240 read_waker,
241 metrics,
242 capacity: max_events,
243 },
244 MemoryBufferSize::MaxSize(max_bytes) => Inner {
245 data: Arc::new(SegQueue::new()),
246 limit,
247 limiter: Arc::new(Semaphore::new(max_bytes.get())),
248 read_waker,
249 metrics,
250 capacity: max_bytes,
251 },
252 }
253 }
254
255 fn send_with_permits(&mut self, size: usize, permits: OwnedSemaphorePermit, item: T) {
260 if let Some(metrics) = &self.metrics {
261 let utilization = size.max(self.used_capacity());
266 metrics.record(utilization, Instant::now());
267 }
268 self.data.push((permits, item));
269 self.read_waker.notify_one();
270 }
271}
272
273impl<T> Inner<T> {
274 fn used_capacity(&self) -> usize {
275 self.capacity.get() - self.limiter.available_permits()
276 }
277
278 fn pop_and_record(&self) -> Option<T> {
279 self.data.pop().map(|(permit, item)| {
280 if let Some(metrics) = &self.metrics {
281 let utilization = self.used_capacity().saturating_sub(permit.num_permits());
285 metrics.record(utilization, Instant::now());
286 }
287 drop(permit);
290 item
291 })
292 }
293}
294
295#[derive(Debug)]
296pub struct LimitedSender<T> {
297 inner: Inner<T>,
298 sender_count: Arc<AtomicUsize>,
299}
300
301impl<T: InMemoryBufferable> LimitedSender<T> {
302 #[allow(clippy::cast_possible_truncation)]
303 fn calc_required_permits(&self, item: &T) -> (usize, u32) {
304 let value = match self.inner.limit {
308 MemoryBufferSize::MaxSize(_) => item.allocated_bytes(),
309 MemoryBufferSize::MaxEvents(_) => item.event_count(),
310 };
311 let limit = self.inner.capacity.get();
312 (value, cmp::min(limit, value) as u32)
313 }
314
315 pub fn available_capacity(&self) -> usize {
317 self.inner.limiter.available_permits()
318 }
319
320 pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
327 let (size, permits_required) = self.calc_required_permits(&item);
329 match self
330 .inner
331 .limiter
332 .clone()
333 .acquire_many_owned(permits_required)
334 .await
335 {
336 Ok(permits) => {
337 self.inner.send_with_permits(size, permits, item);
338 trace!("Sent item.");
339 Ok(())
340 }
341 Err(_) => Err(SendError(item)),
342 }
343 }
344
345 pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
358 let (size, permits_required) = self.calc_required_permits(&item);
360 match self
361 .inner
362 .limiter
363 .clone()
364 .try_acquire_many_owned(permits_required)
365 {
366 Ok(permits) => {
367 self.inner.send_with_permits(size, permits, item);
368 trace!("Attempt to send item succeeded.");
369 Ok(())
370 }
371 Err(TryAcquireError::NoPermits) => Err(TrySendError::InsufficientCapacity(item)),
372 Err(TryAcquireError::Closed) => Err(TrySendError::Disconnected(item)),
373 }
374 }
375}
376
377impl<T> Clone for LimitedSender<T> {
378 fn clone(&self) -> Self {
379 self.sender_count.fetch_add(1, Ordering::SeqCst);
380
381 Self {
382 inner: self.inner.clone(),
383 sender_count: Arc::clone(&self.sender_count),
384 }
385 }
386}
387
388impl<T> Drop for LimitedSender<T> {
389 fn drop(&mut self) {
390 if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
392 self.inner.limiter.close();
393 self.inner.read_waker.notify_one();
394 }
395 }
396}
397
398#[derive(Debug)]
399pub struct LimitedReceiver<T> {
400 inner: Inner<T>,
401}
402
403impl<T: Send + 'static> LimitedReceiver<T> {
404 pub fn available_capacity(&self) -> usize {
406 self.inner.limiter.available_permits()
407 }
408
409 pub async fn next(&mut self) -> Option<T> {
410 loop {
411 if let Some(item) = self.inner.pop_and_record() {
412 return Some(item);
413 }
414
415 if self.inner.limiter.is_closed() {
418 if self.available_capacity() < self.inner.capacity.get() {
419 tokio::task::yield_now().await;
422 continue;
423 }
424 return None;
425 }
426
427 self.inner.read_waker.notified().await;
431 }
432 }
433
434 pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = T> + Send>> {
435 let mut receiver = self;
436 Box::pin(stream! {
437 while let Some(item) = receiver.next().await {
438 yield item;
439 }
440 })
441 }
442}
443
444impl<T> Drop for LimitedReceiver<T> {
445 fn drop(&mut self) {
446 self.inner.limiter.close();
450 }
451}
452
453pub fn limited<T: InMemoryBufferable + fmt::Debug>(
454 limit: MemoryBufferSize,
455 metric_metadata: Option<ChannelMetricMetadata>,
456 ewma_half_life_seconds: Option<f64>,
457) -> (LimitedSender<T>, LimitedReceiver<T>) {
458 let inner = Inner::new(limit, metric_metadata, ewma_half_life_seconds);
459
460 let sender = LimitedSender {
461 inner: inner.clone(),
462 sender_count: Arc::new(AtomicUsize::new(1)),
463 };
464 let receiver = LimitedReceiver { inner };
465
466 (sender, receiver)
467}
468
469#[cfg(test)]
470mod tests {
471 use std::num::NonZeroUsize;
472
473 use rand::{Rng as _, SeedableRng as _, rngs::SmallRng};
474 use tokio_test::{assert_pending, assert_ready, task::spawn};
475 use vector_common::byte_size_of::ByteSizeOf;
476
477 use super::{ChannelMetricMetadata, LimitedReceiver, LimitedSender, limited};
478 use crate::{
479 MemoryBufferSize,
480 test::MultiEventRecord,
481 topology::{channel::limited_queue::SendError, test_util::Sample},
482 };
483
484 #[tokio::test]
485 async fn send_receive() {
486 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
487 let (mut tx, mut rx) = limited(limit, None, None);
488
489 assert_eq!(2, tx.available_capacity());
490
491 let msg = Sample::new(42);
492
493 let mut send = spawn(async { tx.send(msg).await });
495
496 let mut recv = spawn(async { rx.next().await });
497
498 assert!(!send.is_woken());
500 assert!(!recv.is_woken());
501
502 assert_pending!(recv.poll());
504
505 assert_eq!(Ok(()), assert_ready!(send.poll()));
507
508 assert!(recv.is_woken());
510 assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
511 }
512
513 #[tokio::test]
514 async fn records_utilization() {
515 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
516 let (mut tx, mut rx) = limited(
517 limit,
518 Some(ChannelMetricMetadata::new("test_channel", None)),
519 None,
520 );
521
522 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
523
524 tx.send(Sample::new(1)).await.expect("send should succeed");
525 let records = metrics.lock().unwrap().clone();
526 assert_eq!(records.len(), 1);
527 assert_eq!(records.last().copied(), Some(1));
528
529 assert_eq!(Sample::new(1), rx.next().await.unwrap());
530 let records = metrics.lock().unwrap();
531 assert_eq!(records.len(), 2);
532 assert_eq!(records.last().copied(), Some(0));
533 }
534
535 #[tokio::test]
536 async fn oversized_send_records_true_utilization_via_normal_send_path() {
537 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
538 let (mut tx, mut rx) = limited(
539 limit,
540 Some(ChannelMetricMetadata::new("test_channel_oversized", None)),
541 None,
542 );
543 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
544
545 let oversized = MultiEventRecord::new(3);
548 tx.send(oversized.clone())
549 .await
550 .expect("send should succeed");
551
552 let records = metrics.lock().unwrap().clone();
553 assert_eq!(records.len(), 1);
554 assert_eq!(records.last().copied(), Some(3));
555
556 assert_eq!(Some(oversized), rx.next().await);
557 let records = metrics.lock().unwrap().clone();
558 assert_eq!(records.len(), 2);
559 assert_eq!(records.last().copied(), Some(0));
560 }
561
562 #[test]
563 fn test_limiting_by_byte_size() {
564 let max_elements = 10;
565 let msg = Sample::new_with_heap_allocated_values(50);
566 let msg_size = msg.allocated_bytes();
567 let max_allowed_bytes = msg_size * max_elements;
568
569 let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap());
571 let (mut tx, mut rx) = limited(limit, None, None);
572
573 assert_eq!(max_allowed_bytes, tx.available_capacity());
574
575 for _ in 0..10 {
577 let msg_clone = msg.clone();
578 let mut f = spawn(async { tx.send(msg_clone).await });
579 assert_eq!(Ok(()), assert_ready!(f.poll()));
580 }
581 assert_eq!(0, tx.available_capacity());
583
584 let mut send_final = spawn({
586 let msg_clone = msg.clone();
587 async { tx.send(msg_clone).await }
588 });
589 assert_pending!(send_final.poll());
590
591 for _ in 0..10 {
593 let mut f = spawn(async { rx.next().await });
594 let value = assert_ready!(f.poll());
595 assert_eq!(value.allocated_bytes(), msg_size);
596 }
597 let mut recv = spawn(async { rx.next().await });
599 assert_pending!(recv.poll());
600 }
601
602 #[test]
603 fn sender_waits_for_more_capacity_when_none_available() {
604 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
605 let (mut tx, mut rx) = limited(limit, None, None);
606
607 assert_eq!(1, tx.available_capacity());
608
609 let msg1 = Sample::new(42);
610 let msg2 = Sample::new(43);
611
612 let mut send1 = spawn(async { tx.send(msg1).await });
614
615 let mut recv1 = spawn(async { rx.next().await });
616
617 assert!(!send1.is_woken());
619 assert!(!recv1.is_woken());
620
621 assert_pending!(recv1.poll());
623
624 assert_eq!(Ok(()), assert_ready!(send1.poll()));
626 drop(send1);
627
628 assert_eq!(0, tx.available_capacity());
629
630 assert!(recv1.is_woken());
633
634 let mut send2 = spawn(async { tx.send(msg2).await });
636
637 assert!(!send2.is_woken());
638 assert_pending!(send2.poll());
639
640 assert_eq!(Some(Sample::new(42)), assert_ready!(recv1.poll()));
642 drop(recv1);
643
644 assert_eq!(0, rx.available_capacity());
647 assert!(send2.is_woken());
648
649 let mut recv2 = spawn(async { rx.next().await });
650 assert_pending!(recv2.poll());
651
652 assert_eq!(Ok(()), assert_ready!(send2.poll()));
653 drop(send2);
654
655 assert_eq!(0, tx.available_capacity());
656
657 assert!(recv2.is_woken());
659 assert_eq!(Some(Sample::new(43)), assert_ready!(recv2.poll()));
660
661 assert_eq!(1, tx.available_capacity());
662 }
663
664 #[test]
665 fn sender_waits_for_more_capacity_when_partial_available() {
666 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap());
667 let (mut tx, mut rx) = limited(limit, None, None);
668
669 assert_eq!(7, tx.available_capacity());
670
671 let msgs1 = vec![
672 MultiEventRecord::new(1),
673 MultiEventRecord::new(2),
674 MultiEventRecord::new(3),
675 ];
676 let msg2 = MultiEventRecord::new(4);
677
678 let mut small_sends = spawn(async {
680 for msg in msgs1.clone() {
681 tx.send(msg).await?;
682 }
683
684 Ok::<_, SendError<MultiEventRecord>>(())
685 });
686
687 let mut recv1 = spawn(async { rx.next().await });
688
689 assert!(!small_sends.is_woken());
691 assert!(!recv1.is_woken());
692
693 assert_pending!(recv1.poll());
695
696 assert_eq!(Ok(()), assert_ready!(small_sends.poll()));
699 drop(small_sends);
700
701 assert_eq!(1, tx.available_capacity());
702
703 assert!(recv1.is_woken());
706
707 let mut send2 = spawn(tx.send(msg2.clone()));
709
710 assert!(!send2.is_woken());
711 assert_pending!(send2.poll());
712
713 assert_eq!(Some(&msgs1[0]), assert_ready!(recv1.poll()).as_ref());
716 drop(recv1);
717
718 assert_eq!(0, rx.available_capacity());
722
723 assert!(!send2.is_woken());
725
726 let mut recv2 = spawn(async { rx.next().await });
728 assert!(!recv2.is_woken());
729 assert_eq!(Some(&msgs1[1]), assert_ready!(recv2.poll()).as_ref());
730 drop(recv2);
731
732 assert_eq!(0, rx.available_capacity());
733
734 assert!(send2.is_woken());
735 assert_eq!(Ok(()), assert_ready!(send2.poll()));
736
737 let mut recv3 = spawn(async { rx.next().await });
739 assert!(!recv3.is_woken());
740 assert_eq!(Some(&msgs1[2]), assert_ready!(recv3.poll()).as_ref());
741 drop(recv3);
742
743 assert_eq!(3, rx.available_capacity());
744
745 let mut recv4 = spawn(async { rx.next().await });
746 assert!(!recv4.is_woken());
747 assert_eq!(Some(msg2), assert_ready!(recv4.poll()));
748 drop(recv4);
749
750 assert_eq!(7, rx.available_capacity());
751 }
752
753 #[test]
754 fn empty_receiver_returns_none_when_last_sender_drops() {
755 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
756 let (mut tx, mut rx) = limited(limit, None, None);
757
758 assert_eq!(1, tx.available_capacity());
759
760 let tx2 = tx.clone();
761 let msg = Sample::new(42);
762
763 let mut send = spawn(async { tx.send(msg).await });
765
766 let mut recv = spawn(async { rx.next().await });
767
768 assert!(!send.is_woken());
770 assert!(!recv.is_woken());
771
772 assert_pending!(recv.poll());
774
775 drop(tx2);
777 assert!(!recv.is_woken());
778 assert_pending!(recv.poll());
779
780 assert_eq!(Ok(()), assert_ready!(send.poll()));
784 drop(send);
785 drop(tx);
786
787 assert!(recv.is_woken());
788 assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
789 drop(recv);
790
791 let mut recv2 = spawn(async { rx.next().await });
792 assert!(!recv2.is_woken());
793 assert_eq!(None, assert_ready!(recv2.poll()));
794 }
795
796 #[test]
797 fn receiver_returns_none_once_empty_when_last_sender_drops() {
798 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
799 let (tx, mut rx) = limited::<Sample>(limit, None, None);
800
801 assert_eq!(1, tx.available_capacity());
802
803 let tx2 = tx.clone();
804
805 let mut recv = spawn(async { rx.next().await });
807
808 assert!(!recv.is_woken());
810
811 assert_pending!(recv.poll());
813
814 drop(tx);
816 assert!(!recv.is_woken());
817 assert_pending!(recv.poll());
818
819 drop(tx2);
822 assert!(recv.is_woken());
823 assert_eq!(None, assert_ready!(recv.poll()));
824 }
825
826 #[test]
827 fn oversized_send_allowed_when_empty() {
828 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
829 let (mut tx, mut rx) = limited(limit, None, None);
830
831 assert_eq!(1, tx.available_capacity());
832
833 let msg = MultiEventRecord::new(2);
834
835 let mut send = spawn(async { tx.send(msg.clone()).await });
837
838 let mut recv = spawn(async { rx.next().await });
839
840 assert!(!send.is_woken());
842 assert!(!recv.is_woken());
843
844 assert_eq!(Ok(()), assert_ready!(send.poll()));
847 drop(send);
848
849 assert_eq!(0, tx.available_capacity());
850
851 assert_eq!(Some(msg), assert_ready!(recv.poll()));
854 drop(recv);
855
856 assert_eq!(1, rx.available_capacity());
857 }
858
859 #[test]
860 fn oversized_send_allowed_when_partial_capacity() {
861 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
862 let (mut tx, mut rx) = limited(limit, None, None);
863
864 assert_eq!(2, tx.available_capacity());
865
866 let msg1 = MultiEventRecord::new(1);
867 let msg2 = MultiEventRecord::new(3);
868
869 let mut send = spawn(async { tx.send(msg1.clone()).await });
871
872 assert!(!send.is_woken());
874
875 assert_eq!(Ok(()), assert_ready!(send.poll()));
877 drop(send);
878
879 assert_eq!(1, tx.available_capacity());
880
881 let mut send2 = spawn(async { tx.send(msg2.clone()).await });
884
885 assert!(!send2.is_woken());
886 assert_pending!(send2.poll());
887
888 assert_eq!(0, rx.available_capacity());
889
890 let mut recv = spawn(async { rx.next().await });
893 assert!(!recv.is_woken());
894 assert!(!send2.is_woken());
895
896 assert_eq!(Some(msg1), assert_ready!(recv.poll()));
897 drop(recv);
898
899 assert_eq!(0, rx.available_capacity());
900
901 assert_eq!(Ok(()), assert_ready!(send2.poll()));
904 drop(send2);
905
906 assert_eq!(0, tx.available_capacity());
907
908 let mut recv2 = spawn(async { rx.next().await });
909 assert_eq!(Some(msg2), assert_ready!(recv2.poll()));
910
911 assert_eq!(2, tx.available_capacity());
912 }
913
914 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
915 async fn concurrent_send_receive_metrics_remain_valid() {
916 const ITEM_COUNT: usize = 4_000;
917
918 for size in 1..=100 {
920 let limit = NonZeroUsize::new(size * 10).unwrap();
921 let (tx, rx) = limited(
922 MemoryBufferSize::MaxEvents(limit),
923 Some(ChannelMetricMetadata::new("test_channel_concurrent", None)),
924 None,
925 );
926 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
927
928 let sender = tokio::spawn(send_samples(tx, ITEM_COUNT));
929 let receiver = tokio::spawn(receive_samples(rx, ITEM_COUNT));
930
931 sender.await.expect("sender task should not panic");
932 receiver.await.expect("receiver task should not panic");
933
934 let recorded = metrics.lock().unwrap().clone();
935 assert_eq!(
936 recorded.len(),
937 ITEM_COUNT * 2,
938 "expected one metric update per send and per receive"
939 );
940
941 let max_allowed = limit.get();
944 let observed_max = recorded.iter().copied().max().unwrap_or_default();
945 assert!(
946 recorded.iter().all(|value| *value <= max_allowed),
947 "observed utilization value above valid bound: max={observed_max}, allowed={max_allowed}"
948 );
949 }
950 }
951
952 async fn send_samples(mut tx: LimitedSender<Sample>, item_count: usize) {
953 let mut rng = SmallRng::from_rng(&mut rand::rng());
954
955 for i in 0..item_count {
956 tx.send(Sample::new(i as u64))
957 .await
958 .expect("send should succeed");
959 if rng.random::<u8>() % 8 == 0 {
960 tokio::task::yield_now().await;
961 }
962 }
963 }
964
965 async fn receive_samples(mut rx: LimitedReceiver<Sample>, item_count: usize) {
966 let mut rng = SmallRng::from_rng(&mut rand::rng());
967
968 for i in 0..item_count {
969 let next = rx
970 .next()
971 .await
972 .expect("receiver should yield all sent items");
973 assert_eq!(Sample::new(i as u64), next);
974 if rng.random::<u8>() % 8 == 0 {
975 tokio::task::yield_now().await;
976 }
977 }
978 }
979}