codecs/encoding/
serializer.rs

1//! Serializer configuration and implementation for encoding structured events as bytes.
2
3use bytes::BytesMut;
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7#[cfg(feature = "arrow")]
8use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig};
9#[cfg(feature = "opentelemetry")]
10use super::format::{OtlpSerializer, OtlpSerializerConfig};
11#[cfg(feature = "parquet")]
12use super::format::{ParquetSerializer, ParquetSerializerConfig};
13use super::format::{ProtoBatchSerializer, ProtoBatchSerializerConfig};
14#[cfg(feature = "syslog")]
15use super::format::{SyslogSerializer, SyslogSerializerConfig};
16use super::{
17    chunking::Chunker,
18    format::{
19        AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
20        CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer,
21        GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer,
22        LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
23        NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer,
24        RawMessageSerializerConfig, TextSerializer, TextSerializerConfig,
25    },
26    framing::{
27        CharacterDelimitedEncoderConfig, FramingConfig, LengthDelimitedEncoderConfig,
28        VarintLengthDelimitedEncoderConfig,
29    },
30};
31
32/// Serializer configuration.
33#[configurable_component]
34#[derive(Clone, Debug)]
35#[serde(tag = "codec", rename_all = "snake_case")]
36#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
37pub enum SerializerConfig {
38    /// Encodes an event as an [Apache Avro][apache_avro] message.
39    ///
40    /// [apache_avro]: https://avro.apache.org/
41    Avro {
42        /// Apache Avro-specific encoder options.
43        avro: AvroSerializerOptions,
44    },
45
46    /// Encodes an event as a CEF (Common Event Format) formatted message.
47    ///
48    Cef(
49        /// Options for the CEF encoder.
50        CefSerializerConfig,
51    ),
52
53    /// Encodes an event as a CSV message.
54    ///
55    /// This codec must be configured with fields to encode.
56    ///
57    Csv(CsvSerializerConfig),
58
59    /// Encodes an event as a [GELF][gelf] message.
60    ///
61    /// This codec is experimental for the following reason:
62    ///
63    /// The GELF specification is more strict than the actual Graylog receiver.
64    /// Vector's encoder currently adheres more strictly to the GELF spec, with
65    /// the exception that some characters such as `@`  are allowed in field names.
66    ///
67    /// Other GELF codecs, such as Loki's, use a [Go SDK][implementation] that is maintained
68    /// by Graylog and is much more relaxed than the GELF spec.
69    ///
70    /// Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means
71    /// the codec might continue to relax the enforcement of the specification.
72    ///
73    /// [gelf]: https://docs.graylog.org/docs/gelf
74    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
75    Gelf(GelfSerializerConfig),
76
77    /// Encodes an event as [JSON][json].
78    ///
79    /// [json]: https://www.json.org/
80    Json(JsonSerializerConfig),
81
82    /// Encodes an event as a [logfmt][logfmt] message.
83    ///
84    /// [logfmt]: https://brandur.org/logfmt
85    Logfmt,
86
87    /// Encodes an event in the [native Protocol Buffers format][vector_native_protobuf].
88    ///
89    /// This codec is **[experimental][experimental]**.
90    ///
91    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
92    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
93    Native,
94
95    /// Encodes an event in the [native JSON format][vector_native_json].
96    ///
97    /// This codec is **[experimental][experimental]**.
98    ///
99    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
100    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
101    NativeJson,
102
103    /// Encodes an event in the [OTLP (OpenTelemetry Protocol)][otlp] format.
104    ///
105    /// This codec uses protobuf encoding, which is the recommended format for OTLP.
106    /// The output is suitable for sending to OTLP-compatible endpoints with
107    /// `content-type: application/x-protobuf`.
108    ///
109    /// [otlp]: https://opentelemetry.io/docs/specs/otlp/
110    #[cfg(feature = "opentelemetry")]
111    Otlp,
112
113    /// Encodes an event as a [Protobuf][protobuf] message.
114    ///
115    /// [protobuf]: https://protobuf.dev/
116    Protobuf(ProtobufSerializerConfig),
117
118    /// No encoding.
119    ///
120    /// This encoding uses the `message` field of a log event.
121    ///
122    /// Be careful if you are modifying your log events (for example, by using a `remap`
123    /// transform) and removing the message field while doing additional parsing on it, as this
124    /// could lead to the encoding emitting empty strings for the given event.
125    RawMessage,
126
127    /// Plain text encoding.
128    ///
129    /// This encoding uses the `message` field of a log event. For metrics, it uses an
130    /// encoding that resembles the Prometheus export format.
131    ///
132    /// Be careful if you are modifying your log events (for example, by using a `remap`
133    /// transform) and removing the message field while doing additional parsing on it, as this
134    /// could lead to the encoding emitting empty strings for the given event.
135    Text(TextSerializerConfig),
136
137    /// Syslog encoding
138    /// RFC 3164 and 5424 are supported
139    #[cfg(feature = "syslog")]
140    Syslog(SyslogSerializerConfig),
141}
142
143impl Default for SerializerConfig {
144    fn default() -> Self {
145        Self::Json(JsonSerializerConfig::default())
146    }
147}
148
149/// Batch serializer configuration.
150#[configurable_component]
151#[derive(Clone, Debug)]
152#[serde(tag = "codec", rename_all = "snake_case")]
153#[configurable(metadata(
154    docs::enum_tag_description = "The codec to use for batch encoding events."
155))]
156pub enum BatchSerializerConfig {
157    /// Encodes events in [Apache Arrow][apache_arrow] IPC streaming format.
158    ///
159    /// This is the streaming variant of the Arrow IPC format, which writes
160    /// a continuous stream of record batches.
161    ///
162    /// [apache_arrow]: https://arrow.apache.org/
163    #[cfg(feature = "arrow")]
164    #[serde(rename = "arrow_stream")]
165    ArrowStream(ArrowStreamSerializerConfig),
166    /// Encodes events in [Apache Parquet][apache_parquet] columnar format.
167    ///
168    /// [apache_parquet]: https://parquet.apache.org/
169    #[cfg(feature = "parquet")]
170    #[serde(rename = "parquet")]
171    Parquet(ParquetSerializerConfig),
172
173    /// Encodes each event individually as a [Protocol Buffers][protobuf] message.
174    ///
175    /// Each event in the batch is serialized to protobuf bytes independently,
176    /// producing a list of byte buffers (one per event).
177    ///
178    /// [protobuf]: https://protobuf.dev/
179    #[serde(rename = "proto_batch")]
180    ProtoBatch(ProtoBatchSerializerConfig),
181}
182
183impl BatchSerializerConfig {
184    /// Build the batch serializer from this configuration.
185    pub fn build_batch_serializer(
186        &self,
187    ) -> Result<super::BatchSerializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
188        match self {
189            #[cfg(feature = "arrow")]
190            BatchSerializerConfig::ArrowStream(arrow_config) => {
191                let serializer = ArrowStreamSerializer::new(arrow_config.clone())?;
192                Ok(super::BatchSerializer::Arrow(serializer))
193            }
194            #[cfg(feature = "parquet")]
195            BatchSerializerConfig::Parquet(parquet_config) => {
196                let serializer = ParquetSerializer::new(parquet_config.clone())?;
197                Ok(super::BatchSerializer::Parquet(Box::new(serializer)))
198            }
199            BatchSerializerConfig::ProtoBatch(proto_config) => {
200                let serializer = ProtoBatchSerializer::new(proto_config.clone())?;
201                Ok(super::BatchSerializer::ProtoBatch(serializer))
202            }
203        }
204    }
205
206    /// The data type of events that are accepted by this batch serializer.
207    pub fn input_type(&self) -> DataType {
208        match self {
209            #[cfg(feature = "arrow")]
210            BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(),
211            #[cfg(feature = "parquet")]
212            BatchSerializerConfig::Parquet(parquet_config) => parquet_config.input_type(),
213            BatchSerializerConfig::ProtoBatch(proto_config) => proto_config.input_type(),
214        }
215    }
216
217    /// The schema required by the batch serializer.
218    pub fn schema_requirement(&self) -> schema::Requirement {
219        match self {
220            #[cfg(feature = "arrow")]
221            BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(),
222            #[cfg(feature = "parquet")]
223            BatchSerializerConfig::Parquet(parquet_config) => parquet_config.schema_requirement(),
224            BatchSerializerConfig::ProtoBatch(proto_config) => proto_config.schema_requirement(),
225        }
226    }
227}
228
229impl From<AvroSerializerConfig> for SerializerConfig {
230    fn from(config: AvroSerializerConfig) -> Self {
231        Self::Avro { avro: config.avro }
232    }
233}
234
235impl From<CefSerializerConfig> for SerializerConfig {
236    fn from(config: CefSerializerConfig) -> Self {
237        Self::Cef(config)
238    }
239}
240
241impl From<CsvSerializerConfig> for SerializerConfig {
242    fn from(config: CsvSerializerConfig) -> Self {
243        Self::Csv(config)
244    }
245}
246
247impl From<GelfSerializerConfig> for SerializerConfig {
248    fn from(config: GelfSerializerConfig) -> Self {
249        Self::Gelf(config)
250    }
251}
252
253impl From<JsonSerializerConfig> for SerializerConfig {
254    fn from(config: JsonSerializerConfig) -> Self {
255        Self::Json(config)
256    }
257}
258
259impl From<LogfmtSerializerConfig> for SerializerConfig {
260    fn from(_: LogfmtSerializerConfig) -> Self {
261        Self::Logfmt
262    }
263}
264
265impl From<NativeSerializerConfig> for SerializerConfig {
266    fn from(_: NativeSerializerConfig) -> Self {
267        Self::Native
268    }
269}
270
271impl From<NativeJsonSerializerConfig> for SerializerConfig {
272    fn from(_: NativeJsonSerializerConfig) -> Self {
273        Self::NativeJson
274    }
275}
276
277#[cfg(feature = "opentelemetry")]
278impl From<OtlpSerializerConfig> for SerializerConfig {
279    fn from(_: OtlpSerializerConfig) -> Self {
280        Self::Otlp
281    }
282}
283
284impl From<ProtobufSerializerConfig> for SerializerConfig {
285    fn from(config: ProtobufSerializerConfig) -> Self {
286        Self::Protobuf(config)
287    }
288}
289
290impl From<RawMessageSerializerConfig> for SerializerConfig {
291    fn from(_: RawMessageSerializerConfig) -> Self {
292        Self::RawMessage
293    }
294}
295
296impl From<TextSerializerConfig> for SerializerConfig {
297    fn from(config: TextSerializerConfig) -> Self {
298        Self::Text(config)
299    }
300}
301
302impl SerializerConfig {
303    /// Build the `Serializer` from this configuration.
304    pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
305        match self {
306            SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
307                AvroSerializerConfig::new(avro.schema.clone()).build()?,
308            )),
309            SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
310            SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
311            SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
312            SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
313            SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
314            SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
315            SerializerConfig::NativeJson => {
316                Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
317            }
318            #[cfg(feature = "opentelemetry")]
319            SerializerConfig::Otlp => {
320                Ok(Serializer::Otlp(OtlpSerializerConfig::default().build()?))
321            }
322            SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
323            SerializerConfig::RawMessage => {
324                Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
325            }
326            SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
327            #[cfg(feature = "syslog")]
328            SerializerConfig::Syslog(config) => Ok(Serializer::Syslog(config.build())),
329        }
330    }
331
332    /// Return an appropriate default framer for the given serializer.
333    pub fn default_stream_framing(&self) -> FramingConfig {
334        match self {
335            // TODO: Technically, Avro messages are supposed to be framed[1] as a vector of
336            // length-delimited buffers -- `len` as big-endian 32-bit unsigned integer, followed by
337            // `len` bytes -- with a "zero-length buffer" to terminate the overall message... which
338            // our length delimited framer obviously will not do.
339            //
340            // This is OK for now, because the Avro serializer is more ceremonial than anything
341            // else, existing to curry serializer config options to Pulsar's native client, not to
342            // actually serialize the bytes themselves... but we're still exposing this method and
343            // we should do so accurately, even if practically it doesn't need to be.
344            //
345            // [1]: https://avro.apache.org/docs/1.11.1/specification/_print/#message-framing
346            SerializerConfig::Avro { .. } | SerializerConfig::Native => {
347                FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
348            }
349            #[cfg(feature = "opentelemetry")]
350            SerializerConfig::Otlp => FramingConfig::Bytes,
351            SerializerConfig::Protobuf(_) => {
352                FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
353            }
354            SerializerConfig::Cef(_)
355            | SerializerConfig::Csv(_)
356            | SerializerConfig::Json(_)
357            | SerializerConfig::Logfmt
358            | SerializerConfig::NativeJson
359            | SerializerConfig::RawMessage
360            | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
361            #[cfg(feature = "syslog")]
362            SerializerConfig::Syslog(_) => FramingConfig::NewlineDelimited,
363            SerializerConfig::Gelf(_) => {
364                FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
365            }
366        }
367    }
368
369    /// The data type of events that are accepted by this `Serializer`.
370    pub fn input_type(&self) -> DataType {
371        match self {
372            SerializerConfig::Avro { avro } => {
373                AvroSerializerConfig::new(avro.schema.clone()).input_type()
374            }
375            SerializerConfig::Cef(config) => config.input_type(),
376            SerializerConfig::Csv(config) => config.input_type(),
377            SerializerConfig::Gelf(config) => config.input_type(),
378            SerializerConfig::Json(config) => config.input_type(),
379            SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
380            SerializerConfig::Native => NativeSerializerConfig.input_type(),
381            SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
382            #[cfg(feature = "opentelemetry")]
383            SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(),
384            SerializerConfig::Protobuf(config) => config.input_type(),
385            SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
386            SerializerConfig::Text(config) => config.input_type(),
387            #[cfg(feature = "syslog")]
388            SerializerConfig::Syslog(config) => config.input_type(),
389        }
390    }
391
392    /// The schema required by the serializer.
393    pub fn schema_requirement(&self) -> schema::Requirement {
394        match self {
395            SerializerConfig::Avro { avro } => {
396                AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
397            }
398            SerializerConfig::Cef(config) => config.schema_requirement(),
399            SerializerConfig::Csv(config) => config.schema_requirement(),
400            SerializerConfig::Gelf(config) => config.schema_requirement(),
401            SerializerConfig::Json(config) => config.schema_requirement(),
402            SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
403            SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
404            SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
405            #[cfg(feature = "opentelemetry")]
406            SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(),
407            SerializerConfig::Protobuf(config) => config.schema_requirement(),
408            SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
409            SerializerConfig::Text(config) => config.schema_requirement(),
410            #[cfg(feature = "syslog")]
411            SerializerConfig::Syslog(config) => config.schema_requirement(),
412        }
413    }
414}
415
416/// Serialize structured events as bytes.
417#[derive(Debug, Clone)]
418pub enum Serializer {
419    /// Uses an `AvroSerializer` for serialization.
420    Avro(AvroSerializer),
421    /// Uses a `CefSerializer` for serialization.
422    Cef(CefSerializer),
423    /// Uses a `CsvSerializer` for serialization.
424    Csv(CsvSerializer),
425    /// Uses a `GelfSerializer` for serialization.
426    Gelf(GelfSerializer),
427    /// Uses a `JsonSerializer` for serialization.
428    Json(JsonSerializer),
429    /// Uses a `LogfmtSerializer` for serialization.
430    Logfmt(LogfmtSerializer),
431    /// Uses a `NativeSerializer` for serialization.
432    Native(NativeSerializer),
433    /// Uses a `NativeJsonSerializer` for serialization.
434    NativeJson(NativeJsonSerializer),
435    /// Uses an `OtlpSerializer` for serialization.
436    #[cfg(feature = "opentelemetry")]
437    Otlp(OtlpSerializer),
438    /// Uses a `ProtobufSerializer` for serialization.
439    Protobuf(ProtobufSerializer),
440    /// Uses a `RawMessageSerializer` for serialization.
441    RawMessage(RawMessageSerializer),
442    /// Uses a `TextSerializer` for serialization.
443    Text(TextSerializer),
444    /// Uses a `SyslogSerializer` for serialization.
445    #[cfg(feature = "syslog")]
446    Syslog(SyslogSerializer),
447}
448
449impl Serializer {
450    /// Check if the serializer supports encoding an event to JSON via `Serializer::to_json_value`.
451    pub fn supports_json(&self) -> bool {
452        match self {
453            Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
454            Serializer::Avro(_)
455            | Serializer::Cef(_)
456            | Serializer::Csv(_)
457            | Serializer::Logfmt(_)
458            | Serializer::Text(_)
459            | Serializer::Native(_)
460            | Serializer::Protobuf(_)
461            | Serializer::RawMessage(_) => false,
462            #[cfg(feature = "syslog")]
463            Serializer::Syslog(_) => false,
464            #[cfg(feature = "opentelemetry")]
465            Serializer::Otlp(_) => false,
466        }
467    }
468
469    /// Encode event and represent it as JSON value.
470    ///
471    /// # Panics
472    ///
473    /// Panics if the serializer does not support encoding to JSON. Call `Serializer::supports_json`
474    /// if you need to determine the capability to encode to JSON at runtime.
475    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
476        match self {
477            Serializer::Gelf(serializer) => serializer.to_json_value(event),
478            Serializer::Json(serializer) => serializer.to_json_value(event),
479            Serializer::NativeJson(serializer) => serializer.to_json_value(event),
480            Serializer::Avro(_)
481            | Serializer::Cef(_)
482            | Serializer::Csv(_)
483            | Serializer::Logfmt(_)
484            | Serializer::Text(_)
485            | Serializer::Native(_)
486            | Serializer::Protobuf(_)
487            | Serializer::RawMessage(_) => {
488                panic!("Serializer does not support JSON")
489            }
490            #[cfg(feature = "syslog")]
491            Serializer::Syslog(_) => {
492                panic!("Serializer does not support JSON")
493            }
494            #[cfg(feature = "opentelemetry")]
495            Serializer::Otlp(_) => {
496                panic!("Serializer does not support JSON")
497            }
498        }
499    }
500
501    /// Returns the chunking implementation for the serializer, if any is supported.
502    pub fn chunker(&self) -> Option<Chunker> {
503        match self {
504            Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
505            _ => None,
506        }
507    }
508
509    /// Returns whether the serializer produces binary output.
510    ///
511    /// Binary serializers produce raw bytes that should not be interpreted as text,
512    /// while text serializers produce UTF-8 encoded strings.
513    pub const fn is_binary(&self) -> bool {
514        match self {
515            Serializer::RawMessage(_)
516            | Serializer::Avro(_)
517            | Serializer::Native(_)
518            | Serializer::Protobuf(_) => true,
519            #[cfg(feature = "opentelemetry")]
520            Serializer::Otlp(_) => true,
521            #[cfg(feature = "syslog")]
522            Serializer::Syslog(_) => false,
523            Serializer::Cef(_)
524            | Serializer::Csv(_)
525            | Serializer::Logfmt(_)
526            | Serializer::Gelf(_)
527            | Serializer::Json(_)
528            | Serializer::Text(_)
529            | Serializer::NativeJson(_) => false,
530        }
531    }
532}
533
534impl From<AvroSerializer> for Serializer {
535    fn from(serializer: AvroSerializer) -> Self {
536        Self::Avro(serializer)
537    }
538}
539
540impl From<CefSerializer> for Serializer {
541    fn from(serializer: CefSerializer) -> Self {
542        Self::Cef(serializer)
543    }
544}
545
546impl From<CsvSerializer> for Serializer {
547    fn from(serializer: CsvSerializer) -> Self {
548        Self::Csv(serializer)
549    }
550}
551
552impl From<GelfSerializer> for Serializer {
553    fn from(serializer: GelfSerializer) -> Self {
554        Self::Gelf(serializer)
555    }
556}
557
558impl From<JsonSerializer> for Serializer {
559    fn from(serializer: JsonSerializer) -> Self {
560        Self::Json(serializer)
561    }
562}
563
564impl From<LogfmtSerializer> for Serializer {
565    fn from(serializer: LogfmtSerializer) -> Self {
566        Self::Logfmt(serializer)
567    }
568}
569
570impl From<NativeSerializer> for Serializer {
571    fn from(serializer: NativeSerializer) -> Self {
572        Self::Native(serializer)
573    }
574}
575
576impl From<NativeJsonSerializer> for Serializer {
577    fn from(serializer: NativeJsonSerializer) -> Self {
578        Self::NativeJson(serializer)
579    }
580}
581
582#[cfg(feature = "opentelemetry")]
583impl From<OtlpSerializer> for Serializer {
584    fn from(serializer: OtlpSerializer) -> Self {
585        Self::Otlp(serializer)
586    }
587}
588
589impl From<ProtobufSerializer> for Serializer {
590    fn from(serializer: ProtobufSerializer) -> Self {
591        Self::Protobuf(serializer)
592    }
593}
594
595impl From<RawMessageSerializer> for Serializer {
596    fn from(serializer: RawMessageSerializer) -> Self {
597        Self::RawMessage(serializer)
598    }
599}
600
601impl From<TextSerializer> for Serializer {
602    fn from(serializer: TextSerializer) -> Self {
603        Self::Text(serializer)
604    }
605}
606#[cfg(feature = "syslog")]
607impl From<SyslogSerializer> for Serializer {
608    fn from(serializer: SyslogSerializer) -> Self {
609        Self::Syslog(serializer)
610    }
611}
612
613impl tokio_util::codec::Encoder<Event> for Serializer {
614    type Error = vector_common::Error;
615
616    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
617        match self {
618            Serializer::Avro(serializer) => serializer.encode(event, buffer),
619            Serializer::Cef(serializer) => serializer.encode(event, buffer),
620            Serializer::Csv(serializer) => serializer.encode(event, buffer),
621            Serializer::Gelf(serializer) => serializer.encode(event, buffer),
622            Serializer::Json(serializer) => serializer.encode(event, buffer),
623            Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
624            Serializer::Native(serializer) => serializer.encode(event, buffer),
625            Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
626            #[cfg(feature = "opentelemetry")]
627            Serializer::Otlp(serializer) => serializer.encode(event, buffer),
628            Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
629            Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
630            Serializer::Text(serializer) => serializer.encode(event, buffer),
631            #[cfg(feature = "syslog")]
632            Serializer::Syslog(serializer) => serializer.encode(event, buffer),
633        }
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640
641    #[test]
642    fn test_serializer_config_default() {
643        // SerializerConfig should default to Json
644        let config = SerializerConfig::default();
645        assert!(matches!(config, SerializerConfig::Json(_)));
646    }
647
648    #[test]
649    fn test_serializer_is_binary() {
650        // Test that is_binary correctly identifies binary serializers
651        let json_config = JsonSerializerConfig::default();
652        let json_serializer = Serializer::Json(json_config.build());
653        assert!(!json_serializer.is_binary());
654
655        let native_serializer = Serializer::Native(NativeSerializerConfig.build());
656        assert!(native_serializer.is_binary());
657
658        let raw_message_serializer = Serializer::RawMessage(RawMessageSerializerConfig.build());
659        assert!(raw_message_serializer.is_binary());
660    }
661
662    #[test]
663    fn test_serializer_supports_json() {
664        // Test that supports_json correctly identifies JSON-capable serializers
665        let json_config = JsonSerializerConfig::default();
666        let json_serializer = Serializer::Json(json_config.build());
667        assert!(json_serializer.supports_json());
668
669        let text_config = TextSerializerConfig::default();
670        let text_serializer = Serializer::Text(text_config.build());
671        assert!(!text_serializer.supports_json());
672    }
673
674    #[test]
675    fn test_serializer_config_build() {
676        // Test that SerializerConfig can be built successfully
677        let config = SerializerConfig::Json(JsonSerializerConfig::default());
678        let serializer = config.build();
679        assert!(serializer.is_ok());
680        assert!(matches!(serializer.unwrap(), Serializer::Json(_)));
681    }
682
683    #[test]
684    fn test_serializer_config_default_framing() {
685        // Test that default framing is appropriate for each serializer type
686        let json_config = SerializerConfig::Json(JsonSerializerConfig::default());
687        assert!(matches!(
688            json_config.default_stream_framing(),
689            FramingConfig::NewlineDelimited
690        ));
691
692        let native_config = SerializerConfig::Native;
693        assert!(matches!(
694            native_config.default_stream_framing(),
695            FramingConfig::LengthDelimited(_)
696        ));
697    }
698}