vector/config/unit_test/
mod.rs

1// should match vector-unit-test-tests feature
2#[cfg(all(
3    test,
4    feature = "sources-demo_logs",
5    feature = "transforms-remap",
6    feature = "transforms-route",
7    feature = "transforms-filter",
8    feature = "transforms-reduce",
9    feature = "sinks-console"
10))]
11mod tests;
12mod unit_test_components;
13
14use std::{
15    collections::{BTreeMap, HashMap, HashSet},
16    sync::Arc,
17};
18
19use futures_util::{StreamExt, stream::FuturesUnordered};
20use indexmap::IndexMap;
21use tokio::sync::{
22    Mutex,
23    oneshot::{self, Receiver},
24};
25use uuid::Uuid;
26use vrl::{
27    compiler::{Context, TargetValue, TimeZone, state::RuntimeState},
28    diagnostic::Formatter,
29    value,
30};
31
32pub use self::unit_test_components::{
33    UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
34    UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
35};
36use super::{OutputId, compiler::expand_globs, graph::Graph, transform::get_transform_output_ids};
37use crate::{
38    conditions::Condition,
39    config::{
40        self, ComponentKey, Config, ConfigBuilder, ConfigPath, SinkOuter, SourceOuter,
41        TestDefinition, TestInput, TestOutput, loading, loading::ConfigBuilderLoader,
42    },
43    event::{Event, EventMetadata, LogEvent},
44    signal,
45    topology::{
46        RunningTopology,
47        builder::{TopologyPieces, TopologyPiecesBuilder},
48    },
49};
50
51pub struct UnitTest {
52    pub name: String,
53    config: Config,
54    pieces: TopologyPieces,
55    test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
56}
57
58pub struct UnitTestResult {
59    pub errors: Vec<String>,
60}
61
62impl UnitTest {
63    pub async fn run(self) -> UnitTestResult {
64        let diff = config::ConfigDiff::initial(&self.config);
65        let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
66            .await
67            .unwrap();
68        topology.sources_finished().await;
69        let _stop_complete = topology.stop();
70
71        let mut in_flight = self
72            .test_result_rxs
73            .into_iter()
74            .collect::<FuturesUnordered<_>>();
75
76        let mut errors = Vec::new();
77        while let Some(partial_result) = in_flight.next().await {
78            let partial_result = partial_result.expect(
79                "An unexpected error occurred while executing unit tests. Please try again.",
80            );
81            errors.extend(partial_result.test_errors);
82        }
83
84        UnitTestResult { errors }
85    }
86}
87
88/// Loads Log Schema from configurations and sets global schema.
89/// Once this is done, configurations can be correctly loaded using
90/// configured log schema defaults.
91/// If deny is set, will panic if schema has already been set.
92fn init_log_schema_from_paths(
93    config_paths: &[ConfigPath],
94    deny_if_set: bool,
95) -> Result<(), Vec<String>> {
96    let builder = ConfigBuilderLoader::default()
97        .interpolate_env(true)
98        .load_from_paths(config_paths)?;
99    vector_lib::config::init_log_schema(builder.global.log_schema, deny_if_set);
100    Ok(())
101}
102
103pub async fn build_unit_tests_main(
104    paths: &[ConfigPath],
105    signal_handler: &mut signal::SignalHandler,
106) -> Result<Vec<UnitTest>, Vec<String>> {
107    init_log_schema_from_paths(paths, false)?;
108    let secrets_backends_loader = loading::loader_from_paths(
109        loading::SecretBackendLoader::default().interpolate_env(true),
110        paths,
111    )?;
112    let secrets = secrets_backends_loader
113        .retrieve_secrets(signal_handler)
114        .await
115        .map_err(|e| vec![e])?;
116
117    let config_builder = ConfigBuilderLoader::default()
118        .interpolate_env(true)
119        .secrets(secrets)
120        .load_from_paths(paths)?;
121
122    build_unit_tests(config_builder).await
123}
124
125pub async fn build_unit_tests(
126    mut config_builder: ConfigBuilder,
127) -> Result<Vec<UnitTest>, Vec<String>> {
128    // Sanitize config by removing existing sources and sinks
129    config_builder.sources = Default::default();
130    config_builder.sinks = Default::default();
131
132    let test_definitions = std::mem::take(&mut config_builder.tests);
133    let mut tests = Vec::new();
134    let mut build_errors = Vec::new();
135    let metadata = UnitTestBuildMetadata::initialize(&mut config_builder)?;
136
137    for mut test_definition in test_definitions {
138        let test_name = test_definition.name.clone();
139        // Move the legacy single test input into the inputs list if it exists
140        let legacy_input = std::mem::take(&mut test_definition.input);
141        if let Some(input) = legacy_input {
142            test_definition.inputs.push(input);
143        }
144        match build_unit_test(&metadata, test_definition, config_builder.clone()).await {
145            Ok(test) => tests.push(test),
146            Err(errors) => {
147                let mut test_error = errors.join("\n");
148                // Indent all line breaks
149                test_error = test_error.replace('\n', "\n  ");
150                test_error.insert_str(0, &format!("Failed to build test '{test_name}':\n  "));
151                build_errors.push(test_error);
152            }
153        }
154    }
155
156    if build_errors.is_empty() {
157        Ok(tests)
158    } else {
159        Err(build_errors)
160    }
161}
162
163pub struct UnitTestBuildMetadata {
164    // A set of all valid insert_at targets, used to validate test inputs.
165    available_insert_targets: HashSet<ComponentKey>,
166    // A mapping from transform name to unit test source name.
167    source_ids: HashMap<ComponentKey, String>,
168    // A base setup of all necessary unit test sources that can be "hydrated"
169    // with test input events to produces sources used in a particular test.
170    template_sources: IndexMap<ComponentKey, UnitTestSourceConfig>,
171    // A mapping from transform name to unit test sink name.
172    sink_ids: HashMap<OutputId, String>,
173}
174
175impl UnitTestBuildMetadata {
176    pub fn initialize(config_builder: &mut ConfigBuilder) -> Result<Self, Vec<String>> {
177        // A unique id used to name test sources and sinks to avoid name clashes
178        let random_id = Uuid::new_v4().to_string();
179
180        let available_insert_targets = config_builder
181            .transforms
182            .keys()
183            .cloned()
184            .collect::<HashSet<_>>();
185
186        let source_ids = available_insert_targets
187            .iter()
188            .map(|key| (key.clone(), format!("{}-{}-{}", key, "source", random_id)))
189            .collect::<HashMap<_, _>>();
190
191        // Map a test source to every transform
192        let mut template_sources = IndexMap::new();
193        for (key, transform) in config_builder.transforms.iter_mut() {
194            let test_source_id = source_ids
195                .get(key)
196                .expect("Missing test source for a transform")
197                .clone();
198            transform.inputs.extend(Some(test_source_id));
199
200            template_sources.insert(key.clone(), UnitTestSourceConfig::default());
201        }
202
203        let builder = config_builder.clone();
204        let available_extract_targets = builder
205            .transforms
206            .iter()
207            .flat_map(|(key, transform)| {
208                get_transform_output_ids(
209                    transform.inner.as_ref(),
210                    key.clone(),
211                    builder.schema.log_namespace(),
212                )
213            })
214            .collect::<HashSet<_>>();
215
216        let sink_ids = available_extract_targets
217            .iter()
218            .map(|key| {
219                (
220                    key.clone(),
221                    format!(
222                        "{}-{}-{}",
223                        key.to_string().replace('.', "-"),
224                        "sink",
225                        random_id
226                    ),
227                )
228            })
229            .collect::<HashMap<_, _>>();
230
231        Ok(Self {
232            available_insert_targets,
233            source_ids,
234            template_sources,
235            sink_ids,
236        })
237    }
238
239    /// Convert test inputs into sources for use in a unit testing topology
240    pub fn hydrate_into_sources(
241        &self,
242        inputs: &[TestInput],
243    ) -> Result<IndexMap<ComponentKey, SourceOuter>, Vec<String>> {
244        let inputs = build_and_validate_inputs(inputs, &self.available_insert_targets)?;
245        let mut template_sources = self.template_sources.clone();
246        Ok(inputs
247            .into_iter()
248            .map(|(insert_at, events)| {
249                let mut source_config =
250                    template_sources
251                        .shift_remove(&insert_at)
252                        .unwrap_or_else(|| {
253                            // At this point, all inputs should have been validated to
254                            // correspond with valid transforms, and all valid transforms
255                            // have a source attached.
256                            panic!(
257                                "Invalid input: cannot insert at {:?}",
258                                insert_at.to_string()
259                            )
260                        });
261                source_config.events.extend(events);
262                let id: &str = self
263                    .source_ids
264                    .get(&insert_at)
265                    .expect("Corresponding source must exist")
266                    .as_ref();
267                (ComponentKey::from(id), SourceOuter::new(source_config))
268            })
269            .collect::<IndexMap<_, _>>())
270    }
271
272    /// Convert test outputs into sinks for use in a unit testing topology
273    pub fn hydrate_into_sinks(
274        &self,
275        test_name: &str,
276        outputs: &[TestOutput],
277        no_outputs_from: &[OutputId],
278    ) -> Result<
279        (
280            Vec<Receiver<UnitTestSinkResult>>,
281            IndexMap<ComponentKey, SinkOuter<String>>,
282        ),
283        Vec<String>,
284    > {
285        if outputs.is_empty() && no_outputs_from.is_empty() {
286            return Err(vec![
287                "unit test must contain at least one of `outputs` or `no_outputs_from`."
288                    .to_string(),
289            ]);
290        }
291        let outputs = build_outputs(outputs)?;
292
293        let mut template_sinks = IndexMap::new();
294        let mut test_result_rxs = Vec::new();
295        // Add sinks with checks
296        for (ids, built) in outputs {
297            let (tx, rx) = oneshot::channel();
298            let sink_ids = ids.clone();
299            let sink_config = UnitTestSinkConfig {
300                test_name: test_name.to_string(),
301                transform_ids: ids.iter().map(|id| id.to_string()).collect(),
302                result_tx: Arc::new(Mutex::new(Some(tx))),
303                check: UnitTestSinkCheck::Checks {
304                    conditions: built.conditions,
305                    expected_event_count: built.expected_event_count,
306                },
307            };
308
309            test_result_rxs.push(rx);
310            template_sinks.insert(sink_ids, sink_config);
311        }
312
313        // Add sinks with no outputs check
314        for id in no_outputs_from {
315            let (tx, rx) = oneshot::channel();
316            let sink_config = UnitTestSinkConfig {
317                test_name: test_name.to_string(),
318                transform_ids: vec![id.to_string()],
319                result_tx: Arc::new(Mutex::new(Some(tx))),
320                check: UnitTestSinkCheck::NoOutputs,
321            };
322
323            test_result_rxs.push(rx);
324            template_sinks.insert(vec![id.clone()], sink_config);
325        }
326
327        let sinks = template_sinks
328            .into_iter()
329            .map(|(transform_ids, sink_config)| {
330                let transform_ids_str = transform_ids
331                    .iter()
332                    .map(|s| s.to_string())
333                    .collect::<Vec<_>>();
334                let sink_ids = transform_ids
335                    .iter()
336                    .map(|transform_id| {
337                        self.sink_ids
338                            .get(transform_id)
339                            .expect("Sink does not exist")
340                            .as_str()
341                    })
342                    .collect::<Vec<_>>();
343                let sink_id = sink_ids.join(",");
344                (
345                    ComponentKey::from(sink_id),
346                    SinkOuter::new(transform_ids_str, sink_config),
347                )
348            })
349            .collect::<IndexMap<_, _>>();
350
351        Ok((test_result_rxs, sinks))
352    }
353}
354
355// Find all components that participate in the test
356fn get_relevant_test_components(
357    sources: &[&ComponentKey],
358    graph: &Graph,
359) -> Result<HashSet<String>, Vec<String>> {
360    graph.check_for_cycles().map_err(|error| vec![error])?;
361    let mut errors = Vec::new();
362    let mut components = HashSet::new();
363    for source in sources {
364        let paths = graph.paths_to_sink_from(source);
365        if paths.is_empty() {
366            errors.push(format!(
367                "Unable to complete topology between input target '{}' and output target(s)",
368                source
369                    .to_string()
370                    .rsplit_once("-source-")
371                    .unwrap_or(("", ""))
372                    .0
373            ));
374        } else {
375            for path in paths {
376                components.extend(path.into_iter().map(|key| key.to_string()));
377            }
378        }
379    }
380
381    if errors.is_empty() {
382        Ok(components)
383    } else {
384        Err(errors)
385    }
386}
387
388async fn build_unit_test(
389    metadata: &UnitTestBuildMetadata,
390    test: TestDefinition<String>,
391    mut config_builder: ConfigBuilder,
392) -> Result<UnitTest, Vec<String>> {
393    let transform_only_config = config_builder.clone();
394    let transform_only_graph = Graph::new_unchecked(
395        &transform_only_config.sources,
396        &transform_only_config.transforms,
397        &transform_only_config.sinks,
398        transform_only_config.schema,
399        transform_only_config
400            .global
401            .wildcard_matching
402            .unwrap_or_default(),
403    );
404    let test = test.resolve_outputs(&transform_only_graph)?;
405
406    let sources = metadata.hydrate_into_sources(&test.inputs)?;
407    let (test_result_rxs, sinks) =
408        metadata.hydrate_into_sinks(&test.name, &test.outputs, &test.no_outputs_from)?;
409
410    config_builder.sources = sources;
411    config_builder.sinks = sinks;
412    expand_globs(&mut config_builder);
413
414    let graph = Graph::new_unchecked(
415        &config_builder.sources,
416        &config_builder.transforms,
417        &config_builder.sinks,
418        config_builder.schema,
419        config_builder.global.wildcard_matching.unwrap_or_default(),
420    );
421
422    let mut valid_components = get_relevant_test_components(
423        config_builder.sources.keys().collect::<Vec<_>>().as_ref(),
424        &graph,
425    )?;
426
427    // Preserve the original unexpanded transform(s) which are valid test insertion points
428    let unexpanded_transforms = valid_components
429        .iter()
430        .filter_map(|component| {
431            component
432                .split_once('.')
433                .map(|(original_name, _)| original_name.to_string())
434        })
435        .collect::<Vec<_>>();
436    valid_components.extend(unexpanded_transforms);
437
438    // Enrichment tables consume inputs but are referenced dynamically in VRL transforms
439    // (via get_enrichment_table_record). Since we can't statically analyze VRL usage,
440    // we conservatively include all enrichment table inputs as valid components.
441    config_builder
442        .enrichment_tables
443        .iter()
444        .filter_map(|(key, c)| c.as_sink(key).map(|(_, sink)| sink.inputs))
445        .for_each(|i| valid_components.extend(i.into_iter()));
446
447    // Remove all transforms that are not relevant to the current test
448    config_builder.transforms = config_builder
449        .transforms
450        .into_iter()
451        .filter(|(key, _)| valid_components.contains(&key.to_string()))
452        .collect();
453
454    // Sanitize the inputs of all relevant transforms
455    let graph = Graph::new_unchecked(
456        &config_builder.sources,
457        &config_builder.transforms,
458        &config_builder.sinks,
459        config_builder.schema,
460        config_builder.global.wildcard_matching.unwrap_or_default(),
461    );
462    let valid_inputs = graph.input_map()?;
463    for (_, transform) in config_builder.transforms.iter_mut() {
464        let inputs = std::mem::take(&mut transform.inputs);
465        transform.inputs = inputs
466            .into_iter()
467            .filter(|input| valid_inputs.contains_key(input))
468            .collect();
469    }
470
471    if let Some(sink) = get_loose_end_outputs_sink(&config_builder) {
472        config_builder
473            .sinks
474            .insert(ComponentKey::from(Uuid::new_v4().to_string()), sink);
475    }
476    let config = config_builder.build()?;
477    let diff = config::ConfigDiff::initial(&config);
478    let pieces = TopologyPiecesBuilder::new(&config, &diff).build().await?;
479
480    Ok(UnitTest {
481        name: test.name,
482        config,
483        pieces,
484        test_result_rxs,
485    })
486}
487
488/// Near the end of building a unit test, it's possible that we've included a
489/// transform(s) with multiple outputs where at least one of its output is
490/// consumed but its other outputs are left unconsumed.
491///
492/// To avoid warning logs that occur when building such topologies, we construct
493/// a NoOp sink here whose sole purpose is to consume any "loose end" outputs.
494fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
495    let config = config.clone();
496    let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
497        get_transform_output_ids(
498            transform.inner.as_ref(),
499            key.clone(),
500            config.schema.log_namespace(),
501        )
502        .map(|output| output.to_string())
503        .collect::<Vec<_>>()
504    });
505
506    let mut loose_end_outputs = Vec::new();
507    for id in transform_ids {
508        if !config
509            .transforms
510            .iter()
511            .any(|(_, transform)| transform.inputs.contains(&id))
512            && !config
513                .sinks
514                .iter()
515                .any(|(_, sink)| sink.inputs.contains(&id))
516        {
517            loose_end_outputs.push(id);
518        }
519    }
520
521    if loose_end_outputs.is_empty() {
522        None
523    } else {
524        let noop_sink = UnitTestSinkConfig {
525            test_name: "".to_string(),
526            transform_ids: vec![],
527            result_tx: Arc::new(Mutex::new(None)),
528            check: UnitTestSinkCheck::NoOp,
529        };
530        Some(SinkOuter::new(loose_end_outputs, noop_sink))
531    }
532}
533
534fn build_and_validate_inputs(
535    test_inputs: &[TestInput],
536    available_insert_targets: &HashSet<ComponentKey>,
537) -> Result<HashMap<ComponentKey, Vec<Event>>, Vec<String>> {
538    let mut inputs = HashMap::new();
539    let mut errors = Vec::new();
540    if test_inputs.is_empty() {
541        errors.push("must specify at least one input.".to_string());
542        return Err(errors);
543    }
544
545    for (index, input) in test_inputs.iter().enumerate() {
546        if available_insert_targets.contains(&input.insert_at) {
547            match build_input_event(input) {
548                Ok(input_event) => {
549                    inputs
550                        .entry(input.insert_at.clone())
551                        .and_modify(|events: &mut Vec<Event>| {
552                            events.push(input_event.clone());
553                        })
554                        .or_insert_with(|| vec![input_event]);
555                }
556                Err(error) => errors.push(error),
557            }
558        } else {
559            errors.push(format!(
560                "inputs[{}]: unable to locate target transform '{}'",
561                index, input.insert_at
562            ))
563        }
564    }
565
566    if errors.is_empty() {
567        Ok(inputs)
568    } else {
569        Err(errors)
570    }
571}
572
573#[derive(Default)]
574pub(super) struct BuiltOutput {
575    pub(super) expected_event_count: Option<usize>,
576    pub(super) conditions: Vec<Vec<Condition>>,
577}
578
579fn build_outputs(
580    test_outputs: &[TestOutput],
581) -> Result<IndexMap<Vec<OutputId>, BuiltOutput>, Vec<String>> {
582    let mut outputs: IndexMap<Vec<OutputId>, BuiltOutput> = IndexMap::new();
583    let mut errors = Vec::new();
584
585    for output in test_outputs {
586        let mut conditions = Vec::new();
587        for (index, condition) in output
588            .conditions
589            .clone()
590            .unwrap_or_default()
591            .iter()
592            .enumerate()
593        {
594            match condition.build(&Default::default(), &Default::default()) {
595                Ok(condition) => conditions.push(condition),
596                Err(error) => errors.push(format!(
597                    "failed to create test condition '{index}': {error}"
598                )),
599            }
600        }
601
602        let expected_event_count = output.expected_event_count;
603        if expected_event_count == Some(0) && !conditions.is_empty() {
604            errors.push(format!(
605                "output for {:?} has expected_event_count of 0 but also defines conditions; \
606                 conditions cannot be evaluated when no events are expected",
607                output.extract_from
608            ));
609        }
610        outputs
611            .entry(output.extract_from.clone().to_vec())
612            .and_modify(|existing| {
613                if let (Some(prev), Some(new)) =
614                    (existing.expected_event_count, expected_event_count)
615                {
616                    if prev != new {
617                        errors.push(format!(
618                            "conflicting expected_event_count for extract_from {:?}: {} vs {}",
619                            output.extract_from, prev, new
620                        ));
621                    }
622                } else if existing.expected_event_count.is_none() {
623                    existing.expected_event_count = expected_event_count;
624                }
625                existing.conditions.push(conditions.clone());
626            })
627            .or_insert_with(|| BuiltOutput {
628                expected_event_count,
629                conditions: vec![conditions.clone()],
630            });
631    }
632
633    // Post-merge validation: after merging entries that share the same
634    // extract_from, reject any that ended up with expected_event_count of 0 and
635    // non-empty conditions (which would pass vacuously against zero events).
636    for (extract_from, built) in &outputs {
637        if built.expected_event_count == Some(0) && built.conditions.iter().any(|c| !c.is_empty()) {
638            errors.push(format!(
639                "output for {extract_from:?} has expected_event_count of 0 but also defines conditions; \
640                 conditions cannot be evaluated when no events are expected",
641            ));
642        }
643    }
644
645    if errors.is_empty() {
646        Ok(outputs)
647    } else {
648        Err(errors)
649    }
650}
651
652fn build_input_event(input: &TestInput) -> Result<Event, String> {
653    match input.type_str.as_ref() {
654        "raw" => match input.value.as_ref() {
655            Some(v) => Ok(Event::Log(LogEvent::from_str_legacy(v.clone()))),
656            None => Err("input type 'raw' requires the field 'value'".to_string()),
657        },
658        "vrl" => {
659            if let Some(source) = &input.source {
660                let result = vrl::compiler::compile(source, &vector_vrl_functions::all())
661                    .map_err(|e| Formatter::new(source, e.clone()).to_string())?;
662
663                let mut target = TargetValue {
664                    value: value!({}),
665                    metadata: value::Value::Object(BTreeMap::new()),
666                    secrets: value::Secrets::default(),
667                };
668
669                let mut state = RuntimeState::default();
670                let timezone = TimeZone::default();
671                let mut ctx = Context::new(&mut target, &mut state, &timezone);
672
673                result
674                    .program
675                    .resolve(&mut ctx)
676                    .map(|_| {
677                        Event::Log(LogEvent::from_parts(
678                            target.value.clone(),
679                            EventMetadata::default_with_value(target.metadata.clone()),
680                        ))
681                    })
682                    .map_err(|e| e.to_string())
683            } else {
684                Err("input type 'vrl' requires the field 'source'".to_string())
685            }
686        }
687        "log" => {
688            if let Some(log_fields) = &input.log_fields {
689                let mut event = LogEvent::from_str_legacy("");
690                for (path, value) in log_fields {
691                    event
692                        .parse_path_and_insert(path, value.clone())
693                        .map_err(|e| e.to_string())?;
694                }
695                Ok(event.into())
696            } else {
697                Err("input type 'log' requires the field 'log_fields'".to_string())
698            }
699        }
700        "metric" => {
701            if let Some(metric) = &input.metric {
702                Ok(Event::Metric(metric.clone()))
703            } else {
704                Err("input type 'metric' requires the field 'metric'".to_string())
705            }
706        }
707        _ => Err(format!(
708            "unrecognized input type '{}', expected one of: 'raw', 'log' or 'metric'",
709            input.type_str
710        )),
711    }
712}