vector_core/event/
mod.rs

1use std::{convert::TryInto, fmt::Debug, sync::Arc};
2
3pub use array::{EventArray, EventContainer, LogArray, MetricArray, TraceArray, into_event_stream};
4pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
5pub use finalization::{
6    BatchNotifier, BatchStatus, BatchStatusReceiver, EventFinalizer, EventFinalizers, EventStatus,
7    Finalizable,
8};
9pub use log_event::LogEvent;
10pub use metadata::{DatadogMetricOriginMetadata, EventMetadata, Secrets, WithMetadata};
11pub use metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
12pub use r#ref::{EventMutRef, EventRef};
13use serde::{Deserialize, Serialize};
14pub use trace::TraceEvent;
15use vector_buffers::EventCount;
16use vector_common::{
17    EventDataEq, byte_size_of::ByteSizeOf, config::ComponentKey, finalization,
18    internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
19};
20pub use vrl::value::{KeyString, ObjectMap, Value};
21#[cfg(feature = "vrl")]
22pub use vrl_target::{TargetEvents, VrlTarget};
23
24use crate::config::{LogNamespace, OutputId};
25
26#[cfg(any(test, feature = "generate-fixtures"))]
27pub(crate) mod arbitrary_impl;
28pub mod array;
29pub mod discriminant;
30mod estimated_json_encoded_size_of;
31mod log_event;
32#[cfg(feature = "lua")]
33pub mod lua;
34pub mod merge_state;
35mod metadata;
36pub mod metric;
37pub mod proto;
38mod r#ref;
39mod ser;
40
41#[cfg(test)]
42mod test;
43mod trace;
44pub mod util;
45#[cfg(feature = "vrl")]
46mod vrl_target;
47
48pub const PARTIAL: &str = "_partial";
49
50#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52#[allow(clippy::large_enum_variant)]
53pub enum Event {
54    Log(LogEvent),
55    Metric(Metric),
56    Trace(TraceEvent),
57}
58
59impl ByteSizeOf for Event {
60    fn allocated_bytes(&self) -> usize {
61        match self {
62            Event::Log(log_event) => log_event.allocated_bytes(),
63            Event::Metric(metric_event) => metric_event.allocated_bytes(),
64            Event::Trace(trace_event) => trace_event.allocated_bytes(),
65        }
66    }
67}
68
69impl EstimatedJsonEncodedSizeOf for Event {
70    fn estimated_json_encoded_size_of(&self) -> JsonSize {
71        match self {
72            Event::Log(log_event) => log_event.estimated_json_encoded_size_of(),
73            Event::Metric(metric_event) => metric_event.estimated_json_encoded_size_of(),
74            Event::Trace(trace_event) => trace_event.estimated_json_encoded_size_of(),
75        }
76    }
77}
78
79impl EventCount for Event {
80    fn event_count(&self) -> usize {
81        1
82    }
83}
84
85impl Finalizable for Event {
86    fn take_finalizers(&mut self) -> EventFinalizers {
87        match self {
88            Event::Log(log_event) => log_event.take_finalizers(),
89            Event::Metric(metric) => metric.take_finalizers(),
90            Event::Trace(trace_event) => trace_event.take_finalizers(),
91        }
92    }
93}
94
95impl GetEventCountTags for Event {
96    fn get_tags(&self) -> TaggedEventsSent {
97        match self {
98            Event::Log(log) => log.get_tags(),
99            Event::Metric(metric) => metric.get_tags(),
100            Event::Trace(trace) => trace.get_tags(),
101        }
102    }
103}
104
105impl Event {
106    /// Return self as a `LogEvent`
107    ///
108    /// # Panics
109    ///
110    /// This function panics if self is anything other than an `Event::Log`.
111    pub fn as_log(&self) -> &LogEvent {
112        match self {
113            Event::Log(log) => log,
114            _ => panic!("Failed type coercion, {self:?} is not a log event"),
115        }
116    }
117
118    /// Return self as a mutable `LogEvent`
119    ///
120    /// # Panics
121    ///
122    /// This function panics if self is anything other than an `Event::Log`.
123    pub fn as_mut_log(&mut self) -> &mut LogEvent {
124        match self {
125            Event::Log(log) => log,
126            _ => panic!("Failed type coercion, {self:?} is not a log event"),
127        }
128    }
129
130    /// Coerces self into a `LogEvent`
131    ///
132    /// # Panics
133    ///
134    /// This function panics if self is anything other than an `Event::Log`.
135    pub fn into_log(self) -> LogEvent {
136        match self {
137            Event::Log(log) => log,
138            _ => panic!("Failed type coercion, {self:?} is not a log event"),
139        }
140    }
141
142    /// Fallibly coerces self into a `LogEvent`
143    ///
144    /// If the event is a `LogEvent`, then `Some(log_event)` is returned, otherwise `None`.
145    pub fn try_into_log(self) -> Option<LogEvent> {
146        match self {
147            Event::Log(log) => Some(log),
148            _ => None,
149        }
150    }
151
152    /// Return self as a `LogEvent` if possible
153    ///
154    /// If the event is a `LogEvent`, then `Some(&log_event)` is returned, otherwise `None`.
155    pub fn maybe_as_log(&self) -> Option<&LogEvent> {
156        match self {
157            Event::Log(log) => Some(log),
158            _ => None,
159        }
160    }
161
162    /// Return self as a `Metric`
163    ///
164    /// # Panics
165    ///
166    /// This function panics if self is anything other than an `Event::Metric`.
167    pub fn as_metric(&self) -> &Metric {
168        match self {
169            Event::Metric(metric) => metric,
170            _ => panic!("Failed type coercion, {self:?} is not a metric"),
171        }
172    }
173
174    /// Return self as a mutable `Metric`
175    ///
176    /// # Panics
177    ///
178    /// This function panics if self is anything other than an `Event::Metric`.
179    pub fn as_mut_metric(&mut self) -> &mut Metric {
180        match self {
181            Event::Metric(metric) => metric,
182            _ => panic!("Failed type coercion, {self:?} is not a metric"),
183        }
184    }
185
186    /// Coerces self into `Metric`
187    ///
188    /// # Panics
189    ///
190    /// This function panics if self is anything other than an `Event::Metric`.
191    pub fn into_metric(self) -> Metric {
192        match self {
193            Event::Metric(metric) => metric,
194            _ => panic!("Failed type coercion, {self:?} is not a metric"),
195        }
196    }
197
198    /// Fallibly coerces self into a `Metric`
199    ///
200    /// If the event is a `Metric`, then `Some(metric)` is returned, otherwise `None`.
201    pub fn try_into_metric(self) -> Option<Metric> {
202        match self {
203            Event::Metric(metric) => Some(metric),
204            _ => None,
205        }
206    }
207
208    /// Return self as a `TraceEvent`
209    ///
210    /// # Panics
211    ///
212    /// This function panics if self is anything other than an `Event::Trace`.
213    pub fn as_trace(&self) -> &TraceEvent {
214        match self {
215            Event::Trace(trace) => trace,
216            _ => panic!("Failed type coercion, {self:?} is not a trace event"),
217        }
218    }
219
220    /// Return self as a mutable `TraceEvent`
221    ///
222    /// # Panics
223    ///
224    /// This function panics if self is anything other than an `Event::Trace`.
225    pub fn as_mut_trace(&mut self) -> &mut TraceEvent {
226        match self {
227            Event::Trace(trace) => trace,
228            _ => panic!("Failed type coercion, {self:?} is not a trace event"),
229        }
230    }
231
232    /// Coerces self into a `TraceEvent`
233    ///
234    /// # Panics
235    ///
236    /// This function panics if self is anything other than an `Event::Trace`.
237    pub fn into_trace(self) -> TraceEvent {
238        match self {
239            Event::Trace(trace) => trace,
240            _ => panic!("Failed type coercion, {self:?} is not a trace event"),
241        }
242    }
243
244    /// Fallibly coerces self into a `TraceEvent`
245    ///
246    /// If the event is a `TraceEvent`, then `Some(trace)` is returned, otherwise `None`.
247    pub fn try_into_trace(self) -> Option<TraceEvent> {
248        match self {
249            Event::Trace(trace) => Some(trace),
250            _ => None,
251        }
252    }
253
254    pub fn metadata(&self) -> &EventMetadata {
255        match self {
256            Self::Log(log) => log.metadata(),
257            Self::Metric(metric) => metric.metadata(),
258            Self::Trace(trace) => trace.metadata(),
259        }
260    }
261
262    pub fn metadata_mut(&mut self) -> &mut EventMetadata {
263        match self {
264            Self::Log(log) => log.metadata_mut(),
265            Self::Metric(metric) => metric.metadata_mut(),
266            Self::Trace(trace) => trace.metadata_mut(),
267        }
268    }
269
270    /// Destroy the event and return the metadata.
271    pub fn into_metadata(self) -> EventMetadata {
272        match self {
273            Self::Log(log) => log.into_parts().1,
274            Self::Metric(metric) => metric.into_parts().2,
275            Self::Trace(trace) => trace.into_parts().1,
276        }
277    }
278
279    #[must_use]
280    pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
281        match self {
282            Self::Log(log) => log.with_batch_notifier(batch).into(),
283            Self::Metric(metric) => metric.with_batch_notifier(batch).into(),
284            Self::Trace(trace) => trace.with_batch_notifier(batch).into(),
285        }
286    }
287
288    #[must_use]
289    pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
290        match self {
291            Self::Log(log) => log.with_batch_notifier_option(batch).into(),
292            Self::Metric(metric) => metric.with_batch_notifier_option(batch).into(),
293            Self::Trace(trace) => trace.with_batch_notifier_option(batch).into(),
294        }
295    }
296
297    /// Returns a reference to the event metadata source.
298    #[must_use]
299    pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
300        self.metadata().source_id()
301    }
302
303    /// Sets the `source_id` in the event metadata to the provided value.
304    pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
305        self.metadata_mut().set_source_id(source_id);
306    }
307
308    /// Sets the `upstream_id` in the event metadata to the provided value.
309    pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
310        self.metadata_mut().set_upstream_id(upstream_id);
311    }
312
313    /// Sets the `source_type` in the event metadata to the provided value.
314    pub fn set_source_type(&mut self, source_type: &'static str) {
315        self.metadata_mut().set_source_type(source_type);
316    }
317
318    /// Sets the `source_id` in the event metadata to the provided value.
319    #[must_use]
320    pub fn with_source_id(mut self, source_id: Arc<ComponentKey>) -> Self {
321        self.metadata_mut().set_source_id(source_id);
322        self
323    }
324
325    /// Sets the `source_type` in the event metadata to the provided value.
326    #[must_use]
327    pub fn with_source_type(mut self, source_type: &'static str) -> Self {
328        self.metadata_mut().set_source_type(source_type);
329        self
330    }
331
332    /// Sets the `upstream_id` in the event metadata to the provided value.
333    #[must_use]
334    pub fn with_upstream_id(mut self, upstream_id: Arc<OutputId>) -> Self {
335        self.metadata_mut().set_upstream_id(upstream_id);
336        self
337    }
338
339    /// Creates an Event from a JSON value.
340    ///
341    /// # Errors
342    /// If a non-object JSON value is passed in with the `Legacy` namespace, this will return an error.
343    pub fn from_json_value(
344        value: serde_json::Value,
345        log_namespace: LogNamespace,
346    ) -> crate::Result<Self> {
347        match log_namespace {
348            LogNamespace::Vector => Ok(LogEvent::from(Value::from(value)).into()),
349            LogNamespace::Legacy => match value {
350                serde_json::Value::Object(fields) => Ok(LogEvent::from(
351                    fields
352                        .into_iter()
353                        .map(|(k, v)| (k.into(), v.into()))
354                        .collect::<ObjectMap>(),
355                )
356                .into()),
357                _ => Err(crate::Error::from(
358                    "Attempted to convert non-Object JSON into an Event.",
359                )),
360            },
361        }
362    }
363}
364
365impl EventDataEq for Event {
366    fn event_data_eq(&self, other: &Self) -> bool {
367        match (self, other) {
368            (Self::Log(a), Self::Log(b)) => a.event_data_eq(b),
369            (Self::Metric(a), Self::Metric(b)) => a.event_data_eq(b),
370            (Self::Trace(a), Self::Trace(b)) => a.event_data_eq(b),
371            _ => false,
372        }
373    }
374}
375
376impl finalization::AddBatchNotifier for Event {
377    fn add_batch_notifier(&mut self, batch: BatchNotifier) {
378        let finalizer = EventFinalizer::new(batch);
379        match self {
380            Self::Log(log) => log.add_finalizer(finalizer),
381            Self::Metric(metric) => metric.add_finalizer(finalizer),
382            Self::Trace(trace) => trace.add_finalizer(finalizer),
383        }
384    }
385}
386
387impl TryInto<serde_json::Value> for Event {
388    type Error = serde_json::Error;
389
390    fn try_into(self) -> Result<serde_json::Value, Self::Error> {
391        match self {
392            Event::Log(fields) => serde_json::to_value(fields),
393            Event::Metric(metric) => serde_json::to_value(metric),
394            Event::Trace(fields) => serde_json::to_value(fields),
395        }
396    }
397}
398
399impl From<proto::StatisticKind> for StatisticKind {
400    fn from(kind: proto::StatisticKind) -> Self {
401        match kind {
402            proto::StatisticKind::Histogram => StatisticKind::Histogram,
403            proto::StatisticKind::Summary => StatisticKind::Summary,
404        }
405    }
406}
407
408impl From<metric::Sample> for proto::DistributionSample {
409    fn from(sample: metric::Sample) -> Self {
410        Self {
411            value: sample.value,
412            rate: sample.rate,
413        }
414    }
415}
416
417impl From<proto::DistributionSample> for metric::Sample {
418    fn from(sample: proto::DistributionSample) -> Self {
419        Self {
420            value: sample.value,
421            rate: sample.rate,
422        }
423    }
424}
425
426impl From<proto::HistogramBucket> for metric::Bucket {
427    fn from(bucket: proto::HistogramBucket) -> Self {
428        Self {
429            upper_limit: bucket.upper_limit,
430            count: u64::from(bucket.count),
431        }
432    }
433}
434
435impl From<metric::Bucket> for proto::HistogramBucket3 {
436    fn from(bucket: metric::Bucket) -> Self {
437        Self {
438            upper_limit: bucket.upper_limit,
439            count: bucket.count,
440        }
441    }
442}
443
444impl From<proto::HistogramBucket3> for metric::Bucket {
445    fn from(bucket: proto::HistogramBucket3) -> Self {
446        Self {
447            upper_limit: bucket.upper_limit,
448            count: bucket.count,
449        }
450    }
451}
452
453impl From<metric::Quantile> for proto::SummaryQuantile {
454    fn from(quantile: metric::Quantile) -> Self {
455        Self {
456            quantile: quantile.quantile,
457            value: quantile.value,
458        }
459    }
460}
461
462impl From<proto::SummaryQuantile> for metric::Quantile {
463    fn from(quantile: proto::SummaryQuantile) -> Self {
464        Self {
465            quantile: quantile.quantile,
466            value: quantile.value,
467        }
468    }
469}
470
471impl From<LogEvent> for Event {
472    fn from(log: LogEvent) -> Self {
473        Event::Log(log)
474    }
475}
476
477impl From<Metric> for Event {
478    fn from(metric: Metric) -> Self {
479        Event::Metric(metric)
480    }
481}
482
483impl From<TraceEvent> for Event {
484    fn from(trace: TraceEvent) -> Self {
485        Event::Trace(trace)
486    }
487}
488
489pub trait MaybeAsLogMut {
490    fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent>;
491}
492
493impl MaybeAsLogMut for Event {
494    fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
495        match self {
496            Event::Log(log) => Some(log),
497            _ => None,
498        }
499    }
500}