1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{config::ComponentKey, event::EventArray};
11
12pub enum ControlMessage {
13 Add(ComponentKey, BufferSender<EventArray>),
15
16 Remove(ComponentKey),
18
19 Pause(ComponentKey),
24
25 Replace(ComponentKey, BufferSender<EventArray>),
27}
28
29impl fmt::Debug for ControlMessage {
30 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
31 write!(f, "ControlMessage::")?;
32 match self {
33 Self::Add(id, _) => write!(f, "Add({id:?})"),
34 Self::Remove(id) => write!(f, "Remove({id:?})"),
35 Self::Pause(id) => write!(f, "Pause({id:?})"),
36 Self::Replace(id, _) => write!(f, "Replace({id:?})"),
37 }
38 }
39}
40
41pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
44
45pub struct Fanout {
46 senders: IndexMap<ComponentKey, Option<Sender>>,
47 control_channel: mpsc::UnboundedReceiver<ControlMessage>,
48}
49
50impl Fanout {
51 pub fn new() -> (Self, ControlChannel) {
52 let (control_tx, control_rx) = mpsc::unbounded_channel();
53
54 let fanout = Self {
55 senders: Default::default(),
56 control_channel: control_rx,
57 };
58
59 (fanout, control_tx)
60 }
61
62 pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
68 assert!(
69 !self.senders.contains_key(&id),
70 "Adding duplicate output id to fanout: {id}"
71 );
72 self.senders.insert(id, Some(Sender::new(sink)));
73 }
74
75 fn remove(&mut self, id: &ComponentKey) {
76 assert!(
77 self.senders.shift_remove(id).is_some(),
78 "Removing nonexistent sink from fanout: {id}"
79 );
80 }
81
82 fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
83 match self.senders.get_mut(id) {
84 Some(sender) => {
85 assert!(
89 sender.replace(Sender::new(sink)).is_none(),
90 "Replacing existing sink is not valid: {id}"
91 );
92 }
93 None => panic!("Replacing unknown sink from fanout: {id}"),
94 }
95 }
96
97 fn pause(&mut self, id: &ComponentKey) {
98 match self.senders.get_mut(id) {
99 Some(sender) => {
100 assert!(
103 sender.take().is_some(),
104 "Pausing nonexistent sink is not valid: {id}"
105 );
106 }
107 None => panic!("Pausing unknown sink from fanout: {id}"),
108 }
109 }
110
111 pub async fn recv_control_message(&mut self) -> bool {
116 match self.control_channel.recv().await {
117 Some(msg) => {
118 self.apply_control_message(msg);
119 true
120 }
121 None => false,
122 }
123 }
124
125 fn apply_control_message(&mut self, message: ControlMessage) {
129 trace!("Processing control message outside of send: {:?}", message);
130
131 match message {
132 ControlMessage::Add(id, sink) => self.add(id, sink),
133 ControlMessage::Remove(id) => self.remove(&id),
134 ControlMessage::Pause(id) => self.pause(&id),
135 ControlMessage::Replace(id, sink) => self.replace(&id, sink),
136 }
137 }
138
139 async fn wait_for_replacements(&mut self) {
144 while self.senders.values().any(Option::is_none) {
145 if let Some(msg) = self.control_channel.recv().await {
146 self.apply_control_message(msg);
147 } else {
148 }
166 }
167 }
168
169 pub async fn send_stream(
185 &mut self,
186 events: impl Stream<Item = (EventArray, Instant)>,
187 ) -> crate::Result<()> {
188 tokio::pin!(events);
189 while let Some((event_array, send_reference)) = events.next().await {
190 self.send(event_array, Some(send_reference)).await?;
191 }
192 Ok(())
193 }
194
195 pub async fn send(
211 &mut self,
212 events: EventArray,
213 send_reference: Option<Instant>,
214 ) -> crate::Result<()> {
215 while let Ok(message) = self.control_channel.try_recv() {
217 self.apply_control_message(message);
218 }
219
220 self.wait_for_replacements().await;
222
223 if self.senders.is_empty() {
225 trace!("No senders present.");
226 return Ok(());
227 }
228
229 let mut control_channel_open = true;
236
237 let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
240
241 loop {
242 tokio::select! {
243 biased;
247
248 maybe_msg = self.control_channel.recv(), if control_channel_open => {
249 trace!("Processing control message inside of send: {:?}", maybe_msg);
250
251 match maybe_msg {
254 Some(ControlMessage::Add(id, sink)) => {
255 send_group.add(id, sink);
256 },
257 Some(ControlMessage::Remove(id)) => {
258 send_group.remove(&id);
259 },
260 Some(ControlMessage::Pause(id)) => {
261 send_group.pause(&id);
262 },
263 Some(ControlMessage::Replace(id, sink)) => {
264 send_group.replace(&id, Sender::new(sink));
265 },
266 None => {
267 control_channel_open = false;
269 }
270 }
271 }
272
273 result = send_group.send() => match result {
274 Ok(()) => {
275 trace!("Sent item to fanout.");
276 break;
277 },
278 Err(e) => return Err(e),
279 }
280 }
281 }
282
283 Ok(())
284 }
285}
286
287struct SendGroup<'a> {
288 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
289 sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
290}
291
292impl<'a> SendGroup<'a> {
293 fn new(
294 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
295 events: EventArray,
296 send_reference: Option<Instant>,
297 ) -> Self {
298 debug_assert!(senders.values().all(Option::is_some));
301
302 let last_sender_idx = senders.len().saturating_sub(1);
303 let mut events = Some(events);
304
305 let mut sends = HashMap::new();
308 for (i, (key, sender)) in senders.iter_mut().enumerate() {
309 let mut sender = sender
310 .take()
311 .expect("sender must be present to initialize SendGroup");
312
313 if i == last_sender_idx {
315 sender.input = events.take();
316 } else {
317 sender.input.clone_from(&events);
318 }
319 sender.send_reference = send_reference;
320
321 let send = async move {
323 sender.flush().await?;
324 Ok(sender)
325 };
326
327 sends.insert(key.clone(), ReusableBoxFuture::new(send));
328 }
329
330 Self { senders, sends }
331 }
332
333 fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
334 if let Some(send) = self.sends.remove(id) {
335 tokio::spawn(async move {
336 if let Err(e) = send.await {
337 warn!(
338 cause = %e,
339 message = "Encountered error writing to component after detaching from topology.",
340 );
341 }
342 });
343 true
344 } else {
345 false
346 }
347 }
348
349 #[allow(clippy::needless_pass_by_value)]
350 fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
351 assert!(
354 self.senders
355 .insert(id.clone(), Some(Sender::new(sink)))
356 .is_none(),
357 "Adding duplicate output id to fanout: {id}"
358 );
359 }
360
361 fn remove(&mut self, id: &ComponentKey) {
362 assert!(
366 self.senders.shift_remove(id).is_some(),
367 "Removing nonexistent sink from fanout: {id}"
368 );
369
370 self.try_detach_send(id);
375 }
376
377 fn replace(&mut self, id: &ComponentKey, sink: Sender) {
378 match self.senders.get_mut(id) {
379 Some(sender) => {
380 assert!(
384 sender.replace(sink).is_none(),
385 "Replacing existing sink is not valid: {id}"
386 );
387 }
388 None => panic!("Replacing unknown sink from fanout: {id}"),
389 }
390 }
391
392 fn pause(&mut self, id: &ComponentKey) {
393 match self.senders.get_mut(id) {
394 Some(sender) => {
395 if sender.take().is_none() {
402 assert!(
403 self.try_detach_send(id),
404 "Pausing already-paused sink is invalid: {id}"
405 );
406 }
407 }
408 None => panic!("Pausing unknown sink from fanout: {id}"),
409 }
410 }
411
412 async fn send(&mut self) -> crate::Result<()> {
413 loop {
417 if self.sends.is_empty() {
418 break;
419 }
420
421 let mut done = Vec::new();
422 for (key, send) in &mut self.sends {
423 if let Poll::Ready(result) = poll!(send.get_pin()) {
424 let sender = result?;
425
426 done.push((key.clone(), sender));
429 }
430 }
431
432 for (key, sender) in done {
433 self.sends.remove(&key);
434 self.replace(&key, sender);
435 }
436
437 if !self.sends.is_empty() {
438 pending!();
442 }
443 }
444
445 Ok(())
446 }
447}
448
449struct Sender {
450 inner: BufferSender<EventArray>,
451 input: Option<EventArray>,
452 send_reference: Option<Instant>,
453}
454
455impl Sender {
456 fn new(inner: BufferSender<EventArray>) -> Self {
457 Self {
458 inner,
459 input: None,
460 send_reference: None,
461 }
462 }
463
464 async fn flush(&mut self) -> crate::Result<()> {
465 let send_reference = self.send_reference.take();
466 if let Some(input) = self.input.take() {
467 self.inner.send(input, send_reference).await?;
468 self.inner.flush().await?;
469 }
470
471 Ok(())
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use std::{mem, num::NonZeroUsize};
478
479 use futures::poll;
480 use tokio::sync::mpsc::UnboundedSender;
481 use tokio_test::{assert_pending, assert_ready, task::spawn};
482 use tracing::Span;
483 use vector_buffers::{
484 WhenFull,
485 topology::{
486 builder::TopologyBuilder,
487 channel::{BufferReceiver, BufferSender},
488 },
489 };
490 use vrl::value::Value;
491
492 use super::{ControlMessage, Fanout};
493 use crate::{
494 config::ComponentKey,
495 event::{Event, EventArray, EventContainer, LogEvent},
496 test_util::{collect_ready, collect_ready_events},
497 };
498
499 fn build_sender_pair(
500 capacity: usize,
501 ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
502 TopologyBuilder::standalone_memory(
503 NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
504 WhenFull::Block,
505 &Span::current(),
506 None,
507 None,
508 )
509 }
510
511 fn build_sender_pairs(
512 capacities: &[usize],
513 ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
514 let mut pairs = Vec::new();
515 for capacity in capacities {
516 pairs.push(build_sender_pair(*capacity));
517 }
518 pairs
519 }
520
521 fn fanout_from_senders(
522 capacities: &[usize],
523 ) -> (
524 Fanout,
525 UnboundedSender<ControlMessage>,
526 Vec<BufferReceiver<EventArray>>,
527 ) {
528 let (mut fanout, control) = Fanout::new();
529 let pairs = build_sender_pairs(capacities);
530
531 let mut receivers = Vec::new();
532 for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
533 fanout.add(ComponentKey::from(i.to_string()), sender);
534 receivers.push(receiver);
535 }
536
537 (fanout, control, receivers)
538 }
539
540 fn add_sender_to_fanout(
541 fanout: &mut Fanout,
542 receivers: &mut Vec<BufferReceiver<EventArray>>,
543 sender_id: usize,
544 capacity: usize,
545 ) {
546 let (sender, receiver) = build_sender_pair(capacity);
547 receivers.push(receiver);
548
549 fanout.add(ComponentKey::from(sender_id.to_string()), sender);
550 }
551
552 fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
553 control
554 .send(ControlMessage::Remove(ComponentKey::from(
555 sender_id.to_string(),
556 )))
557 .expect("sending control message should not fail");
558 }
559
560 fn replace_sender_in_fanout(
561 control: &UnboundedSender<ControlMessage>,
562 receivers: &mut [BufferReceiver<EventArray>],
563 sender_id: usize,
564 capacity: usize,
565 ) -> BufferReceiver<EventArray> {
566 let (sender, receiver) = build_sender_pair(capacity);
567 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
568
569 control
570 .send(ControlMessage::Pause(ComponentKey::from(
571 sender_id.to_string(),
572 )))
573 .expect("sending control message should not fail");
574
575 control
576 .send(ControlMessage::Replace(
577 ComponentKey::from(sender_id.to_string()),
578 sender,
579 ))
580 .expect("sending control message should not fail");
581
582 old_receiver
583 }
584
585 fn start_sender_replace(
586 control: &UnboundedSender<ControlMessage>,
587 receivers: &mut [BufferReceiver<EventArray>],
588 sender_id: usize,
589 capacity: usize,
590 ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
591 let (sender, receiver) = build_sender_pair(capacity);
592 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
593
594 control
595 .send(ControlMessage::Pause(ComponentKey::from(
596 sender_id.to_string(),
597 )))
598 .expect("sending control message should not fail");
599
600 (old_receiver, sender)
601 }
602
603 fn finish_sender_resume(
604 control: &UnboundedSender<ControlMessage>,
605 sender_id: usize,
606 sender: BufferSender<EventArray>,
607 ) {
608 control
609 .send(ControlMessage::Replace(
610 ComponentKey::from(sender_id.to_string()),
611 sender,
612 ))
613 .expect("sending control message should not fail");
614 }
615
616 fn unwrap_log_event_message<E>(event: E) -> String
617 where
618 E: EventContainer,
619 {
620 let event = event
621 .into_events()
622 .next()
623 .expect("must have at least one event");
624 let event = event.into_log();
625 event
626 .get("message")
627 .and_then(Value::as_bytes)
628 .and_then(|b| String::from_utf8(b.to_vec()).ok())
629 .expect("must be valid log event with `message` field")
630 }
631
632 #[tokio::test]
633 async fn fanout_writes_to_all() {
634 let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]);
635 let events = make_event_array(2);
636
637 let clones = events.clone();
638 fanout.send(clones, None).await.expect("should not fail");
639
640 for receiver in receivers {
641 assert_eq!(
642 collect_ready(receiver.into_stream()),
643 std::slice::from_ref(&events)
644 );
645 }
646 }
647
648 #[tokio::test]
649 async fn fanout_notready() {
650 let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]);
651 let events = make_events(2);
652
653 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
655 assert_ready!(first_send.poll()).expect("should not fail");
656 drop(first_send);
657
658 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
660 assert_pending!(second_send.poll());
661
662 for receiver in &mut receivers {
664 assert_eq!(Some(events[0].clone().into()), receiver.next().await);
665 }
666
667 assert_ready!(second_send.poll()).expect("should not fail");
669 drop(second_send);
670
671 for receiver in &mut receivers {
673 assert_eq!(Some(events[1].clone().into()), receiver.next().await);
674 }
675 }
676
677 #[tokio::test]
678 async fn fanout_grow() {
679 let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]);
680 let events = make_events(3);
681
682 fanout
684 .send(events[0].clone().into(), None)
685 .await
686 .expect("should not fail");
687 fanout
688 .send(events[1].clone().into(), None)
689 .await
690 .expect("should not fail");
691
692 add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4);
694
695 fanout
697 .send(events[2].clone().into(), None)
698 .await
699 .expect("should not fail");
700
701 let expected_events = [&events, &events, &events[2..]];
704 for (i, receiver) in receivers.into_iter().enumerate() {
705 assert_eq!(
706 collect_ready_events(receiver.into_stream()),
707 expected_events[i]
708 );
709 }
710 }
711
712 #[tokio::test]
713 async fn fanout_shrink() {
714 let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]);
715 let events = make_events(3);
716
717 fanout
719 .send(events[0].clone().into(), None)
720 .await
721 .expect("should not fail");
722 fanout
723 .send(events[1].clone().into(), None)
724 .await
725 .expect("should not fail");
726
727 remove_sender_from_fanout(&control, 1);
729
730 fanout
732 .send(events[2].clone().into(), None)
733 .await
734 .expect("should not fail");
735
736 let expected_events = [&events, &events[..2]];
738 for (i, receiver) in receivers.into_iter().enumerate() {
739 assert_eq!(
740 collect_ready_events(receiver.into_stream()),
741 expected_events[i]
742 );
743 }
744 }
745
746 #[tokio::test]
747 async fn fanout_shrink_when_notready() {
748 let events = make_events(2);
754 let expected_first_event = unwrap_log_event_message(events[0].clone());
755 let expected_second_event = unwrap_log_event_message(events[1].clone());
756
757 let cases = [
758 (
761 0,
762 false,
763 [
764 expected_second_event.clone(),
765 expected_first_event.clone(),
766 expected_second_event.clone(),
767 ],
768 ),
769 (
770 1,
771 true,
772 [
773 expected_second_event.clone(),
774 expected_second_event.clone(),
775 expected_second_event.clone(),
776 ],
777 ),
778 (
779 2,
780 false,
781 [
782 expected_second_event.clone(),
783 expected_first_event.clone(),
784 expected_second_event.clone(),
785 ],
786 ),
787 ];
788
789 for (sender_id, should_complete, expected_last_seen) in cases {
790 let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]);
791
792 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
794 assert_ready!(first_send.poll()).expect("should not fail");
795 drop(first_send);
796
797 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
799 assert_pending!(second_send.poll());
800
801 remove_sender_from_fanout(&control, sender_id);
803
804 if should_complete {
805 assert_ready!(second_send.poll()).expect("should not fail");
806 } else {
807 assert_pending!(second_send.poll());
808 }
809 drop(second_send);
810
811 drop(fanout);
813
814 let mut last_seen = Vec::new();
815 for receiver in &mut receivers {
816 let mut events = Vec::new();
817 while let Some(event) = receiver.next().await {
818 events.insert(0, event);
819 }
820
821 last_seen.push(unwrap_log_event_message(events.remove(0)));
822 }
823
824 assert_eq!(&expected_last_seen[..], &last_seen);
825 }
826 }
827
828 #[tokio::test]
829 async fn fanout_no_sinks() {
830 let (mut fanout, _) = Fanout::new();
831 let events = make_events(2);
832
833 fanout
834 .send(events[0].clone().into(), None)
835 .await
836 .expect("should not fail");
837 fanout
838 .send(events[1].clone().into(), None)
839 .await
840 .expect("should not fail");
841 }
842
843 #[tokio::test]
844 async fn fanout_replace() {
845 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]);
846 let events = make_events(3);
847
848 fanout
850 .send(events[0].clone().into(), None)
851 .await
852 .expect("should not fail");
853 fanout
854 .send(events[1].clone().into(), None)
855 .await
856 .expect("should not fail");
857
858 let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4);
860
861 fanout
863 .send(events[2].clone().into(), None)
864 .await
865 .expect("should not fail");
866
867 let expected_events = [&events[2..], &events, &events];
870 for (i, receiver) in receivers.into_iter().enumerate() {
871 assert_eq!(
872 collect_ready_events(receiver.into_stream()),
873 expected_events[i]
874 );
875 }
876
877 assert_eq!(
879 collect_ready_events(old_first_receiver.into_stream()),
880 &events[..2]
881 );
882 }
883
884 #[tokio::test]
885 async fn fanout_wait() {
886 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]);
887 let events = make_events(3);
888
889 let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
891 assert_ready!(poll!(send1)).expect("should not fail");
892 let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
893 assert_ready!(poll!(send2)).expect("should not fail");
894
895 let (old_first_receiver, new_first_sender) =
899 start_sender_replace(&control, &mut receivers, 0, 4);
900
901 let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
903 assert_pending!(third_send.poll());
904
905 finish_sender_resume(&control, 0, new_first_sender);
908 assert!(third_send.is_woken());
909 assert_ready!(third_send.poll()).expect("should not fail");
910
911 assert_eq!(
914 collect_ready_events(old_first_receiver.into_stream()),
915 &events[0..2]
916 );
917
918 let expected_events = [&events[2..], &events];
919 for (i, receiver) in receivers.into_iter().enumerate() {
920 assert_eq!(
921 collect_ready_events(receiver.into_stream()),
922 expected_events[i]
923 );
924 }
925 }
926
927 fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
928 (0..count).map(|i| LogEvent::from(format!("line {i}")))
929 }
930
931 fn make_events(count: usize) -> Vec<Event> {
932 make_events_inner(count).map(Into::into).collect()
933 }
934
935 fn make_event_array(count: usize) -> EventArray {
936 make_events_inner(count).collect::<Vec<_>>().into()
937 }
938}