codecs/encoding/format/
otlp.rs1use crate::encoding::ProtobufSerializer;
2use bytes::BytesMut;
3use opentelemetry_proto::proto::{
4 DESCRIPTOR_BYTES, LOGS_REQUEST_MESSAGE_TYPE, METRICS_REQUEST_MESSAGE_TYPE,
5 RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
6 TRACES_REQUEST_MESSAGE_TYPE,
7};
8use tokio_util::codec::Encoder;
9use vector_config_macros::configurable_component;
10use vector_core::{config::DataType, event::Event, schema};
11use vrl::protobuf::encode::Options;
12
13#[configurable_component]
15#[derive(Debug, Clone, Default)]
16pub struct OtlpSerializerConfig {
17 }
19
20impl OtlpSerializerConfig {
21 pub fn build(&self) -> Result<OtlpSerializer, crate::encoding::BuildError> {
23 OtlpSerializer::new()
24 }
25
26 pub fn input_type(&self) -> DataType {
28 DataType::Log | DataType::Trace
29 }
30
31 pub fn schema_requirement(&self) -> schema::Requirement {
33 schema::Requirement::empty()
34 }
35}
36
37#[derive(Debug, Clone)]
54#[allow(dead_code)] pub struct OtlpSerializer {
56 logs_descriptor: ProtobufSerializer,
57 metrics_descriptor: ProtobufSerializer,
58 traces_descriptor: ProtobufSerializer,
59 options: Options,
60}
61
62impl OtlpSerializer {
63 pub fn new() -> vector_common::Result<Self> {
65 let options = Options {
66 use_json_names: true,
67 allow_lossy_string_coercion: true,
68 };
69
70 let logs_descriptor = ProtobufSerializer::new_from_bytes(
71 DESCRIPTOR_BYTES,
72 LOGS_REQUEST_MESSAGE_TYPE,
73 &options,
74 )?;
75
76 let metrics_descriptor = ProtobufSerializer::new_from_bytes(
77 DESCRIPTOR_BYTES,
78 METRICS_REQUEST_MESSAGE_TYPE,
79 &options,
80 )?;
81
82 let traces_descriptor = ProtobufSerializer::new_from_bytes(
83 DESCRIPTOR_BYTES,
84 TRACES_REQUEST_MESSAGE_TYPE,
85 &options,
86 )?;
87
88 Ok(Self {
89 logs_descriptor,
90 metrics_descriptor,
91 traces_descriptor,
92 options,
93 })
94 }
95}
96
97impl Encoder<Event> for OtlpSerializer {
98 type Error = vector_common::Error;
99
100 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
101 match &event {
105 Event::Log(log) => {
106 if log.contains(RESOURCE_LOGS_JSON_FIELD) {
107 self.logs_descriptor.encode(event, buffer)
108 } else if log.contains(RESOURCE_METRICS_JSON_FIELD) {
109 self.metrics_descriptor.encode(event, buffer)
111 } else {
112 Err(format!(
113 "Log event does not contain OTLP top-level fields ({RESOURCE_LOGS_JSON_FIELD} or {RESOURCE_METRICS_JSON_FIELD})",
114 )
115 .into())
116 }
117 }
118 Event::Trace(trace) => {
119 if trace.contains(RESOURCE_SPANS_JSON_FIELD) {
120 self.traces_descriptor.encode(event, buffer)
121 } else {
122 Err(format!(
123 "Trace event does not contain OTLP top-level field ({RESOURCE_SPANS_JSON_FIELD})",
124 )
125 .into())
126 }
127 }
128 Event::Metric(_) => {
129 Err("OTLP serializer does not support native Vector metrics yet.".into())
130 }
131 }
132 }
133}