codecs/decoding/
mod.rs

1//! A collection of support structures that are used in the process of decoding
2//! bytes into events.
3
4mod config;
5mod decoder;
6mod error;
7pub mod format;
8pub mod framing;
9
10use std::fmt::Debug;
11
12use bytes::{Bytes, BytesMut};
13pub use config::DecodingConfig;
14pub use decoder::Decoder;
15pub use error::StreamDecodingError;
16pub use format::{
17    BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
18    GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
19    InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
20    NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
21    NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
22    ProtobufDeserializerConfig, ProtobufDeserializerOptions,
23};
24#[cfg(feature = "opentelemetry")]
25pub use format::{OtlpDeserializer, OtlpDeserializerConfig, OtlpSignalType};
26#[cfg(feature = "syslog")]
27pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
28pub use framing::{
29    BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
30    CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
31    ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
32    LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
33    NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
34    OctetCountingDecoderOptions, VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
35};
36use smallvec::SmallVec;
37use vector_config::configurable_component;
38use vector_core::{
39    config::{DataType, LogNamespace},
40    event::{Event, EventMetadata},
41    schema,
42};
43
44use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
45use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
46
47/// An error that occurred while decoding structured events from a byte stream /
48/// byte messages.
49#[derive(Debug)]
50pub enum Error {
51    /// The error occurred while producing byte frames from the byte stream /
52    /// byte messages.
53    FramingError(BoxedFramingError),
54    /// The error occurred while parsing structured events from a byte frame.
55    ParsingError(vector_common::Error),
56}
57
58impl std::fmt::Display for Error {
59    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        match self {
61            Self::FramingError(error) => write!(formatter, "FramingError({error})"),
62            Self::ParsingError(error) => write!(formatter, "ParsingError({error})"),
63        }
64    }
65}
66
67impl std::error::Error for Error {}
68
69impl From<std::io::Error> for Error {
70    fn from(error: std::io::Error) -> Self {
71        Self::FramingError(Box::new(error))
72    }
73}
74
75impl StreamDecodingError for Error {
76    fn can_continue(&self) -> bool {
77        match self {
78            Self::FramingError(error) => error.can_continue(),
79            Self::ParsingError(_) => true,
80        }
81    }
82}
83
84/// Framing configuration.
85///
86/// Framing handles how events are separated when encoded in a raw byte form, where each event is
87/// a frame that must be prefixed, or delimited, in a way that marks where an event begins and
88/// ends within the byte stream.
89#[configurable_component]
90#[derive(Clone, Debug)]
91#[serde(tag = "method", rename_all = "snake_case")]
92#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
93pub enum FramingConfig {
94    /// Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).
95    Bytes,
96
97    /// Byte frames which are delimited by a chosen character.
98    CharacterDelimited(CharacterDelimitedDecoderConfig),
99
100    /// Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
101    LengthDelimited(LengthDelimitedDecoderConfig),
102
103    /// Byte frames which are delimited by a newline character.
104    NewlineDelimited(NewlineDelimitedDecoderConfig),
105
106    /// Byte frames according to the [octet counting][octet_counting] format.
107    ///
108    /// [octet_counting]: https://tools.ietf.org/html/rfc6587#section-3.4.1
109    OctetCounting(OctetCountingDecoderConfig),
110
111    /// Byte frames which are chunked GELF messages.
112    ///
113    /// [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html
114    ChunkedGelf(ChunkedGelfDecoderConfig),
115
116    /// Byte frames which are prefixed by a varint indicating the length.
117    /// This is compatible with protobuf's length-delimited encoding.
118    VarintLengthDelimited(VarintLengthDelimitedDecoderConfig),
119}
120
121impl From<BytesDecoderConfig> for FramingConfig {
122    fn from(_: BytesDecoderConfig) -> Self {
123        Self::Bytes
124    }
125}
126
127impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
128    fn from(config: CharacterDelimitedDecoderConfig) -> Self {
129        Self::CharacterDelimited(config)
130    }
131}
132
133impl From<LengthDelimitedDecoderConfig> for FramingConfig {
134    fn from(config: LengthDelimitedDecoderConfig) -> Self {
135        Self::LengthDelimited(config)
136    }
137}
138
139impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
140    fn from(config: NewlineDelimitedDecoderConfig) -> Self {
141        Self::NewlineDelimited(config)
142    }
143}
144
145impl From<OctetCountingDecoderConfig> for FramingConfig {
146    fn from(config: OctetCountingDecoderConfig) -> Self {
147        Self::OctetCounting(config)
148    }
149}
150
151impl From<ChunkedGelfDecoderConfig> for FramingConfig {
152    fn from(config: ChunkedGelfDecoderConfig) -> Self {
153        Self::ChunkedGelf(config)
154    }
155}
156
157impl From<VarintLengthDelimitedDecoderConfig> for FramingConfig {
158    fn from(config: VarintLengthDelimitedDecoderConfig) -> Self {
159        Self::VarintLengthDelimited(config)
160    }
161}
162
163impl FramingConfig {
164    /// Build the `Framer` from this configuration.
165    pub fn build(&self) -> Framer {
166        match self {
167            FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
168            FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
169            FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
170            FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
171            FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
172            FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
173            FramingConfig::VarintLengthDelimited(config) => {
174                Framer::VarintLengthDelimited(config.build())
175            }
176        }
177    }
178}
179
180/// Produce byte frames from a byte stream / byte message.
181#[derive(Debug, Clone)]
182pub enum Framer {
183    /// Uses a `BytesDecoder` for framing.
184    Bytes(BytesDecoder),
185    /// Uses a `CharacterDelimitedDecoder` for framing.
186    CharacterDelimited(CharacterDelimitedDecoder),
187    /// Uses a `LengthDelimitedDecoder` for framing.
188    LengthDelimited(LengthDelimitedDecoder),
189    /// Uses a `NewlineDelimitedDecoder` for framing.
190    NewlineDelimited(NewlineDelimitedDecoder),
191    /// Uses a `OctetCountingDecoder` for framing.
192    OctetCounting(OctetCountingDecoder),
193    /// Uses an opaque `Framer` implementation for framing.
194    Boxed(BoxedFramer),
195    /// Uses a `ChunkedGelfDecoder` for framing.
196    ChunkedGelf(ChunkedGelfDecoder),
197    /// Uses a `VarintLengthDelimitedDecoder` for framing.
198    VarintLengthDelimited(VarintLengthDelimitedDecoder),
199}
200
201impl tokio_util::codec::Decoder for Framer {
202    type Item = Bytes;
203    type Error = BoxedFramingError;
204
205    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
206        match self {
207            Framer::Bytes(framer) => framer.decode(src),
208            Framer::CharacterDelimited(framer) => framer.decode(src),
209            Framer::LengthDelimited(framer) => framer.decode(src),
210            Framer::NewlineDelimited(framer) => framer.decode(src),
211            Framer::OctetCounting(framer) => framer.decode(src),
212            Framer::Boxed(framer) => framer.decode(src),
213            Framer::ChunkedGelf(framer) => framer.decode(src),
214            Framer::VarintLengthDelimited(framer) => framer.decode(src),
215        }
216    }
217
218    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
219        match self {
220            Framer::Bytes(framer) => framer.decode_eof(src),
221            Framer::CharacterDelimited(framer) => framer.decode_eof(src),
222            Framer::LengthDelimited(framer) => framer.decode_eof(src),
223            Framer::NewlineDelimited(framer) => framer.decode_eof(src),
224            Framer::OctetCounting(framer) => framer.decode_eof(src),
225            Framer::Boxed(framer) => framer.decode_eof(src),
226            Framer::ChunkedGelf(framer) => framer.decode_eof(src),
227            Framer::VarintLengthDelimited(framer) => framer.decode_eof(src),
228        }
229    }
230}
231
232/// Configures how events are decoded from raw bytes. Note some decoders can also determine the event output
233/// type (log, metric, trace).
234#[configurable_component]
235#[derive(Clone, Debug)]
236#[serde(tag = "codec", rename_all = "snake_case")]
237#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
238pub enum DeserializerConfig {
239    /// Uses the raw bytes as-is.
240    Bytes,
241
242    /// Decodes the raw bytes as [JSON][json].
243    ///
244    /// [json]: https://www.json.org/
245    Json(JsonDeserializerConfig),
246
247    /// Decodes the raw bytes as [protobuf][protobuf].
248    ///
249    /// [protobuf]: https://protobuf.dev/
250    Protobuf(ProtobufDeserializerConfig),
251
252    #[cfg(feature = "opentelemetry")]
253    /// Decodes the raw bytes as [OTLP (OpenTelemetry Protocol)][otlp] protobuf format.
254    ///
255    /// This decoder handles the three OTLP signal types: logs, metrics, and traces.
256    /// It automatically detects which type of OTLP message is being decoded.
257    ///
258    /// [otlp]: https://opentelemetry.io/docs/specs/otlp/
259    Otlp(OtlpDeserializerConfig),
260
261    #[cfg(feature = "syslog")]
262    /// Decodes the raw bytes as a Syslog message.
263    ///
264    /// Decodes either as the [RFC 3164][rfc3164]-style format ("old" style) or the
265    /// [RFC 5424][rfc5424]-style format ("new" style, includes structured data).
266    ///
267    /// [rfc3164]: https://www.ietf.org/rfc/rfc3164.txt
268    /// [rfc5424]: https://www.ietf.org/rfc/rfc5424.txt
269    Syslog(SyslogDeserializerConfig),
270
271    /// Decodes the raw bytes as [native Protocol Buffers format][vector_native_protobuf].
272    ///
273    /// This decoder can output all types of events: logs, metrics, and traces.
274    ///
275    /// This codec is **[experimental][experimental]**.
276    ///
277    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
278    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
279    Native,
280
281    /// Decodes the raw bytes as [native JSON format][vector_native_json].
282    ///
283    /// This decoder can output all types of events: logs, metrics, and traces.
284    ///
285    /// This codec is **[experimental][experimental]**.
286    ///
287    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
288    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
289    NativeJson(NativeJsonDeserializerConfig),
290
291    /// Decodes the raw bytes as a [GELF][gelf] message.
292    ///
293    /// This codec is experimental for the following reason:
294    ///
295    /// The GELF specification is more strict than the actual Graylog receiver.
296    /// Vector's decoder adheres more strictly to the GELF spec, with
297    /// the exception that some characters such as `@` are allowed in field names.
298    ///
299    /// Other GELF codecs, such as Loki's, use a [Go SDK][implementation] that is maintained
300    /// by Graylog and is much more relaxed than the GELF spec.
301    ///
302    /// Going forward, Vector will use the [Go SDK][implementation] as the reference implementation, which means
303    /// the codec may continue to relax the enforcement of the specification.
304    ///
305    /// [gelf]: https://docs.graylog.org/docs/gelf
306    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
307    Gelf(GelfDeserializerConfig),
308
309    /// Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message.
310    ///
311    /// [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol
312    Influxdb(InfluxdbDeserializerConfig),
313
314    /// Decodes the raw bytes as as an [Apache Avro][apache_avro] message.
315    ///
316    /// [apache_avro]: https://avro.apache.org/
317    Avro {
318        /// Apache Avro-specific encoder options.
319        avro: AvroDeserializerOptions,
320    },
321
322    /// Decodes the raw bytes as a string and passes them as input to a [VRL][vrl] program.
323    ///
324    /// [vrl]: https://vector.dev/docs/reference/vrl
325    Vrl(VrlDeserializerConfig),
326}
327
328impl From<BytesDeserializerConfig> for DeserializerConfig {
329    fn from(_: BytesDeserializerConfig) -> Self {
330        Self::Bytes
331    }
332}
333
334impl From<JsonDeserializerConfig> for DeserializerConfig {
335    fn from(config: JsonDeserializerConfig) -> Self {
336        Self::Json(config)
337    }
338}
339
340#[cfg(feature = "syslog")]
341impl From<SyslogDeserializerConfig> for DeserializerConfig {
342    fn from(config: SyslogDeserializerConfig) -> Self {
343        Self::Syslog(config)
344    }
345}
346
347impl From<GelfDeserializerConfig> for DeserializerConfig {
348    fn from(config: GelfDeserializerConfig) -> Self {
349        Self::Gelf(config)
350    }
351}
352
353impl From<NativeDeserializerConfig> for DeserializerConfig {
354    fn from(_: NativeDeserializerConfig) -> Self {
355        Self::Native
356    }
357}
358
359impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
360    fn from(config: NativeJsonDeserializerConfig) -> Self {
361        Self::NativeJson(config)
362    }
363}
364
365impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
366    fn from(config: InfluxdbDeserializerConfig) -> Self {
367        Self::Influxdb(config)
368    }
369}
370
371impl DeserializerConfig {
372    /// Build the `Deserializer` from this configuration.
373    pub fn build(&self) -> vector_common::Result<Deserializer> {
374        match self {
375            DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
376                AvroDeserializerConfig {
377                    avro_options: avro.clone(),
378                }
379                .build()?,
380            )),
381            DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
382            DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
383            DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
384            #[cfg(feature = "opentelemetry")]
385            DeserializerConfig::Otlp(config) => Ok(Deserializer::Otlp(config.build())),
386            #[cfg(feature = "syslog")]
387            DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
388            DeserializerConfig::Native => {
389                Ok(Deserializer::Native(NativeDeserializerConfig.build()))
390            }
391            DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
392            DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
393            DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
394            DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
395        }
396    }
397
398    /// Return an appropriate default framer for the given deserializer
399    pub fn default_stream_framing(&self) -> FramingConfig {
400        match self {
401            DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
402            DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
403            DeserializerConfig::Bytes
404            | DeserializerConfig::Json(_)
405            | DeserializerConfig::Influxdb(_)
406            | DeserializerConfig::NativeJson(_) => {
407                FramingConfig::NewlineDelimited(Default::default())
408            }
409            DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
410            #[cfg(feature = "opentelemetry")]
411            DeserializerConfig::Otlp(_) => FramingConfig::Bytes,
412            #[cfg(feature = "syslog")]
413            DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
414            DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
415            DeserializerConfig::Gelf(_) => {
416                FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
417            }
418        }
419    }
420
421    /// Returns an appropriate default framing config for the given deserializer with message based inputs.
422    pub fn default_message_based_framing(&self) -> FramingConfig {
423        match self {
424            DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
425            _ => FramingConfig::Bytes,
426        }
427    }
428
429    /// Returns `true` when this is a VRL deserializer.
430    /// Sources can use this to decide whether to call `Decoder::with_metadata_template`.
431    pub fn is_vrl(&self) -> bool {
432        matches!(self, DeserializerConfig::Vrl(_))
433    }
434
435    /// Return the type of event build by this deserializer.
436    pub fn output_type(&self) -> DataType {
437        match self {
438            DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
439                avro_options: avro.clone(),
440            }
441            .output_type(),
442            DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
443            DeserializerConfig::Json(config) => config.output_type(),
444            DeserializerConfig::Protobuf(config) => config.output_type(),
445            #[cfg(feature = "opentelemetry")]
446            DeserializerConfig::Otlp(config) => config.output_type(),
447            #[cfg(feature = "syslog")]
448            DeserializerConfig::Syslog(config) => config.output_type(),
449            DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
450            DeserializerConfig::NativeJson(config) => config.output_type(),
451            DeserializerConfig::Gelf(config) => config.output_type(),
452            DeserializerConfig::Vrl(config) => config.output_type(),
453            DeserializerConfig::Influxdb(config) => config.output_type(),
454        }
455    }
456
457    /// The schema produced by the deserializer.
458    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
459        match self {
460            DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
461                avro_options: avro.clone(),
462            }
463            .schema_definition(log_namespace),
464            DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
465            DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
466            DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
467            #[cfg(feature = "opentelemetry")]
468            DeserializerConfig::Otlp(config) => config.schema_definition(log_namespace),
469            #[cfg(feature = "syslog")]
470            DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
471            DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
472            DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
473            DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
474            DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
475            DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
476        }
477    }
478
479    /// Get the HTTP content type.
480    pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
481        match (&self, framer) {
482            (
483                DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
484                FramingConfig::NewlineDelimited(_),
485            ) => "application/x-ndjson",
486            (
487                DeserializerConfig::Gelf(_)
488                | DeserializerConfig::Json(_)
489                | DeserializerConfig::NativeJson(_),
490                FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
491                    character_delimited:
492                        CharacterDelimitedDecoderOptions {
493                            delimiter: b',',
494                            max_length: Some(usize::MAX),
495                        },
496                }),
497            ) => "application/json",
498            (DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
499                "application/octet-stream"
500            }
501            (DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
502            #[cfg(feature = "opentelemetry")]
503            (DeserializerConfig::Otlp(_), _) => "application/x-protobuf",
504            (
505                DeserializerConfig::Json(_)
506                | DeserializerConfig::NativeJson(_)
507                | DeserializerConfig::Bytes
508                | DeserializerConfig::Gelf(_)
509                | DeserializerConfig::Influxdb(_)
510                | DeserializerConfig::Vrl(_),
511                _,
512            ) => "text/plain",
513            #[cfg(feature = "syslog")]
514            (DeserializerConfig::Syslog(_), _) => "text/plain",
515        }
516    }
517}
518
519/// Parse structured events from bytes.
520#[allow(clippy::large_enum_variant)]
521#[derive(Clone)]
522pub enum Deserializer {
523    /// Uses a `AvroDeserializer` for deserialization.
524    Avro(AvroDeserializer),
525    /// Uses a `BytesDeserializer` for deserialization.
526    Bytes(BytesDeserializer),
527    /// Uses a `JsonDeserializer` for deserialization.
528    Json(JsonDeserializer),
529    /// Uses a `ProtobufDeserializer` for deserialization.
530    Protobuf(ProtobufDeserializer),
531    #[cfg(feature = "opentelemetry")]
532    /// Uses an `OtlpDeserializer` for deserialization.
533    Otlp(OtlpDeserializer),
534    #[cfg(feature = "syslog")]
535    /// Uses a `SyslogDeserializer` for deserialization.
536    Syslog(SyslogDeserializer),
537    /// Uses a `NativeDeserializer` for deserialization.
538    Native(NativeDeserializer),
539    /// Uses a `NativeDeserializer` for deserialization.
540    NativeJson(NativeJsonDeserializer),
541    /// Uses an opaque `Deserializer` implementation for deserialization.
542    Boxed(BoxedDeserializer),
543    /// Uses a `GelfDeserializer` for deserialization.
544    Gelf(GelfDeserializer),
545    /// Uses a `InfluxdbDeserializer` for deserialization.
546    Influxdb(InfluxdbDeserializer),
547    /// Uses a `VrlDeserializer` for deserialization.
548    Vrl(VrlDeserializer),
549}
550
551impl Deserializer {
552    /// Attaches a metadata template to the inner deserializer, if it supports
553    /// one.
554    pub fn with_metadata_template(self, metadata: EventMetadata) -> Self {
555        match self {
556            Deserializer::Vrl(d) => Deserializer::Vrl(d.with_metadata_template(metadata)),
557            other => other,
558        }
559    }
560}
561
562impl format::Deserializer for Deserializer {
563    fn parse(
564        &self,
565        bytes: Bytes,
566        log_namespace: LogNamespace,
567    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
568        match self {
569            Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
570            Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
571            Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
572            Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
573            #[cfg(feature = "opentelemetry")]
574            Deserializer::Otlp(deserializer) => deserializer.parse(bytes, log_namespace),
575            #[cfg(feature = "syslog")]
576            Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
577            Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
578            Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
579            Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
580            Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
581            Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
582            Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
583        }
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590
591    #[test]
592    fn gelf_stream_default_framing_is_null_delimited() {
593        let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
594        let framing_config = deserializer_config.default_stream_framing();
595        assert!(matches!(
596            framing_config,
597            FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
598                character_delimited: CharacterDelimitedDecoderOptions {
599                    delimiter: 0,
600                    max_length: None,
601                }
602            })
603        ));
604    }
605}