1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 sync::{Arc, LazyLock, Mutex},
6 time::Instant,
7};
8
9use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered};
10use futures_util::stream::FuturesUnordered;
11use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
12use tokio::{
13 select,
14 sync::{mpsc::UnboundedSender, oneshot},
15 time::timeout,
16};
17use tracing::Instrument;
18use vector_lib::{
19 EstimatedJsonEncodedSizeOf,
20 buffers::{
21 BufferType, WhenFull,
22 topology::{
23 builder::TopologyBuilder,
24 channel::{BufferReceiver, BufferSender, ChannelMetricMetadata, LimitedReceiver},
25 },
26 },
27 internal_event::{self, CountByteSize, EventsSent, InternalEventHandle as _, Registered},
28 latency::LatencyRecorder,
29 schema::Definition,
30 source_sender::{CHUNK_SIZE, SourceSenderItem},
31 transform::update_runtime_schema_definition,
32};
33use vector_lib::{gauge, internal_event::GaugeName};
34use vector_vrl_metrics::MetricsStorage;
35
36use super::{
37 BuiltBuffer, ConfigDiff,
38 fanout::{self, Fanout},
39 schema,
40 task::{Task, TaskOutput, TaskResult},
41};
42use crate::{
43 SourceSender,
44 config::{
45 ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId,
46 ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput,
47 },
48 event::{EventArray, EventContainer},
49 extra_context::ExtraContext,
50 internal_events::EventsReceived,
51 shutdown::SourceShutdownCoordinator,
52 spawn_named,
53 topology::task::TaskError,
54 transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf},
55 utilization::{
56 OutputUtilization, Utilization, UtilizationComponentSender, UtilizationEmitter,
57 UtilizationRegistry,
58 },
59};
60
61static ENRICHMENT_TABLES: LazyLock<vector_lib::enrichment::TableRegistry> =
62 LazyLock::new(vector_lib::enrichment::TableRegistry::default);
63static METRICS_STORAGE: LazyLock<MetricsStorage> = LazyLock::new(MetricsStorage::default);
64
65pub(crate) static SOURCE_SENDER_BUFFER_SIZE: LazyLock<usize> =
66 LazyLock::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
67
68const READY_ARRAY_CAPACITY: NonZeroUsize = NonZeroUsize::new(CHUNK_SIZE * 4).unwrap();
69pub(crate) const TOPOLOGY_BUFFER_SIZE: NonZeroUsize = NonZeroUsize::new(100).unwrap();
70const TRANSFORM_CHANNEL_METRIC_PREFIX: &str = "transform_buffer";
71
72static TRANSFORM_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
73 crate::app::worker_threads()
74 .map(std::num::NonZeroUsize::get)
75 .unwrap_or_else(crate::num_threads)
76});
77
78const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];
79
80struct Builder<'a> {
81 config: &'a super::Config,
82 diff: &'a ConfigDiff,
83 shutdown_coordinator: SourceShutdownCoordinator,
84 errors: Vec<String>,
85 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
86 tasks: HashMap<ComponentKey, Task>,
87 buffers: HashMap<ComponentKey, BuiltBuffer>,
88 inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
89 healthchecks: HashMap<ComponentKey, Task>,
90 detach_triggers: HashMap<ComponentKey, Trigger>,
91 extra_context: ExtraContext,
92 utilization_emitter: Option<UtilizationEmitter>,
93 utilization_registry: UtilizationRegistry,
94}
95
96impl<'a> Builder<'a> {
97 fn new(
98 config: &'a super::Config,
99 diff: &'a ConfigDiff,
100 buffers: HashMap<ComponentKey, BuiltBuffer>,
101 extra_context: ExtraContext,
102 utilization_registry: Option<UtilizationRegistry>,
103 ) -> Self {
104 let (emitter, registry) = if let Some(registry) = utilization_registry {
107 (None, registry)
108 } else {
109 let (emitter, registry) = UtilizationEmitter::new();
110 (Some(emitter), registry)
111 };
112 Self {
113 config,
114 diff,
115 buffers,
116 shutdown_coordinator: SourceShutdownCoordinator::default(),
117 errors: vec![],
118 outputs: HashMap::new(),
119 tasks: HashMap::new(),
120 inputs: HashMap::new(),
121 healthchecks: HashMap::new(),
122 detach_triggers: HashMap::new(),
123 extra_context,
124 utilization_emitter: emitter,
125 utilization_registry: registry,
126 }
127 }
128
129 async fn build(mut self) -> Result<TopologyPieces, Vec<String>> {
131 let enrichment_tables = self.load_enrichment_tables().await;
132 let source_tasks = self.build_sources(enrichment_tables).await;
133 self.build_transforms(enrichment_tables).await;
134 self.build_sinks(enrichment_tables).await;
135
136 enrichment_tables.finish_load();
139
140 if self.errors.is_empty() {
141 Ok(TopologyPieces {
142 inputs: self.inputs,
143 outputs: Self::finalize_outputs(self.outputs),
144 tasks: self.tasks,
145 source_tasks,
146 healthchecks: self.healthchecks,
147 shutdown_coordinator: self.shutdown_coordinator,
148 detach_triggers: self.detach_triggers,
149 metrics_storage: METRICS_STORAGE.clone(),
150 utilization: self
151 .utilization_emitter
152 .map(|e| (e, self.utilization_registry)),
153 })
154 } else {
155 Err(self.errors)
156 }
157 }
158
159 fn finalize_outputs(
160 outputs: HashMap<OutputId, UnboundedSender<fanout::ControlMessage>>,
161 ) -> HashMap<ComponentKey, HashMap<Option<String>, UnboundedSender<fanout::ControlMessage>>>
162 {
163 let mut finalized_outputs = HashMap::new();
164 for (id, output) in outputs {
165 let entry = finalized_outputs
166 .entry(id.component)
167 .or_insert_with(HashMap::new);
168 entry.insert(id.port, output);
169 }
170
171 finalized_outputs
172 }
173
174 async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
177 let mut enrichment_tables = HashMap::new();
178
179 'tables: for (name, table_outer) in self.config.enrichment_tables.iter() {
181 let table_name = name.to_string();
182 if ENRICHMENT_TABLES.needs_reload(&table_name) {
183 let indexes = if !self.diff.enrichment_tables.is_added(name) {
184 Some(ENRICHMENT_TABLES.index_fields(&table_name))
187 } else {
188 None
189 };
190
191 let mut table = match table_outer.inner.build(&self.config.global).await {
192 Ok(table) => table,
193 Err(error) => {
194 self.errors
195 .push(format!("Enrichment Table \"{name}\": {error}"));
196 continue;
197 }
198 };
199
200 if let Some(indexes) = indexes {
201 for (case, index) in indexes {
202 match table
203 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
204 {
205 Ok(_) => (),
206 Err(error) => {
207 error!(message = "Unable to add index to reloaded enrichment table.",
211 table = ?name.to_string(),
212 %error);
213 continue 'tables;
214 }
215 }
216 }
217 }
218
219 enrichment_tables.insert(table_name, table);
220 }
221 }
222
223 ENRICHMENT_TABLES.load(enrichment_tables);
224
225 &ENRICHMENT_TABLES
226 }
227
228 async fn build_sources(
229 &mut self,
230 enrichment_tables: &vector_lib::enrichment::TableRegistry,
231 ) -> HashMap<ComponentKey, Task> {
232 let mut source_tasks = HashMap::new();
233
234 let table_sources = self
235 .config
236 .enrichment_tables
237 .iter()
238 .filter_map(|(key, table)| table.as_source(key))
239 .collect::<Vec<_>>();
240 for (key, source) in self
241 .config
242 .sources()
243 .filter(|(key, _)| self.diff.sources.contains_new(key))
244 .chain(
245 table_sources
246 .iter()
247 .map(|(key, source)| (key, source))
248 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
249 )
250 {
251 debug!(component_id = %key, "Building new source.");
252
253 let typetag = source.inner.get_component_name();
254 let source_outputs = source.inner.outputs(self.config.schema.log_namespace());
255
256 let span = error_span!(
257 "source",
258 component_kind = "source",
259 component_id = %key.id(),
260 component_type = %source.inner.get_component_name(),
261 );
262 let _entered_span = span.enter();
263
264 let task_name = format!(
265 ">> {} ({}, pump) >>",
266 source.inner.get_component_name(),
267 key.id()
268 );
269
270 let mut builder = SourceSender::builder()
271 .with_buffer(*SOURCE_SENDER_BUFFER_SIZE)
272 .with_timeout(source.inner.send_timeout())
273 .with_ewma_half_life_seconds(
274 self.config.global.buffer_utilization_ewma_half_life_seconds,
275 );
276 let mut pumps = Vec::new();
277 let mut controls = HashMap::new();
278 let mut schema_definitions = HashMap::with_capacity(source_outputs.len());
279
280 for output in source_outputs.into_iter() {
281 let rx = builder.add_source_output(output.clone(), key.clone());
282
283 let (fanout, control) = Fanout::new();
284 let source_type = source.inner.get_component_name();
285 let source = Arc::new(key.clone());
286
287 let pump = run_source_output_pump(rx, fanout, source, source_type);
288
289 pumps.push(pump.instrument(span.clone()));
290 controls.insert(
291 OutputId {
292 component: key.clone(),
293 port: output.port.clone(),
294 },
295 control,
296 );
297
298 let port = output.port.clone();
299 if let Some(definition) = output.schema_definition(self.config.schema.enabled) {
300 schema_definitions.insert(port, definition);
301 }
302 }
303
304 let (pump_error_tx, mut pump_error_rx) = oneshot::channel();
305 let pump = async move {
306 debug!("Source pump supervisor starting.");
307
308 let mut handles = FuturesUnordered::new();
313 for pump in pumps {
314 handles.push(spawn_named(pump, task_name.as_ref()));
315 }
316
317 let mut had_pump_error = false;
318 while let Some(output) = handles.try_next().await? {
319 if let Err(e) = output {
320 _ = pump_error_tx.send(e);
323 had_pump_error = true;
324 break;
325 }
326 }
327
328 if had_pump_error {
329 debug!("Source pump supervisor task finished with an error.");
330 } else {
331 debug!("Source pump supervisor task finished normally.");
332 }
333 Ok(TaskOutput::Source)
334 };
335 let pump = Task::new(key.clone(), typetag, pump);
336
337 let (shutdown_signal, force_shutdown_tripwire) = self
338 .shutdown_coordinator
339 .register_source(key, INTERNAL_SOURCES.contains(&typetag));
340
341 let context = SourceContext {
342 key: key.clone(),
343 globals: self.config.global.clone(),
344 enrichment_tables: enrichment_tables.clone(),
345 metrics_storage: METRICS_STORAGE.clone(),
346 shutdown: shutdown_signal,
347 out: builder.build(),
348 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy),
349 acknowledgements: source.sink_acknowledgements,
350 schema_definitions,
351 schema: self.config.schema,
352 extra_context: self.extra_context.clone(),
353 };
354 let server = match source.inner.build(context).await {
355 Err(error) => {
356 self.errors.push(format!("Source \"{key}\": {error}"));
357 continue;
358 }
359 Ok(server) => server,
360 };
361
362 let server = async move {
370 debug!("Source starting.");
371
372 let mut result = select! {
373 biased;
374
375 _ = force_shutdown_tripwire => Ok(()),
377
378 Ok(e) = &mut pump_error_rx => Err(e),
384
385 result = server => result.map_err(|_| TaskError::Opaque),
387 };
388
389 if let Ok(e) = pump_error_rx.try_recv() {
398 result = Err(e);
399 }
400
401 match result {
402 Ok(()) => {
403 debug!("Source finished normally.");
404 Ok(TaskOutput::Source)
405 }
406 Err(e) => {
407 debug!("Source finished with an error.");
408 Err(e)
409 }
410 }
411 };
412 let server = Task::new(key.clone(), typetag, server);
413
414 self.outputs.extend(controls);
415 self.tasks.insert(key.clone(), pump);
416 source_tasks.insert(key.clone(), server);
417 }
418
419 source_tasks
420 }
421
422 async fn build_transforms(
423 &mut self,
424 enrichment_tables: &vector_lib::enrichment::TableRegistry,
425 ) {
426 let mut definition_cache = HashMap::default();
427
428 for (key, transform) in self
429 .config
430 .transforms()
431 .filter(|(key, _)| self.diff.transforms.contains_new(key))
432 {
433 debug!(component_id = %key, "Building new transform.");
434
435 let input_definitions = match schema::input_definitions(
436 &transform.inputs,
437 self.config,
438 enrichment_tables.clone(),
439 &mut definition_cache,
440 ) {
441 Ok(definitions) => definitions,
442 Err(_) => {
443 return;
447 }
448 };
449
450 let merged_definition: Definition = input_definitions
451 .iter()
452 .map(|(_output_id, definition)| definition.clone())
453 .reduce(Definition::merge)
454 .unwrap_or_else(Definition::any);
456
457 let span = error_span!(
458 "transform",
459 component_kind = "transform",
460 component_id = %key.id(),
461 component_type = %transform.inner.get_component_name(),
462 );
463 let _span = span.enter();
464
465 let schema_definitions = transform
467 .inner
468 .outputs(
469 &TransformContext {
470 enrichment_tables: enrichment_tables.clone(),
471 metrics_storage: METRICS_STORAGE.clone(),
472 schema: self.config.schema,
473 ..Default::default()
474 },
475 &input_definitions,
476 )
477 .into_iter()
478 .map(|output| {
479 let definitions = output.schema_definitions(self.config.schema.enabled);
480 (output.port, definitions)
481 })
482 .collect::<HashMap<_, _>>();
483
484 let context = TransformContext {
485 key: Some(key.clone()),
486 globals: self.config.global.clone(),
487 enrichment_tables: enrichment_tables.clone(),
488 metrics_storage: METRICS_STORAGE.clone(),
489 schema_definitions,
490 merged_schema_definition: merged_definition.clone(),
491 schema: self.config.schema,
492 extra_context: self.extra_context.clone(),
493 };
494
495 let node =
496 TransformNode::from_parts(key.clone(), &context, transform, &input_definitions);
497
498 let transform = match transform
499 .inner
500 .build(&context)
501 .instrument(span.clone())
502 .await
503 {
504 Err(error) => {
505 self.errors.push(format!("Transform \"{key}\": {error}"));
506 continue;
507 }
508 Ok(transform) => transform,
509 };
510
511 let metrics = ChannelMetricMetadata::new(TRANSFORM_CHANNEL_METRIC_PREFIX, None);
512 let (input_tx, input_rx) = TopologyBuilder::standalone_memory(
513 TOPOLOGY_BUFFER_SIZE,
514 WhenFull::Block,
515 &span,
516 Some(metrics),
517 self.config.global.buffer_utilization_ewma_half_life_seconds,
518 );
519
520 self.inputs
521 .insert(key.clone(), (input_tx, node.inputs.clone()));
522
523 let (transform_task, transform_outputs) =
524 self.build_transform(transform, node, input_rx);
525
526 self.outputs.extend(transform_outputs);
527 self.tasks.insert(key.clone(), transform_task);
528 }
529 }
530
531 async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
532 let table_sinks = self
533 .config
534 .enrichment_tables
535 .iter()
536 .filter_map(|(key, table)| table.as_sink(key))
537 .collect::<Vec<_>>();
538 for (key, sink) in self
539 .config
540 .sinks()
541 .filter(|(key, _)| self.diff.sinks.contains_new(key))
542 .chain(
543 table_sinks
544 .iter()
545 .map(|(key, sink)| (key, sink))
546 .filter(|(key, _)| self.diff.enrichment_tables.contains_new(key)),
547 )
548 {
549 debug!(component_id = %key, "Building new sink.");
550
551 let sink_inputs = &sink.inputs;
552 let healthcheck = sink.healthcheck();
553 let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled;
554 let healthcheck_timeout = healthcheck.timeout;
555
556 let typetag = sink.inner.get_component_name();
557 let input_type = sink.inner.input().data_type();
558
559 let span = error_span!(
560 "sink",
561 component_kind = "sink",
562 component_id = %key.id(),
563 component_type = %sink.inner.get_component_name(),
564 );
565 let _entered_span = span.enter();
566
567 if let Err(mut err) = schema::validate_sink_expectations(
571 key,
572 sink,
573 self.config,
574 enrichment_tables.clone(),
575 ) {
576 self.errors.append(&mut err);
577 };
578
579 let (tx, rx) = match self.buffers.remove(key) {
580 Some(buffer) => buffer,
581 _ => {
582 let buffer_type =
583 match sink.buffer.stages().first().expect("cant ever be empty") {
584 BufferType::Memory { .. } => "memory",
585 BufferType::DiskV2 { .. } => "disk",
586 };
587 let buffer_span = error_span!("sink", buffer_type);
588 let buffer = sink
589 .buffer
590 .build(
591 self.config.global.data_dir.clone(),
592 key.to_string(),
593 buffer_span,
594 )
595 .await;
596 match buffer {
597 Err(error) => {
598 self.errors.push(format!("Sink \"{key}\": {error}"));
599 continue;
600 }
601 Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))),
602 }
603 }
604 };
605
606 let cx = SinkContext {
607 healthcheck,
608 globals: self.config.global.clone(),
609 enrichment_tables: enrichment_tables.clone(),
610 metrics_storage: METRICS_STORAGE.clone(),
611 proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()),
612 schema: self.config.schema,
613 app_name: crate::get_app_name().to_string(),
614 app_name_slug: crate::get_slugified_app_name(),
615 extra_context: self.extra_context.clone(),
616 };
617
618 let (sink, healthcheck) = match sink.inner.build(cx).await {
619 Err(error) => {
620 self.errors.push(format!("Sink \"{key}\": {error}"));
621 continue;
622 }
623 Ok(built) => built,
624 };
625
626 let (trigger, tripwire) = Tripwire::new();
627
628 let utilization_sender = self
629 .utilization_registry
630 .add_component(key.clone(), gauge!(GaugeName::Utilization));
631 let component_key = key.clone();
632 let sink = async move {
633 debug!("Sink starting.");
634
635 let rx = rx
642 .lock()
643 .unwrap()
644 .take()
645 .expect("Task started but input has been taken.");
646
647 let mut rx = Utilization::new(utilization_sender, component_key.clone(), rx);
648
649 let events_received = register!(EventsReceived);
650 sink.run(
651 rx.by_ref()
652 .filter(|events: &EventArray| ready(filter_events_type(events, input_type)))
653 .inspect(|events| {
654 events_received.emit(CountByteSize(
655 events.len(),
656 events.estimated_json_encoded_size_of(),
657 ))
658 })
659 .take_until_if(tripwire),
660 )
661 .await
662 .map(|_| {
663 debug!("Sink finished normally.");
664 TaskOutput::Sink(rx)
665 })
666 .map_err(|_| {
667 debug!("Sink finished with an error.");
668 TaskError::Opaque
669 })
670 };
671
672 let task = Task::new(key.clone(), typetag, sink);
673
674 let component_key = key.clone();
675 let healthcheck_task = async move {
676 if enable_healthcheck {
677 timeout(healthcheck_timeout, healthcheck)
678 .map(|result| match result {
679 Ok(Ok(_)) => {
680 info!("Healthcheck passed.");
681 Ok(TaskOutput::Healthcheck)
682 }
683 Ok(Err(error)) => {
684 error!(
685 msg = "Healthcheck failed.",
686 %error,
687 component_kind = "sink",
688 component_type = typetag,
689 component_id = %component_key.id(),
690 );
691 Err(TaskError::wrapped(error))
692 }
693 Err(e) => {
694 error!(
695 msg = "Healthcheck timed out.",
696 component_kind = "sink",
697 component_type = typetag,
698 component_id = %component_key.id(),
699 );
700 Err(TaskError::wrapped(Box::new(e)))
701 }
702 })
703 .await
704 } else {
705 info!("Healthcheck disabled.");
706 Ok(TaskOutput::Healthcheck)
707 }
708 };
709
710 let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task);
711
712 self.inputs.insert(key.clone(), (tx, sink_inputs.clone()));
713 self.healthchecks.insert(key.clone(), healthcheck_task);
714 self.tasks.insert(key.clone(), task);
715 self.detach_triggers.insert(key.clone(), trigger);
716 }
717 }
718
719 fn build_transform(
720 &self,
721 transform: Transform,
722 node: TransformNode,
723 input_rx: BufferReceiver<EventArray>,
724 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
725 match transform {
726 Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx),
728 Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx),
729 Transform::Task(t) => self.build_task_transform(
730 t,
731 input_rx,
732 node.input_details.data_type(),
733 node.typetag,
734 &node.key,
735 &node.outputs,
736 ),
737 }
738 }
739
740 fn build_sync_transform(
741 &self,
742 t: Box<dyn SyncTransform>,
743 node: TransformNode,
744 input_rx: BufferReceiver<EventArray>,
745 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
746 let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key);
747
748 let sender = self
749 .utilization_registry
750 .add_component(node.key.clone(), gauge!(GaugeName::Utilization));
751 let runner = Runner::new(
752 t,
753 input_rx,
754 sender,
755 node.input_details.data_type(),
756 outputs,
757 LatencyRecorder::new(self.config.global.latency_ewma_alpha),
758 );
759 let transform = if node.enable_concurrency {
760 runner.run_concurrently().boxed()
761 } else {
762 runner.run_inline().boxed()
763 };
764
765 let transform = async move {
766 debug!("Synchronous transform starting.");
767
768 match transform.await {
769 Ok(v) => {
770 debug!("Synchronous transform finished normally.");
771 Ok(v)
772 }
773 Err(e) => {
774 debug!("Synchronous transform finished with an error.");
775 Err(e)
776 }
777 }
778 };
779
780 let mut output_controls = HashMap::new();
781 for (name, control) in controls {
782 let id = name
783 .map(|name| OutputId::from((&node.key, name)))
784 .unwrap_or_else(|| OutputId::from(&node.key));
785 output_controls.insert(id, control);
786 }
787
788 let task = Task::new(node.key.clone(), node.typetag, transform);
789
790 (task, output_controls)
791 }
792
793 fn build_task_transform(
794 &self,
795 t: Box<dyn TaskTransform<EventArray>>,
796 input_rx: BufferReceiver<EventArray>,
797 input_type: DataType,
798 typetag: &str,
799 key: &ComponentKey,
800 outputs: &[TransformOutput],
801 ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
802 let (mut fanout, control) = Fanout::new();
803
804 let sender = self
805 .utilization_registry
806 .add_component(key.clone(), gauge!(GaugeName::Utilization));
807 let output_sender = sender.clone();
808 let input_rx = Utilization::new(sender, key.clone(), input_rx.into_stream());
809
810 let events_received = register!(EventsReceived);
811 let filtered = input_rx
812 .filter(move |events| ready(filter_events_type(events, input_type)))
813 .inspect(move |events| {
814 events_received.emit(CountByteSize(
815 events.len(),
816 events.estimated_json_encoded_size_of(),
817 ))
818 });
819 let events_sent = register!(EventsSent::from(internal_event::Output(None)));
820 let output_id = Arc::new(OutputId {
821 component: key.clone(),
822 port: None,
823 });
824 let latency_recorder = LatencyRecorder::new(self.config.global.latency_ewma_alpha);
825
826 let schema_definition_map = outputs
828 .iter()
829 .find(|x| x.port.is_none())
830 .expect("output for default port required for task transforms")
831 .log_schema_definitions
832 .clone()
833 .into_iter()
834 .map(|(key, value)| (key, Arc::new(value)))
835 .collect();
836
837 let stream = t
838 .transform(Box::pin(filtered))
839 .map(move |mut events| {
840 for event in events.iter_events_mut() {
841 update_runtime_schema_definition(event, &output_id, &schema_definition_map);
842 }
843 let now = Instant::now();
844 latency_recorder.on_send(&mut events, now);
845 (events, now)
846 })
847 .inspect(move |(events, _): &(EventArray, Instant)| {
848 events_sent.emit(CountByteSize(
849 events.len(),
850 events.estimated_json_encoded_size_of(),
851 ));
852 });
853 let stream = OutputUtilization::new(output_sender, stream);
854 let transform = async move {
855 debug!("Task transform starting.");
856
857 match fanout.send_stream(stream).await {
858 Ok(()) => {
859 debug!("Task transform finished normally.");
860 Ok(TaskOutput::Transform)
861 }
862 Err(e) => {
863 debug!("Task transform finished with an error.");
864 Err(TaskError::wrapped(e))
865 }
866 }
867 }
868 .boxed();
869
870 let mut outputs = HashMap::new();
871 outputs.insert(OutputId::from(key), control);
872
873 let task = Task::new(key.clone(), typetag, transform);
874
875 (task, outputs)
876 }
877}
878
879async fn run_source_output_pump(
880 mut rx: LimitedReceiver<SourceSenderItem>,
881 mut fanout: Fanout,
882 source: Arc<ComponentKey>,
883 source_type: &'static str,
884) -> TaskResult {
885 debug!("Source pump starting.");
886
887 let mut control_channel_open = true;
888 loop {
889 tokio::select! {
890 biased;
891 alive = fanout.recv_control_message(), if control_channel_open => {
895 control_channel_open = alive;
896 }
897 item = rx.next() => {
898 match item {
899 Some(SourceSenderItem { events: mut array, send_reference }) => {
900 let now = Instant::now();
908 array.for_each_metadata_mut(|metadata| {
909 metadata.set_source_id(Arc::clone(&source));
910 metadata.set_source_type(source_type);
911 metadata.set_last_transform_timestamp(now);
912 });
913 fanout
914 .send(array, Some(send_reference))
915 .await
916 .map_err(|e| {
917 debug!("Source pump finished with an error.");
918 TaskError::wrapped(e)
919 })?;
920 }
921 None => break,
922 }
923 }
924 }
925 }
926
927 debug!("Source pump finished normally.");
928 Ok(TaskOutput::Source)
929}
930
931pub async fn reload_enrichment_tables(config: &Config) {
932 let mut enrichment_tables = HashMap::new();
933 'tables: for (name, table_outer) in config.enrichment_tables.iter() {
935 let table_name = name.to_string();
936 if ENRICHMENT_TABLES.needs_reload(&table_name) {
937 let indexes = Some(ENRICHMENT_TABLES.index_fields(&table_name));
938
939 let mut table = match table_outer.inner.build(&config.global).await {
940 Ok(table) => table,
941 Err(error) => {
942 error!("Enrichment table \"{name}\" reload failed: {error}");
943 continue;
944 }
945 };
946
947 if let Some(indexes) = indexes {
948 for (case, index) in indexes {
949 match table
950 .add_index(case, &index.iter().map(|s| s.as_ref()).collect::<Vec<_>>())
951 {
952 Ok(_) => (),
953 Err(error) => {
954 error!(
958 message = "Unable to add index to reloaded enrichment table.",
959 table = ?name.to_string(),
960 %error
961 );
962 continue 'tables;
963 }
964 }
965 }
966 }
967
968 enrichment_tables.insert(table_name, table);
969 }
970 }
971
972 ENRICHMENT_TABLES.load(enrichment_tables);
973 ENRICHMENT_TABLES.finish_load();
974}
975
976pub struct TopologyPieces {
977 pub(super) inputs: HashMap<ComponentKey, (BufferSender<EventArray>, Inputs<OutputId>)>,
978 pub(crate) outputs: HashMap<ComponentKey, HashMap<Option<String>, fanout::ControlChannel>>,
979 pub(super) tasks: HashMap<ComponentKey, Task>,
980 pub(crate) source_tasks: HashMap<ComponentKey, Task>,
981 pub(super) healthchecks: HashMap<ComponentKey, Task>,
982 pub(crate) shutdown_coordinator: SourceShutdownCoordinator,
983 pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>,
984 pub(crate) metrics_storage: MetricsStorage,
985 pub(crate) utilization: Option<(UtilizationEmitter, UtilizationRegistry)>,
986}
987
988pub struct TopologyPiecesBuilder<'a> {
1000 config: &'a Config,
1001 diff: &'a ConfigDiff,
1002 buffers: HashMap<ComponentKey, BuiltBuffer>,
1003 extra_context: ExtraContext,
1004 utilization_registry: Option<UtilizationRegistry>,
1005}
1006
1007impl<'a> TopologyPiecesBuilder<'a> {
1008 pub fn new(config: &'a Config, diff: &'a ConfigDiff) -> Self {
1010 Self {
1011 config,
1012 diff,
1013 buffers: HashMap::new(),
1014 extra_context: ExtraContext::default(),
1015 utilization_registry: None,
1016 }
1017 }
1018
1019 pub fn with_buffers(mut self, buffers: HashMap<ComponentKey, BuiltBuffer>) -> Self {
1021 self.buffers = buffers;
1022 self
1023 }
1024
1025 pub fn with_extra_context(mut self, extra_context: ExtraContext) -> Self {
1027 self.extra_context = extra_context;
1028 self
1029 }
1030
1031 pub fn with_utilization_registry(mut self, registry: Option<UtilizationRegistry>) -> Self {
1033 self.utilization_registry = registry;
1034 self
1035 }
1036
1037 pub async fn build(self) -> Result<TopologyPieces, Vec<String>> {
1042 Builder::new(
1043 self.config,
1044 self.diff,
1045 self.buffers,
1046 self.extra_context,
1047 self.utilization_registry,
1048 )
1049 .build()
1050 .await
1051 }
1052
1053 pub async fn build_or_log_errors(self) -> Option<TopologyPieces> {
1058 match self.build().await {
1059 Err(errors) => {
1060 for error in errors {
1061 error!(message = "Configuration error.", %error, internal_log_rate_limit = false);
1062 }
1063 None
1064 }
1065 Ok(new_pieces) => Some(new_pieces),
1066 }
1067 }
1068}
1069
1070impl TopologyPieces {
1071 pub async fn build_or_log_errors(
1072 config: &Config,
1073 diff: &ConfigDiff,
1074 buffers: HashMap<ComponentKey, BuiltBuffer>,
1075 extra_context: ExtraContext,
1076 utilization_registry: Option<UtilizationRegistry>,
1077 ) -> Option<Self> {
1078 TopologyPiecesBuilder::new(config, diff)
1079 .with_buffers(buffers)
1080 .with_extra_context(extra_context)
1081 .with_utilization_registry(utilization_registry)
1082 .build_or_log_errors()
1083 .await
1084 }
1085
1086 pub async fn build(
1088 config: &super::Config,
1089 diff: &ConfigDiff,
1090 buffers: HashMap<ComponentKey, BuiltBuffer>,
1091 extra_context: ExtraContext,
1092 utilization_registry: Option<UtilizationRegistry>,
1093 ) -> Result<Self, Vec<String>> {
1094 TopologyPiecesBuilder::new(config, diff)
1095 .with_buffers(buffers)
1096 .with_extra_context(extra_context)
1097 .with_utilization_registry(utilization_registry)
1098 .build()
1099 .await
1100 }
1101}
1102
1103const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool {
1104 match events {
1105 EventArray::Logs(_) => data_type.contains(DataType::Log),
1106 EventArray::Metrics(_) => data_type.contains(DataType::Metric),
1107 EventArray::Traces(_) => data_type.contains(DataType::Trace),
1108 }
1109}
1110
1111#[derive(Debug, Clone)]
1112struct TransformNode {
1113 key: ComponentKey,
1114 typetag: &'static str,
1115 inputs: Inputs<OutputId>,
1116 input_details: Input,
1117 outputs: Vec<TransformOutput>,
1118 enable_concurrency: bool,
1119}
1120
1121impl TransformNode {
1122 pub fn from_parts(
1123 key: ComponentKey,
1124 context: &TransformContext,
1125 transform: &TransformOuter<OutputId>,
1126 schema_definition: &[(OutputId, Definition)],
1127 ) -> Self {
1128 Self {
1129 key,
1130 typetag: transform.inner.get_component_name(),
1131 inputs: transform.inputs.clone(),
1132 input_details: transform.inner.input(),
1133 outputs: transform.inner.outputs(context, schema_definition),
1134 enable_concurrency: transform.inner.enable_concurrency(),
1135 }
1136 }
1137}
1138
1139struct Runner {
1140 transform: Box<dyn SyncTransform>,
1141 input_rx: Option<BufferReceiver<EventArray>>,
1142 input_type: DataType,
1143 outputs: TransformOutputs,
1144 timer_tx: UtilizationComponentSender,
1145 latency_recorder: LatencyRecorder,
1146 events_received: Registered<EventsReceived>,
1147}
1148
1149impl Runner {
1150 fn new(
1151 transform: Box<dyn SyncTransform>,
1152 input_rx: BufferReceiver<EventArray>,
1153 timer_tx: UtilizationComponentSender,
1154 input_type: DataType,
1155 outputs: TransformOutputs,
1156 latency_recorder: LatencyRecorder,
1157 ) -> Self {
1158 Self {
1159 transform,
1160 input_rx: Some(input_rx),
1161 input_type,
1162 outputs,
1163 timer_tx,
1164 latency_recorder,
1165 events_received: register!(EventsReceived),
1166 }
1167 }
1168
1169 fn on_events_received(&mut self, events: &EventArray) {
1170 self.timer_tx.try_send_stop_wait();
1171
1172 self.events_received.emit(CountByteSize(
1173 events.len(),
1174 events.estimated_json_encoded_size_of(),
1175 ));
1176 }
1177
1178 async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> {
1179 self.timer_tx.try_send_start_wait();
1180 let now = Instant::now();
1181 outputs_buf.for_each_array_mut(|array| self.latency_recorder.on_send(array, now));
1182 self.outputs.send(outputs_buf).await
1183 }
1184
1185 async fn run_inline(mut self) -> TaskResult {
1186 const INLINE_BATCH_SIZE: usize = 128;
1188
1189 let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE);
1190
1191 let mut input_rx = self
1192 .input_rx
1193 .take()
1194 .expect("can't run runner twice")
1195 .into_stream()
1196 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1197
1198 self.timer_tx.try_send_start_wait();
1199 while let Some(events) = input_rx.next().await {
1200 self.on_events_received(&events);
1201 self.transform.transform_all(events, &mut outputs_buf);
1202 self.send_outputs(&mut outputs_buf)
1203 .await
1204 .map_err(TaskError::wrapped)?;
1205 }
1206
1207 Ok(TaskOutput::Transform)
1208 }
1209
1210 async fn run_concurrently(mut self) -> TaskResult {
1211 let input_rx = self
1212 .input_rx
1213 .take()
1214 .expect("can't run runner twice")
1215 .into_stream()
1216 .filter(move |events| ready(filter_events_type(events, self.input_type)));
1217
1218 let mut input_rx =
1219 super::ready_arrays::ReadyArrays::with_capacity(input_rx, READY_ARRAY_CAPACITY);
1220
1221 let mut in_flight = FuturesOrdered::new();
1222 let mut shutting_down = false;
1223
1224 self.timer_tx.try_send_start_wait();
1225 loop {
1226 tokio::select! {
1227 biased;
1228
1229 result = in_flight.next(), if !in_flight.is_empty() => {
1230 match result {
1231 Some(Ok(mut outputs_buf)) => {
1232 self.send_outputs(&mut outputs_buf).await
1233 .map_err(TaskError::wrapped)?;
1234 }
1235 _ => unreachable!("join error or bad poll"),
1236 }
1237 }
1238
1239 input_arrays = input_rx.next(), if in_flight.len() < *TRANSFORM_CONCURRENCY_LIMIT && !shutting_down => {
1240 match input_arrays {
1241 Some(input_arrays) => {
1242 let mut len = 0;
1243 for events in &input_arrays {
1244 self.on_events_received(events);
1245 len += events.len();
1246 }
1247
1248 let mut t = self.transform.clone();
1249 let mut outputs_buf = self.outputs.new_buf_with_capacity(len);
1250 let task = tokio::spawn(async move {
1251 for events in input_arrays {
1252 t.transform_all(events, &mut outputs_buf);
1253 }
1254 outputs_buf
1255 }.in_current_span());
1256 in_flight.push_back(task);
1257 }
1258 None => {
1259 shutting_down = true;
1260 continue
1261 }
1262 }
1263 }
1264
1265 else => {
1266 if shutting_down {
1267 break
1268 }
1269 }
1270 }
1271 }
1272
1273 Ok(TaskOutput::Transform)
1274 }
1275}