1#[cfg(all(
3 test,
4 feature = "sources-demo_logs",
5 feature = "transforms-remap",
6 feature = "transforms-route",
7 feature = "transforms-filter",
8 feature = "transforms-reduce",
9 feature = "sinks-console"
10))]
11mod tests;
12mod unit_test_components;
13
14use std::{
15 collections::{BTreeMap, HashMap, HashSet},
16 sync::Arc,
17};
18
19use futures_util::{StreamExt, stream::FuturesUnordered};
20use indexmap::IndexMap;
21use tokio::sync::{
22 Mutex,
23 oneshot::{self, Receiver},
24};
25use uuid::Uuid;
26use vrl::{
27 compiler::{Context, TargetValue, TimeZone, state::RuntimeState},
28 diagnostic::Formatter,
29 value,
30};
31
32pub use self::unit_test_components::{
33 UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
34 UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
35};
36use super::{OutputId, compiler::expand_globs, graph::Graph, transform::get_transform_output_ids};
37use crate::{
38 conditions::Condition,
39 config::{
40 self, ComponentKey, Config, ConfigBuilder, ConfigPath, SinkOuter, SourceOuter,
41 TestDefinition, TestInput, TestOutput, loading, loading::ConfigBuilderLoader,
42 },
43 event::{Event, EventMetadata, LogEvent},
44 signal,
45 topology::{
46 RunningTopology,
47 builder::{TopologyPieces, TopologyPiecesBuilder},
48 },
49};
50
51pub struct UnitTest {
52 pub name: String,
53 config: Config,
54 pieces: TopologyPieces,
55 test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
56}
57
58pub struct UnitTestResult {
59 pub errors: Vec<String>,
60}
61
62impl UnitTest {
63 pub async fn run(self) -> UnitTestResult {
64 let diff = config::ConfigDiff::initial(&self.config);
65 let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
66 .await
67 .unwrap();
68 topology.sources_finished().await;
69 let _stop_complete = topology.stop();
70
71 let mut in_flight = self
72 .test_result_rxs
73 .into_iter()
74 .collect::<FuturesUnordered<_>>();
75
76 let mut errors = Vec::new();
77 while let Some(partial_result) = in_flight.next().await {
78 let partial_result = partial_result.expect(
79 "An unexpected error occurred while executing unit tests. Please try again.",
80 );
81 errors.extend(partial_result.test_errors);
82 }
83
84 UnitTestResult { errors }
85 }
86}
87
88fn init_log_schema_from_paths(
93 config_paths: &[ConfigPath],
94 deny_if_set: bool,
95) -> Result<(), Vec<String>> {
96 let builder = ConfigBuilderLoader::default()
97 .interpolate_env(true)
98 .load_from_paths(config_paths)?;
99 vector_lib::config::init_log_schema(builder.global.log_schema, deny_if_set);
100 Ok(())
101}
102
103pub async fn build_unit_tests_main(
104 paths: &[ConfigPath],
105 signal_handler: &mut signal::SignalHandler,
106) -> Result<Vec<UnitTest>, Vec<String>> {
107 init_log_schema_from_paths(paths, false)?;
108 let secrets_backends_loader = loading::loader_from_paths(
109 loading::SecretBackendLoader::default().interpolate_env(true),
110 paths,
111 )?;
112 let secrets = secrets_backends_loader
113 .retrieve_secrets(signal_handler)
114 .await
115 .map_err(|e| vec![e])?;
116
117 let config_builder = ConfigBuilderLoader::default()
118 .interpolate_env(true)
119 .secrets(secrets)
120 .load_from_paths(paths)?;
121
122 build_unit_tests(config_builder).await
123}
124
125pub async fn build_unit_tests(
126 mut config_builder: ConfigBuilder,
127) -> Result<Vec<UnitTest>, Vec<String>> {
128 config_builder.sources = Default::default();
130 config_builder.sinks = Default::default();
131
132 let test_definitions = std::mem::take(&mut config_builder.tests);
133 let mut tests = Vec::new();
134 let mut build_errors = Vec::new();
135 let metadata = UnitTestBuildMetadata::initialize(&mut config_builder)?;
136
137 for mut test_definition in test_definitions {
138 let test_name = test_definition.name.clone();
139 let legacy_input = std::mem::take(&mut test_definition.input);
141 if let Some(input) = legacy_input {
142 test_definition.inputs.push(input);
143 }
144 match build_unit_test(&metadata, test_definition, config_builder.clone()).await {
145 Ok(test) => tests.push(test),
146 Err(errors) => {
147 let mut test_error = errors.join("\n");
148 test_error = test_error.replace('\n', "\n ");
150 test_error.insert_str(0, &format!("Failed to build test '{test_name}':\n "));
151 build_errors.push(test_error);
152 }
153 }
154 }
155
156 if build_errors.is_empty() {
157 Ok(tests)
158 } else {
159 Err(build_errors)
160 }
161}
162
163pub struct UnitTestBuildMetadata {
164 available_insert_targets: HashSet<ComponentKey>,
166 source_ids: HashMap<ComponentKey, String>,
168 template_sources: IndexMap<ComponentKey, UnitTestSourceConfig>,
171 sink_ids: HashMap<OutputId, String>,
173}
174
175impl UnitTestBuildMetadata {
176 pub fn initialize(config_builder: &mut ConfigBuilder) -> Result<Self, Vec<String>> {
177 let random_id = Uuid::new_v4().to_string();
179
180 let available_insert_targets = config_builder
181 .transforms
182 .keys()
183 .cloned()
184 .collect::<HashSet<_>>();
185
186 let source_ids = available_insert_targets
187 .iter()
188 .map(|key| (key.clone(), format!("{}-{}-{}", key, "source", random_id)))
189 .collect::<HashMap<_, _>>();
190
191 let mut template_sources = IndexMap::new();
193 for (key, transform) in config_builder.transforms.iter_mut() {
194 let test_source_id = source_ids
195 .get(key)
196 .expect("Missing test source for a transform")
197 .clone();
198 transform.inputs.extend(Some(test_source_id));
199
200 template_sources.insert(key.clone(), UnitTestSourceConfig::default());
201 }
202
203 let builder = config_builder.clone();
204 let available_extract_targets = builder
205 .transforms
206 .iter()
207 .flat_map(|(key, transform)| {
208 get_transform_output_ids(
209 transform.inner.as_ref(),
210 key.clone(),
211 builder.schema.log_namespace(),
212 )
213 })
214 .collect::<HashSet<_>>();
215
216 let sink_ids = available_extract_targets
217 .iter()
218 .map(|key| {
219 (
220 key.clone(),
221 format!(
222 "{}-{}-{}",
223 key.to_string().replace('.', "-"),
224 "sink",
225 random_id
226 ),
227 )
228 })
229 .collect::<HashMap<_, _>>();
230
231 Ok(Self {
232 available_insert_targets,
233 source_ids,
234 template_sources,
235 sink_ids,
236 })
237 }
238
239 pub fn hydrate_into_sources(
241 &self,
242 inputs: &[TestInput],
243 ) -> Result<IndexMap<ComponentKey, SourceOuter>, Vec<String>> {
244 let inputs = build_and_validate_inputs(inputs, &self.available_insert_targets)?;
245 let mut template_sources = self.template_sources.clone();
246 Ok(inputs
247 .into_iter()
248 .map(|(insert_at, events)| {
249 let mut source_config =
250 template_sources
251 .shift_remove(&insert_at)
252 .unwrap_or_else(|| {
253 panic!(
257 "Invalid input: cannot insert at {:?}",
258 insert_at.to_string()
259 )
260 });
261 source_config.events.extend(events);
262 let id: &str = self
263 .source_ids
264 .get(&insert_at)
265 .expect("Corresponding source must exist")
266 .as_ref();
267 (ComponentKey::from(id), SourceOuter::new(source_config))
268 })
269 .collect::<IndexMap<_, _>>())
270 }
271
272 pub fn hydrate_into_sinks(
274 &self,
275 test_name: &str,
276 outputs: &[TestOutput],
277 no_outputs_from: &[OutputId],
278 ) -> Result<
279 (
280 Vec<Receiver<UnitTestSinkResult>>,
281 IndexMap<ComponentKey, SinkOuter<String>>,
282 ),
283 Vec<String>,
284 > {
285 if outputs.is_empty() && no_outputs_from.is_empty() {
286 return Err(vec![
287 "unit test must contain at least one of `outputs` or `no_outputs_from`."
288 .to_string(),
289 ]);
290 }
291 let outputs = build_outputs(outputs)?;
292
293 let mut template_sinks = IndexMap::new();
294 let mut test_result_rxs = Vec::new();
295 for (ids, built) in outputs {
297 let (tx, rx) = oneshot::channel();
298 let sink_ids = ids.clone();
299 let sink_config = UnitTestSinkConfig {
300 test_name: test_name.to_string(),
301 transform_ids: ids.iter().map(|id| id.to_string()).collect(),
302 result_tx: Arc::new(Mutex::new(Some(tx))),
303 check: UnitTestSinkCheck::Checks {
304 conditions: built.conditions,
305 expected_event_count: built.expected_event_count,
306 },
307 };
308
309 test_result_rxs.push(rx);
310 template_sinks.insert(sink_ids, sink_config);
311 }
312
313 for id in no_outputs_from {
315 let (tx, rx) = oneshot::channel();
316 let sink_config = UnitTestSinkConfig {
317 test_name: test_name.to_string(),
318 transform_ids: vec![id.to_string()],
319 result_tx: Arc::new(Mutex::new(Some(tx))),
320 check: UnitTestSinkCheck::NoOutputs,
321 };
322
323 test_result_rxs.push(rx);
324 template_sinks.insert(vec![id.clone()], sink_config);
325 }
326
327 let sinks = template_sinks
328 .into_iter()
329 .map(|(transform_ids, sink_config)| {
330 let transform_ids_str = transform_ids
331 .iter()
332 .map(|s| s.to_string())
333 .collect::<Vec<_>>();
334 let sink_ids = transform_ids
335 .iter()
336 .map(|transform_id| {
337 self.sink_ids
338 .get(transform_id)
339 .expect("Sink does not exist")
340 .as_str()
341 })
342 .collect::<Vec<_>>();
343 let sink_id = sink_ids.join(",");
344 (
345 ComponentKey::from(sink_id),
346 SinkOuter::new(transform_ids_str, sink_config),
347 )
348 })
349 .collect::<IndexMap<_, _>>();
350
351 Ok((test_result_rxs, sinks))
352 }
353}
354
355fn get_relevant_test_components(
357 sources: &[&ComponentKey],
358 graph: &Graph,
359) -> Result<HashSet<String>, Vec<String>> {
360 graph.check_for_cycles().map_err(|error| vec![error])?;
361 let mut errors = Vec::new();
362 let mut components = HashSet::new();
363 for source in sources {
364 let paths = graph.paths_to_sink_from(source);
365 if paths.is_empty() {
366 errors.push(format!(
367 "Unable to complete topology between input target '{}' and output target(s)",
368 source
369 .to_string()
370 .rsplit_once("-source-")
371 .unwrap_or(("", ""))
372 .0
373 ));
374 } else {
375 for path in paths {
376 components.extend(path.into_iter().map(|key| key.to_string()));
377 }
378 }
379 }
380
381 if errors.is_empty() {
382 Ok(components)
383 } else {
384 Err(errors)
385 }
386}
387
388async fn build_unit_test(
389 metadata: &UnitTestBuildMetadata,
390 test: TestDefinition<String>,
391 mut config_builder: ConfigBuilder,
392) -> Result<UnitTest, Vec<String>> {
393 let transform_only_config = config_builder.clone();
394 let transform_only_graph = Graph::new_unchecked(
395 &transform_only_config.sources,
396 &transform_only_config.transforms,
397 &transform_only_config.sinks,
398 transform_only_config.schema,
399 transform_only_config
400 .global
401 .wildcard_matching
402 .unwrap_or_default(),
403 );
404 let test = test.resolve_outputs(&transform_only_graph)?;
405
406 let sources = metadata.hydrate_into_sources(&test.inputs)?;
407 let (test_result_rxs, sinks) =
408 metadata.hydrate_into_sinks(&test.name, &test.outputs, &test.no_outputs_from)?;
409
410 config_builder.sources = sources;
411 config_builder.sinks = sinks;
412 expand_globs(&mut config_builder);
413
414 let graph = Graph::new_unchecked(
415 &config_builder.sources,
416 &config_builder.transforms,
417 &config_builder.sinks,
418 config_builder.schema,
419 config_builder.global.wildcard_matching.unwrap_or_default(),
420 );
421
422 let mut valid_components = get_relevant_test_components(
423 config_builder.sources.keys().collect::<Vec<_>>().as_ref(),
424 &graph,
425 )?;
426
427 let unexpanded_transforms = valid_components
429 .iter()
430 .filter_map(|component| {
431 component
432 .split_once('.')
433 .map(|(original_name, _)| original_name.to_string())
434 })
435 .collect::<Vec<_>>();
436 valid_components.extend(unexpanded_transforms);
437
438 config_builder
442 .enrichment_tables
443 .iter()
444 .filter_map(|(key, c)| c.as_sink(key).map(|(_, sink)| sink.inputs))
445 .for_each(|i| valid_components.extend(i.into_iter()));
446
447 config_builder.transforms = config_builder
449 .transforms
450 .into_iter()
451 .filter(|(key, _)| valid_components.contains(&key.to_string()))
452 .collect();
453
454 let graph = Graph::new_unchecked(
456 &config_builder.sources,
457 &config_builder.transforms,
458 &config_builder.sinks,
459 config_builder.schema,
460 config_builder.global.wildcard_matching.unwrap_or_default(),
461 );
462 let valid_inputs = graph.input_map()?;
463 for (_, transform) in config_builder.transforms.iter_mut() {
464 let inputs = std::mem::take(&mut transform.inputs);
465 transform.inputs = inputs
466 .into_iter()
467 .filter(|input| valid_inputs.contains_key(input))
468 .collect();
469 }
470
471 if let Some(sink) = get_loose_end_outputs_sink(&config_builder) {
472 config_builder
473 .sinks
474 .insert(ComponentKey::from(Uuid::new_v4().to_string()), sink);
475 }
476 let config = config_builder.build()?;
477 let diff = config::ConfigDiff::initial(&config);
478 let pieces = TopologyPiecesBuilder::new(&config, &diff).build().await?;
479
480 Ok(UnitTest {
481 name: test.name,
482 config,
483 pieces,
484 test_result_rxs,
485 })
486}
487
488fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
495 let config = config.clone();
496 let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
497 get_transform_output_ids(
498 transform.inner.as_ref(),
499 key.clone(),
500 config.schema.log_namespace(),
501 )
502 .map(|output| output.to_string())
503 .collect::<Vec<_>>()
504 });
505
506 let mut loose_end_outputs = Vec::new();
507 for id in transform_ids {
508 if !config
509 .transforms
510 .iter()
511 .any(|(_, transform)| transform.inputs.contains(&id))
512 && !config
513 .sinks
514 .iter()
515 .any(|(_, sink)| sink.inputs.contains(&id))
516 {
517 loose_end_outputs.push(id);
518 }
519 }
520
521 if loose_end_outputs.is_empty() {
522 None
523 } else {
524 let noop_sink = UnitTestSinkConfig {
525 test_name: "".to_string(),
526 transform_ids: vec![],
527 result_tx: Arc::new(Mutex::new(None)),
528 check: UnitTestSinkCheck::NoOp,
529 };
530 Some(SinkOuter::new(loose_end_outputs, noop_sink))
531 }
532}
533
534fn build_and_validate_inputs(
535 test_inputs: &[TestInput],
536 available_insert_targets: &HashSet<ComponentKey>,
537) -> Result<HashMap<ComponentKey, Vec<Event>>, Vec<String>> {
538 let mut inputs = HashMap::new();
539 let mut errors = Vec::new();
540 if test_inputs.is_empty() {
541 errors.push("must specify at least one input.".to_string());
542 return Err(errors);
543 }
544
545 for (index, input) in test_inputs.iter().enumerate() {
546 if available_insert_targets.contains(&input.insert_at) {
547 match build_input_event(input) {
548 Ok(input_event) => {
549 inputs
550 .entry(input.insert_at.clone())
551 .and_modify(|events: &mut Vec<Event>| {
552 events.push(input_event.clone());
553 })
554 .or_insert_with(|| vec![input_event]);
555 }
556 Err(error) => errors.push(error),
557 }
558 } else {
559 errors.push(format!(
560 "inputs[{}]: unable to locate target transform '{}'",
561 index, input.insert_at
562 ))
563 }
564 }
565
566 if errors.is_empty() {
567 Ok(inputs)
568 } else {
569 Err(errors)
570 }
571}
572
573#[derive(Default)]
574pub(super) struct BuiltOutput {
575 pub(super) expected_event_count: Option<usize>,
576 pub(super) conditions: Vec<Vec<Condition>>,
577}
578
579fn build_outputs(
580 test_outputs: &[TestOutput],
581) -> Result<IndexMap<Vec<OutputId>, BuiltOutput>, Vec<String>> {
582 let mut outputs: IndexMap<Vec<OutputId>, BuiltOutput> = IndexMap::new();
583 let mut errors = Vec::new();
584
585 for output in test_outputs {
586 let mut conditions = Vec::new();
587 for (index, condition) in output
588 .conditions
589 .clone()
590 .unwrap_or_default()
591 .iter()
592 .enumerate()
593 {
594 match condition.build(&Default::default(), &Default::default()) {
595 Ok(condition) => conditions.push(condition),
596 Err(error) => errors.push(format!(
597 "failed to create test condition '{index}': {error}"
598 )),
599 }
600 }
601
602 let expected_event_count = output.expected_event_count;
603 if expected_event_count == Some(0) && !conditions.is_empty() {
604 errors.push(format!(
605 "output for {:?} has expected_event_count of 0 but also defines conditions; \
606 conditions cannot be evaluated when no events are expected",
607 output.extract_from
608 ));
609 }
610 outputs
611 .entry(output.extract_from.clone().to_vec())
612 .and_modify(|existing| {
613 if let (Some(prev), Some(new)) =
614 (existing.expected_event_count, expected_event_count)
615 {
616 if prev != new {
617 errors.push(format!(
618 "conflicting expected_event_count for extract_from {:?}: {} vs {}",
619 output.extract_from, prev, new
620 ));
621 }
622 } else if existing.expected_event_count.is_none() {
623 existing.expected_event_count = expected_event_count;
624 }
625 existing.conditions.push(conditions.clone());
626 })
627 .or_insert_with(|| BuiltOutput {
628 expected_event_count,
629 conditions: vec![conditions.clone()],
630 });
631 }
632
633 for (extract_from, built) in &outputs {
637 if built.expected_event_count == Some(0) && built.conditions.iter().any(|c| !c.is_empty()) {
638 errors.push(format!(
639 "output for {extract_from:?} has expected_event_count of 0 but also defines conditions; \
640 conditions cannot be evaluated when no events are expected",
641 ));
642 }
643 }
644
645 if errors.is_empty() {
646 Ok(outputs)
647 } else {
648 Err(errors)
649 }
650}
651
652fn build_input_event(input: &TestInput) -> Result<Event, String> {
653 match input.type_str.as_ref() {
654 "raw" => match input.value.as_ref() {
655 Some(v) => Ok(Event::Log(LogEvent::from_str_legacy(v.clone()))),
656 None => Err("input type 'raw' requires the field 'value'".to_string()),
657 },
658 "vrl" => {
659 if let Some(source) = &input.source {
660 let result = vrl::compiler::compile(source, &vector_vrl_functions::all())
661 .map_err(|e| Formatter::new(source, e.clone()).to_string())?;
662
663 let mut target = TargetValue {
664 value: value!({}),
665 metadata: value::Value::Object(BTreeMap::new()),
666 secrets: value::Secrets::default(),
667 };
668
669 let mut state = RuntimeState::default();
670 let timezone = TimeZone::default();
671 let mut ctx = Context::new(&mut target, &mut state, &timezone);
672
673 result
674 .program
675 .resolve(&mut ctx)
676 .map(|_| {
677 Event::Log(LogEvent::from_parts(
678 target.value.clone(),
679 EventMetadata::default_with_value(target.metadata.clone()),
680 ))
681 })
682 .map_err(|e| e.to_string())
683 } else {
684 Err("input type 'vrl' requires the field 'source'".to_string())
685 }
686 }
687 "log" => {
688 if let Some(log_fields) = &input.log_fields {
689 let mut event = LogEvent::from_str_legacy("");
690 for (path, value) in log_fields {
691 event
692 .parse_path_and_insert(path, value.clone())
693 .map_err(|e| e.to_string())?;
694 }
695 Ok(event.into())
696 } else {
697 Err("input type 'log' requires the field 'log_fields'".to_string())
698 }
699 }
700 "metric" => {
701 if let Some(metric) = &input.metric {
702 Ok(Event::Metric(metric.clone()))
703 } else {
704 Err("input type 'metric' requires the field 'metric'".to_string())
705 }
706 }
707 _ => Err(format!(
708 "unrecognized input type '{}', expected one of: 'raw', 'log' or 'metric'",
709 input.type_str
710 )),
711 }
712}