1use bytes::Bytes;
2use opentelemetry_proto::proto::{
3 DESCRIPTOR_BYTES, LOGS_REQUEST_MESSAGE_TYPE, METRICS_REQUEST_MESSAGE_TYPE,
4 RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
5 TRACES_REQUEST_MESSAGE_TYPE,
6};
7use smallvec::{SmallVec, smallvec};
8use vector_config::{configurable_component, indexmap::IndexSet};
9use vector_core::{
10 config::{DataType, LogNamespace},
11 event::Event,
12 schema,
13};
14use vrl::{protobuf::parse::Options, value::Kind};
15
16use super::{Deserializer, ProtobufDeserializer};
17
18#[configurable_component]
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21#[serde(rename_all = "snake_case")]
22pub enum OtlpSignalType {
23 Logs,
25 Metrics,
27 Traces,
29}
30
31#[configurable_component]
33#[derive(Debug, Clone)]
34pub struct OtlpDeserializerConfig {
35 #[serde(default = "default_signal_types")]
44 pub signal_types: IndexSet<OtlpSignalType>,
45}
46
47fn default_signal_types() -> IndexSet<OtlpSignalType> {
48 IndexSet::from([
49 OtlpSignalType::Logs,
50 OtlpSignalType::Metrics,
51 OtlpSignalType::Traces,
52 ])
53}
54
55impl Default for OtlpDeserializerConfig {
56 fn default() -> Self {
57 Self {
58 signal_types: default_signal_types(),
59 }
60 }
61}
62
63impl OtlpDeserializerConfig {
64 pub fn build(&self) -> OtlpDeserializer {
66 OtlpDeserializer::new_with_signals(self.signal_types.clone())
67 }
68
69 pub fn output_type(&self) -> DataType {
71 DataType::Log | DataType::Trace
72 }
73
74 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
76 match log_namespace {
77 LogNamespace::Legacy => {
78 schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
79 }
80 LogNamespace::Vector => {
81 schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
82 }
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
104pub struct OtlpDeserializer {
105 logs_deserializer: ProtobufDeserializer,
106 metrics_deserializer: ProtobufDeserializer,
107 traces_deserializer: ProtobufDeserializer,
108 signals: IndexSet<OtlpSignalType>,
110}
111
112impl Default for OtlpDeserializer {
113 fn default() -> Self {
114 Self::new_with_signals(default_signal_types())
115 }
116}
117
118impl OtlpDeserializer {
119 pub fn new_with_signals(signals: IndexSet<OtlpSignalType>) -> Self {
122 let options = Options {
123 use_json_names: true,
124 };
125
126 let logs_deserializer = ProtobufDeserializer::new_from_bytes(
127 DESCRIPTOR_BYTES,
128 LOGS_REQUEST_MESSAGE_TYPE,
129 options.clone(),
130 )
131 .expect("Failed to create logs deserializer");
132
133 let metrics_deserializer = ProtobufDeserializer::new_from_bytes(
134 DESCRIPTOR_BYTES,
135 METRICS_REQUEST_MESSAGE_TYPE,
136 options.clone(),
137 )
138 .expect("Failed to create metrics deserializer");
139
140 let traces_deserializer = ProtobufDeserializer::new_from_bytes(
141 DESCRIPTOR_BYTES,
142 TRACES_REQUEST_MESSAGE_TYPE,
143 options,
144 )
145 .expect("Failed to create traces deserializer");
146
147 Self {
148 logs_deserializer,
149 metrics_deserializer,
150 traces_deserializer,
151 signals,
152 }
153 }
154}
155
156impl Deserializer for OtlpDeserializer {
157 fn parse(
158 &self,
159 bytes: Bytes,
160 log_namespace: LogNamespace,
161 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
162 for signal_type in &self.signals {
164 match signal_type {
165 OtlpSignalType::Logs => {
166 if let Ok(events) = self.logs_deserializer.parse(bytes.clone(), log_namespace)
167 && let Some(Event::Log(log)) = events.first()
168 && log.get(RESOURCE_LOGS_JSON_FIELD).is_some()
169 {
170 return Ok(events);
171 }
172 }
173 OtlpSignalType::Metrics => {
174 if let Ok(events) = self
175 .metrics_deserializer
176 .parse(bytes.clone(), log_namespace)
177 && let Some(Event::Log(log)) = events.first()
178 && log.get(RESOURCE_METRICS_JSON_FIELD).is_some()
179 {
180 return Ok(events);
181 }
182 }
183 OtlpSignalType::Traces => {
184 if let Ok(mut events) =
186 self.traces_deserializer.parse(bytes.clone(), log_namespace)
187 && let Some(Event::Log(log)) = events.first()
188 && log.get(RESOURCE_SPANS_JSON_FIELD).is_some()
189 {
190 if let Some(Event::Log(log)) = events.pop() {
192 let trace_event = Event::Trace(log.into());
193 return Ok(smallvec![trace_event]);
194 }
195 }
196 }
197 }
198 }
199
200 Err(format!("Invalid OTLP data: expected one of {:?}", self.signals).into())
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use opentelemetry_proto::proto::{
207 collector::{
208 logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest,
209 trace::v1::ExportTraceServiceRequest,
210 },
211 logs::v1::{LogRecord, ResourceLogs, ScopeLogs},
212 metrics::v1::{Metric, ResourceMetrics, ScopeMetrics},
213 resource::v1::Resource,
214 trace::v1::{ResourceSpans, ScopeSpans, Span},
215 };
216 use prost::Message;
217
218 use super::*;
219
220 const TEST_TRACE_ID: [u8; 16] = [
222 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f,
223 0x10,
224 ];
225 const TEST_SPAN_ID: [u8; 8] = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
227
228 fn create_logs_request_bytes() -> Bytes {
229 let request = ExportLogsServiceRequest {
230 resource_logs: vec![ResourceLogs {
231 resource: Some(Resource {
232 attributes: vec![],
233 dropped_attributes_count: 0,
234 }),
235 scope_logs: vec![ScopeLogs {
236 scope: None,
237 log_records: vec![LogRecord {
238 time_unix_nano: 1234567890,
239 severity_number: 9,
240 severity_text: "INFO".to_string(),
241 body: None,
242 attributes: vec![],
243 dropped_attributes_count: 0,
244 flags: 0,
245 trace_id: vec![],
246 span_id: vec![],
247 observed_time_unix_nano: 0,
248 }],
249 schema_url: String::new(),
250 }],
251 schema_url: String::new(),
252 }],
253 };
254
255 Bytes::from(request.encode_to_vec())
256 }
257
258 fn create_metrics_request_bytes() -> Bytes {
259 let request = ExportMetricsServiceRequest {
260 resource_metrics: vec![ResourceMetrics {
261 resource: Some(Resource {
262 attributes: vec![],
263 dropped_attributes_count: 0,
264 }),
265 scope_metrics: vec![ScopeMetrics {
266 scope: None,
267 metrics: vec![Metric {
268 name: "test_metric".to_string(),
269 description: String::new(),
270 unit: String::new(),
271 data: None,
272 }],
273 schema_url: String::new(),
274 }],
275 schema_url: String::new(),
276 }],
277 };
278
279 Bytes::from(request.encode_to_vec())
280 }
281
282 fn create_traces_request_bytes() -> Bytes {
283 let request = ExportTraceServiceRequest {
284 resource_spans: vec![ResourceSpans {
285 resource: Some(Resource {
286 attributes: vec![],
287 dropped_attributes_count: 0,
288 }),
289 scope_spans: vec![ScopeSpans {
290 scope: None,
291 spans: vec![Span {
292 trace_id: TEST_TRACE_ID.to_vec(),
293 span_id: TEST_SPAN_ID.to_vec(),
294 trace_state: String::new(),
295 parent_span_id: vec![],
296 name: "test_span".to_string(),
297 kind: 0,
298 start_time_unix_nano: 1234567890,
299 end_time_unix_nano: 1234567900,
300 attributes: vec![],
301 dropped_attributes_count: 0,
302 events: vec![],
303 dropped_events_count: 0,
304 links: vec![],
305 dropped_links_count: 0,
306 status: None,
307 }],
308 schema_url: String::new(),
309 }],
310 schema_url: String::new(),
311 }],
312 };
313
314 Bytes::from(request.encode_to_vec())
315 }
316
317 fn validate_trace_ids(trace: &vrl::value::Value) {
318 let resource_spans = trace
320 .get("resourceSpans")
321 .and_then(|v| v.as_array())
322 .expect("resourceSpans should be an array");
323
324 let first_rs = resource_spans
325 .first()
326 .expect("should have at least one resource span");
327
328 let scope_spans = first_rs
329 .get("scopeSpans")
330 .and_then(|v| v.as_array())
331 .expect("scopeSpans should be an array");
332
333 let first_ss = scope_spans
334 .first()
335 .expect("should have at least one scope span");
336
337 let spans = first_ss
338 .get("spans")
339 .and_then(|v| v.as_array())
340 .expect("spans should be an array");
341
342 let span = spans.first().expect("should have at least one span");
343
344 let trace_id = span
346 .get("traceId")
347 .and_then(|v| v.as_bytes())
348 .expect("traceId should exist and be bytes");
349
350 assert_eq!(
351 trace_id.as_ref(),
352 &TEST_TRACE_ID,
353 "traceId should match the expected 16 bytes (0102030405060708090a0b0c0d0e0f10)"
354 );
355
356 let span_id = span
358 .get("spanId")
359 .and_then(|v| v.as_bytes())
360 .expect("spanId should exist and be bytes");
361
362 assert_eq!(
363 span_id.as_ref(),
364 &TEST_SPAN_ID,
365 "spanId should match the expected 8 bytes (0102030405060708)"
366 );
367 }
368
369 fn assert_otlp_event(bytes: Bytes, field: &str, is_trace: bool) {
370 let deserializer = OtlpDeserializer::default();
371 let events = deserializer.parse(bytes, LogNamespace::Legacy).unwrap();
372
373 assert_eq!(events.len(), 1);
374 if is_trace {
375 assert!(matches!(events[0], Event::Trace(_)));
376 let trace = events[0].as_trace();
377 assert!(trace.get(field).is_some());
378 validate_trace_ids(trace.value());
379 } else {
380 assert!(events[0].as_log().get(field).is_some());
381 }
382 }
383
384 #[test]
385 fn deserialize_otlp_logs() {
386 assert_otlp_event(create_logs_request_bytes(), RESOURCE_LOGS_JSON_FIELD, false);
387 }
388
389 #[test]
390 fn deserialize_otlp_metrics() {
391 assert_otlp_event(
392 create_metrics_request_bytes(),
393 RESOURCE_METRICS_JSON_FIELD,
394 false,
395 );
396 }
397
398 #[test]
399 fn deserialize_otlp_traces() {
400 assert_otlp_event(
401 create_traces_request_bytes(),
402 RESOURCE_SPANS_JSON_FIELD,
403 true,
404 );
405 }
406
407 #[test]
408 fn deserialize_invalid_otlp() {
409 let deserializer = OtlpDeserializer::default();
410 let bytes = Bytes::from("invalid protobuf data");
411 let result = deserializer.parse(bytes, LogNamespace::Legacy);
412
413 assert!(result.is_err());
414 assert!(
415 result
416 .unwrap_err()
417 .to_string()
418 .contains("Invalid OTLP data")
419 );
420 }
421
422 #[test]
423 fn deserialize_with_custom_priority_traces_only() {
424 let deserializer =
426 OtlpDeserializer::new_with_signals(IndexSet::from([OtlpSignalType::Traces]));
427
428 let trace_bytes = create_traces_request_bytes();
430 let result = deserializer.parse(trace_bytes, LogNamespace::Legacy);
431 assert!(result.is_ok());
432 assert!(matches!(result.unwrap()[0], Event::Trace(_)));
433
434 let log_bytes = create_logs_request_bytes();
436 let result = deserializer.parse(log_bytes, LogNamespace::Legacy);
437 assert!(result.is_err());
438 }
439}