1#![allow(missing_docs)]
2use std::{
3 collections::{HashMap, HashSet},
4 fmt::{self, Display, Formatter},
5 fs,
6 hash::Hash,
7 net::SocketAddr,
8 path::PathBuf,
9 time::Duration,
10};
11
12use indexmap::IndexMap;
13use serde::Serialize;
14use vector_config::configurable_component;
15pub use vector_lib::{
16 config::{
17 AcknowledgementsConfig, DataType, GlobalOptions, Input, LogNamespace,
18 SourceAcknowledgementsConfig, SourceOutput, TransformOutput, WildcardMatching,
19 },
20 configurable::component::{GenerateConfig, SinkDescription, TransformDescription},
21};
22
23use crate::{
24 conditions,
25 event::{Metric, Value},
26 secrets::SecretBackends,
27 serde::OneOrMany,
28};
29
30pub mod api;
31mod builder;
32mod cmd;
33mod compiler;
34mod diff;
35pub mod dot_graph;
36mod enrichment_table;
37pub mod format;
38mod graph;
39pub mod loading;
40pub mod provider;
41pub mod schema;
42mod secret;
43mod sink;
44mod source;
45mod transform;
46pub mod unit_test;
47mod validation;
48mod vars;
49pub mod watcher;
50
51pub use builder::ConfigBuilder;
52pub use cmd::{Opts, cmd};
53pub use diff::ConfigDiff;
54pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
55pub use format::{Format, FormatHint};
56pub use loading::{
57 COLLECTOR, CONFIG_PATHS, load, load_from_paths, load_from_paths_with_provider_and_secrets,
58 load_from_str, load_from_str_with_secrets, load_source_from_paths, merge_path_lists,
59 process_paths,
60};
61pub use provider::ProviderConfig;
62pub use secret::SecretBackend;
63pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
64pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
65pub use transform::{
66 BoxedTransform, TransformConfig, TransformContext, TransformOuter, get_transform_output_ids,
67};
68pub use unit_test::{UnitTestResult, build_unit_tests, build_unit_tests_main};
69pub use validation::warnings;
70pub use vars::{ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX, interpolate};
71pub use vector_lib::{
72 config::{
73 ComponentKey, LogSchema, OutputId, init_log_schema, init_telemetry, log_schema,
74 proxy::ProxyConfig, telemetry,
75 },
76 id::Inputs,
77};
78
79#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80pub enum ComponentType {
82 Transform,
83 Sink,
84 EnrichmentTable,
85}
86
87#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
88pub struct ComponentConfig {
89 pub config_paths: Vec<PathBuf>,
90 pub component_key: ComponentKey,
91 pub component_type: ComponentType,
92}
93
94impl ComponentConfig {
95 pub fn new(
96 config_paths: Vec<PathBuf>,
97 component_key: ComponentKey,
98 component_type: ComponentType,
99 ) -> Self {
100 let canonicalized_paths = config_paths
101 .into_iter()
102 .filter_map(|p| fs::canonicalize(p).ok())
103 .collect();
104
105 Self {
106 config_paths: canonicalized_paths,
107 component_key,
108 component_type,
109 }
110 }
111
112 pub fn contains(
113 &self,
114 config_paths: &HashSet<PathBuf>,
115 ) -> Option<(ComponentKey, ComponentType)> {
116 if config_paths.iter().any(|p| self.config_paths.contains(p)) {
117 return Some((self.component_key.clone(), self.component_type.clone()));
118 }
119 None
120 }
121}
122
123#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
124pub enum ConfigPath {
125 File(PathBuf, FormatHint),
126 Dir(PathBuf),
127}
128
129impl<'a> From<&'a ConfigPath> for &'a PathBuf {
130 fn from(config_path: &'a ConfigPath) -> &'a PathBuf {
131 match config_path {
132 ConfigPath::File(path, _) => path,
133 ConfigPath::Dir(path) => path,
134 }
135 }
136}
137
138impl ConfigPath {
139 pub const fn as_dir(&self) -> Option<&PathBuf> {
140 match self {
141 Self::Dir(path) => Some(path),
142 _ => None,
143 }
144 }
145}
146
147#[derive(Debug, Default, Serialize)]
148pub struct Config {
149 #[cfg(feature = "api")]
150 pub api: api::Options,
151 pub schema: schema::Options,
152 pub global: GlobalOptions,
153 pub healthchecks: HealthcheckOptions,
154 sources: IndexMap<ComponentKey, SourceOuter>,
155 sinks: IndexMap<ComponentKey, SinkOuter<OutputId>>,
156 transforms: IndexMap<ComponentKey, TransformOuter<OutputId>>,
157 pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
158 tests: Vec<TestDefinition>,
159 secret: IndexMap<ComponentKey, SecretBackends>,
160 pub graceful_shutdown_duration: Option<Duration>,
161}
162
163impl Config {
164 pub fn builder() -> builder::ConfigBuilder {
165 Default::default()
166 }
167
168 pub fn is_empty(&self) -> bool {
169 self.sources.is_empty()
170 }
171
172 pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
173 self.sources.iter()
174 }
175
176 pub fn source(&self, id: &ComponentKey) -> Option<&SourceOuter> {
177 self.sources.get(id)
178 }
179
180 pub fn transforms(&self) -> impl Iterator<Item = (&ComponentKey, &TransformOuter<OutputId>)> {
181 self.transforms.iter()
182 }
183
184 pub fn transform(&self, id: &ComponentKey) -> Option<&TransformOuter<OutputId>> {
185 self.transforms.get(id)
186 }
187
188 pub fn sinks(&self) -> impl Iterator<Item = (&ComponentKey, &SinkOuter<OutputId>)> {
189 self.sinks.iter()
190 }
191
192 pub fn sink(&self, id: &ComponentKey) -> Option<&SinkOuter<OutputId>> {
193 self.sinks.get(id)
194 }
195
196 pub fn enrichment_tables(
197 &self,
198 ) -> impl Iterator<Item = (&ComponentKey, &EnrichmentTableOuter<OutputId>)> {
199 self.enrichment_tables.iter()
200 }
201
202 pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter<OutputId>> {
203 self.enrichment_tables.get(id)
204 }
205
206 pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> {
207 self.transforms
208 .get(id)
209 .map(|t| &t.inputs[..])
210 .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..]))
211 .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..]))
212 }
213
214 pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec<String>> {
215 let inputs: Vec<_> = self
216 .sinks
217 .iter()
218 .filter(|(_, sink)| {
219 sink.inner
220 .acknowledgements()
221 .merge_default(&self.global.acknowledgements)
222 .enabled()
223 })
224 .flat_map(|(name, sink)| {
225 sink.inputs
226 .iter()
227 .map(|input| (name.clone(), input.clone()))
228 })
229 .collect();
230 self.propagate_acks_rec(inputs);
231 Ok(())
232 }
233
234 fn propagate_acks_rec(&mut self, sink_inputs: Vec<(ComponentKey, OutputId)>) {
235 for (sink, input) in sink_inputs {
236 let component = &input.component;
237 if let Some(source) = self.sources.get_mut(component) {
238 if source.inner.can_acknowledge() {
239 source.sink_acknowledgements = true;
240 } else {
241 warn!(
242 message = "Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur.",
243 source = component.id(),
244 sink = sink.id(),
245 );
246 }
247 } else if let Some(transform) = self.transforms.get(component) {
248 let inputs = transform
249 .inputs
250 .iter()
251 .map(|input| (sink.clone(), input.clone()))
252 .collect();
253 self.propagate_acks_rec(inputs);
254 }
255 }
256 }
257
258 pub fn transform_keys_with_external_files(&self) -> HashSet<ComponentKey> {
259 self.transforms
260 .iter()
261 .filter_map(|(name, transform_outer)| {
262 if !transform_outer.inner.files_to_watch().is_empty() {
263 Some(name.clone())
264 } else {
265 None
266 }
267 })
268 .collect()
269 }
270}
271
272#[configurable_component]
274#[derive(Clone, Copy, Debug)]
275#[serde(default)]
276pub struct HealthcheckOptions {
277 pub enabled: bool,
281
282 pub require_healthy: bool,
288}
289
290impl HealthcheckOptions {
291 pub fn set_require_healthy(&mut self, require_healthy: impl Into<Option<bool>>) {
292 if let Some(require_healthy) = require_healthy.into() {
293 self.require_healthy = require_healthy;
294 }
295 }
296
297 const fn merge(&mut self, other: Self) {
298 self.enabled &= other.enabled;
299 self.require_healthy |= other.require_healthy;
300 }
301}
302
303impl Default for HealthcheckOptions {
304 fn default() -> Self {
305 Self {
306 enabled: true,
307 require_healthy: false,
308 }
309 }
310}
311
312impl_generate_config_from_default!(HealthcheckOptions);
313
314#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
316pub enum Resource {
317 Port(SocketAddr, Protocol),
318 SystemFdOffset(usize),
319 Fd(u32),
320 DiskBuffer(String),
321}
322
323#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
324pub enum Protocol {
325 Tcp,
326 Udp,
327}
328
329impl Resource {
330 pub const fn tcp(addr: SocketAddr) -> Self {
331 Self::Port(addr, Protocol::Tcp)
332 }
333
334 pub const fn udp(addr: SocketAddr) -> Self {
335 Self::Port(addr, Protocol::Udp)
336 }
337
338 pub fn conflicts<K: Eq + Hash + Clone>(
340 components: impl IntoIterator<Item = (K, Vec<Resource>)>,
341 ) -> HashMap<Resource, HashSet<K>> {
342 let mut resource_map = HashMap::<Resource, HashSet<K>>::new();
343 let mut unspecified = Vec::new();
344
345 for (key, resources) in components {
347 for resource in resources {
348 if let Resource::Port(address, protocol) = &resource
349 && address.ip().is_unspecified()
350 {
351 unspecified.push((key.clone(), *address, *protocol));
352 }
353
354 resource_map
355 .entry(resource)
356 .or_default()
357 .insert(key.clone());
358 }
359 }
360
361 for (key, address0, protocol0) in unspecified {
365 for (resource, components) in resource_map.iter_mut() {
366 if let Resource::Port(address, protocol) = resource {
367 if &address0 == address && &protocol0 == protocol {
371 components.insert(key.clone());
372 }
373 }
374 }
375 }
376
377 resource_map.retain(|_, components| components.len() > 1);
378
379 resource_map
380 }
381}
382
383impl Display for Protocol {
384 fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
385 match self {
386 Protocol::Udp => write!(fmt, "udp"),
387 Protocol::Tcp => write!(fmt, "tcp"),
388 }
389 }
390}
391
392impl Display for Resource {
393 fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
394 match self {
395 Resource::Port(address, protocol) => write!(fmt, "{protocol} {address}"),
396 Resource::SystemFdOffset(offset) => write!(fmt, "systemd {}th socket", offset + 1),
397 Resource::Fd(fd) => write!(fmt, "file descriptor: {fd}"),
398 Resource::DiskBuffer(name) => write!(fmt, "disk buffer {name:?}"),
399 }
400 }
401}
402
403#[configurable_component]
405#[derive(Clone, Debug)]
406#[serde(deny_unknown_fields)]
407pub struct TestDefinition<T: 'static = OutputId> {
408 pub name: String,
410
411 pub input: Option<TestInput>,
413
414 #[serde(default)]
416 pub inputs: Vec<TestInput>,
417
418 #[serde(default)]
420 pub outputs: Vec<TestOutput<T>>,
421
422 #[serde(default)]
424 pub no_outputs_from: Vec<T>,
425}
426
427impl TestDefinition<String> {
428 fn resolve_outputs(
429 self,
430 graph: &graph::Graph,
431 ) -> Result<TestDefinition<OutputId>, Vec<String>> {
432 let TestDefinition {
433 name,
434 input,
435 inputs,
436 outputs,
437 no_outputs_from,
438 } = self;
439 let mut errors = Vec::new();
440
441 let output_map = graph.input_map().expect("ambiguous outputs");
442
443 let outputs = outputs
444 .into_iter()
445 .map(|old| {
446 let TestOutput {
447 extract_from,
448 conditions,
449 expected_event_count,
450 } = old;
451
452 (extract_from.to_vec(), conditions, expected_event_count)
453 })
454 .filter_map(|(extract_from, conditions, expected_event_count)| {
455 let mut outputs = Vec::new();
456 for from in extract_from {
457 if no_outputs_from.contains(&from) {
458 errors.push(format!(
459 r#"Invalid extract_from target in test '{name}': '{from}' listed in no_outputs_from"#
460 ));
461 } else if let Some(output_id) = output_map.get(&from) {
462 outputs.push(output_id.clone());
463 } else {
464 errors.push(format!(
465 r#"Invalid extract_from target in test '{name}': '{from}' does not exist"#
466 ));
467 }
468 }
469 if outputs.is_empty() {
470 None
471 } else {
472 Some(TestOutput {
473 extract_from: outputs.into(),
474 conditions,
475 expected_event_count,
476 })
477 }
478 })
479 .collect();
480
481 let no_outputs_from = no_outputs_from
482 .into_iter()
483 .filter_map(|o| {
484 if let Some(output_id) = output_map.get(&o) {
485 Some(output_id.clone())
486 } else {
487 errors.push(format!(
488 r#"Invalid no_outputs_from target in test '{name}': '{o}' does not exist"#
489 ));
490 None
491 }
492 })
493 .collect();
494
495 if errors.is_empty() {
496 Ok(TestDefinition {
497 name,
498 input,
499 inputs,
500 outputs,
501 no_outputs_from,
502 })
503 } else {
504 Err(errors)
505 }
506 }
507}
508
509impl TestDefinition<OutputId> {
510 fn stringify(self) -> TestDefinition<String> {
511 let TestDefinition {
512 name,
513 input,
514 inputs,
515 outputs,
516 no_outputs_from,
517 } = self;
518
519 let outputs = outputs
520 .into_iter()
521 .map(|old| TestOutput {
522 extract_from: old
523 .extract_from
524 .to_vec()
525 .into_iter()
526 .map(|item| item.to_string())
527 .collect::<Vec<_>>()
528 .into(),
529 conditions: old.conditions,
530 expected_event_count: old.expected_event_count,
531 })
532 .collect();
533
534 let no_outputs_from = no_outputs_from.iter().map(ToString::to_string).collect();
535
536 TestDefinition {
537 name,
538 input,
539 inputs,
540 outputs,
541 no_outputs_from,
542 }
543 }
544}
545
546#[configurable_component]
551#[derive(Clone, Debug)]
552#[serde(deny_unknown_fields)]
553pub struct TestInput {
554 pub insert_at: ComponentKey,
556
557 #[serde(default = "default_test_input_type", rename = "type")]
561 pub type_str: String,
562
563 pub value: Option<String>,
568
569 pub source: Option<String>,
573
574 pub log_fields: Option<IndexMap<String, Value>>,
578
579 pub metric: Option<Metric>,
583}
584
585fn default_test_input_type() -> String {
586 "raw".to_string()
587}
588
589#[configurable_component]
594#[derive(Clone, Debug)]
595#[serde(deny_unknown_fields)]
596pub struct TestOutput<T: 'static = OutputId> {
597 pub extract_from: OneOrMany<T>,
599
600 pub conditions: Option<Vec<conditions::AnyCondition>>,
602
603 pub expected_event_count: Option<usize>,
611}
612
613#[cfg(all(test, feature = "sources-file", feature = "sinks-console"))]
614mod tests {
615 use std::path::PathBuf;
616
617 use indoc::indoc;
618
619 use super::{ComponentKey, ConfigDiff, Format, builder::ConfigBuilder, format, load_from_str};
620 use crate::{config, topology::builder::TopologyPiecesBuilder};
621
622 async fn load(config: &str, format: config::Format) -> Result<Vec<String>, Vec<String>> {
623 match config::load_from_str(config, format) {
624 Ok(c) => {
625 let diff = ConfigDiff::initial(&c);
626 let c2 = config::load_from_str(config, format).unwrap();
627 match (
628 config::warnings(&c2),
629 TopologyPiecesBuilder::new(&c, &diff).build().await,
630 ) {
631 (warnings, Ok(_pieces)) => Ok(warnings),
632 (_, Err(errors)) => Err(errors),
633 }
634 }
635 Err(error) => Err(error),
636 }
637 }
638
639 #[tokio::test]
640 async fn bad_inputs() {
641 let err = load(
642 r#"
643 [sources.in]
644 type = "test_basic"
645
646 [transforms.sample]
647 type = "test_basic"
648 inputs = []
649 suffix = "foo"
650 increase = 1.25
651
652 [transforms.sample2]
653 type = "test_basic"
654 inputs = ["qwerty"]
655 suffix = "foo"
656 increase = 1.25
657
658 [sinks.out]
659 type = "test_basic"
660 inputs = ["asdf", "in", "in"]
661 "#,
662 Format::Toml,
663 )
664 .await
665 .unwrap_err();
666
667 assert_eq!(
668 vec![
669 "Sink \"out\" has input \"in\" duplicated 2 times",
670 "Transform \"sample\" has no inputs",
671 "Input \"qwerty\" for transform \"sample2\" doesn't match any components.",
672 "Input \"asdf\" for sink \"out\" doesn't match any components.",
673 ],
674 err,
675 );
676 }
677
678 #[tokio::test]
679 async fn duplicate_name() {
680 let err = load(
681 r#"
682 [sources.foo]
683 type = "test_basic"
684
685 [sources.bar]
686 type = "test_basic"
687
688 [transforms.foo]
689 type = "test_basic"
690 inputs = ["bar"]
691 suffix = "foo"
692 increase = 1.25
693
694 [sinks.out]
695 type = "test_basic"
696 inputs = ["foo"]
697 "#,
698 Format::Toml,
699 )
700 .await
701 .unwrap_err();
702
703 assert_eq!(
704 err,
705 vec!["More than one component with name \"foo\" (source, transform).",]
706 );
707 }
708
709 #[tokio::test]
710 #[cfg(unix)]
711 async fn conflicting_stdin_and_fd_resources() {
712 let errors = load(
713 r#"
714 [sources.stdin]
715 type = "stdin"
716
717 [sources.file_descriptor]
718 type = "file_descriptor"
719 fd = 0
720
721 [sinks.out]
722 type = "test_basic"
723 inputs = ["stdin", "file_descriptor"]
724 "#,
725 Format::Toml,
726 )
727 .await
728 .unwrap_err();
729
730 assert_eq!(errors.len(), 1);
731 let expected_prefix = "Resource `file descriptor: 0` is claimed by multiple components:";
732 assert!(errors[0].starts_with(expected_prefix));
733 }
734
735 #[tokio::test]
736 #[cfg(unix)]
737 async fn conflicting_fd_resources() {
738 let errors = load(
739 r#"
740 [sources.file_descriptor1]
741 type = "file_descriptor"
742 fd = 10
743 [sources.file_descriptor2]
744 type = "file_descriptor"
745 fd = 10
746 [sinks.out]
747 type = "test_basic"
748 inputs = ["file_descriptor1", "file_descriptor2"]
749 "#,
750 Format::Toml,
751 )
752 .await
753 .unwrap_err();
754
755 assert_eq!(errors.len(), 1);
756 let expected_prefix = "Resource `file descriptor: 10` is claimed by multiple components:";
757 assert!(errors[0].starts_with(expected_prefix));
758 }
759
760 #[tokio::test]
761 #[cfg(all(unix, feature = "sources-file_descriptor"))]
762 async fn no_conflict_fd_resources() {
763 use crate::sources::file_descriptors::file_descriptor::null_fd;
764 let fd1 = null_fd().unwrap();
765 let fd2 = null_fd().unwrap();
766 let result = load(
767 &format!(
768 r#"
769 [sources.file_descriptor1]
770 type = "file_descriptor"
771 fd = {fd1}
772
773 [sources.file_descriptor2]
774 type = "file_descriptor"
775 fd = {fd2}
776
777 [sinks.out]
778 type = "test_basic"
779 inputs = ["file_descriptor1", "file_descriptor2"]
780 "#
781 ),
782 Format::Toml,
783 )
784 .await;
785
786 let expected = Ok(vec![]);
787 assert_eq!(result, expected);
788 }
789
790 #[tokio::test]
791 async fn warnings() {
792 let warnings = load(
793 r#"
794 [sources.in1]
795 type = "test_basic"
796
797 [sources.in2]
798 type = "test_basic"
799
800 [transforms.sample1]
801 type = "test_basic"
802 inputs = ["in1"]
803 suffix = "foo"
804 increase = 1.25
805
806 [transforms.sample2]
807 type = "test_basic"
808 inputs = ["in1"]
809 suffix = "foo"
810 increase = 1.25
811
812 [sinks.out]
813 type = "test_basic"
814 inputs = ["sample1"]
815 "#,
816 Format::Toml,
817 )
818 .await
819 .unwrap();
820
821 assert_eq!(
822 warnings,
823 vec![
824 "Transform \"sample2\" has no consumers",
825 "Source \"in2\" has no consumers",
826 ]
827 )
828 }
829
830 #[tokio::test]
831 async fn cycle() {
832 let errors = load(
833 r#"
834 [sources.in]
835 type = "test_basic"
836
837 [transforms.one]
838 type = "test_basic"
839 inputs = ["in"]
840 suffix = "foo"
841 increase = 1.25
842
843 [transforms.two]
844 type = "test_basic"
845 inputs = ["one", "four"]
846 suffix = "foo"
847 increase = 1.25
848
849 [transforms.three]
850 type = "test_basic"
851 inputs = ["two"]
852 suffix = "foo"
853 increase = 1.25
854
855 [transforms.four]
856 type = "test_basic"
857 inputs = ["three"]
858 suffix = "foo"
859 increase = 1.25
860
861 [sinks.out]
862 type = "test_basic"
863 inputs = ["four"]
864 "#,
865 Format::Toml,
866 )
867 .await
868 .unwrap_err();
869
870 assert_eq!(
871 errors,
872 vec!["Cyclic dependency detected in the chain [ four -> two -> three -> four ]"]
873 )
874 }
875
876 #[test]
877 fn default_data_dir() {
878 let config = load_from_str(
879 indoc! {r#"
880 [sources.in]
881 type = "test_basic"
882
883 [sinks.out]
884 type = "test_basic"
885 inputs = ["in"]
886 "#},
887 Format::Toml,
888 )
889 .unwrap();
890
891 assert_eq!(
892 Some(PathBuf::from("/var/lib/vector")),
893 config.global.data_dir
894 )
895 }
896
897 #[test]
898 fn default_schema() {
899 let config = load_from_str(
900 indoc! {r#"
901 [sources.in]
902 type = "test_basic"
903
904 [sinks.out]
905 type = "test_basic"
906 inputs = ["in"]
907 "#},
908 Format::Toml,
909 )
910 .unwrap();
911
912 assert_eq!(
913 "host",
914 config.global.log_schema.host_key().unwrap().to_string()
915 );
916 assert_eq!(
917 "message",
918 config.global.log_schema.message_key().unwrap().to_string()
919 );
920 assert_eq!(
921 "timestamp",
922 config
923 .global
924 .log_schema
925 .timestamp_key()
926 .unwrap()
927 .to_string()
928 );
929 }
930
931 #[test]
932 fn custom_schema() {
933 let config = load_from_str(
934 indoc! {r#"
935 [log_schema]
936 host_key = "this"
937 message_key = "that"
938 timestamp_key = "then"
939
940 [sources.in]
941 type = "test_basic"
942
943 [sinks.out]
944 type = "test_basic"
945 inputs = ["in"]
946 "#},
947 Format::Toml,
948 )
949 .unwrap();
950
951 assert_eq!(
952 "this",
953 config.global.log_schema.host_key().unwrap().to_string()
954 );
955 assert_eq!(
956 "that",
957 config.global.log_schema.message_key().unwrap().to_string()
958 );
959 assert_eq!(
960 "then",
961 config
962 .global
963 .log_schema
964 .timestamp_key()
965 .unwrap()
966 .to_string()
967 );
968 }
969
970 #[test]
971 fn config_append() {
972 let mut config: ConfigBuilder = format::deserialize(
973 indoc! {r#"
974 [sources.in]
975 type = "test_basic"
976
977 [sinks.out]
978 type = "test_basic"
979 inputs = ["in"]
980 "#},
981 Format::Toml,
982 )
983 .unwrap();
984
985 assert_eq!(
986 config.append(
987 format::deserialize(
988 indoc! {r#"
989 data_dir = "/foobar"
990
991 [proxy]
992 http = "http://proxy.inc:3128"
993
994 [transforms.foo]
995 type = "test_basic"
996 inputs = [ "in" ]
997 suffix = "foo"
998 increase = 1.25
999
1000 [[tests]]
1001 name = "check_simple_log"
1002 [tests.input]
1003 insert_at = "foo"
1004 type = "raw"
1005 value = "2019-11-28T12:00:00+00:00 info Sorry, I'm busy this week Cecil"
1006 [[tests.outputs]]
1007 extract_from = "foo"
1008 [[tests.outputs.conditions]]
1009 type = "vrl"
1010 source = ".message == \"Sorry, I'm busy this week Cecil\""
1011 "#},
1012 Format::Toml,
1013 )
1014 .unwrap()
1015 ),
1016 Ok(())
1017 );
1018
1019 assert!(config.global.proxy.http.is_some());
1020 assert!(config.global.proxy.https.is_none());
1021 assert_eq!(Some(PathBuf::from("/foobar")), config.global.data_dir);
1022 assert!(config.sources.contains_key(&ComponentKey::from("in")));
1023 assert!(config.sinks.contains_key(&ComponentKey::from("out")));
1024 assert!(config.transforms.contains_key(&ComponentKey::from("foo")));
1025 assert_eq!(config.tests.len(), 1);
1026 }
1027
1028 #[test]
1029 fn config_append_collisions() {
1030 let mut config: ConfigBuilder = format::deserialize(
1031 indoc! {r#"
1032 [sources.in]
1033 type = "test_basic"
1034
1035 [sinks.out]
1036 type = "test_basic"
1037 inputs = ["in"]
1038 "#},
1039 Format::Toml,
1040 )
1041 .unwrap();
1042
1043 assert_eq!(
1044 config.append(
1045 format::deserialize(
1046 indoc! {r#"
1047 [sources.in]
1048 type = "test_basic"
1049
1050 [transforms.foo]
1051 type = "test_basic"
1052 inputs = [ "in" ]
1053 suffix = "foo"
1054 increase = 1.25
1055
1056 [sinks.out]
1057 type = "test_basic"
1058 inputs = ["in"]
1059 "#},
1060 Format::Toml,
1061 )
1062 .unwrap()
1063 ),
1064 Err(vec![
1065 "duplicate source id found: in".into(),
1066 "duplicate sink id found: out".into(),
1067 ])
1068 );
1069 }
1070
1071 #[test]
1072 fn with_proxy() {
1073 let config: ConfigBuilder = format::deserialize(
1074 indoc! {r#"
1075 [proxy]
1076 http = "http://server:3128"
1077 https = "http://other:3128"
1078 no_proxy = ["localhost", "127.0.0.1"]
1079
1080 [sources.in]
1081 type = "nginx_metrics"
1082 endpoints = ["http://localhost:8000/basic_status"]
1083 proxy.http = "http://server:3128"
1084 proxy.https = "http://other:3128"
1085 proxy.no_proxy = ["localhost", "127.0.0.1"]
1086
1087 [sinks.out]
1088 type = "console"
1089 inputs = ["in"]
1090 encoding.codec = "json"
1091 "#},
1092 Format::Toml,
1093 )
1094 .unwrap();
1095 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1096 assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1097 assert!(config.global.proxy.no_proxy.matches("localhost"));
1098 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1099 assert_eq!(source.proxy.http, Some("http://server:3128".into()));
1100 assert_eq!(source.proxy.https, Some("http://other:3128".into()));
1101 assert!(source.proxy.no_proxy.matches("localhost"));
1102 }
1103
1104 #[test]
1105 fn with_partial_global_proxy() {
1106 let config: ConfigBuilder = format::deserialize(
1107 indoc! {r#"
1108 [proxy]
1109 http = "http://server:3128"
1110
1111 [sources.in]
1112 type = "nginx_metrics"
1113 endpoints = ["http://localhost:8000/basic_status"]
1114
1115 [sources.in.proxy]
1116 http = "http://server:3129"
1117 https = "http://other:3129"
1118 no_proxy = ["localhost", "127.0.0.1"]
1119
1120 [sinks.out]
1121 type = "console"
1122 inputs = ["in"]
1123 encoding.codec = "json"
1124 "#},
1125 Format::Toml,
1126 )
1127 .unwrap();
1128 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1129 assert_eq!(config.global.proxy.https, None);
1130 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1131 assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1132 assert_eq!(source.proxy.https, Some("http://other:3129".into()));
1133 assert!(source.proxy.no_proxy.matches("localhost"));
1134 }
1135
1136 #[test]
1137 fn with_partial_source_proxy() {
1138 let config: ConfigBuilder = format::deserialize(
1139 indoc! {r#"
1140 [proxy]
1141 http = "http://server:3128"
1142 https = "http://other:3128"
1143
1144 [sources.in]
1145 type = "nginx_metrics"
1146 endpoints = ["http://localhost:8000/basic_status"]
1147
1148 [sources.in.proxy]
1149 http = "http://server:3129"
1150 no_proxy = ["localhost", "127.0.0.1"]
1151
1152 [sinks.out]
1153 type = "console"
1154 inputs = ["in"]
1155 encoding.codec = "json"
1156 "#},
1157 Format::Toml,
1158 )
1159 .unwrap();
1160 assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1161 assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1162 let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1163 assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1164 assert_eq!(source.proxy.https, None);
1165 assert!(source.proxy.no_proxy.matches("localhost"));
1166 }
1167}
1168
1169#[cfg(all(test, feature = "sources-file", feature = "sinks-file"))]
1170mod acknowledgements_tests {
1171 use indoc::indoc;
1172
1173 use super::*;
1174
1175 #[test]
1176 fn propagates_settings() {
1177 let config: ConfigBuilder = format::deserialize(
1182 indoc! {r#"
1183 data_dir = "/tmp"
1184 [sources.in1]
1185 type = "file"
1186 include = ["/var/log/**/*.log"]
1187 [sources.in2]
1188 type = "file"
1189 include = ["/var/log/**/*.log"]
1190 [sources.in3]
1191 type = "file"
1192 include = ["/var/log/**/*.log"]
1193 [transforms.parse3]
1194 type = "test_basic"
1195 inputs = ["in3"]
1196 increase = 0.0
1197 suffix = ""
1198 [sinks.out1]
1199 type = "file"
1200 inputs = ["in1"]
1201 encoding.codec = "text"
1202 path = "/path/to/out1"
1203 [sinks.out2]
1204 type = "file"
1205 inputs = ["in2"]
1206 encoding.codec = "text"
1207 path = "/path/to/out2"
1208 acknowledgements = true
1209 [sinks.out3]
1210 type = "file"
1211 inputs = ["parse3"]
1212 encoding.codec = "text"
1213 path = "/path/to/out3"
1214 acknowledgements.enabled = true
1215 "#},
1216 Format::Toml,
1217 )
1218 .unwrap();
1219
1220 for source in config.sources.values() {
1221 assert!(
1222 !source.sink_acknowledgements,
1223 "Source `sink_acknowledgements` should be `false` before propagation"
1224 );
1225 }
1226
1227 let config = config.build().unwrap();
1228
1229 let get = |key: &str| config.sources.get(&ComponentKey::from(key)).unwrap();
1230 assert!(!get("in1").sink_acknowledgements);
1231 assert!(get("in2").sink_acknowledgements);
1232 assert!(get("in3").sink_acknowledgements);
1233 }
1234}
1235
1236#[cfg(test)]
1237mod resource_tests {
1238 use std::{
1239 collections::{HashMap, HashSet},
1240 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
1241 };
1242
1243 use proptest::prelude::*;
1244
1245 use super::Resource;
1246
1247 fn tcp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1248 Resource::tcp(SocketAddr::new(addr.into(), port))
1249 }
1250
1251 fn udp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1252 Resource::udp(SocketAddr::new(addr.into(), port))
1253 }
1254
1255 fn unspecified() -> impl Strategy<Value = IpAddr> {
1256 prop_oneof![
1257 Just(Ipv4Addr::UNSPECIFIED.into()),
1258 Just(Ipv6Addr::UNSPECIFIED.into()),
1259 ]
1260 }
1261
1262 fn specaddr() -> impl Strategy<Value = IpAddr> {
1263 any::<IpAddr>().prop_filter("Must be specific address", |addr| !addr.is_unspecified())
1264 }
1265
1266 fn specport() -> impl Strategy<Value = u16> {
1267 any::<u16>().prop_filter("Must be specific port", |&port| port > 0)
1268 }
1269
1270 fn hashmap(conflicts: Vec<(Resource, Vec<&str>)>) -> HashMap<Resource, HashSet<&str>> {
1271 conflicts
1272 .into_iter()
1273 .map(|(key, values)| (key, values.into_iter().collect()))
1274 .collect()
1275 }
1276
1277 proptest! {
1278 #[test]
1279 fn valid(addr: IpAddr, port1 in specport(), port2 in specport()) {
1280 prop_assume!(port1 != port2);
1281 let components = vec![
1282 ("sink_0", vec![tcp(addr, 0)]),
1283 ("sink_1", vec![tcp(addr, port1)]),
1284 ("sink_2", vec![tcp(addr, port2)]),
1285 ];
1286 let conflicting = Resource::conflicts(components);
1287 assert_eq!(conflicting, HashMap::new());
1288 }
1289
1290 #[test]
1291 fn conflicting_pair(addr: IpAddr, port in specport()) {
1292 let components = vec![
1293 ("sink_0", vec![tcp(addr, 0)]),
1294 ("sink_1", vec![tcp(addr, port)]),
1295 ("sink_2", vec![tcp(addr, port)]),
1296 ];
1297 let conflicting = Resource::conflicts(components);
1298 assert_eq!(
1299 conflicting,
1300 hashmap(vec![(tcp(addr, port), vec!["sink_1", "sink_2"])])
1301 );
1302 }
1303
1304 #[test]
1305 fn conflicting_multi(addr: IpAddr, port in specport()) {
1306 let components = vec![
1307 ("sink_0", vec![tcp(addr, 0)]),
1308 ("sink_1", vec![tcp(addr, port), tcp(addr, 0)]),
1309 ("sink_2", vec![tcp(addr, port)]),
1310 ];
1311 let conflicting = Resource::conflicts(components);
1312 assert_eq!(
1313 conflicting,
1314 hashmap(vec![
1315 (tcp(addr, 0), vec!["sink_0", "sink_1"]),
1316 (tcp(addr, port), vec!["sink_1", "sink_2"])
1317 ])
1318 );
1319 }
1320
1321 #[test]
1322 fn different_network_interface(addr1: IpAddr, addr2: IpAddr, port: u16) {
1323 prop_assume!(addr1 != addr2);
1324 let components = vec![
1325 ("sink_0", vec![tcp(addr1, port)]),
1326 ("sink_1", vec![tcp(addr2, port)]),
1327 ];
1328 let conflicting = Resource::conflicts(components);
1329 assert_eq!(conflicting, HashMap::new());
1330 }
1331
1332 #[test]
1333 fn unspecified_network_interface(addr in specaddr(), unspec in unspecified(), port: u16) {
1334 let components = vec![
1335 ("sink_0", vec![tcp(addr, port)]),
1336 ("sink_1", vec![tcp(unspec, port)]),
1337 ];
1338 let conflicting = Resource::conflicts(components);
1339 assert_eq!(conflicting, HashMap::new());
1340 }
1341
1342 #[test]
1343 fn different_protocol(addr: IpAddr) {
1344 let components = vec![
1345 ("sink_0", vec![tcp(addr, 0)]),
1346 ("sink_1", vec![udp(addr, 0)]),
1347 ];
1348 let conflicting = Resource::conflicts(components);
1349 assert_eq!(conflicting, HashMap::new());
1350 }
1351 }
1352
1353 #[test]
1354 fn different_unspecified_ip_version() {
1355 let components = vec![
1356 ("sink_0", vec![tcp(Ipv4Addr::UNSPECIFIED, 0)]),
1357 ("sink_1", vec![tcp(Ipv6Addr::UNSPECIFIED, 0)]),
1358 ];
1359 let conflicting = Resource::conflicts(components);
1360 assert_eq!(conflicting, HashMap::new());
1361 }
1362}
1363
1364#[cfg(all(test, feature = "sources-stdin", feature = "sinks-console"))]
1365mod resource_config_tests {
1366 use indoc::indoc;
1367 use vector_lib::configurable::schema::generate_root_schema;
1368
1369 use super::{Format, load_from_str};
1370
1371 #[test]
1372 fn config_conflict_detected() {
1373 assert!(
1374 load_from_str(
1375 indoc! {r#"
1376 [sources.in0]
1377 type = "stdin"
1378
1379 [sources.in1]
1380 type = "stdin"
1381
1382 [sinks.out]
1383 type = "console"
1384 inputs = ["in0","in1"]
1385 encoding.codec = "json"
1386 "#},
1387 Format::Toml,
1388 )
1389 .is_err()
1390 );
1391 }
1392
1393 #[test]
1394 #[ignore]
1395 #[allow(clippy::print_stdout)]
1396 #[allow(clippy::print_stderr)]
1397 fn generate_component_config_schema() {
1398 use indexmap::IndexMap;
1399 use vector_lib::{config::ComponentKey, configurable::configurable_component};
1400
1401 use crate::config::{SinkOuter, SourceOuter, TransformOuter};
1402
1403 #[configurable_component]
1405 #[derive(Clone)]
1406 struct ComponentsOnlyConfig {
1407 #[serde(default)]
1409 pub sources: IndexMap<ComponentKey, SourceOuter>,
1410
1411 #[serde(default)]
1413 pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
1414
1415 #[serde(default)]
1417 pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
1418 }
1419
1420 match generate_root_schema::<ComponentsOnlyConfig>() {
1421 Ok(schema) => {
1422 let json = serde_json::to_string_pretty(&schema)
1423 .expect("rendering root schema to JSON should not fail");
1424
1425 println!("{json}");
1426 }
1427 Err(e) => eprintln!("error while generating schema: {e:?}"),
1428 }
1429 }
1430}