codecs/decoding/format/
otlp.rs

1use bytes::Bytes;
2use opentelemetry_proto::proto::{
3    DESCRIPTOR_BYTES, LOGS_REQUEST_MESSAGE_TYPE, METRICS_REQUEST_MESSAGE_TYPE,
4    RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
5    TRACES_REQUEST_MESSAGE_TYPE,
6};
7use smallvec::{SmallVec, smallvec};
8use vector_config::{configurable_component, indexmap::IndexSet};
9use vector_core::{
10    config::{DataType, LogNamespace},
11    event::Event,
12    schema,
13};
14use vrl::{protobuf::parse::Options, value::Kind};
15
16use super::{Deserializer, ProtobufDeserializer};
17
18/// OTLP signal type for prioritized parsing.
19#[configurable_component]
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21#[serde(rename_all = "snake_case")]
22pub enum OtlpSignalType {
23    /// OTLP logs signal (ExportLogsServiceRequest)
24    Logs,
25    /// OTLP metrics signal (ExportMetricsServiceRequest)
26    Metrics,
27    /// OTLP traces signal (ExportTraceServiceRequest)
28    Traces,
29}
30
31/// Config used to build an `OtlpDeserializer`.
32#[configurable_component]
33#[derive(Debug, Clone)]
34pub struct OtlpDeserializerConfig {
35    /// Signal types to attempt parsing, in priority order.
36    ///
37    /// The deserializer tries to parse signals in the specified order. This allows you to optimize
38    /// performance when you know the expected signal types. For example, if you only receive
39    /// traces, set this to `["traces"]` to avoid attempting to parse as logs or metrics first.
40    ///
41    /// If not specified, defaults to trying all types in this order: logs, metrics, traces.
42    /// Duplicate signal types are automatically removed while preserving order.
43    #[serde(default = "default_signal_types")]
44    pub signal_types: IndexSet<OtlpSignalType>,
45}
46
47fn default_signal_types() -> IndexSet<OtlpSignalType> {
48    IndexSet::from([
49        OtlpSignalType::Logs,
50        OtlpSignalType::Metrics,
51        OtlpSignalType::Traces,
52    ])
53}
54
55impl Default for OtlpDeserializerConfig {
56    fn default() -> Self {
57        Self {
58            signal_types: default_signal_types(),
59        }
60    }
61}
62
63impl OtlpDeserializerConfig {
64    /// Build the `OtlpDeserializer` from this configuration.
65    pub fn build(&self) -> OtlpDeserializer {
66        OtlpDeserializer::new_with_signals(self.signal_types.clone())
67    }
68
69    /// Return the type of event build by this deserializer.
70    pub fn output_type(&self) -> DataType {
71        DataType::Log | DataType::Trace
72    }
73
74    /// The schema produced by the deserializer.
75    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
76        match log_namespace {
77            LogNamespace::Legacy => {
78                schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
79            }
80            LogNamespace::Vector => {
81                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
82            }
83        }
84    }
85}
86
87/// Deserializer that builds `Event`s from a byte frame containing [OTLP](https://opentelemetry.io/docs/specs/otlp/) protobuf data.
88///
89/// This deserializer decodes events using the OTLP protobuf specification. It handles the three
90/// OTLP signal types: logs, metrics, and traces.
91///
92/// The implementation supports three OTLP message types:
93/// - `ExportLogsServiceRequest` → Log events with `resourceLogs` field
94/// - `ExportMetricsServiceRequest` → Log events with `resourceMetrics` field
95/// - `ExportTraceServiceRequest` → Trace events with `resourceSpans` field
96///
97/// One major caveat here is that the incoming metrics will be parsed as logs but they will preserve the OTLP format.
98/// This means that components that work on metrics, will not be compatible with this output.
99/// However, these events can be forwarded directly to a downstream OTEL collector.
100///
101/// This is the inverse of what the OTLP encoder does, ensuring round-trip compatibility
102/// with the `opentelemetry` source when `use_otlp_decoding` is enabled.
103#[derive(Debug, Clone)]
104pub struct OtlpDeserializer {
105    logs_deserializer: ProtobufDeserializer,
106    metrics_deserializer: ProtobufDeserializer,
107    traces_deserializer: ProtobufDeserializer,
108    /// Signal types to parse, in priority order
109    signals: IndexSet<OtlpSignalType>,
110}
111
112impl Default for OtlpDeserializer {
113    fn default() -> Self {
114        Self::new_with_signals(default_signal_types())
115    }
116}
117
118impl OtlpDeserializer {
119    /// Creates a new OTLP deserializer with custom signal support.
120    /// During parsing, each signal type is tried in order until one succeeds.
121    pub fn new_with_signals(signals: IndexSet<OtlpSignalType>) -> Self {
122        let options = Options {
123            use_json_names: true,
124        };
125
126        let logs_deserializer = ProtobufDeserializer::new_from_bytes(
127            DESCRIPTOR_BYTES,
128            LOGS_REQUEST_MESSAGE_TYPE,
129            options.clone(),
130        )
131        .expect("Failed to create logs deserializer");
132
133        let metrics_deserializer = ProtobufDeserializer::new_from_bytes(
134            DESCRIPTOR_BYTES,
135            METRICS_REQUEST_MESSAGE_TYPE,
136            options.clone(),
137        )
138        .expect("Failed to create metrics deserializer");
139
140        let traces_deserializer = ProtobufDeserializer::new_from_bytes(
141            DESCRIPTOR_BYTES,
142            TRACES_REQUEST_MESSAGE_TYPE,
143            options,
144        )
145        .expect("Failed to create traces deserializer");
146
147        Self {
148            logs_deserializer,
149            metrics_deserializer,
150            traces_deserializer,
151            signals,
152        }
153    }
154}
155
156impl Deserializer for OtlpDeserializer {
157    fn parse(
158        &self,
159        bytes: Bytes,
160        log_namespace: LogNamespace,
161    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
162        // Try parsing in the priority order specified
163        for signal_type in &self.signals {
164            match signal_type {
165                OtlpSignalType::Logs => {
166                    if let Ok(events) = self.logs_deserializer.parse(bytes.clone(), log_namespace)
167                        && let Some(Event::Log(log)) = events.first()
168                        && log.get(RESOURCE_LOGS_JSON_FIELD).is_some()
169                    {
170                        return Ok(events);
171                    }
172                }
173                OtlpSignalType::Metrics => {
174                    if let Ok(events) = self
175                        .metrics_deserializer
176                        .parse(bytes.clone(), log_namespace)
177                        && let Some(Event::Log(log)) = events.first()
178                        && log.get(RESOURCE_METRICS_JSON_FIELD).is_some()
179                    {
180                        return Ok(events);
181                    }
182                }
183                OtlpSignalType::Traces => {
184                    // TODO: <https://github.com/vectordotdev/vector/issues/25045>
185                    if let Ok(mut events) =
186                        self.traces_deserializer.parse(bytes.clone(), log_namespace)
187                        && let Some(Event::Log(log)) = events.first()
188                        && log.get(RESOURCE_SPANS_JSON_FIELD).is_some()
189                    {
190                        // Convert the log event to a trace event by taking ownership
191                        if let Some(Event::Log(log)) = events.pop() {
192                            let trace_event = Event::Trace(log.into());
193                            return Ok(smallvec![trace_event]);
194                        }
195                    }
196                }
197            }
198        }
199
200        Err(format!("Invalid OTLP data: expected one of {:?}", self.signals).into())
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use opentelemetry_proto::proto::{
207        collector::{
208            logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest,
209            trace::v1::ExportTraceServiceRequest,
210        },
211        logs::v1::{LogRecord, ResourceLogs, ScopeLogs},
212        metrics::v1::{Metric, ResourceMetrics, ScopeMetrics},
213        resource::v1::Resource,
214        trace::v1::{ResourceSpans, ScopeSpans, Span},
215    };
216    use prost::Message;
217
218    use super::*;
219
220    // trace_id: 0102030405060708090a0b0c0d0e0f10 (16 bytes)
221    const TEST_TRACE_ID: [u8; 16] = [
222        0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f,
223        0x10,
224    ];
225    // span_id: 0102030405060708 (8 bytes)
226    const TEST_SPAN_ID: [u8; 8] = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
227
228    fn create_logs_request_bytes() -> Bytes {
229        let request = ExportLogsServiceRequest {
230            resource_logs: vec![ResourceLogs {
231                resource: Some(Resource {
232                    attributes: vec![],
233                    dropped_attributes_count: 0,
234                }),
235                scope_logs: vec![ScopeLogs {
236                    scope: None,
237                    log_records: vec![LogRecord {
238                        time_unix_nano: 1234567890,
239                        severity_number: 9,
240                        severity_text: "INFO".to_string(),
241                        body: None,
242                        attributes: vec![],
243                        dropped_attributes_count: 0,
244                        flags: 0,
245                        trace_id: vec![],
246                        span_id: vec![],
247                        observed_time_unix_nano: 0,
248                    }],
249                    schema_url: String::new(),
250                }],
251                schema_url: String::new(),
252            }],
253        };
254
255        Bytes::from(request.encode_to_vec())
256    }
257
258    fn create_metrics_request_bytes() -> Bytes {
259        let request = ExportMetricsServiceRequest {
260            resource_metrics: vec![ResourceMetrics {
261                resource: Some(Resource {
262                    attributes: vec![],
263                    dropped_attributes_count: 0,
264                }),
265                scope_metrics: vec![ScopeMetrics {
266                    scope: None,
267                    metrics: vec![Metric {
268                        name: "test_metric".to_string(),
269                        description: String::new(),
270                        unit: String::new(),
271                        data: None,
272                    }],
273                    schema_url: String::new(),
274                }],
275                schema_url: String::new(),
276            }],
277        };
278
279        Bytes::from(request.encode_to_vec())
280    }
281
282    fn create_traces_request_bytes() -> Bytes {
283        let request = ExportTraceServiceRequest {
284            resource_spans: vec![ResourceSpans {
285                resource: Some(Resource {
286                    attributes: vec![],
287                    dropped_attributes_count: 0,
288                }),
289                scope_spans: vec![ScopeSpans {
290                    scope: None,
291                    spans: vec![Span {
292                        trace_id: TEST_TRACE_ID.to_vec(),
293                        span_id: TEST_SPAN_ID.to_vec(),
294                        trace_state: String::new(),
295                        parent_span_id: vec![],
296                        name: "test_span".to_string(),
297                        kind: 0,
298                        start_time_unix_nano: 1234567890,
299                        end_time_unix_nano: 1234567900,
300                        attributes: vec![],
301                        dropped_attributes_count: 0,
302                        events: vec![],
303                        dropped_events_count: 0,
304                        links: vec![],
305                        dropped_links_count: 0,
306                        status: None,
307                    }],
308                    schema_url: String::new(),
309                }],
310                schema_url: String::new(),
311            }],
312        };
313
314        Bytes::from(request.encode_to_vec())
315    }
316
317    fn validate_trace_ids(trace: &vrl::value::Value) {
318        // Navigate to the span and check traceId and spanId
319        let resource_spans = trace
320            .get("resourceSpans")
321            .and_then(|v| v.as_array())
322            .expect("resourceSpans should be an array");
323
324        let first_rs = resource_spans
325            .first()
326            .expect("should have at least one resource span");
327
328        let scope_spans = first_rs
329            .get("scopeSpans")
330            .and_then(|v| v.as_array())
331            .expect("scopeSpans should be an array");
332
333        let first_ss = scope_spans
334            .first()
335            .expect("should have at least one scope span");
336
337        let spans = first_ss
338            .get("spans")
339            .and_then(|v| v.as_array())
340            .expect("spans should be an array");
341
342        let span = spans.first().expect("should have at least one span");
343
344        // Verify traceId - should be raw bytes (16 bytes for trace_id)
345        let trace_id = span
346            .get("traceId")
347            .and_then(|v| v.as_bytes())
348            .expect("traceId should exist and be bytes");
349
350        assert_eq!(
351            trace_id.as_ref(),
352            &TEST_TRACE_ID,
353            "traceId should match the expected 16 bytes (0102030405060708090a0b0c0d0e0f10)"
354        );
355
356        // Verify spanId - should be raw bytes (8 bytes for span_id)
357        let span_id = span
358            .get("spanId")
359            .and_then(|v| v.as_bytes())
360            .expect("spanId should exist and be bytes");
361
362        assert_eq!(
363            span_id.as_ref(),
364            &TEST_SPAN_ID,
365            "spanId should match the expected 8 bytes (0102030405060708)"
366        );
367    }
368
369    fn assert_otlp_event(bytes: Bytes, field: &str, is_trace: bool) {
370        let deserializer = OtlpDeserializer::default();
371        let events = deserializer.parse(bytes, LogNamespace::Legacy).unwrap();
372
373        assert_eq!(events.len(), 1);
374        if is_trace {
375            assert!(matches!(events[0], Event::Trace(_)));
376            let trace = events[0].as_trace();
377            assert!(trace.get(field).is_some());
378            validate_trace_ids(trace.value());
379        } else {
380            assert!(events[0].as_log().get(field).is_some());
381        }
382    }
383
384    #[test]
385    fn deserialize_otlp_logs() {
386        assert_otlp_event(create_logs_request_bytes(), RESOURCE_LOGS_JSON_FIELD, false);
387    }
388
389    #[test]
390    fn deserialize_otlp_metrics() {
391        assert_otlp_event(
392            create_metrics_request_bytes(),
393            RESOURCE_METRICS_JSON_FIELD,
394            false,
395        );
396    }
397
398    #[test]
399    fn deserialize_otlp_traces() {
400        assert_otlp_event(
401            create_traces_request_bytes(),
402            RESOURCE_SPANS_JSON_FIELD,
403            true,
404        );
405    }
406
407    #[test]
408    fn deserialize_invalid_otlp() {
409        let deserializer = OtlpDeserializer::default();
410        let bytes = Bytes::from("invalid protobuf data");
411        let result = deserializer.parse(bytes, LogNamespace::Legacy);
412
413        assert!(result.is_err());
414        assert!(
415            result
416                .unwrap_err()
417                .to_string()
418                .contains("Invalid OTLP data")
419        );
420    }
421
422    #[test]
423    fn deserialize_with_custom_priority_traces_only() {
424        // Configure to only try traces - should succeed for traces, fail for others
425        let deserializer =
426            OtlpDeserializer::new_with_signals(IndexSet::from([OtlpSignalType::Traces]));
427
428        // Traces should work
429        let trace_bytes = create_traces_request_bytes();
430        let result = deserializer.parse(trace_bytes, LogNamespace::Legacy);
431        assert!(result.is_ok());
432        assert!(matches!(result.unwrap()[0], Event::Trace(_)));
433
434        // Logs should fail since we're not trying to parse logs
435        let log_bytes = create_logs_request_bytes();
436        let result = deserializer.parse(log_bytes, LogNamespace::Legacy);
437        assert!(result.is_err());
438    }
439}