vector/config/unit_test/
unit_test_components.rs

1use std::sync::Arc;
2
3use futures::{Sink, Stream, stream};
4use futures_util::{FutureExt, StreamExt, future, stream::BoxStream};
5use tokio::sync::{Mutex, oneshot};
6use vector_lib::{
7    config::{DataType, Input, LogNamespace},
8    configurable::configurable_component,
9    event::Event,
10    schema,
11    sink::{StreamSink, VectorSink},
12};
13
14use crate::{
15    conditions::Condition,
16    config::{
17        AcknowledgementsConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
18    },
19    sinks::Healthcheck,
20    sources,
21};
22
23/// Configuration for the `unit_test` source.
24#[configurable_component(source("unit_test", "Unit test."))]
25#[derive(Clone, Debug, Default)]
26pub struct UnitTestSourceConfig {
27    /// List of events sent from this source as part of the test.
28    #[serde(skip)]
29    pub events: Vec<Event>,
30}
31
32impl_generate_config_from_default!(UnitTestSourceConfig);
33
34#[async_trait::async_trait]
35#[typetag::serde(name = "unit_test")]
36impl SourceConfig for UnitTestSourceConfig {
37    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
38        let events = self.events.clone().into_iter();
39
40        Ok(Box::pin(async move {
41            let mut out = cx.out;
42            let _shutdown = cx.shutdown;
43            out.send_batch(events).await.map_err(|_| ())?;
44            Ok(())
45        }))
46    }
47
48    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
49        vec![SourceOutput::new_maybe_logs(
50            DataType::all_bits(),
51            schema::Definition::default_legacy_namespace(),
52        )]
53    }
54
55    fn can_acknowledge(&self) -> bool {
56        false
57    }
58}
59
60/// Configuration for the `unit_test_stream` source.
61#[configurable_component(source("unit_test_stream", "Unit test stream."))]
62#[derive(Clone)]
63pub struct UnitTestStreamSourceConfig {
64    #[serde(skip)]
65    stream: Arc<Mutex<Option<stream::BoxStream<'static, Event>>>>,
66}
67
68impl_generate_config_from_default!(UnitTestStreamSourceConfig);
69
70impl UnitTestStreamSourceConfig {
71    pub fn new(stream: impl Stream<Item = Event> + Send + 'static) -> Self {
72        Self {
73            stream: Arc::new(Mutex::new(Some(stream.boxed()))),
74        }
75    }
76}
77
78impl Default for UnitTestStreamSourceConfig {
79    fn default() -> Self {
80        Self::new(stream::empty().boxed())
81    }
82}
83
84impl std::fmt::Debug for UnitTestStreamSourceConfig {
85    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        formatter
87            .debug_struct("UnitTestStreamSourceConfig")
88            .finish()
89    }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "unit_test_stream")]
94impl SourceConfig for UnitTestStreamSourceConfig {
95    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
96        let stream = self.stream.lock().await.take().unwrap();
97        Ok(Box::pin(async move {
98            let mut out = cx.out;
99            let _shutdown = cx.shutdown;
100            out.send_event_stream(stream).await.map_err(|_| ())?;
101            Ok(())
102        }))
103    }
104
105    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
106        vec![SourceOutput::new_maybe_logs(
107            DataType::all_bits(),
108            schema::Definition::default_legacy_namespace(),
109        )]
110    }
111
112    fn can_acknowledge(&self) -> bool {
113        false
114    }
115}
116
117#[derive(Clone, Default)]
118pub enum UnitTestSinkCheck {
119    /// Check all events that are received against the list of conditions.
120    Checks {
121        conditions: Vec<Vec<Condition>>,
122        expected_event_count: Option<usize>,
123    },
124
125    /// Check that no events were received.
126    NoOutputs,
127
128    /// Do nothing.
129    #[default]
130    NoOp,
131}
132
133#[derive(Debug)]
134pub struct UnitTestSinkResult {
135    pub test_name: String,
136    pub test_errors: Vec<String>,
137}
138
139/// Configuration for the `unit_test` sink.
140#[configurable_component(sink("unit_test", "Unit test."))]
141#[derive(Clone, Default, Derivative)]
142#[derivative(Debug)]
143pub struct UnitTestSinkConfig {
144    /// Name of the test that this sink is being used for.
145    pub test_name: String,
146
147    /// List of names of the transform/branch associated with this sink.
148    pub transform_ids: Vec<String>,
149
150    /// Sender side of the test result channel.
151    #[serde(skip)]
152    pub result_tx: Arc<Mutex<Option<oneshot::Sender<UnitTestSinkResult>>>>,
153
154    /// Predicate applied to each event that reaches the sink.
155    #[serde(skip)]
156    #[derivative(Debug = "ignore")]
157    pub check: UnitTestSinkCheck,
158}
159
160impl_generate_config_from_default!(UnitTestSinkConfig);
161
162#[async_trait::async_trait]
163#[typetag::serde(name = "unit_test")]
164impl SinkConfig for UnitTestSinkConfig {
165    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
166        let tx = self.result_tx.lock().await.take();
167        let sink = UnitTestSink {
168            test_name: self.test_name.clone(),
169            transform_ids: self.transform_ids.clone(),
170            result_tx: tx,
171            check: self.check.clone(),
172        };
173        let healthcheck = future::ok(()).boxed();
174
175        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
176    }
177
178    fn input(&self) -> Input {
179        Input::all()
180    }
181
182    fn acknowledgements(&self) -> &AcknowledgementsConfig {
183        &AcknowledgementsConfig::DEFAULT
184    }
185}
186
187pub struct UnitTestSink {
188    pub test_name: String,
189    pub transform_ids: Vec<String>,
190    // None for NoOp test sinks
191    pub result_tx: Option<oneshot::Sender<UnitTestSinkResult>>,
192    pub check: UnitTestSinkCheck,
193}
194
195#[async_trait::async_trait]
196impl StreamSink<Event> for UnitTestSink {
197    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
198        let mut output_events = Vec::new();
199        let mut result = UnitTestSinkResult {
200            test_name: self.test_name,
201            test_errors: Vec::new(),
202        };
203
204        while let Some(event) = input.next().await {
205            output_events.push(event);
206        }
207
208        match self.check {
209            UnitTestSinkCheck::Checks {
210                conditions: checks,
211                expected_event_count,
212            } => {
213                if let Some(expected) = expected_event_count {
214                    let actual = output_events.len();
215                    if actual != expected {
216                        result.test_errors.push(format!(
217                            "expected {} events from transforms {:?}, but received {}",
218                            expected, self.transform_ids, actual
219                        ));
220                    }
221                }
222
223                if output_events.is_empty() && expected_event_count != Some(0) {
224                    result
225                        .test_errors
226                        .push(format!("checks for transforms {:?} failed: no events received. Topology may be disconnected or transform is missing inputs.", self.transform_ids));
227                } else {
228                    for (i, check) in checks.iter().enumerate() {
229                        let mut check_errors = Vec::new();
230                        for (j, condition) in check.iter().enumerate() {
231                            let mut condition_errors = Vec::new();
232                            for event in output_events.iter() {
233                                match condition.check_with_context(event.clone()).0 {
234                                    Ok(_) => {
235                                        condition_errors.clear();
236                                        break;
237                                    }
238                                    Err(error) => {
239                                        condition_errors.push(format!("  condition[{j}]: {error}"));
240                                    }
241                                }
242                            }
243                            check_errors.extend(condition_errors);
244                        }
245                        // If there are errors, add a preamble to the output
246                        if !check_errors.is_empty() {
247                            check_errors.insert(
248                                0,
249                                format!(
250                                    "check[{}] for transforms {:?} failed conditions:",
251                                    i, self.transform_ids
252                                ),
253                            );
254                        }
255
256                        result.test_errors.extend(check_errors);
257                    }
258
259                    // If there are errors, add a summary of events received
260                    if !result.test_errors.is_empty() {
261                        result.test_errors.push(format!(
262                            "output payloads from {:?} (events encoded as JSON):\n  {}",
263                            self.transform_ids,
264                            events_to_string(&output_events)
265                        ));
266                    }
267                }
268            }
269            UnitTestSinkCheck::NoOutputs => {
270                if !output_events.is_empty() {
271                    result.test_errors.push(format!(
272                        "check for transforms {:?} failed: expected no outputs",
273                        self.transform_ids
274                    ));
275                }
276            }
277            UnitTestSinkCheck::NoOp => {}
278        }
279
280        if let Some(tx) = self.result_tx
281            && tx.send(result).is_err()
282        {
283            error!(message = "Sending unit test results failed in unit test sink.");
284        }
285        Ok(())
286    }
287}
288
289/// Configuration for the `unit_test_stream` sink.
290#[configurable_component(sink("unit_test_stream", "Unit test stream."))]
291#[derive(Clone, Default)]
292pub struct UnitTestStreamSinkConfig {
293    /// Sink that receives the processed events.
294    #[serde(skip)]
295    sink: Arc<Mutex<Option<Box<dyn Sink<Event, Error = ()> + Send + Unpin>>>>,
296}
297
298impl_generate_config_from_default!(UnitTestStreamSinkConfig);
299
300impl UnitTestStreamSinkConfig {
301    pub fn new(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
302        Self {
303            sink: Arc::new(Mutex::new(Some(Box::new(sink)))),
304        }
305    }
306}
307
308impl std::fmt::Debug for UnitTestStreamSinkConfig {
309    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310        formatter.debug_struct("UnitTestStreamSinkConfig").finish()
311    }
312}
313
314#[async_trait::async_trait]
315#[typetag::serde(name = "unit_test_stream")]
316impl SinkConfig for UnitTestStreamSinkConfig {
317    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
318        let sink = self.sink.lock().await.take().unwrap();
319        let healthcheck = future::ok(()).boxed();
320
321        #[allow(deprecated)]
322        Ok((VectorSink::from_event_sink(sink), healthcheck))
323    }
324
325    fn input(&self) -> Input {
326        Input::all()
327    }
328
329    fn acknowledgements(&self) -> &AcknowledgementsConfig {
330        &AcknowledgementsConfig::DEFAULT
331    }
332}
333
334fn events_to_string(events: &[Event]) -> String {
335    events
336        .iter()
337        .map(|event| match event {
338            Event::Log(log) => serde_json::to_string(log).unwrap_or_else(|_| "{}".to_string()),
339            Event::Metric(metric) => {
340                serde_json::to_string(metric).unwrap_or_else(|_| "{}".to_string())
341            }
342            Event::Trace(trace) => {
343                serde_json::to_string(trace).unwrap_or_else(|_| "{}".to_string())
344            }
345        })
346        .collect::<Vec<_>>()
347        .join("\n  ")
348}