1use std::{convert::TryInto, fmt::Debug, sync::Arc};
2
3pub use array::{EventArray, EventContainer, LogArray, MetricArray, TraceArray, into_event_stream};
4pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
5pub use finalization::{
6 BatchNotifier, BatchStatus, BatchStatusReceiver, EventFinalizer, EventFinalizers, EventStatus,
7 Finalizable,
8};
9pub use log_event::LogEvent;
10pub use metadata::{DatadogMetricOriginMetadata, EventMetadata, Secrets, WithMetadata};
11pub use metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
12pub use r#ref::{EventMutRef, EventRef};
13use serde::{Deserialize, Serialize};
14pub use trace::TraceEvent;
15use vector_buffers::EventCount;
16use vector_common::{
17 EventDataEq, byte_size_of::ByteSizeOf, config::ComponentKey, finalization,
18 internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
19};
20pub use vrl::value::{KeyString, ObjectMap, Value};
21#[cfg(feature = "vrl")]
22pub use vrl_target::{TargetEvents, VrlTarget};
23
24use crate::config::{LogNamespace, OutputId};
25
26#[cfg(any(test, feature = "generate-fixtures"))]
27pub(crate) mod arbitrary_impl;
28pub mod array;
29pub mod discriminant;
30mod estimated_json_encoded_size_of;
31mod log_event;
32#[cfg(feature = "lua")]
33pub mod lua;
34pub mod merge_state;
35mod metadata;
36pub mod metric;
37pub mod proto;
38mod r#ref;
39mod ser;
40
41#[cfg(test)]
42mod test;
43mod trace;
44pub mod util;
45#[cfg(feature = "vrl")]
46mod vrl_target;
47
48pub const PARTIAL: &str = "_partial";
49
50#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52#[allow(clippy::large_enum_variant)]
53pub enum Event {
54 Log(LogEvent),
55 Metric(Metric),
56 Trace(TraceEvent),
57}
58
59impl ByteSizeOf for Event {
60 fn allocated_bytes(&self) -> usize {
61 match self {
62 Event::Log(log_event) => log_event.allocated_bytes(),
63 Event::Metric(metric_event) => metric_event.allocated_bytes(),
64 Event::Trace(trace_event) => trace_event.allocated_bytes(),
65 }
66 }
67}
68
69impl EstimatedJsonEncodedSizeOf for Event {
70 fn estimated_json_encoded_size_of(&self) -> JsonSize {
71 match self {
72 Event::Log(log_event) => log_event.estimated_json_encoded_size_of(),
73 Event::Metric(metric_event) => metric_event.estimated_json_encoded_size_of(),
74 Event::Trace(trace_event) => trace_event.estimated_json_encoded_size_of(),
75 }
76 }
77}
78
79impl EventCount for Event {
80 fn event_count(&self) -> usize {
81 1
82 }
83}
84
85impl Finalizable for Event {
86 fn take_finalizers(&mut self) -> EventFinalizers {
87 match self {
88 Event::Log(log_event) => log_event.take_finalizers(),
89 Event::Metric(metric) => metric.take_finalizers(),
90 Event::Trace(trace_event) => trace_event.take_finalizers(),
91 }
92 }
93}
94
95impl GetEventCountTags for Event {
96 fn get_tags(&self) -> TaggedEventsSent {
97 match self {
98 Event::Log(log) => log.get_tags(),
99 Event::Metric(metric) => metric.get_tags(),
100 Event::Trace(trace) => trace.get_tags(),
101 }
102 }
103}
104
105impl Event {
106 pub fn as_log(&self) -> &LogEvent {
112 match self {
113 Event::Log(log) => log,
114 _ => panic!("Failed type coercion, {self:?} is not a log event"),
115 }
116 }
117
118 pub fn as_mut_log(&mut self) -> &mut LogEvent {
124 match self {
125 Event::Log(log) => log,
126 _ => panic!("Failed type coercion, {self:?} is not a log event"),
127 }
128 }
129
130 pub fn into_log(self) -> LogEvent {
136 match self {
137 Event::Log(log) => log,
138 _ => panic!("Failed type coercion, {self:?} is not a log event"),
139 }
140 }
141
142 pub fn try_into_log(self) -> Option<LogEvent> {
146 match self {
147 Event::Log(log) => Some(log),
148 _ => None,
149 }
150 }
151
152 pub fn maybe_as_log(&self) -> Option<&LogEvent> {
156 match self {
157 Event::Log(log) => Some(log),
158 _ => None,
159 }
160 }
161
162 pub fn as_metric(&self) -> &Metric {
168 match self {
169 Event::Metric(metric) => metric,
170 _ => panic!("Failed type coercion, {self:?} is not a metric"),
171 }
172 }
173
174 pub fn as_mut_metric(&mut self) -> &mut Metric {
180 match self {
181 Event::Metric(metric) => metric,
182 _ => panic!("Failed type coercion, {self:?} is not a metric"),
183 }
184 }
185
186 pub fn into_metric(self) -> Metric {
192 match self {
193 Event::Metric(metric) => metric,
194 _ => panic!("Failed type coercion, {self:?} is not a metric"),
195 }
196 }
197
198 pub fn try_into_metric(self) -> Option<Metric> {
202 match self {
203 Event::Metric(metric) => Some(metric),
204 _ => None,
205 }
206 }
207
208 pub fn as_trace(&self) -> &TraceEvent {
214 match self {
215 Event::Trace(trace) => trace,
216 _ => panic!("Failed type coercion, {self:?} is not a trace event"),
217 }
218 }
219
220 pub fn as_mut_trace(&mut self) -> &mut TraceEvent {
226 match self {
227 Event::Trace(trace) => trace,
228 _ => panic!("Failed type coercion, {self:?} is not a trace event"),
229 }
230 }
231
232 pub fn into_trace(self) -> TraceEvent {
238 match self {
239 Event::Trace(trace) => trace,
240 _ => panic!("Failed type coercion, {self:?} is not a trace event"),
241 }
242 }
243
244 pub fn try_into_trace(self) -> Option<TraceEvent> {
248 match self {
249 Event::Trace(trace) => Some(trace),
250 _ => None,
251 }
252 }
253
254 pub fn metadata(&self) -> &EventMetadata {
255 match self {
256 Self::Log(log) => log.metadata(),
257 Self::Metric(metric) => metric.metadata(),
258 Self::Trace(trace) => trace.metadata(),
259 }
260 }
261
262 pub fn metadata_mut(&mut self) -> &mut EventMetadata {
263 match self {
264 Self::Log(log) => log.metadata_mut(),
265 Self::Metric(metric) => metric.metadata_mut(),
266 Self::Trace(trace) => trace.metadata_mut(),
267 }
268 }
269
270 pub fn into_metadata(self) -> EventMetadata {
272 match self {
273 Self::Log(log) => log.into_parts().1,
274 Self::Metric(metric) => metric.into_parts().2,
275 Self::Trace(trace) => trace.into_parts().1,
276 }
277 }
278
279 #[must_use]
280 pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
281 match self {
282 Self::Log(log) => log.with_batch_notifier(batch).into(),
283 Self::Metric(metric) => metric.with_batch_notifier(batch).into(),
284 Self::Trace(trace) => trace.with_batch_notifier(batch).into(),
285 }
286 }
287
288 #[must_use]
289 pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
290 match self {
291 Self::Log(log) => log.with_batch_notifier_option(batch).into(),
292 Self::Metric(metric) => metric.with_batch_notifier_option(batch).into(),
293 Self::Trace(trace) => trace.with_batch_notifier_option(batch).into(),
294 }
295 }
296
297 #[must_use]
299 pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
300 self.metadata().source_id()
301 }
302
303 pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
305 self.metadata_mut().set_source_id(source_id);
306 }
307
308 pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
310 self.metadata_mut().set_upstream_id(upstream_id);
311 }
312
313 pub fn set_source_type(&mut self, source_type: &'static str) {
315 self.metadata_mut().set_source_type(source_type);
316 }
317
318 #[must_use]
320 pub fn with_source_id(mut self, source_id: Arc<ComponentKey>) -> Self {
321 self.metadata_mut().set_source_id(source_id);
322 self
323 }
324
325 #[must_use]
327 pub fn with_source_type(mut self, source_type: &'static str) -> Self {
328 self.metadata_mut().set_source_type(source_type);
329 self
330 }
331
332 #[must_use]
334 pub fn with_upstream_id(mut self, upstream_id: Arc<OutputId>) -> Self {
335 self.metadata_mut().set_upstream_id(upstream_id);
336 self
337 }
338
339 pub fn from_json_value(
344 value: serde_json::Value,
345 log_namespace: LogNamespace,
346 ) -> crate::Result<Self> {
347 match log_namespace {
348 LogNamespace::Vector => Ok(LogEvent::from(Value::from(value)).into()),
349 LogNamespace::Legacy => match value {
350 serde_json::Value::Object(fields) => Ok(LogEvent::from(
351 fields
352 .into_iter()
353 .map(|(k, v)| (k.into(), v.into()))
354 .collect::<ObjectMap>(),
355 )
356 .into()),
357 _ => Err(crate::Error::from(
358 "Attempted to convert non-Object JSON into an Event.",
359 )),
360 },
361 }
362 }
363}
364
365impl EventDataEq for Event {
366 fn event_data_eq(&self, other: &Self) -> bool {
367 match (self, other) {
368 (Self::Log(a), Self::Log(b)) => a.event_data_eq(b),
369 (Self::Metric(a), Self::Metric(b)) => a.event_data_eq(b),
370 (Self::Trace(a), Self::Trace(b)) => a.event_data_eq(b),
371 _ => false,
372 }
373 }
374}
375
376impl finalization::AddBatchNotifier for Event {
377 fn add_batch_notifier(&mut self, batch: BatchNotifier) {
378 let finalizer = EventFinalizer::new(batch);
379 match self {
380 Self::Log(log) => log.add_finalizer(finalizer),
381 Self::Metric(metric) => metric.add_finalizer(finalizer),
382 Self::Trace(trace) => trace.add_finalizer(finalizer),
383 }
384 }
385}
386
387impl TryInto<serde_json::Value> for Event {
388 type Error = serde_json::Error;
389
390 fn try_into(self) -> Result<serde_json::Value, Self::Error> {
391 match self {
392 Event::Log(fields) => serde_json::to_value(fields),
393 Event::Metric(metric) => serde_json::to_value(metric),
394 Event::Trace(fields) => serde_json::to_value(fields),
395 }
396 }
397}
398
399impl From<proto::StatisticKind> for StatisticKind {
400 fn from(kind: proto::StatisticKind) -> Self {
401 match kind {
402 proto::StatisticKind::Histogram => StatisticKind::Histogram,
403 proto::StatisticKind::Summary => StatisticKind::Summary,
404 }
405 }
406}
407
408impl From<metric::Sample> for proto::DistributionSample {
409 fn from(sample: metric::Sample) -> Self {
410 Self {
411 value: sample.value,
412 rate: sample.rate,
413 }
414 }
415}
416
417impl From<proto::DistributionSample> for metric::Sample {
418 fn from(sample: proto::DistributionSample) -> Self {
419 Self {
420 value: sample.value,
421 rate: sample.rate,
422 }
423 }
424}
425
426impl From<proto::HistogramBucket> for metric::Bucket {
427 fn from(bucket: proto::HistogramBucket) -> Self {
428 Self {
429 upper_limit: bucket.upper_limit,
430 count: u64::from(bucket.count),
431 }
432 }
433}
434
435impl From<metric::Bucket> for proto::HistogramBucket3 {
436 fn from(bucket: metric::Bucket) -> Self {
437 Self {
438 upper_limit: bucket.upper_limit,
439 count: bucket.count,
440 }
441 }
442}
443
444impl From<proto::HistogramBucket3> for metric::Bucket {
445 fn from(bucket: proto::HistogramBucket3) -> Self {
446 Self {
447 upper_limit: bucket.upper_limit,
448 count: bucket.count,
449 }
450 }
451}
452
453impl From<metric::Quantile> for proto::SummaryQuantile {
454 fn from(quantile: metric::Quantile) -> Self {
455 Self {
456 quantile: quantile.quantile,
457 value: quantile.value,
458 }
459 }
460}
461
462impl From<proto::SummaryQuantile> for metric::Quantile {
463 fn from(quantile: proto::SummaryQuantile) -> Self {
464 Self {
465 quantile: quantile.quantile,
466 value: quantile.value,
467 }
468 }
469}
470
471impl From<LogEvent> for Event {
472 fn from(log: LogEvent) -> Self {
473 Event::Log(log)
474 }
475}
476
477impl From<Metric> for Event {
478 fn from(metric: Metric) -> Self {
479 Event::Metric(metric)
480 }
481}
482
483impl From<TraceEvent> for Event {
484 fn from(trace: TraceEvent) -> Self {
485 Event::Trace(trace)
486 }
487}
488
489pub trait MaybeAsLogMut {
490 fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent>;
491}
492
493impl MaybeAsLogMut for Event {
494 fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
495 match self {
496 Event::Log(log) => Some(log),
497 _ => None,
498 }
499 }
500}