vector/topology/
running.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::{Arc, Mutex},
4};
5
6use futures::{Future, FutureExt, future};
7use snafu::Snafu;
8use stream_cancel::Trigger;
9use tokio::{
10    sync::{mpsc, watch},
11    time::{Duration, Instant, interval, sleep_until},
12};
13use tracing::Instrument;
14use vector_lib::{
15    buffers::topology::channel::BufferSender,
16    shutdown::ShutdownSignal,
17    tap::topology::{TapOutput, TapResource, WatchRx, WatchTx},
18    trigger::DisabledTrigger,
19};
20
21use super::{
22    BuiltBuffer, TaskHandle,
23    builder::{self, TopologyPieces, TopologyPiecesBuilder, reload_enrichment_tables},
24    fanout::{ControlChannel, ControlMessage},
25    handle_errors, retain, take_healthchecks,
26    task::{Task, TaskOutput},
27};
28use crate::{
29    config::{ComponentKey, Config, ConfigDiff, HealthcheckOptions, Inputs, OutputId, Resource},
30    event::EventArray,
31    extra_context::ExtraContext,
32    shutdown::SourceShutdownCoordinator,
33    signal::ShutdownError,
34    spawn_named,
35    utilization::UtilizationRegistry,
36};
37
38pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver<ShutdownError>;
39
40#[derive(Debug, Snafu)]
41pub enum ReloadError {
42    #[snafu(display("global options changed: {}", changed_fields.join(", ")))]
43    GlobalOptionsChanged { changed_fields: Vec<String> },
44    #[snafu(display("failed to compute global diff: {}", source))]
45    GlobalDiffFailed { source: serde_json::Error },
46    #[snafu(display("topology build failed"))]
47    TopologyBuildFailed,
48    #[snafu(display("failed to restore previous config"))]
49    FailedToRestore,
50}
51
52#[allow(dead_code)]
53pub struct RunningTopology {
54    inputs: HashMap<ComponentKey, BufferSender<EventArray>>,
55    inputs_tap_metadata: HashMap<ComponentKey, Inputs<OutputId>>,
56    outputs: HashMap<OutputId, ControlChannel>,
57    outputs_tap_metadata: HashMap<ComponentKey, (&'static str, String)>,
58    component_type_names: HashMap<ComponentKey, String>,
59    source_tasks: HashMap<ComponentKey, TaskHandle>,
60    tasks: HashMap<ComponentKey, TaskHandle>,
61    shutdown_coordinator: SourceShutdownCoordinator,
62    detach_triggers: HashMap<ComponentKey, DisabledTrigger>,
63    pub(crate) config: Config,
64    pub(crate) abort_tx: mpsc::UnboundedSender<ShutdownError>,
65    watch: (WatchTx, WatchRx),
66    graceful_shutdown_duration: Option<Duration>,
67    utilization_registry: Option<UtilizationRegistry>,
68    utilization_task: Option<TaskHandle>,
69    utilization_task_shutdown_trigger: Option<Trigger>,
70    metrics_task: Option<TaskHandle>,
71    metrics_task_shutdown_trigger: Option<Trigger>,
72    pending_reload: Option<HashSet<ComponentKey>>,
73}
74
75impl RunningTopology {
76    pub fn new(config: Config, abort_tx: mpsc::UnboundedSender<ShutdownError>) -> Self {
77        Self {
78            inputs: HashMap::new(),
79            inputs_tap_metadata: HashMap::new(),
80            outputs: HashMap::new(),
81            outputs_tap_metadata: HashMap::new(),
82            component_type_names: HashMap::new(),
83            shutdown_coordinator: SourceShutdownCoordinator::default(),
84            detach_triggers: HashMap::new(),
85            source_tasks: HashMap::new(),
86            tasks: HashMap::new(),
87            abort_tx,
88            watch: watch::channel(TapResource::default()),
89            graceful_shutdown_duration: config.graceful_shutdown_duration,
90            config,
91            utilization_registry: None,
92            utilization_task: None,
93            utilization_task_shutdown_trigger: None,
94            metrics_task: None,
95            metrics_task_shutdown_trigger: None,
96            pending_reload: None,
97        }
98    }
99
100    /// Gets the configuration that represents this running topology.
101    pub const fn config(&self) -> &Config {
102        &self.config
103    }
104
105    /// Adds a set of component keys to the pending reload set if one exists. Otherwise, it
106    /// initializes the pending reload set.
107    pub fn extend_reload_set(&mut self, new_set: HashSet<ComponentKey>) {
108        match &mut self.pending_reload {
109            None => self.pending_reload = Some(new_set.clone()),
110            Some(existing) => existing.extend(new_set),
111        }
112    }
113
114    /// Creates a subscription to topology changes.
115    ///
116    /// This is used by the tap API to observe configuration changes, and re-wire tap sinks.
117    pub fn watch(&self) -> watch::Receiver<TapResource> {
118        self.watch.1.clone()
119    }
120
121    /// Signal that all sources in this topology are ended.
122    ///
123    /// The future returned by this function will finish once all the sources in
124    /// this topology have finished. This allows the caller to wait for or
125    /// detect that the sources in the topology are no longer
126    /// producing. [`Application`][crate::app::Application], as an example, uses this as a
127    /// shutdown signal.
128    pub fn sources_finished(&self) -> future::BoxFuture<'static, ()> {
129        self.shutdown_coordinator.shutdown_tripwire()
130    }
131
132    /// Shut down all topology components.
133    ///
134    /// This function sends the shutdown signal to all sources in this topology
135    /// and returns a future that resolves once all components (sources,
136    /// transforms, and sinks) have finished shutting down. Transforms and sinks
137    /// will shut down automatically once their input tasks finish.
138    ///
139    /// This function takes ownership of `self`, so once it returns everything
140    /// in the [`RunningTopology`] instance has been dropped except for the
141    /// `tasks` map. This map gets moved into the returned future and is used to
142    /// poll for when the tasks have completed. Once the returned future is
143    /// dropped then everything from this RunningTopology instance is fully
144    /// dropped.
145    pub fn stop(self) -> impl Future<Output = ()> {
146        // Create handy handles collections of all tasks for the subsequent
147        // operations.
148        let mut wait_handles = Vec::new();
149        // We need a Vec here since source components have two tasks. One for
150        // pump in self.tasks, and the other for source in self.source_tasks.
151        let mut check_handles = HashMap::<ComponentKey, Vec<_>>::new();
152
153        let map_closure = |_result| ();
154
155        // We need to give some time to the sources to gracefully shutdown, so
156        // we will merge them with other tasks.
157        for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) {
158            let task = task.map(map_closure).shared();
159
160            wait_handles.push(task.clone());
161            check_handles.entry(key).or_default().push(task);
162        }
163
164        if let Some(utilization_task) = self.utilization_task {
165            wait_handles.push(utilization_task.map(map_closure).shared());
166        }
167
168        if let Some(metrics_task) = self.metrics_task {
169            wait_handles.push(metrics_task.map(map_closure).shared());
170        }
171
172        // If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown.
173        let deadline = self
174            .graceful_shutdown_duration
175            .map(|grace_period| Instant::now() + grace_period);
176
177        let timeout = if let Some(deadline) = deadline {
178            // If we reach the deadline, this future will print out which components
179            // won't gracefully shutdown since we will start to forcefully shutdown
180            // the sources.
181            let mut check_handles2 = check_handles.clone();
182            Box::pin(async move {
183                sleep_until(deadline).await;
184                // Remove all tasks that have shutdown.
185                check_handles2.retain(|_key, handles| {
186                    retain(handles, |handle| handle.peek().is_none());
187                    !handles.is_empty()
188                });
189                let remaining_components = check_handles2
190                    .keys()
191                    .map(|item| item.to_string())
192                    .collect::<Vec<_>>()
193                    .join(", ");
194
195                error!(
196                    components = ?remaining_components,
197                    message = "Failed to gracefully shut down in time. Killing components.",
198                    internal_log_rate_limit = false
199                );
200            }) as future::BoxFuture<'static, ()>
201        } else {
202            Box::pin(future::pending()) as future::BoxFuture<'static, ()>
203        };
204
205        // Reports in intervals which components are still running.
206        let mut interval = interval(Duration::from_secs(5));
207        let reporter = async move {
208            loop {
209                interval.tick().await;
210
211                // Remove all tasks that have shutdown.
212                check_handles.retain(|_key, handles| {
213                    retain(handles, |handle| handle.peek().is_none());
214                    !handles.is_empty()
215                });
216                let remaining_components = check_handles
217                    .keys()
218                    .map(|item| item.to_string())
219                    .collect::<Vec<_>>()
220                    .join(", ");
221
222                let (deadline_passed, time_remaining) = match deadline {
223                    Some(d) => match d.checked_duration_since(Instant::now()) {
224                        Some(remaining) => (false, format!("{} seconds left", remaining.as_secs())),
225                        None => (true, "overdue".to_string()),
226                    },
227                    None => (false, "no time limit".to_string()),
228                };
229
230                info!(
231                    remaining_components = ?remaining_components,
232                    time_remaining = ?time_remaining,
233                    "Shutting down... Waiting on running components."
234                );
235
236                let all_done = check_handles.is_empty();
237
238                if all_done {
239                    info!("Shutdown reporter exiting: all components shut down.");
240                    break;
241                } else if deadline_passed {
242                    error!(remaining_components = ?remaining_components, "Shutdown reporter: deadline exceeded.");
243                    break;
244                }
245            }
246        };
247
248        // Finishes once all tasks have shutdown.
249        let success = futures::future::join_all(wait_handles).map(|_| ());
250
251        // Aggregate future that ends once anything detects that all tasks have shutdown.
252        let shutdown_complete_future = future::select_all(vec![
253            Box::pin(timeout) as future::BoxFuture<'static, ()>,
254            Box::pin(reporter) as future::BoxFuture<'static, ()>,
255            Box::pin(success) as future::BoxFuture<'static, ()>,
256        ]);
257
258        // Now kick off the shutdown process by shutting down the sources.
259        let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
260        if let Some(trigger) = self.utilization_task_shutdown_trigger {
261            trigger.cancel();
262        }
263        if let Some(trigger) = self.metrics_task_shutdown_trigger {
264            trigger.cancel();
265        }
266
267        futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
268    }
269
270    /// Attempts to load a new configuration and update this running topology.
271    ///
272    /// If the new configuration was valid, and all changes were able to be made -- removing of
273    /// old components, changing of existing components, adding of new components -- then
274    /// `Ok(())` is returned.
275    ///
276    /// If the new configuration is not valid, or not all of the changes in the new configuration
277    /// were able to be made, then this method will attempt to undo the changes made and bring the
278    /// topology back to its previous state, returning the appropriate error.
279    ///
280    /// If the restore also fails, `ReloadError::FailedToRestore` is returned.
281    pub async fn reload_config_and_respawn(
282        &mut self,
283        new_config: Config,
284        extra_context: ExtraContext,
285    ) -> Result<(), ReloadError> {
286        info!("Reloading running topology with new configuration.");
287
288        if self.config.global != new_config.global {
289            return match self.config.global.diff(&new_config.global) {
290                Ok(changed_fields) => Err(ReloadError::GlobalOptionsChanged { changed_fields }),
291                Err(source) => Err(ReloadError::GlobalDiffFailed { source }),
292            };
293        }
294
295        // Calculate the change between the current configuration and the new configuration, and
296        // shutdown any components that are changing so that we can reclaim their buffers before
297        // spawning the new version of the component.
298        //
299        // We also shutdown any component that is simply being removed entirely.
300        let diff = if let Some(components) = &self.pending_reload {
301            ConfigDiff::new(&self.config, &new_config, components.clone())
302        } else {
303            ConfigDiff::new(&self.config, &new_config, HashSet::new())
304        };
305        let buffers = self.shutdown_diff(&diff, &new_config).await;
306
307        // Gives windows some time to make available any port
308        // released by shutdown components.
309        // Issue: https://github.com/vectordotdev/vector/issues/3035
310        if cfg!(windows) {
311            // This value is guess work.
312            tokio::time::sleep(Duration::from_millis(200)).await;
313        }
314
315        // Try to build all of the new components coming from the new configuration.  If we can
316        // successfully build them, we'll attempt to connect them up to the topology and spawn their
317        // respective component tasks.
318        if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&new_config, &diff)
319            .with_buffers(buffers.clone())
320            .with_extra_context(extra_context.clone())
321            .with_utilization_registry(self.utilization_registry.clone())
322            .build_or_log_errors()
323            .await
324        {
325            // If healthchecks are configured for any of the changing/new components, try running
326            // them before moving forward with connecting and spawning.  In some cases, healthchecks
327            // failing may be configured as a non-blocking issue and so we'll still continue on.
328            if self
329                .run_healthchecks(&diff, &mut new_pieces, new_config.healthchecks)
330                .await
331            {
332                self.connect_diff(&diff, &mut new_pieces).await;
333                self.spawn_diff(&diff, new_pieces);
334                self.config = new_config;
335
336                info!("New configuration loaded successfully.");
337
338                return Ok(());
339            }
340        }
341
342        // We failed to build, connect, and spawn all of the changed/new components, so we flip
343        // around the configuration differential to generate all the components that we need to
344        // bring back to restore the current configuration.
345        warn!("Failed to completely load new configuration. Restoring old configuration.");
346
347        let diff = diff.flip();
348        if let Some(mut new_pieces) = TopologyPiecesBuilder::new(&self.config, &diff)
349            .with_buffers(buffers)
350            .with_extra_context(extra_context.clone())
351            .with_utilization_registry(self.utilization_registry.clone())
352            .build_or_log_errors()
353            .await
354            && self
355                .run_healthchecks(&diff, &mut new_pieces, self.config.healthchecks)
356                .await
357        {
358            self.connect_diff(&diff, &mut new_pieces).await;
359            self.spawn_diff(&diff, new_pieces);
360
361            info!("Old configuration restored successfully.");
362
363            return Err(ReloadError::TopologyBuildFailed);
364        }
365
366        error!(
367            message = "Failed to restore old configuration.",
368            internal_log_rate_limit = false
369        );
370
371        Err(ReloadError::FailedToRestore)
372    }
373
374    /// Attempts to reload enrichment tables.
375    pub(crate) async fn reload_enrichment_tables(&self) {
376        reload_enrichment_tables(&self.config).await;
377    }
378
379    pub(crate) async fn run_healthchecks(
380        &mut self,
381        diff: &ConfigDiff,
382        pieces: &mut TopologyPieces,
383        options: HealthcheckOptions,
384    ) -> bool {
385        if options.enabled {
386            let healthchecks = take_healthchecks(diff, pieces)
387                .into_iter()
388                .map(|(_, task)| task);
389            let healthchecks = future::try_join_all(healthchecks);
390
391            info!("Running healthchecks.");
392            if options.require_healthy {
393                let success = healthchecks.await;
394
395                if success.is_ok() {
396                    info!("All healthchecks passed.");
397                    true
398                } else {
399                    error!(
400                        message = "Sinks unhealthy.",
401                        internal_log_rate_limit = false
402                    );
403                    false
404                }
405            } else {
406                tokio::spawn(healthchecks);
407                true
408            }
409        } else {
410            true
411        }
412    }
413
414    /// Shuts down any changed/removed component in the given configuration diff.
415    ///
416    /// If buffers for any of the changed/removed components can be recovered, they'll be returned.
417    async fn shutdown_diff(
418        &mut self,
419        diff: &ConfigDiff,
420        new_config: &Config,
421    ) -> HashMap<ComponentKey, BuiltBuffer> {
422        // First, we shutdown any changed/removed sources. This ensures that we can allow downstream
423        // components to terminate naturally by virtue of the flow of events stopping.
424        if diff.sources.any_changed_or_removed() {
425            let timeout = Duration::from_secs(30);
426            let mut source_shutdown_handles = Vec::new();
427
428            let deadline = Instant::now() + timeout;
429            for key in &diff.sources.to_remove {
430                debug!(component_id = %key, "Removing source.");
431
432                let previous = self.tasks.remove(key).unwrap();
433                drop(previous); // detach and forget
434
435                self.remove_outputs(key);
436                source_shutdown_handles
437                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
438            }
439
440            for key in &diff.sources.to_change {
441                debug!(component_id = %key, "Changing source.");
442
443                self.remove_outputs(key);
444                source_shutdown_handles
445                    .push(self.shutdown_coordinator.shutdown_source(key, deadline));
446            }
447
448            debug!(
449                "Waiting for up to {} seconds for source(s) to finish shutting down.",
450                timeout.as_secs()
451            );
452            futures::future::join_all(source_shutdown_handles).await;
453
454            // Final cleanup pass now that all changed/removed sources have signalled as having shutdown.
455            for key in diff.sources.removed_and_changed() {
456                if let Some(task) = self.source_tasks.remove(key) {
457                    task.await.unwrap().unwrap();
458                }
459            }
460        }
461
462        // Next, we shutdown any changed/removed transforms.  Same as before: we want allow
463        // downstream components to terminate naturally by virtue of the flow of events stopping.
464        //
465        // Since transforms are entirely driven by the flow of events into them from upstream
466        // components, the shutdown of sources they depend on, or the shutdown of transforms they
467        // depend on, and thus the closing of their buffer, will naturally cause them to shutdown,
468        // which is why we don't do any manual triggering of shutdown here.
469        for key in &diff.transforms.to_remove {
470            debug!(component_id = %key, "Removing transform.");
471
472            let previous = self.tasks.remove(key).unwrap();
473            drop(previous); // detach and forget
474
475            self.remove_inputs(key, diff, new_config).await;
476            self.remove_outputs(key);
477
478            if let Some(registry) = self.utilization_registry.as_ref() {
479                registry.remove_component(key);
480            }
481        }
482
483        for key in &diff.transforms.to_change {
484            debug!(component_id = %key, "Changing transform.");
485
486            self.remove_inputs(key, diff, new_config).await;
487            self.remove_outputs(key);
488        }
489
490        // Now we'll process any changed/removed sinks.
491        //
492        // At this point both the old and the new config don't have conflicts in their resource
493        // usage. So if we combine their resources, all found conflicts are between to be removed
494        // and to be added components.
495        let removed_table_sinks = diff
496            .enrichment_tables
497            .removed_and_changed()
498            .filter_map(|key| {
499                self.config
500                    .enrichment_table(key)
501                    .and_then(|t| t.as_sink(key))
502                    .map(|(key, s)| (key.clone(), s.resources(&key)))
503            })
504            .collect::<Vec<_>>();
505        let remove_sink = diff
506            .sinks
507            .removed_and_changed()
508            .map(|key| {
509                (
510                    key,
511                    self.config
512                        .sink(key)
513                        .map(|s| s.resources(key))
514                        .unwrap_or_default(),
515                )
516            })
517            .chain(removed_table_sinks.iter().map(|(k, s)| (k, s.clone())));
518        let add_source = diff
519            .sources
520            .changed_and_added()
521            .map(|key| (key, new_config.source(key).unwrap().inner.resources()));
522        let added_table_sinks = diff
523            .enrichment_tables
524            .changed_and_added()
525            .filter_map(|key| {
526                self.config
527                    .enrichment_table(key)
528                    .and_then(|t| t.as_sink(key))
529                    .map(|(key, s)| (key.clone(), s.resources(&key)))
530            })
531            .collect::<Vec<_>>();
532        let add_sink = diff
533            .sinks
534            .changed_and_added()
535            .map(|key| {
536                (
537                    key,
538                    new_config
539                        .sink(key)
540                        .map(|s| s.resources(key))
541                        .unwrap_or_default(),
542                )
543            })
544            .chain(added_table_sinks.iter().map(|(k, s)| (k, s.clone())));
545        let conflicts = Resource::conflicts(
546            remove_sink.map(|(key, value)| ((true, key), value)).chain(
547                add_sink
548                    .chain(add_source)
549                    .map(|(key, value)| ((false, key), value)),
550            ),
551        )
552        .into_iter()
553        .flat_map(|(_, components)| components)
554        .collect::<HashSet<_>>();
555        // Existing conflicting sinks
556        let conflicting_sinks = conflicts
557            .into_iter()
558            .filter(|&(existing_sink, _)| existing_sink)
559            .map(|(_, key)| key.clone());
560
561        // For any sink whose buffer configuration didn't change, we can reuse their buffer.
562        let reuse_buffers = diff
563            .sinks
564            .to_change
565            .iter()
566            .filter(|&key| {
567                if diff.components_to_reload.contains(key) {
568                    return false;
569                }
570                self.config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
571                    self.config
572                        .enrichment_table(key)
573                        .and_then(|t| t.as_sink(key))
574                        .map(|(_, s)| s.buffer)
575                }) == new_config.sink(key).map(|s| s.buffer.clone()).or_else(|| {
576                    self.config
577                        .enrichment_table(key)
578                        .and_then(|t| t.as_sink(key))
579                        .map(|(_, s)| s.buffer)
580                })
581            })
582            .cloned()
583            .collect::<HashSet<_>>();
584
585        // For any existing sink that has a conflicting resource dependency with a changed/added
586        // sink, for any sink that we want to reuse their buffer, or for any changed sink with
587        // a disk buffer that is not being reused, we need to explicitly wait for them to finish
588        // processing so we can reclaim ownership of those resources/buffers.
589        let changed_disk_buffer_sinks = diff
590            .sinks
591            .to_change
592            .iter()
593            .filter(|key| {
594                !reuse_buffers.contains(*key)
595                    && self
596                        .config
597                        .sink(key)
598                        .is_some_and(|s| s.buffer.has_disk_stage())
599            })
600            .cloned()
601            .collect::<HashSet<_>>();
602
603        let wait_for_sinks = conflicting_sinks
604            .chain(reuse_buffers.iter().cloned())
605            .chain(changed_disk_buffer_sinks.iter().cloned())
606            .collect::<HashSet<_>>();
607
608        // First, we remove any inputs to removed sinks so they can naturally shut down.
609        let removed_sinks = diff
610            .sinks
611            .to_remove
612            .iter()
613            .chain(diff.enrichment_tables.to_remove.iter().filter(|key| {
614                self.config
615                    .enrichment_table(key)
616                    .and_then(|t| t.as_sink(key))
617                    .is_some()
618            }))
619            .collect::<Vec<_>>();
620        for key in &removed_sinks {
621            debug!(component_id = %key, "Removing sink.");
622            self.remove_inputs(key, diff, new_config).await;
623
624            if let Some(registry) = self.utilization_registry.as_ref() {
625                registry.remove_component(key);
626            }
627        }
628
629        // After that, for any changed sinks, we temporarily detach their inputs (not remove) so
630        // they can naturally shutdown and allow us to recover their buffers if possible.
631        let mut buffer_tx = HashMap::new();
632
633        let sinks_to_change = diff
634            .sinks
635            .to_change
636            .iter()
637            .chain(diff.enrichment_tables.to_change.iter().filter(|key| {
638                self.config
639                    .enrichment_table(key)
640                    .and_then(|t| t.as_sink(key))
641                    .is_some()
642            }))
643            .collect::<Vec<_>>();
644
645        for key in &sinks_to_change {
646            debug!(component_id = %key, "Changing sink.");
647            if reuse_buffers.contains(key) || changed_disk_buffer_sinks.contains(key) {
648                self.detach_triggers
649                    .remove(key)
650                    .unwrap()
651                    .into_inner()
652                    .cancel();
653
654                if reuse_buffers.contains(key) {
655                    // We explicitly clone the input side of the buffer and store it so we don't lose
656                    // it when we remove the inputs below.
657                    //
658                    // We clone instead of removing here because otherwise the input will be missing for
659                    // the rest of the reload process, which violates the assumption that all previous
660                    // inputs for components not being removed are still available. It's simpler to
661                    // allow the "old" input to stick around and be replaced (even though that's
662                    // basically a no-op since we're reusing the same buffer) than it is to pass around
663                    // info about which sinks are having their buffers reused and treat them differently
664                    // at other stages.
665                    buffer_tx.insert((*key).clone(), self.inputs.get(key).unwrap().clone());
666                }
667            }
668            self.remove_inputs(key, diff, new_config).await;
669        }
670
671        // Now that we've disconnected or temporarily detached the inputs to all changed/removed
672        // sinks, we can actually wait for them to shutdown before collecting any buffers that are
673        // marked for reuse.
674        //
675        // If a sink we're removing isn't tying up any resource that a changed/added sink depends
676        // on, we don't bother waiting for it to shutdown.
677        for key in &removed_sinks {
678            let previous = self.tasks.remove(key).unwrap();
679            if wait_for_sinks.contains(key) {
680                debug!(message = "Waiting for sink to shutdown.", component_id = %key);
681                previous.await.unwrap().unwrap();
682            } else {
683                drop(previous); // detach and forget
684            }
685        }
686
687        let mut buffers = HashMap::<ComponentKey, BuiltBuffer>::new();
688        for key in &sinks_to_change {
689            if wait_for_sinks.contains(key) {
690                let previous = self.tasks.remove(key).unwrap();
691                debug!(message = "Waiting for sink to shutdown.", component_id = %key);
692                let buffer = previous.await.unwrap().unwrap();
693
694                if reuse_buffers.contains(key) {
695                    // We clone instead of removing here because otherwise the input will be
696                    // missing for the rest of the reload process, which violates the assumption
697                    // that all previous inputs for components not being removed are still
698                    // available. It's simpler to allow the "old" input to stick around and be
699                    // replaced (even though that's basically a no-op since we're reusing the same
700                    // buffer) than it is to pass around info about which sinks are having their
701                    // buffers reused and treat them differently at other stages.
702                    let tx = buffer_tx.remove(key).unwrap();
703                    let rx = match buffer {
704                        TaskOutput::Sink(rx) => rx.into_inner(),
705                        _ => unreachable!(),
706                    };
707
708                    buffers.insert((*key).clone(), (tx, Arc::new(Mutex::new(Some(rx)))));
709                }
710            }
711        }
712
713        buffers
714    }
715
716    /// Connects all changed/added components in the given configuration diff.
717    pub(crate) async fn connect_diff(
718        &mut self,
719        diff: &ConfigDiff,
720        new_pieces: &mut TopologyPieces,
721    ) {
722        debug!("Connecting changed/added component(s).");
723
724        // Update tap metadata
725        if !self.watch.0.is_closed() {
726            for key in &diff.sources.to_remove {
727                // Sources only have outputs
728                self.outputs_tap_metadata.remove(key);
729                self.component_type_names.remove(key);
730            }
731
732            for key in &diff.transforms.to_remove {
733                // Transforms can have both inputs and outputs
734                self.outputs_tap_metadata.remove(key);
735                self.inputs_tap_metadata.remove(key);
736                self.component_type_names.remove(key);
737            }
738
739            for key in &diff.sinks.to_remove {
740                // Sinks only have inputs
741                self.inputs_tap_metadata.remove(key);
742                self.component_type_names.remove(key);
743            }
744
745            let removed_sinks = diff.enrichment_tables.to_remove.iter().filter(|key| {
746                self.config
747                    .enrichment_table(key)
748                    .and_then(|t| t.as_sink(key))
749                    .is_some()
750            });
751            for key in removed_sinks {
752                // Sinks only have inputs
753                self.inputs_tap_metadata.remove(key);
754            }
755
756            let removed_sources = diff.enrichment_tables.to_remove.iter().filter_map(|key| {
757                self.config
758                    .enrichment_table(key)
759                    .and_then(|t| t.as_source(key).map(|(key, _)| key))
760            });
761            for key in removed_sources {
762                // Sources only have outputs
763                self.outputs_tap_metadata.remove(&key);
764            }
765
766            for key in diff.sources.changed_and_added() {
767                if let Some(task) = new_pieces.tasks.get(key) {
768                    let typetag = task.typetag().to_string();
769                    self.outputs_tap_metadata
770                        .insert(key.clone(), ("source", typetag.clone()));
771                    self.component_type_names.insert(key.clone(), typetag);
772                }
773            }
774
775            for key in diff
776                .enrichment_tables
777                .changed_and_added()
778                .filter_map(|key| {
779                    self.config
780                        .enrichment_table(key)
781                        .and_then(|t| t.as_source(key).map(|(key, _)| key))
782                })
783            {
784                if let Some(task) = new_pieces.tasks.get(&key) {
785                    self.outputs_tap_metadata
786                        .insert(key.clone(), ("source", task.typetag().to_string()));
787                }
788            }
789
790            for key in diff.transforms.changed_and_added() {
791                if let Some(task) = new_pieces.tasks.get(key) {
792                    let typetag = task.typetag().to_string();
793                    self.outputs_tap_metadata
794                        .insert(key.clone(), ("transform", typetag.clone()));
795                    self.component_type_names.insert(key.clone(), typetag);
796                }
797            }
798
799            for key in diff.sinks.changed_and_added() {
800                if let Some(task) = new_pieces.tasks.get(key) {
801                    self.component_type_names
802                        .insert(key.clone(), task.typetag().to_string());
803                }
804            }
805
806            for (key, input) in &new_pieces.inputs {
807                self.inputs_tap_metadata
808                    .insert(key.clone(), input.1.clone());
809            }
810        }
811
812        // We configure the outputs of any changed/added sources first, so they're available to any
813        // transforms and sinks that come afterwards.
814        for key in diff.sources.changed_and_added() {
815            debug!(component_id = %key, "Configuring outputs for source.");
816            self.setup_outputs(key, new_pieces).await;
817        }
818
819        let added_changed_table_sources: Vec<&ComponentKey> = diff
820            .enrichment_tables
821            .changed_and_added()
822            .filter(|k| new_pieces.source_tasks.contains_key(k))
823            .collect();
824        for key in added_changed_table_sources.iter() {
825            debug!(component_id = %key, "Connecting outputs for enrichment table source.");
826            self.setup_outputs(key, new_pieces).await;
827        }
828
829        // We configure the outputs of any changed/added transforms next, for the same reason: we
830        // need them to be available to any transforms and sinks that come afterwards.
831        for key in diff.transforms.changed_and_added() {
832            debug!(component_id = %key, "Configuring outputs for transform.");
833            self.setup_outputs(key, new_pieces).await;
834        }
835
836        // Now that all possible outputs are configured, we can start wiring up inputs, starting
837        // with transforms.
838        for key in diff.transforms.changed_and_added() {
839            debug!(component_id = %key, "Connecting inputs for transform.");
840            self.setup_inputs(key, diff, new_pieces).await;
841        }
842
843        // Now that all sources and transforms are fully configured, we can wire up sinks.
844        for key in diff.sinks.changed_and_added() {
845            debug!(component_id = %key, "Connecting inputs for sink.");
846            self.setup_inputs(key, diff, new_pieces).await;
847        }
848        let added_changed_tables: Vec<&ComponentKey> = diff
849            .enrichment_tables
850            .changed_and_added()
851            .filter(|k| new_pieces.inputs.contains_key(k))
852            .collect();
853        for key in added_changed_tables.iter() {
854            debug!(component_id = %key, "Connecting inputs for enrichment table sink.");
855            self.setup_inputs(key, diff, new_pieces).await;
856        }
857
858        // We do a final pass here to reconnect unchanged components.
859        //
860        // Why would we reconnect unchanged components?  Well, as sources and transforms will
861        // recreate their fanouts every time they're changed, we can run into a situation where a
862        // transform/sink, which we'll call B, is pointed at a source/transform that was changed, which
863        // we'll call A, but because B itself didn't change at all, we haven't yet reconnected it.
864        //
865        // Instead of propagating connections forward -- B reconnecting A forcefully -- we only
866        // connect components backwards i.e. transforms to sources/transforms, and sinks to
867        // sources/transforms, to ensure we're connecting components in order.
868        self.reattach_severed_inputs(diff);
869
870        // Broadcast any topology changes to subscribers.
871        if !self.watch.0.is_closed() {
872            let outputs = self
873                .outputs
874                .clone()
875                .into_iter()
876                .flat_map(|(output_id, control_tx)| {
877                    self.outputs_tap_metadata.get(&output_id.component).map(
878                        |(component_kind, component_type)| {
879                            (
880                                TapOutput {
881                                    output_id,
882                                    component_kind,
883                                    component_type: component_type.clone(),
884                                },
885                                control_tx,
886                            )
887                        },
888                    )
889                })
890                .collect::<HashMap<_, _>>();
891
892            let mut removals = diff.sources.to_remove.clone();
893            removals.extend(diff.transforms.to_remove.iter().cloned());
894            self.watch
895                .0
896                .send(TapResource {
897                    outputs,
898                    inputs: self.inputs_tap_metadata.clone(),
899                    source_keys: diff
900                        .sources
901                        .changed_and_added()
902                        .map(|key| key.to_string())
903                        .chain(
904                            added_changed_table_sources
905                                .iter()
906                                .map(|key| key.to_string()),
907                        )
908                        .collect(),
909                    sink_keys: diff
910                        .sinks
911                        .changed_and_added()
912                        .map(|key| key.to_string())
913                        .chain(added_changed_tables.iter().map(|key| key.to_string()))
914                        .collect(),
915                    // Note, only sources and transforms are relevant. Sinks do
916                    // not have outputs to tap.
917                    removals,
918                    type_names: self
919                        .component_type_names
920                        .iter()
921                        .map(|(k, v)| (k.to_string(), v.clone()))
922                        .collect(),
923                })
924                .expect("Couldn't broadcast config changes.");
925        }
926    }
927
928    async fn setup_outputs(
929        &mut self,
930        key: &ComponentKey,
931        new_pieces: &mut builder::TopologyPieces,
932    ) {
933        let outputs = new_pieces.outputs.remove(key).unwrap();
934        for (port, output) in outputs {
935            debug!(component_id = %key, output_id = ?port, "Configuring output for component.");
936
937            let id = OutputId {
938                component: key.clone(),
939                port,
940            };
941
942            self.outputs.insert(id, output);
943        }
944    }
945
946    async fn setup_inputs(
947        &mut self,
948        key: &ComponentKey,
949        diff: &ConfigDiff,
950        new_pieces: &mut builder::TopologyPieces,
951    ) {
952        let (tx, inputs) = new_pieces.inputs.remove(key).unwrap();
953
954        let old_inputs = self
955            .config
956            .inputs_for_node(key)
957            .into_iter()
958            .flatten()
959            .cloned()
960            .collect::<HashSet<_>>();
961
962        let new_inputs = inputs.iter().cloned().collect::<HashSet<_>>();
963        let inputs_to_add = &new_inputs - &old_inputs;
964
965        for input in inputs {
966            let output = self.outputs.get_mut(&input).expect("unknown output");
967
968            if diff.contains(&input.component) || inputs_to_add.contains(&input) {
969                // If the input we're connecting to is changing, that means its outputs will have been
970                // recreated, so instead of replacing a paused sink, we have to add it to this new
971                // output for the first time, since there's nothing to actually replace at this point.
972                debug!(component_id = %key, fanout_id = %input, "Adding component input to fanout.");
973
974                _ = output.send(ControlMessage::Add(key.clone(), tx.clone()));
975            } else {
976                // We know that if this component is connected to a given input, and neither
977                // components were changed, then the output must still exist, which means we paused
978                // this component's connection to its output, so we have to replace that connection
979                // now:
980                debug!(component_id = %key, fanout_id = %input, "Replacing component input in fanout.");
981
982                _ = output.send(ControlMessage::Replace(key.clone(), tx.clone()));
983            }
984        }
985
986        self.inputs.insert(key.clone(), tx);
987        new_pieces
988            .detach_triggers
989            .remove(key)
990            .map(|trigger| self.detach_triggers.insert(key.clone(), trigger.into()));
991    }
992
993    fn remove_outputs(&mut self, key: &ComponentKey) {
994        self.outputs.retain(|id, _output| &id.component != key);
995    }
996
997    async fn remove_inputs(&mut self, key: &ComponentKey, diff: &ConfigDiff, new_config: &Config) {
998        self.inputs.remove(key);
999        self.detach_triggers.remove(key);
1000
1001        let old_inputs = self.config.inputs_for_node(key).expect("node exists");
1002        let new_inputs = new_config
1003            .inputs_for_node(key)
1004            .unwrap_or_default()
1005            .iter()
1006            .collect::<HashSet<_>>();
1007
1008        for input in old_inputs {
1009            if let Some(output) = self.outputs.get_mut(input) {
1010                if diff.contains(&input.component)
1011                    || diff.is_removed(key)
1012                    || !new_inputs.contains(input)
1013                {
1014                    // 3 cases to remove the input:
1015                    //
1016                    // Case 1: If the input we're removing ourselves from is changing, that means its
1017                    // outputs will be recreated, so instead of pausing the sink, we just delete it
1018                    // outright to ensure things are clean.
1019                    //
1020                    // Case 2: If this component itself is being removed, then pausing makes no sense
1021                    // because it isn't coming back.
1022                    //
1023                    // Case 3: This component is no longer connected to the input from new config.
1024                    debug!(component_id = %key, fanout_id = %input, "Removing component input from fanout.");
1025
1026                    _ = output.send(ControlMessage::Remove(key.clone()));
1027                } else {
1028                    // We know that if this component is connected to a given input, and it isn't being
1029                    // changed, then it will exist when we reconnect inputs, so we should pause it
1030                    // now to pause further sends through that component until we reconnect:
1031                    debug!(component_id = %key, fanout_id = %input, "Pausing component input in fanout.");
1032
1033                    _ = output.send(ControlMessage::Pause(key.clone()));
1034                }
1035            }
1036        }
1037    }
1038
1039    fn reattach_severed_inputs(&mut self, diff: &ConfigDiff) {
1040        let unchanged_transforms = self
1041            .config
1042            .transforms()
1043            .filter(|(key, _)| !diff.transforms.contains(key));
1044        for (transform_key, transform) in unchanged_transforms {
1045            let changed_outputs = get_changed_outputs(diff, transform.inputs.clone());
1046            for output_id in changed_outputs {
1047                debug!(component_id = %transform_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1048
1049                let input = self.inputs.get(transform_key).cloned().unwrap();
1050                let output = self.outputs.get_mut(&output_id).unwrap();
1051                _ = output.send(ControlMessage::Add(transform_key.clone(), input));
1052            }
1053        }
1054
1055        let unchanged_sinks = self
1056            .config
1057            .sinks()
1058            .filter(|(key, _)| !diff.sinks.contains(key));
1059        for (sink_key, sink) in unchanged_sinks {
1060            let changed_outputs = get_changed_outputs(diff, sink.inputs.clone());
1061            for output_id in changed_outputs {
1062                debug!(component_id = %sink_key, fanout_id = %output_id.component, "Reattaching component input to fanout.");
1063
1064                let input = self.inputs.get(sink_key).cloned().unwrap();
1065                let output = self.outputs.get_mut(&output_id).unwrap();
1066                _ = output.send(ControlMessage::Add(sink_key.clone(), input));
1067            }
1068        }
1069    }
1070
1071    /// Starts any new or changed components in the given configuration diff.
1072    pub(crate) fn spawn_diff(&mut self, diff: &ConfigDiff, mut new_pieces: TopologyPieces) {
1073        for key in &diff.sources.to_change {
1074            debug!(message = "Spawning changed source.", component_id = %key);
1075            self.spawn_source(key, &mut new_pieces);
1076        }
1077
1078        for key in &diff.sources.to_add {
1079            debug!(message = "Spawning new source.", component_id = %key);
1080            self.spawn_source(key, &mut new_pieces);
1081        }
1082
1083        let changed_table_sources: Vec<&ComponentKey> = diff
1084            .enrichment_tables
1085            .to_change
1086            .iter()
1087            .filter(|k| new_pieces.source_tasks.contains_key(k))
1088            .collect();
1089
1090        let added_table_sources: Vec<&ComponentKey> = diff
1091            .enrichment_tables
1092            .to_add
1093            .iter()
1094            .filter(|k| new_pieces.source_tasks.contains_key(k))
1095            .collect();
1096
1097        for key in changed_table_sources {
1098            debug!(message = "Spawning changed enrichment table source.", component_id = %key);
1099            self.spawn_source(key, &mut new_pieces);
1100        }
1101
1102        for key in added_table_sources {
1103            debug!(message = "Spawning new enrichment table source.", component_id = %key);
1104            self.spawn_source(key, &mut new_pieces);
1105        }
1106
1107        for key in &diff.transforms.to_change {
1108            debug!(message = "Spawning changed transform.", component_id = %key);
1109            self.spawn_transform(key, &mut new_pieces);
1110        }
1111
1112        for key in &diff.transforms.to_add {
1113            debug!(message = "Spawning new transform.", component_id = %key);
1114            self.spawn_transform(key, &mut new_pieces);
1115        }
1116
1117        for key in &diff.sinks.to_change {
1118            debug!(message = "Spawning changed sink.", component_id = %key);
1119            self.spawn_sink(key, &mut new_pieces);
1120        }
1121
1122        for key in &diff.sinks.to_add {
1123            trace!(message = "Spawning new sink.", component_id = %key);
1124            self.spawn_sink(key, &mut new_pieces);
1125        }
1126
1127        let changed_tables: Vec<&ComponentKey> = diff
1128            .enrichment_tables
1129            .to_change
1130            .iter()
1131            .filter(|k| {
1132                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1133            })
1134            .collect();
1135
1136        let added_tables: Vec<&ComponentKey> = diff
1137            .enrichment_tables
1138            .to_add
1139            .iter()
1140            .filter(|k| {
1141                new_pieces.tasks.contains_key(k) && !new_pieces.source_tasks.contains_key(k)
1142            })
1143            .collect();
1144
1145        for key in changed_tables {
1146            debug!(message = "Spawning changed enrichment table sink.", component_id = %key);
1147            self.spawn_sink(key, &mut new_pieces);
1148        }
1149
1150        for key in added_tables {
1151            debug!(message = "Spawning enrichment table new sink.", component_id = %key);
1152            self.spawn_sink(key, &mut new_pieces);
1153        }
1154    }
1155
1156    fn spawn_sink(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1157        let task = new_pieces.tasks.remove(key).unwrap();
1158        let span = error_span!(
1159            "sink",
1160            component_kind = "sink",
1161            component_id = %task.id(),
1162            component_type = %task.typetag(),
1163        );
1164
1165        let task_span = span.or_current();
1166        #[cfg(feature = "allocation-tracing")]
1167        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1168            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1169                task.id().to_string(),
1170                "sink".to_string(),
1171                task.typetag().to_string(),
1172            );
1173            debug!(
1174                component_kind = "sink",
1175                component_type = task.typetag(),
1176                component_id = task.id(),
1177                group_id = group_id.as_raw().to_string(),
1178                "Registered new allocation group."
1179            );
1180            group_id.attach_to_span(&task_span);
1181        }
1182
1183        let task_name = format!(">> {} ({})", task.typetag(), task.id());
1184        let task = {
1185            let key = key.clone();
1186            handle_errors(task, self.abort_tx.clone(), |error| {
1187                ShutdownError::SinkAborted { key, error }
1188            })
1189        }
1190        .instrument(task_span);
1191        let spawned = spawn_named(task, task_name.as_ref());
1192        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1193            drop(previous); // detach and forget
1194        }
1195    }
1196
1197    fn spawn_transform(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1198        let task = new_pieces.tasks.remove(key).unwrap();
1199        let span = error_span!(
1200            "transform",
1201            component_kind = "transform",
1202            component_id = %task.id(),
1203            component_type = %task.typetag(),
1204        );
1205
1206        let task_span = span.or_current();
1207        #[cfg(feature = "allocation-tracing")]
1208        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1209            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1210                task.id().to_string(),
1211                "transform".to_string(),
1212                task.typetag().to_string(),
1213            );
1214            debug!(
1215                component_kind = "transform",
1216                component_type = task.typetag(),
1217                component_id = task.id(),
1218                group_id = group_id.as_raw().to_string(),
1219                "Registered new allocation group."
1220            );
1221            group_id.attach_to_span(&task_span);
1222        }
1223
1224        let task_name = format!(">> {} ({}) >>", task.typetag(), task.id());
1225        let task = {
1226            let key = key.clone();
1227            handle_errors(task, self.abort_tx.clone(), |error| {
1228                ShutdownError::TransformAborted { key, error }
1229            })
1230        }
1231        .instrument(task_span);
1232        let spawned = spawn_named(task, task_name.as_ref());
1233        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1234            drop(previous); // detach and forget
1235        }
1236    }
1237
1238    fn spawn_source(&mut self, key: &ComponentKey, new_pieces: &mut builder::TopologyPieces) {
1239        let task = new_pieces.tasks.remove(key).unwrap();
1240        let span = error_span!(
1241            "source",
1242            component_kind = "source",
1243            component_id = %task.id(),
1244            component_type = %task.typetag(),
1245        );
1246
1247        let task_span = span.or_current();
1248        #[cfg(feature = "allocation-tracing")]
1249        if crate::internal_telemetry::allocations::is_allocation_tracing_enabled() {
1250            let group_id = crate::internal_telemetry::allocations::acquire_allocation_group_id(
1251                task.id().to_string(),
1252                "source".to_string(),
1253                task.typetag().to_string(),
1254            );
1255
1256            debug!(
1257                component_kind = "source",
1258                component_type = task.typetag(),
1259                component_id = task.id(),
1260                group_id = group_id.as_raw().to_string(),
1261                "Registered new allocation group."
1262            );
1263            group_id.attach_to_span(&task_span);
1264        }
1265
1266        let task_name = format!("{} ({}) >>", task.typetag(), task.id());
1267        let task = {
1268            let key = key.clone();
1269            handle_errors(task, self.abort_tx.clone(), |error| {
1270                ShutdownError::SourceAborted { key, error }
1271            })
1272        }
1273        .instrument(task_span.clone());
1274        let spawned = spawn_named(task, task_name.as_ref());
1275        if let Some(previous) = self.tasks.insert(key.clone(), spawned) {
1276            drop(previous); // detach and forget
1277        }
1278
1279        self.shutdown_coordinator
1280            .takeover_source(key, &mut new_pieces.shutdown_coordinator);
1281
1282        // Now spawn the actual source task.
1283        let source_task = new_pieces.source_tasks.remove(key).unwrap();
1284        let source_task = {
1285            let key = key.clone();
1286            handle_errors(source_task, self.abort_tx.clone(), |error| {
1287                ShutdownError::SourceAborted { key, error }
1288            })
1289        }
1290        .instrument(task_span);
1291        self.source_tasks
1292            .insert(key.clone(), spawn_named(source_task, task_name.as_ref()));
1293    }
1294
1295    pub async fn start_init_validated(
1296        config: Config,
1297        extra_context: ExtraContext,
1298    ) -> Option<(Self, ShutdownErrorReceiver)> {
1299        let diff = ConfigDiff::initial(&config);
1300        let pieces = TopologyPiecesBuilder::new(&config, &diff)
1301            .with_extra_context(extra_context)
1302            .build_or_log_errors()
1303            .await?;
1304        Self::start_validated(config, diff, pieces).await
1305    }
1306
1307    pub async fn start_validated(
1308        config: Config,
1309        diff: ConfigDiff,
1310        mut pieces: TopologyPieces,
1311    ) -> Option<(Self, ShutdownErrorReceiver)> {
1312        let (abort_tx, abort_rx) = mpsc::unbounded_channel();
1313
1314        let expire_metrics = match (
1315            config.global.expire_metrics,
1316            config.global.expire_metrics_secs,
1317        ) {
1318            (Some(e), None) => {
1319                warn!(
1320                    "DEPRECATED: `expire_metrics` setting is deprecated and will be removed in a future version. Use `expire_metrics_secs` instead."
1321                );
1322                if e < Duration::from_secs(0) {
1323                    None
1324                } else {
1325                    Some(e.as_secs_f64())
1326                }
1327            }
1328            (Some(_), Some(_)) => {
1329                error!(
1330                    message = "Cannot set both `expire_metrics` and `expire_metrics_secs`.",
1331                    internal_log_rate_limit = false
1332                );
1333                return None;
1334            }
1335            (None, Some(e)) => {
1336                if e < 0f64 {
1337                    None
1338                } else {
1339                    Some(e)
1340                }
1341            }
1342            (None, None) => Some(300f64),
1343        };
1344
1345        if let Err(error) = crate::metrics::Controller::get()
1346            .expect("Metrics must be initialized")
1347            .set_expiry(
1348                expire_metrics,
1349                config
1350                    .global
1351                    .expire_metrics_per_metric_set
1352                    .clone()
1353                    .unwrap_or_default(),
1354            )
1355        {
1356            error!(message = "Invalid metrics expiry.", %error, internal_log_rate_limit = false);
1357            return None;
1358        }
1359
1360        let (utilization_emitter, utilization_registry) = pieces
1361            .utilization
1362            .take()
1363            .expect("Topology is missing the utilization metric emitter!");
1364        let metrics_storage = pieces.metrics_storage.clone();
1365        let metrics_refresh_period = config
1366            .global
1367            .metrics_storage_refresh_period
1368            .map(Duration::from_secs_f64);
1369        let mut running_topology = Self::new(config, abort_tx);
1370
1371        if !running_topology
1372            .run_healthchecks(&diff, &mut pieces, running_topology.config.healthchecks)
1373            .await
1374        {
1375            return None;
1376        }
1377        running_topology.connect_diff(&diff, &mut pieces).await;
1378        running_topology.spawn_diff(&diff, pieces);
1379
1380        let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
1381            ShutdownSignal::new_wired();
1382        running_topology.utilization_registry = Some(utilization_registry.clone());
1383        running_topology.utilization_task_shutdown_trigger =
1384            Some(utilization_task_shutdown_trigger);
1385        running_topology.utilization_task = Some(tokio::spawn(Task::new(
1386            "utilization_heartbeat".into(),
1387            "",
1388            async move {
1389                utilization_emitter
1390                    .run_utilization(utilization_shutdown_signal)
1391                    .await;
1392                Ok(TaskOutput::Healthcheck)
1393            },
1394        )));
1395        if let Some(metrics_refresh_period) = metrics_refresh_period {
1396            let (metrics_task_shutdown_trigger, metrics_shutdown_signal, _) =
1397                ShutdownSignal::new_wired();
1398            running_topology.metrics_task_shutdown_trigger = Some(metrics_task_shutdown_trigger);
1399            running_topology.metrics_task = Some(tokio::spawn(Task::new(
1400                "metrics_heartbeat".into(),
1401                "",
1402                async move {
1403                    metrics_storage
1404                        .run_periodic_refresh(metrics_refresh_period, metrics_shutdown_signal)
1405                        .await;
1406                    Ok(TaskOutput::Healthcheck)
1407                },
1408            )));
1409        }
1410
1411        Some((running_topology, abort_rx))
1412    }
1413}
1414
1415fn get_changed_outputs(diff: &ConfigDiff, output_ids: Inputs<OutputId>) -> Vec<OutputId> {
1416    let mut changed_outputs = Vec::new();
1417
1418    for source_key in &diff.sources.to_change {
1419        changed_outputs.extend(
1420            output_ids
1421                .iter()
1422                .filter(|id| &id.component == source_key)
1423                .cloned(),
1424        );
1425    }
1426
1427    for transform_key in &diff.transforms.to_change {
1428        changed_outputs.extend(
1429            output_ids
1430                .iter()
1431                .filter(|id| &id.component == transform_key)
1432                .cloned(),
1433        );
1434    }
1435
1436    changed_outputs
1437}