vector/config/
mod.rs

1#![allow(missing_docs)]
2use std::{
3    collections::{HashMap, HashSet},
4    fmt::{self, Display, Formatter},
5    fs,
6    hash::Hash,
7    net::SocketAddr,
8    path::PathBuf,
9    time::Duration,
10};
11
12use indexmap::IndexMap;
13use serde::Serialize;
14use vector_config::configurable_component;
15pub use vector_lib::{
16    config::{
17        AcknowledgementsConfig, DataType, GlobalOptions, Input, LogNamespace,
18        SourceAcknowledgementsConfig, SourceOutput, TransformOutput, WildcardMatching,
19    },
20    configurable::component::{GenerateConfig, SinkDescription, TransformDescription},
21};
22
23use crate::{
24    conditions,
25    event::{Metric, Value},
26    secrets::SecretBackends,
27    serde::OneOrMany,
28};
29
30pub mod api;
31mod builder;
32mod cmd;
33mod compiler;
34mod diff;
35pub mod dot_graph;
36mod enrichment_table;
37pub mod format;
38mod graph;
39pub mod loading;
40pub mod provider;
41pub mod schema;
42mod secret;
43mod sink;
44mod source;
45mod transform;
46pub mod unit_test;
47mod validation;
48mod vars;
49pub mod watcher;
50
51pub use builder::ConfigBuilder;
52pub use cmd::{Opts, cmd};
53pub use diff::ConfigDiff;
54pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
55pub use format::{Format, FormatHint};
56pub use loading::{
57    COLLECTOR, CONFIG_PATHS, load, load_from_paths, load_from_paths_with_provider_and_secrets,
58    load_from_str, load_from_str_with_secrets, load_source_from_paths, merge_path_lists,
59    process_paths,
60};
61pub use provider::ProviderConfig;
62pub use secret::SecretBackend;
63pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
64pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
65pub use transform::{
66    BoxedTransform, TransformConfig, TransformContext, TransformOuter, get_transform_output_ids,
67};
68pub use unit_test::{UnitTestResult, build_unit_tests, build_unit_tests_main};
69pub use validation::warnings;
70pub use vars::{ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX, interpolate};
71pub use vector_lib::{
72    config::{
73        ComponentKey, LogSchema, OutputId, init_log_schema, init_telemetry, log_schema,
74        proxy::ProxyConfig, telemetry,
75    },
76    id::Inputs,
77};
78
79#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
80// // This is not a comprehensive set; variants are added as needed.
81pub enum ComponentType {
82    Transform,
83    Sink,
84    EnrichmentTable,
85}
86
87#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
88pub struct ComponentConfig {
89    pub config_paths: Vec<PathBuf>,
90    pub component_key: ComponentKey,
91    pub component_type: ComponentType,
92}
93
94impl ComponentConfig {
95    pub fn new(
96        config_paths: Vec<PathBuf>,
97        component_key: ComponentKey,
98        component_type: ComponentType,
99    ) -> Self {
100        let canonicalized_paths = config_paths
101            .into_iter()
102            .filter_map(|p| fs::canonicalize(p).ok())
103            .collect();
104
105        Self {
106            config_paths: canonicalized_paths,
107            component_key,
108            component_type,
109        }
110    }
111
112    pub fn contains(
113        &self,
114        config_paths: &HashSet<PathBuf>,
115    ) -> Option<(ComponentKey, ComponentType)> {
116        if config_paths.iter().any(|p| self.config_paths.contains(p)) {
117            return Some((self.component_key.clone(), self.component_type.clone()));
118        }
119        None
120    }
121}
122
123#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
124pub enum ConfigPath {
125    File(PathBuf, FormatHint),
126    Dir(PathBuf),
127}
128
129impl<'a> From<&'a ConfigPath> for &'a PathBuf {
130    fn from(config_path: &'a ConfigPath) -> &'a PathBuf {
131        match config_path {
132            ConfigPath::File(path, _) => path,
133            ConfigPath::Dir(path) => path,
134        }
135    }
136}
137
138impl ConfigPath {
139    pub const fn as_dir(&self) -> Option<&PathBuf> {
140        match self {
141            Self::Dir(path) => Some(path),
142            _ => None,
143        }
144    }
145}
146
147#[derive(Debug, Default, Serialize)]
148pub struct Config {
149    #[cfg(feature = "api")]
150    pub api: api::Options,
151    pub schema: schema::Options,
152    pub global: GlobalOptions,
153    pub healthchecks: HealthcheckOptions,
154    sources: IndexMap<ComponentKey, SourceOuter>,
155    sinks: IndexMap<ComponentKey, SinkOuter<OutputId>>,
156    transforms: IndexMap<ComponentKey, TransformOuter<OutputId>>,
157    pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<OutputId>>,
158    tests: Vec<TestDefinition>,
159    secret: IndexMap<ComponentKey, SecretBackends>,
160    pub graceful_shutdown_duration: Option<Duration>,
161}
162
163impl Config {
164    pub fn builder() -> builder::ConfigBuilder {
165        Default::default()
166    }
167
168    pub fn is_empty(&self) -> bool {
169        self.sources.is_empty()
170    }
171
172    pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
173        self.sources.iter()
174    }
175
176    pub fn source(&self, id: &ComponentKey) -> Option<&SourceOuter> {
177        self.sources.get(id)
178    }
179
180    pub fn transforms(&self) -> impl Iterator<Item = (&ComponentKey, &TransformOuter<OutputId>)> {
181        self.transforms.iter()
182    }
183
184    pub fn transform(&self, id: &ComponentKey) -> Option<&TransformOuter<OutputId>> {
185        self.transforms.get(id)
186    }
187
188    pub fn sinks(&self) -> impl Iterator<Item = (&ComponentKey, &SinkOuter<OutputId>)> {
189        self.sinks.iter()
190    }
191
192    pub fn sink(&self, id: &ComponentKey) -> Option<&SinkOuter<OutputId>> {
193        self.sinks.get(id)
194    }
195
196    pub fn enrichment_tables(
197        &self,
198    ) -> impl Iterator<Item = (&ComponentKey, &EnrichmentTableOuter<OutputId>)> {
199        self.enrichment_tables.iter()
200    }
201
202    pub fn enrichment_table(&self, id: &ComponentKey) -> Option<&EnrichmentTableOuter<OutputId>> {
203        self.enrichment_tables.get(id)
204    }
205
206    pub fn inputs_for_node(&self, id: &ComponentKey) -> Option<&[OutputId]> {
207        self.transforms
208            .get(id)
209            .map(|t| &t.inputs[..])
210            .or_else(|| self.sinks.get(id).map(|s| &s.inputs[..]))
211            .or_else(|| self.enrichment_tables.get(id).map(|s| &s.inputs[..]))
212    }
213
214    pub fn propagate_acknowledgements(&mut self) -> Result<(), Vec<String>> {
215        let inputs: Vec<_> = self
216            .sinks
217            .iter()
218            .filter(|(_, sink)| {
219                sink.inner
220                    .acknowledgements()
221                    .merge_default(&self.global.acknowledgements)
222                    .enabled()
223            })
224            .flat_map(|(name, sink)| {
225                sink.inputs
226                    .iter()
227                    .map(|input| (name.clone(), input.clone()))
228            })
229            .collect();
230        self.propagate_acks_rec(inputs);
231        Ok(())
232    }
233
234    fn propagate_acks_rec(&mut self, sink_inputs: Vec<(ComponentKey, OutputId)>) {
235        for (sink, input) in sink_inputs {
236            let component = &input.component;
237            if let Some(source) = self.sources.get_mut(component) {
238                if source.inner.can_acknowledge() {
239                    source.sink_acknowledgements = true;
240                } else {
241                    warn!(
242                        message = "Source has acknowledgements enabled by a sink, but acknowledgements are not supported by this source. Silent data loss could occur.",
243                        source = component.id(),
244                        sink = sink.id(),
245                    );
246                }
247            } else if let Some(transform) = self.transforms.get(component) {
248                let inputs = transform
249                    .inputs
250                    .iter()
251                    .map(|input| (sink.clone(), input.clone()))
252                    .collect();
253                self.propagate_acks_rec(inputs);
254            }
255        }
256    }
257
258    pub fn transform_keys_with_external_files(&self) -> HashSet<ComponentKey> {
259        self.transforms
260            .iter()
261            .filter_map(|(name, transform_outer)| {
262                if !transform_outer.inner.files_to_watch().is_empty() {
263                    Some(name.clone())
264                } else {
265                    None
266                }
267            })
268            .collect()
269    }
270}
271
272/// Healthcheck options.
273#[configurable_component]
274#[derive(Clone, Copy, Debug)]
275#[serde(default)]
276pub struct HealthcheckOptions {
277    /// Whether or not healthchecks are enabled for all sinks.
278    ///
279    /// Can be overridden on a per-sink basis.
280    pub enabled: bool,
281
282    /// Whether or not to require a sink to report as being healthy during startup.
283    ///
284    /// When enabled and a sink reports not being healthy, Vector will exit during start-up.
285    ///
286    /// Can be alternatively set, and overridden by, the `--require-healthy` command-line flag.
287    pub require_healthy: bool,
288}
289
290impl HealthcheckOptions {
291    pub fn set_require_healthy(&mut self, require_healthy: impl Into<Option<bool>>) {
292        if let Some(require_healthy) = require_healthy.into() {
293            self.require_healthy = require_healthy;
294        }
295    }
296
297    const fn merge(&mut self, other: Self) {
298        self.enabled &= other.enabled;
299        self.require_healthy |= other.require_healthy;
300    }
301}
302
303impl Default for HealthcheckOptions {
304    fn default() -> Self {
305        Self {
306            enabled: true,
307            require_healthy: false,
308        }
309    }
310}
311
312impl_generate_config_from_default!(HealthcheckOptions);
313
314/// Unique thing, like port, of which only one owner can be.
315#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
316pub enum Resource {
317    Port(SocketAddr, Protocol),
318    SystemFdOffset(usize),
319    Fd(u32),
320    DiskBuffer(String),
321}
322
323#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Copy)]
324pub enum Protocol {
325    Tcp,
326    Udp,
327}
328
329impl Resource {
330    pub const fn tcp(addr: SocketAddr) -> Self {
331        Self::Port(addr, Protocol::Tcp)
332    }
333
334    pub const fn udp(addr: SocketAddr) -> Self {
335        Self::Port(addr, Protocol::Udp)
336    }
337
338    /// From given components returns all that have a resource conflict with any other component.
339    pub fn conflicts<K: Eq + Hash + Clone>(
340        components: impl IntoIterator<Item = (K, Vec<Resource>)>,
341    ) -> HashMap<Resource, HashSet<K>> {
342        let mut resource_map = HashMap::<Resource, HashSet<K>>::new();
343        let mut unspecified = Vec::new();
344
345        // Find equality based conflicts
346        for (key, resources) in components {
347            for resource in resources {
348                if let Resource::Port(address, protocol) = &resource
349                    && address.ip().is_unspecified()
350                {
351                    unspecified.push((key.clone(), *address, *protocol));
352                }
353
354                resource_map
355                    .entry(resource)
356                    .or_default()
357                    .insert(key.clone());
358            }
359        }
360
361        // Port with unspecified address will bind to all network interfaces
362        // so we have to check for all Port resources if they share the same
363        // port.
364        for (key, address0, protocol0) in unspecified {
365            for (resource, components) in resource_map.iter_mut() {
366                if let Resource::Port(address, protocol) = resource {
367                    // IP addresses can either be v4 or v6.
368                    // Therefore we check if the ip version matches, the port matches and if the protocol (TCP/UDP) matches
369                    // when checking for equality.
370                    if &address0 == address && &protocol0 == protocol {
371                        components.insert(key.clone());
372                    }
373                }
374            }
375        }
376
377        resource_map.retain(|_, components| components.len() > 1);
378
379        resource_map
380    }
381}
382
383impl Display for Protocol {
384    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
385        match self {
386            Protocol::Udp => write!(fmt, "udp"),
387            Protocol::Tcp => write!(fmt, "tcp"),
388        }
389    }
390}
391
392impl Display for Resource {
393    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
394        match self {
395            Resource::Port(address, protocol) => write!(fmt, "{protocol} {address}"),
396            Resource::SystemFdOffset(offset) => write!(fmt, "systemd {}th socket", offset + 1),
397            Resource::Fd(fd) => write!(fmt, "file descriptor: {fd}"),
398            Resource::DiskBuffer(name) => write!(fmt, "disk buffer {name:?}"),
399        }
400    }
401}
402
403/// A unit test definition.
404#[configurable_component]
405#[derive(Clone, Debug)]
406#[serde(deny_unknown_fields)]
407pub struct TestDefinition<T: 'static = OutputId> {
408    /// The name of the unit test.
409    pub name: String,
410
411    /// An input event to test against.
412    pub input: Option<TestInput>,
413
414    /// A set of input events to test against.
415    #[serde(default)]
416    pub inputs: Vec<TestInput>,
417
418    /// A set of expected output events after the test has run.
419    #[serde(default)]
420    pub outputs: Vec<TestOutput<T>>,
421
422    /// A set of component outputs that should not have emitted any events.
423    #[serde(default)]
424    pub no_outputs_from: Vec<T>,
425}
426
427impl TestDefinition<String> {
428    fn resolve_outputs(
429        self,
430        graph: &graph::Graph,
431    ) -> Result<TestDefinition<OutputId>, Vec<String>> {
432        let TestDefinition {
433            name,
434            input,
435            inputs,
436            outputs,
437            no_outputs_from,
438        } = self;
439        let mut errors = Vec::new();
440
441        let output_map = graph.input_map().expect("ambiguous outputs");
442
443        let outputs = outputs
444            .into_iter()
445            .map(|old| {
446                let TestOutput {
447                    extract_from,
448                    conditions,
449                    expected_event_count,
450                } = old;
451
452                (extract_from.to_vec(), conditions, expected_event_count)
453            })
454            .filter_map(|(extract_from, conditions, expected_event_count)| {
455                let mut outputs = Vec::new();
456                for from in extract_from {
457                    if no_outputs_from.contains(&from) {
458                        errors.push(format!(
459                            r#"Invalid extract_from target in test '{name}': '{from}' listed in no_outputs_from"#
460                        ));
461                    } else if let Some(output_id) = output_map.get(&from) {
462                        outputs.push(output_id.clone());
463                    } else {
464                        errors.push(format!(
465                            r#"Invalid extract_from target in test '{name}': '{from}' does not exist"#
466                        ));
467                    }
468                }
469                if outputs.is_empty() {
470                    None
471                } else {
472                    Some(TestOutput {
473                        extract_from: outputs.into(),
474                        conditions,
475                        expected_event_count,
476                    })
477                }
478            })
479            .collect();
480
481        let no_outputs_from = no_outputs_from
482            .into_iter()
483            .filter_map(|o| {
484                if let Some(output_id) = output_map.get(&o) {
485                    Some(output_id.clone())
486                } else {
487                    errors.push(format!(
488                        r#"Invalid no_outputs_from target in test '{name}': '{o}' does not exist"#
489                    ));
490                    None
491                }
492            })
493            .collect();
494
495        if errors.is_empty() {
496            Ok(TestDefinition {
497                name,
498                input,
499                inputs,
500                outputs,
501                no_outputs_from,
502            })
503        } else {
504            Err(errors)
505        }
506    }
507}
508
509impl TestDefinition<OutputId> {
510    fn stringify(self) -> TestDefinition<String> {
511        let TestDefinition {
512            name,
513            input,
514            inputs,
515            outputs,
516            no_outputs_from,
517        } = self;
518
519        let outputs = outputs
520            .into_iter()
521            .map(|old| TestOutput {
522                extract_from: old
523                    .extract_from
524                    .to_vec()
525                    .into_iter()
526                    .map(|item| item.to_string())
527                    .collect::<Vec<_>>()
528                    .into(),
529                conditions: old.conditions,
530                expected_event_count: old.expected_event_count,
531            })
532            .collect();
533
534        let no_outputs_from = no_outputs_from.iter().map(ToString::to_string).collect();
535
536        TestDefinition {
537            name,
538            input,
539            inputs,
540            outputs,
541            no_outputs_from,
542        }
543    }
544}
545
546/// A unit test input.
547///
548/// An input describes not only the type of event to insert, but also which transform within the
549/// configuration to insert it to.
550#[configurable_component]
551#[derive(Clone, Debug)]
552#[serde(deny_unknown_fields)]
553pub struct TestInput {
554    /// The name of the transform to insert the input event to.
555    pub insert_at: ComponentKey,
556
557    /// The type of the input event.
558    ///
559    /// Can be either `raw`, `vrl`, `log`, or `metric.
560    #[serde(default = "default_test_input_type", rename = "type")]
561    pub type_str: String,
562
563    /// The raw string value to use as the input event.
564    ///
565    /// Use this only when the input event should be a raw event (i.e. unprocessed/undecoded log
566    /// event) and when the input type is set to `raw`.
567    pub value: Option<String>,
568
569    /// The vrl expression to generate the input event.
570    ///
571    /// Only relevant when `type` is `vrl`.
572    pub source: Option<String>,
573
574    /// The set of log fields to use when creating a log input event.
575    ///
576    /// Only relevant when `type` is `log`.
577    pub log_fields: Option<IndexMap<String, Value>>,
578
579    /// The metric to use as an input event.
580    ///
581    /// Only relevant when `type` is `metric`.
582    pub metric: Option<Metric>,
583}
584
585fn default_test_input_type() -> String {
586    "raw".to_string()
587}
588
589/// A unit test output.
590///
591/// An output describes what we expect a transform to emit when fed a certain event, or events, when
592/// running a unit test.
593#[configurable_component]
594#[derive(Clone, Debug)]
595#[serde(deny_unknown_fields)]
596pub struct TestOutput<T: 'static = OutputId> {
597    /// The transform outputs to extract events from.
598    pub extract_from: OneOrMany<T>,
599
600    /// The conditions to run against the output to validate that they were transformed as expected.
601    pub conditions: Option<Vec<conditions::AnyCondition>>,
602
603    /// The expected number of events to be produced by the transform.
604    ///
605    /// If specified, the test will fail if the number of events emitted by the
606    /// transform does not match this value. This check is independent of
607    /// `conditions` -- the count is verified first, then each condition is
608    /// evaluated against the output events separately. This is useful for
609    /// transforms that may emit multiple events.
610    pub expected_event_count: Option<usize>,
611}
612
613#[cfg(all(test, feature = "sources-file", feature = "sinks-console"))]
614mod tests {
615    use std::path::PathBuf;
616
617    use indoc::indoc;
618
619    use super::{ComponentKey, ConfigDiff, Format, builder::ConfigBuilder, format, load_from_str};
620    use crate::{config, topology::builder::TopologyPiecesBuilder};
621
622    async fn load(config: &str, format: config::Format) -> Result<Vec<String>, Vec<String>> {
623        match config::load_from_str(config, format) {
624            Ok(c) => {
625                let diff = ConfigDiff::initial(&c);
626                let c2 = config::load_from_str(config, format).unwrap();
627                match (
628                    config::warnings(&c2),
629                    TopologyPiecesBuilder::new(&c, &diff).build().await,
630                ) {
631                    (warnings, Ok(_pieces)) => Ok(warnings),
632                    (_, Err(errors)) => Err(errors),
633                }
634            }
635            Err(error) => Err(error),
636        }
637    }
638
639    #[tokio::test]
640    async fn bad_inputs() {
641        let err = load(
642            r#"
643            [sources.in]
644            type = "test_basic"
645
646            [transforms.sample]
647            type = "test_basic"
648            inputs = []
649            suffix = "foo"
650            increase = 1.25
651
652            [transforms.sample2]
653            type = "test_basic"
654            inputs = ["qwerty"]
655            suffix = "foo"
656            increase = 1.25
657
658            [sinks.out]
659            type = "test_basic"
660            inputs = ["asdf", "in", "in"]
661            "#,
662            Format::Toml,
663        )
664        .await
665        .unwrap_err();
666
667        assert_eq!(
668            vec![
669                "Sink \"out\" has input \"in\" duplicated 2 times",
670                "Transform \"sample\" has no inputs",
671                "Input \"qwerty\" for transform \"sample2\" doesn't match any components.",
672                "Input \"asdf\" for sink \"out\" doesn't match any components.",
673            ],
674            err,
675        );
676    }
677
678    #[tokio::test]
679    async fn duplicate_name() {
680        let err = load(
681            r#"
682            [sources.foo]
683            type = "test_basic"
684
685            [sources.bar]
686            type = "test_basic"
687
688            [transforms.foo]
689            type = "test_basic"
690            inputs = ["bar"]
691            suffix = "foo"
692            increase = 1.25
693
694            [sinks.out]
695            type = "test_basic"
696            inputs = ["foo"]
697            "#,
698            Format::Toml,
699        )
700        .await
701        .unwrap_err();
702
703        assert_eq!(
704            err,
705            vec!["More than one component with name \"foo\" (source, transform).",]
706        );
707    }
708
709    #[tokio::test]
710    #[cfg(unix)]
711    async fn conflicting_stdin_and_fd_resources() {
712        let errors = load(
713            r#"
714            [sources.stdin]
715            type = "stdin"
716
717            [sources.file_descriptor]
718            type = "file_descriptor"
719            fd = 0
720
721            [sinks.out]
722            type = "test_basic"
723            inputs = ["stdin", "file_descriptor"]
724            "#,
725            Format::Toml,
726        )
727        .await
728        .unwrap_err();
729
730        assert_eq!(errors.len(), 1);
731        let expected_prefix = "Resource `file descriptor: 0` is claimed by multiple components:";
732        assert!(errors[0].starts_with(expected_prefix));
733    }
734
735    #[tokio::test]
736    #[cfg(unix)]
737    async fn conflicting_fd_resources() {
738        let errors = load(
739            r#"
740            [sources.file_descriptor1]
741            type = "file_descriptor"
742            fd = 10
743            [sources.file_descriptor2]
744            type = "file_descriptor"
745            fd = 10
746            [sinks.out]
747            type = "test_basic"
748            inputs = ["file_descriptor1", "file_descriptor2"]
749            "#,
750            Format::Toml,
751        )
752        .await
753        .unwrap_err();
754
755        assert_eq!(errors.len(), 1);
756        let expected_prefix = "Resource `file descriptor: 10` is claimed by multiple components:";
757        assert!(errors[0].starts_with(expected_prefix));
758    }
759
760    #[tokio::test]
761    #[cfg(all(unix, feature = "sources-file_descriptor"))]
762    async fn no_conflict_fd_resources() {
763        use crate::sources::file_descriptors::file_descriptor::null_fd;
764        let fd1 = null_fd().unwrap();
765        let fd2 = null_fd().unwrap();
766        let result = load(
767            &format!(
768                r#"
769            [sources.file_descriptor1]
770            type = "file_descriptor"
771            fd = {fd1}
772
773            [sources.file_descriptor2]
774            type = "file_descriptor"
775            fd = {fd2}
776
777            [sinks.out]
778            type = "test_basic"
779            inputs = ["file_descriptor1", "file_descriptor2"]
780            "#
781            ),
782            Format::Toml,
783        )
784        .await;
785
786        let expected = Ok(vec![]);
787        assert_eq!(result, expected);
788    }
789
790    #[tokio::test]
791    async fn warnings() {
792        let warnings = load(
793            r#"
794            [sources.in1]
795            type = "test_basic"
796
797            [sources.in2]
798            type = "test_basic"
799
800            [transforms.sample1]
801            type = "test_basic"
802            inputs = ["in1"]
803            suffix = "foo"
804            increase = 1.25
805
806            [transforms.sample2]
807            type = "test_basic"
808            inputs = ["in1"]
809            suffix = "foo"
810            increase = 1.25
811
812            [sinks.out]
813            type = "test_basic"
814            inputs = ["sample1"]
815            "#,
816            Format::Toml,
817        )
818        .await
819        .unwrap();
820
821        assert_eq!(
822            warnings,
823            vec![
824                "Transform \"sample2\" has no consumers",
825                "Source \"in2\" has no consumers",
826            ]
827        )
828    }
829
830    #[tokio::test]
831    async fn cycle() {
832        let errors = load(
833            r#"
834            [sources.in]
835            type = "test_basic"
836
837            [transforms.one]
838            type = "test_basic"
839            inputs = ["in"]
840            suffix = "foo"
841            increase = 1.25
842
843            [transforms.two]
844            type = "test_basic"
845            inputs = ["one", "four"]
846            suffix = "foo"
847            increase = 1.25
848
849            [transforms.three]
850            type = "test_basic"
851            inputs = ["two"]
852            suffix = "foo"
853            increase = 1.25
854
855            [transforms.four]
856            type = "test_basic"
857            inputs = ["three"]
858            suffix = "foo"
859            increase = 1.25
860
861            [sinks.out]
862            type = "test_basic"
863            inputs = ["four"]
864            "#,
865            Format::Toml,
866        )
867        .await
868        .unwrap_err();
869
870        assert_eq!(
871            errors,
872            vec!["Cyclic dependency detected in the chain [ four -> two -> three -> four ]"]
873        )
874    }
875
876    #[test]
877    fn default_data_dir() {
878        let config = load_from_str(
879            indoc! {r#"
880                [sources.in]
881                type = "test_basic"
882
883                [sinks.out]
884                type = "test_basic"
885                inputs = ["in"]
886            "#},
887            Format::Toml,
888        )
889        .unwrap();
890
891        assert_eq!(
892            Some(PathBuf::from("/var/lib/vector")),
893            config.global.data_dir
894        )
895    }
896
897    #[test]
898    fn default_schema() {
899        let config = load_from_str(
900            indoc! {r#"
901            [sources.in]
902            type = "test_basic"
903
904            [sinks.out]
905            type = "test_basic"
906            inputs = ["in"]
907            "#},
908            Format::Toml,
909        )
910        .unwrap();
911
912        assert_eq!(
913            "host",
914            config.global.log_schema.host_key().unwrap().to_string()
915        );
916        assert_eq!(
917            "message",
918            config.global.log_schema.message_key().unwrap().to_string()
919        );
920        assert_eq!(
921            "timestamp",
922            config
923                .global
924                .log_schema
925                .timestamp_key()
926                .unwrap()
927                .to_string()
928        );
929    }
930
931    #[test]
932    fn custom_schema() {
933        let config = load_from_str(
934            indoc! {r#"
935                [log_schema]
936                  host_key = "this"
937                  message_key = "that"
938                  timestamp_key = "then"
939
940                [sources.in]
941                  type = "test_basic"
942
943                [sinks.out]
944                  type = "test_basic"
945                  inputs = ["in"]
946            "#},
947            Format::Toml,
948        )
949        .unwrap();
950
951        assert_eq!(
952            "this",
953            config.global.log_schema.host_key().unwrap().to_string()
954        );
955        assert_eq!(
956            "that",
957            config.global.log_schema.message_key().unwrap().to_string()
958        );
959        assert_eq!(
960            "then",
961            config
962                .global
963                .log_schema
964                .timestamp_key()
965                .unwrap()
966                .to_string()
967        );
968    }
969
970    #[test]
971    fn config_append() {
972        let mut config: ConfigBuilder = format::deserialize(
973            indoc! {r#"
974                [sources.in]
975                  type = "test_basic"
976
977                [sinks.out]
978                  type = "test_basic"
979                  inputs = ["in"]
980            "#},
981            Format::Toml,
982        )
983        .unwrap();
984
985        assert_eq!(
986            config.append(
987                format::deserialize(
988                    indoc! {r#"
989                        data_dir = "/foobar"
990
991                        [proxy]
992                          http = "http://proxy.inc:3128"
993
994                        [transforms.foo]
995                          type = "test_basic"
996                          inputs = [ "in" ]
997                          suffix = "foo"
998                          increase = 1.25
999
1000                        [[tests]]
1001                          name = "check_simple_log"
1002                          [tests.input]
1003                            insert_at = "foo"
1004                            type = "raw"
1005                            value = "2019-11-28T12:00:00+00:00 info Sorry, I'm busy this week Cecil"
1006                          [[tests.outputs]]
1007                            extract_from = "foo"
1008                            [[tests.outputs.conditions]]
1009                              type = "vrl"
1010                              source = ".message == \"Sorry, I'm busy this week Cecil\""
1011                    "#},
1012                    Format::Toml,
1013                )
1014                .unwrap()
1015            ),
1016            Ok(())
1017        );
1018
1019        assert!(config.global.proxy.http.is_some());
1020        assert!(config.global.proxy.https.is_none());
1021        assert_eq!(Some(PathBuf::from("/foobar")), config.global.data_dir);
1022        assert!(config.sources.contains_key(&ComponentKey::from("in")));
1023        assert!(config.sinks.contains_key(&ComponentKey::from("out")));
1024        assert!(config.transforms.contains_key(&ComponentKey::from("foo")));
1025        assert_eq!(config.tests.len(), 1);
1026    }
1027
1028    #[test]
1029    fn config_append_collisions() {
1030        let mut config: ConfigBuilder = format::deserialize(
1031            indoc! {r#"
1032                [sources.in]
1033                  type = "test_basic"
1034
1035                [sinks.out]
1036                  type = "test_basic"
1037                  inputs = ["in"]
1038            "#},
1039            Format::Toml,
1040        )
1041        .unwrap();
1042
1043        assert_eq!(
1044            config.append(
1045                format::deserialize(
1046                    indoc! {r#"
1047                        [sources.in]
1048                          type = "test_basic"
1049
1050                        [transforms.foo]
1051                          type = "test_basic"
1052                          inputs = [ "in" ]
1053                          suffix = "foo"
1054                          increase = 1.25
1055
1056                        [sinks.out]
1057                          type = "test_basic"
1058                          inputs = ["in"]
1059                    "#},
1060                    Format::Toml,
1061                )
1062                .unwrap()
1063            ),
1064            Err(vec![
1065                "duplicate source id found: in".into(),
1066                "duplicate sink id found: out".into(),
1067            ])
1068        );
1069    }
1070
1071    #[test]
1072    fn with_proxy() {
1073        let config: ConfigBuilder = format::deserialize(
1074            indoc! {r#"
1075                [proxy]
1076                  http = "http://server:3128"
1077                  https = "http://other:3128"
1078                  no_proxy = ["localhost", "127.0.0.1"]
1079
1080                [sources.in]
1081                  type = "nginx_metrics"
1082                  endpoints = ["http://localhost:8000/basic_status"]
1083                  proxy.http = "http://server:3128"
1084                  proxy.https = "http://other:3128"
1085                  proxy.no_proxy = ["localhost", "127.0.0.1"]
1086
1087                [sinks.out]
1088                  type = "console"
1089                  inputs = ["in"]
1090                  encoding.codec = "json"
1091            "#},
1092            Format::Toml,
1093        )
1094        .unwrap();
1095        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1096        assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1097        assert!(config.global.proxy.no_proxy.matches("localhost"));
1098        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1099        assert_eq!(source.proxy.http, Some("http://server:3128".into()));
1100        assert_eq!(source.proxy.https, Some("http://other:3128".into()));
1101        assert!(source.proxy.no_proxy.matches("localhost"));
1102    }
1103
1104    #[test]
1105    fn with_partial_global_proxy() {
1106        let config: ConfigBuilder = format::deserialize(
1107            indoc! {r#"
1108                [proxy]
1109                  http = "http://server:3128"
1110
1111                [sources.in]
1112                  type = "nginx_metrics"
1113                  endpoints = ["http://localhost:8000/basic_status"]
1114
1115                [sources.in.proxy]
1116                  http = "http://server:3129"
1117                  https = "http://other:3129"
1118                  no_proxy = ["localhost", "127.0.0.1"]
1119
1120                [sinks.out]
1121                  type = "console"
1122                  inputs = ["in"]
1123                  encoding.codec = "json"
1124            "#},
1125            Format::Toml,
1126        )
1127        .unwrap();
1128        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1129        assert_eq!(config.global.proxy.https, None);
1130        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1131        assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1132        assert_eq!(source.proxy.https, Some("http://other:3129".into()));
1133        assert!(source.proxy.no_proxy.matches("localhost"));
1134    }
1135
1136    #[test]
1137    fn with_partial_source_proxy() {
1138        let config: ConfigBuilder = format::deserialize(
1139            indoc! {r#"
1140                [proxy]
1141                  http = "http://server:3128"
1142                  https = "http://other:3128"
1143
1144                [sources.in]
1145                  type = "nginx_metrics"
1146                  endpoints = ["http://localhost:8000/basic_status"]
1147
1148                [sources.in.proxy]
1149                  http = "http://server:3129"
1150                  no_proxy = ["localhost", "127.0.0.1"]
1151
1152                [sinks.out]
1153                  type = "console"
1154                  inputs = ["in"]
1155                  encoding.codec = "json"
1156            "#},
1157            Format::Toml,
1158        )
1159        .unwrap();
1160        assert_eq!(config.global.proxy.http, Some("http://server:3128".into()));
1161        assert_eq!(config.global.proxy.https, Some("http://other:3128".into()));
1162        let source = config.sources.get(&ComponentKey::from("in")).unwrap();
1163        assert_eq!(source.proxy.http, Some("http://server:3129".into()));
1164        assert_eq!(source.proxy.https, None);
1165        assert!(source.proxy.no_proxy.matches("localhost"));
1166    }
1167}
1168
1169#[cfg(all(test, feature = "sources-file", feature = "sinks-file"))]
1170mod acknowledgements_tests {
1171    use indoc::indoc;
1172
1173    use super::*;
1174
1175    #[test]
1176    fn propagates_settings() {
1177        // The topology:
1178        // in1 => out1
1179        // in2 => out2 (acks enabled)
1180        // in3 => parse3 => out3 (acks enabled)
1181        let config: ConfigBuilder = format::deserialize(
1182            indoc! {r#"
1183                data_dir = "/tmp"
1184                [sources.in1]
1185                    type = "file"
1186                    include = ["/var/log/**/*.log"]
1187                [sources.in2]
1188                    type = "file"
1189                    include = ["/var/log/**/*.log"]
1190                [sources.in3]
1191                    type = "file"
1192                    include = ["/var/log/**/*.log"]
1193                [transforms.parse3]
1194                    type = "test_basic"
1195                    inputs = ["in3"]
1196                    increase = 0.0
1197                    suffix = ""
1198                [sinks.out1]
1199                    type = "file"
1200                    inputs = ["in1"]
1201                    encoding.codec = "text"
1202                    path = "/path/to/out1"
1203                [sinks.out2]
1204                    type = "file"
1205                    inputs = ["in2"]
1206                    encoding.codec = "text"
1207                    path = "/path/to/out2"
1208                    acknowledgements = true
1209                [sinks.out3]
1210                    type = "file"
1211                    inputs = ["parse3"]
1212                    encoding.codec = "text"
1213                    path = "/path/to/out3"
1214                    acknowledgements.enabled = true
1215            "#},
1216            Format::Toml,
1217        )
1218        .unwrap();
1219
1220        for source in config.sources.values() {
1221            assert!(
1222                !source.sink_acknowledgements,
1223                "Source `sink_acknowledgements` should be `false` before propagation"
1224            );
1225        }
1226
1227        let config = config.build().unwrap();
1228
1229        let get = |key: &str| config.sources.get(&ComponentKey::from(key)).unwrap();
1230        assert!(!get("in1").sink_acknowledgements);
1231        assert!(get("in2").sink_acknowledgements);
1232        assert!(get("in3").sink_acknowledgements);
1233    }
1234}
1235
1236#[cfg(test)]
1237mod resource_tests {
1238    use std::{
1239        collections::{HashMap, HashSet},
1240        net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
1241    };
1242
1243    use proptest::prelude::*;
1244
1245    use super::Resource;
1246
1247    fn tcp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1248        Resource::tcp(SocketAddr::new(addr.into(), port))
1249    }
1250
1251    fn udp(addr: impl Into<IpAddr>, port: u16) -> Resource {
1252        Resource::udp(SocketAddr::new(addr.into(), port))
1253    }
1254
1255    fn unspecified() -> impl Strategy<Value = IpAddr> {
1256        prop_oneof![
1257            Just(Ipv4Addr::UNSPECIFIED.into()),
1258            Just(Ipv6Addr::UNSPECIFIED.into()),
1259        ]
1260    }
1261
1262    fn specaddr() -> impl Strategy<Value = IpAddr> {
1263        any::<IpAddr>().prop_filter("Must be specific address", |addr| !addr.is_unspecified())
1264    }
1265
1266    fn specport() -> impl Strategy<Value = u16> {
1267        any::<u16>().prop_filter("Must be specific port", |&port| port > 0)
1268    }
1269
1270    fn hashmap(conflicts: Vec<(Resource, Vec<&str>)>) -> HashMap<Resource, HashSet<&str>> {
1271        conflicts
1272            .into_iter()
1273            .map(|(key, values)| (key, values.into_iter().collect()))
1274            .collect()
1275    }
1276
1277    proptest! {
1278        #[test]
1279        fn valid(addr: IpAddr, port1 in specport(), port2 in specport()) {
1280            prop_assume!(port1 != port2);
1281            let components = vec![
1282                ("sink_0", vec![tcp(addr, 0)]),
1283                ("sink_1", vec![tcp(addr, port1)]),
1284                ("sink_2", vec![tcp(addr, port2)]),
1285            ];
1286            let conflicting = Resource::conflicts(components);
1287            assert_eq!(conflicting, HashMap::new());
1288        }
1289
1290        #[test]
1291        fn conflicting_pair(addr: IpAddr, port in specport()) {
1292            let components = vec![
1293                ("sink_0", vec![tcp(addr, 0)]),
1294                ("sink_1", vec![tcp(addr, port)]),
1295                ("sink_2", vec![tcp(addr, port)]),
1296            ];
1297            let conflicting = Resource::conflicts(components);
1298            assert_eq!(
1299                conflicting,
1300                hashmap(vec![(tcp(addr, port), vec!["sink_1", "sink_2"])])
1301            );
1302        }
1303
1304        #[test]
1305        fn conflicting_multi(addr: IpAddr, port in specport()) {
1306            let components = vec![
1307                ("sink_0", vec![tcp(addr, 0)]),
1308                ("sink_1", vec![tcp(addr, port), tcp(addr, 0)]),
1309                ("sink_2", vec![tcp(addr, port)]),
1310            ];
1311            let conflicting = Resource::conflicts(components);
1312            assert_eq!(
1313                conflicting,
1314                hashmap(vec![
1315                    (tcp(addr, 0), vec!["sink_0", "sink_1"]),
1316                    (tcp(addr, port), vec!["sink_1", "sink_2"])
1317                ])
1318            );
1319        }
1320
1321        #[test]
1322        fn different_network_interface(addr1: IpAddr, addr2: IpAddr, port: u16) {
1323            prop_assume!(addr1 != addr2);
1324            let components = vec![
1325                ("sink_0", vec![tcp(addr1, port)]),
1326                ("sink_1", vec![tcp(addr2, port)]),
1327            ];
1328            let conflicting = Resource::conflicts(components);
1329            assert_eq!(conflicting, HashMap::new());
1330        }
1331
1332        #[test]
1333        fn unspecified_network_interface(addr in specaddr(), unspec in unspecified(), port: u16) {
1334            let components = vec![
1335                ("sink_0", vec![tcp(addr, port)]),
1336                ("sink_1", vec![tcp(unspec, port)]),
1337            ];
1338            let conflicting = Resource::conflicts(components);
1339            assert_eq!(conflicting, HashMap::new());
1340        }
1341
1342        #[test]
1343        fn different_protocol(addr: IpAddr) {
1344            let components = vec![
1345                ("sink_0", vec![tcp(addr, 0)]),
1346                ("sink_1", vec![udp(addr, 0)]),
1347            ];
1348            let conflicting = Resource::conflicts(components);
1349            assert_eq!(conflicting, HashMap::new());
1350        }
1351    }
1352
1353    #[test]
1354    fn different_unspecified_ip_version() {
1355        let components = vec![
1356            ("sink_0", vec![tcp(Ipv4Addr::UNSPECIFIED, 0)]),
1357            ("sink_1", vec![tcp(Ipv6Addr::UNSPECIFIED, 0)]),
1358        ];
1359        let conflicting = Resource::conflicts(components);
1360        assert_eq!(conflicting, HashMap::new());
1361    }
1362}
1363
1364#[cfg(all(test, feature = "sources-stdin", feature = "sinks-console"))]
1365mod resource_config_tests {
1366    use indoc::indoc;
1367    use vector_lib::configurable::schema::generate_root_schema;
1368
1369    use super::{Format, load_from_str};
1370
1371    #[test]
1372    fn config_conflict_detected() {
1373        assert!(
1374            load_from_str(
1375                indoc! {r#"
1376                [sources.in0]
1377                  type = "stdin"
1378
1379                [sources.in1]
1380                  type = "stdin"
1381
1382                [sinks.out]
1383                  type = "console"
1384                  inputs = ["in0","in1"]
1385                  encoding.codec = "json"
1386            "#},
1387                Format::Toml,
1388            )
1389            .is_err()
1390        );
1391    }
1392
1393    #[test]
1394    #[ignore]
1395    #[allow(clippy::print_stdout)]
1396    #[allow(clippy::print_stderr)]
1397    fn generate_component_config_schema() {
1398        use indexmap::IndexMap;
1399        use vector_lib::{config::ComponentKey, configurable::configurable_component};
1400
1401        use crate::config::{SinkOuter, SourceOuter, TransformOuter};
1402
1403        /// Top-level Vector configuration.
1404        #[configurable_component]
1405        #[derive(Clone)]
1406        struct ComponentsOnlyConfig {
1407            /// Configured sources.
1408            #[serde(default)]
1409            pub sources: IndexMap<ComponentKey, SourceOuter>,
1410
1411            /// Configured transforms.
1412            #[serde(default)]
1413            pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
1414
1415            /// Configured sinks.
1416            #[serde(default)]
1417            pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
1418        }
1419
1420        match generate_root_schema::<ComponentsOnlyConfig>() {
1421            Ok(schema) => {
1422                let json = serde_json::to_string_pretty(&schema)
1423                    .expect("rendering root schema to JSON should not fail");
1424
1425                println!("{json}");
1426            }
1427            Err(e) => eprintln!("error while generating schema: {e:?}"),
1428        }
1429    }
1430}