codecs/encoding/
encoder.rs

1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3use vector_common::internal_event::emit;
4use vector_core::event::Event;
5
6#[cfg(feature = "arrow")]
7use crate::encoding::ArrowStreamSerializer;
8#[cfg(feature = "parquet")]
9use crate::encoding::ParquetSerializer;
10use crate::{
11    encoding::{Error, Framer, ProtoBatchSerializer, Serializer},
12    internal_events::{EncoderFramingError, EncoderSerializeError},
13};
14
15/// The output of a batch encoding operation.
16///
17/// Different batch serializers produce different output types:
18/// - Arrow serializer produces a `RecordBatch`
19/// - Proto serializer produces individual byte buffers per event
20#[derive(Debug)]
21pub enum BatchOutput {
22    /// An Arrow RecordBatch containing all events encoded as columnar data.
23    #[cfg(feature = "arrow")]
24    Arrow(arrow::record_batch::RecordBatch),
25    /// A list of individually-serialized records (one per event).
26    Records(Vec<Vec<u8>>),
27}
28
29/// Serializers that support batch encoding (encoding all events at once).
30#[derive(Debug, Clone)]
31pub enum BatchSerializer {
32    /// Arrow IPC stream format serializer.
33    #[cfg(feature = "arrow")]
34    Arrow(ArrowStreamSerializer),
35    /// Parquet format serializer.
36    #[cfg(feature = "parquet")]
37    Parquet(Box<ParquetSerializer>),
38    /// Protobuf batch serializer that encodes each event individually.
39    ProtoBatch(ProtoBatchSerializer),
40}
41
42/// An encoder that encodes batches of events.
43#[derive(Debug, Clone)]
44pub struct BatchEncoder {
45    serializer: BatchSerializer,
46}
47
48impl BatchEncoder {
49    /// Creates a new `BatchEncoder` with the specified batch serializer.
50    pub const fn new(serializer: BatchSerializer) -> Self {
51        Self { serializer }
52    }
53
54    /// Get the batch serializer.
55    pub const fn serializer(&self) -> &BatchSerializer {
56        &self.serializer
57    }
58
59    /// Get the HTTP content type.
60    ///
61    /// Returns `None` for serializers that do not produce a single HTTP body
62    /// (e.g. `ProtoBatch`, which emits one record per event for an out-of-band
63    /// transport rather than an HTTP payload).
64    #[cfg(any(feature = "arrow", feature = "parquet"))]
65    pub const fn content_type(&self) -> Option<&'static str> {
66        match &self.serializer {
67            #[cfg(feature = "arrow")]
68            BatchSerializer::Arrow(_) => Some("application/vnd.apache.arrow.stream"),
69            #[cfg(feature = "parquet")]
70            BatchSerializer::Parquet(_) => Some("application/vnd.apache.parquet"),
71            BatchSerializer::ProtoBatch(_) => None,
72        }
73    }
74
75    /// Encode a batch of events into a `BatchOutput`.
76    pub fn encode_batch(&self, events: &[Event]) -> Result<BatchOutput, Error> {
77        match &self.serializer {
78            #[cfg(feature = "arrow")]
79            BatchSerializer::Arrow(serializer) => {
80                let record_batch = serializer.encode_to_record_batch(events).map_err(|err| {
81                    use crate::encoding::ArrowEncodingError;
82                    match err {
83                        ArrowEncodingError::NullConstraint { .. } => {
84                            Error::SchemaConstraintViolation(Box::new(err))
85                        }
86                        _ => Error::SerializingError(Box::new(err)),
87                    }
88                })?;
89                Ok(BatchOutput::Arrow(record_batch))
90            }
91            BatchSerializer::ProtoBatch(serializer) => {
92                let records = serializer
93                    .encode_batch(events)
94                    .map_err(|err| Error::SerializingError(Box::new(err)))?;
95                Ok(BatchOutput::Records(records))
96            }
97            #[cfg(feature = "parquet")]
98            BatchSerializer::Parquet(_) => Err(Error::SerializingError(Box::from(
99                "Parquet serializer does not support encode_batch; use the tokio Encoder interface instead",
100            ))),
101        }
102    }
103}
104
105impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
106    type Error = Error;
107
108    #[allow(unused_variables)]
109    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
110        match &mut self.serializer {
111            #[cfg(feature = "arrow")]
112            BatchSerializer::Arrow(serializer) => {
113                serializer.encode(events, buffer).map_err(|err| {
114                    use crate::encoding::ArrowEncodingError;
115                    match err {
116                        ArrowEncodingError::NullConstraint { .. } => {
117                            Error::SchemaConstraintViolation(Box::new(err))
118                        }
119                        _ => Error::SerializingError(Box::new(err)),
120                    }
121                })
122            }
123            #[cfg(feature = "parquet")]
124            BatchSerializer::Parquet(serializer) => serializer
125                .encode(events, buffer)
126                .map_err(Error::SerializingError),
127            BatchSerializer::ProtoBatch(_) => Err(Error::SerializingError(Box::from(
128                "ProtoBatch serializer does not support the tokio Encoder interface; use BatchEncoder::encode_batch() instead",
129            ))),
130        }
131    }
132}
133
134/// A wrapper that supports both framed and batch encoding modes.
135#[derive(Debug, Clone)]
136pub enum EncoderKind {
137    /// Uses framing to encode individual events
138    Framed(Box<Encoder<Framer>>),
139    /// Encodes events in batches without framing
140    Batch(BatchEncoder),
141}
142
143#[derive(Debug, Clone)]
144/// An encoder that can encode structured events into byte frames.
145pub struct Encoder<Framer>
146where
147    Framer: Clone,
148{
149    framer: Framer,
150    serializer: Serializer,
151}
152
153impl Default for Encoder<Framer> {
154    fn default() -> Self {
155        use crate::encoding::{NewlineDelimitedEncoder, TextSerializerConfig};
156
157        Self {
158            framer: NewlineDelimitedEncoder::default().into(),
159            serializer: TextSerializerConfig::default().build().into(),
160        }
161    }
162}
163
164impl Default for Encoder<()> {
165    fn default() -> Self {
166        use crate::encoding::TextSerializerConfig;
167
168        Self {
169            framer: (),
170            serializer: TextSerializerConfig::default().build().into(),
171        }
172    }
173}
174
175impl<Framer> Encoder<Framer>
176where
177    Framer: Clone,
178{
179    /// Serialize the event without applying framing.
180    pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
181        let len = buffer.len();
182        let mut payload = buffer.split_off(len);
183
184        self.serialize_at_start(event, &mut payload)?;
185
186        buffer.unsplit(payload);
187
188        Ok(())
189    }
190
191    /// Serialize the event without applying framing, at the start of the provided buffer.
192    fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
193        self.serializer.encode(event, buffer).map_err(|error| {
194            emit(EncoderSerializeError { error: &error });
195            Error::SerializingError(error)
196        })
197    }
198}
199
200impl Encoder<Framer> {
201    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
202    /// from a structured event, and the `Framer` to wrap these into a byte
203    /// frame.
204    pub const fn new(framer: Framer, serializer: Serializer) -> Self {
205        Self { framer, serializer }
206    }
207
208    /// Get the framer.
209    pub const fn framer(&self) -> &Framer {
210        &self.framer
211    }
212
213    /// Get the serializer.
214    pub const fn serializer(&self) -> &Serializer {
215        &self.serializer
216    }
217
218    /// Get the prefix that encloses a batch of events.
219    pub const fn batch_prefix(&self) -> &[u8] {
220        match (&self.framer, &self.serializer) {
221            (
222                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
223                    delimiter: b',',
224                }),
225                Serializer::Json(_) | Serializer::NativeJson(_),
226            ) => b"[",
227            _ => &[],
228        }
229    }
230
231    /// Get the suffix that encloses a batch of events.
232    pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
233        match (&self.framer, &self.serializer, empty) {
234            (
235                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
236                    delimiter: b',',
237                }),
238                Serializer::Json(_) | Serializer::NativeJson(_),
239                _,
240            ) => b"]",
241            (Framer::NewlineDelimited(_), _, false) => b"\n",
242            _ => &[],
243        }
244    }
245
246    /// Get the HTTP content type.
247    pub const fn content_type(&self) -> &'static str {
248        match (&self.serializer, &self.framer) {
249            (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
250                "application/x-ndjson"
251            }
252            (
253                Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
254                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
255                    delimiter: b',',
256                }),
257            ) => "application/json",
258            (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
259            (
260                Serializer::Avro(_)
261                | Serializer::Cef(_)
262                | Serializer::Csv(_)
263                | Serializer::Gelf(_)
264                | Serializer::Json(_)
265                | Serializer::Logfmt(_)
266                | Serializer::NativeJson(_)
267                | Serializer::RawMessage(_)
268                | Serializer::Text(_),
269                _,
270            ) => "text/plain",
271            #[cfg(feature = "syslog")]
272            (Serializer::Syslog(_), _) => "text/plain",
273            #[cfg(feature = "opentelemetry")]
274            (Serializer::Otlp(_), _) => "application/x-protobuf",
275        }
276    }
277}
278
279impl Encoder<()> {
280    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
281    /// from a structured event.
282    pub const fn new(serializer: Serializer) -> Self {
283        Self {
284            framer: (),
285            serializer,
286        }
287    }
288
289    /// Get the serializer.
290    pub const fn serializer(&self) -> &Serializer {
291        &self.serializer
292    }
293}
294
295impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
296    type Error = Error;
297
298    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
299        let len = buffer.len();
300        let mut payload = buffer.split_off(len);
301
302        self.serialize_at_start(event, &mut payload)?;
303
304        // Frame the serialized event.
305        self.framer.encode((), &mut payload).map_err(|error| {
306            emit(EncoderFramingError { error: &error });
307            Error::FramingError(error)
308        })?;
309
310        buffer.unsplit(payload);
311
312        Ok(())
313    }
314}
315
316impl tokio_util::codec::Encoder<Event> for Encoder<()> {
317    type Error = Error;
318
319    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
320        let len = buffer.len();
321        let mut payload = buffer.split_off(len);
322
323        self.serialize_at_start(event, &mut payload)?;
324
325        buffer.unsplit(payload);
326
327        Ok(())
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use bytes::BufMut;
334    use futures::{SinkExt, StreamExt};
335    use tokio_util::codec::FramedWrite;
336    use vector_core::event::LogEvent;
337
338    use super::*;
339    use crate::encoding::BoxedFramingError;
340
341    #[derive(Debug, Clone)]
342    struct ParenEncoder;
343
344    impl ParenEncoder {
345        pub(super) const fn new() -> Self {
346            Self
347        }
348    }
349
350    impl tokio_util::codec::Encoder<()> for ParenEncoder {
351        type Error = BoxedFramingError;
352
353        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
354            dst.reserve(2);
355            let inner = dst.split();
356            dst.put_u8(b'(');
357            dst.unsplit(inner);
358            dst.put_u8(b')');
359            Ok(())
360        }
361    }
362
363    #[derive(Debug, Clone)]
364    struct ErrorNthEncoder<T>(T, usize, usize)
365    where
366        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
367
368    impl<T> ErrorNthEncoder<T>
369    where
370        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
371    {
372        pub(super) const fn new(encoder: T, n: usize) -> Self {
373            Self(encoder, 0, n)
374        }
375    }
376
377    impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
378    where
379        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
380    {
381        type Error = BoxedFramingError;
382
383        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
384            self.0.encode((), dst)?;
385            let result = if self.1 == self.2 {
386                Err(Box::new(std::io::Error::other("error")) as _)
387            } else {
388                Ok(())
389            };
390            self.1 += 1;
391            result
392        }
393    }
394
395    #[tokio::test]
396    async fn test_encode_events_sink_empty() {
397        let encoder = Encoder::<Framer>::new(
398            Framer::Boxed(Box::new(ParenEncoder::new())),
399            crate::encoding::TextSerializerConfig::default()
400                .build()
401                .into(),
402        );
403        let source = futures::stream::iter(vec![
404            Event::Log(LogEvent::from("foo")),
405            Event::Log(LogEvent::from("bar")),
406            Event::Log(LogEvent::from("baz")),
407        ])
408        .map(Ok);
409        let sink = Vec::new();
410        let mut framed = FramedWrite::new(sink, encoder);
411        source.forward(&mut framed).await.unwrap();
412        let sink = framed.into_inner();
413        assert_eq!(sink, b"(foo)(bar)(baz)");
414    }
415
416    #[tokio::test]
417    async fn test_encode_events_sink_non_empty() {
418        let encoder = Encoder::<Framer>::new(
419            Framer::Boxed(Box::new(ParenEncoder::new())),
420            crate::encoding::TextSerializerConfig::default()
421                .build()
422                .into(),
423        );
424        let source = futures::stream::iter(vec![
425            Event::Log(LogEvent::from("bar")),
426            Event::Log(LogEvent::from("baz")),
427            Event::Log(LogEvent::from("bat")),
428        ])
429        .map(Ok);
430        let sink = Vec::from("(foo)");
431        let mut framed = FramedWrite::new(sink, encoder);
432        source.forward(&mut framed).await.unwrap();
433        let sink = framed.into_inner();
434        assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
435    }
436
437    #[tokio::test]
438    async fn test_encode_events_sink_empty_handle_framing_error() {
439        let encoder = Encoder::<Framer>::new(
440            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
441            crate::encoding::TextSerializerConfig::default()
442                .build()
443                .into(),
444        );
445        let source = futures::stream::iter(vec![
446            Event::Log(LogEvent::from("foo")),
447            Event::Log(LogEvent::from("bar")),
448            Event::Log(LogEvent::from("baz")),
449        ])
450        .map(Ok);
451        let sink = Vec::new();
452        let mut framed = FramedWrite::new(sink, encoder);
453        assert!(source.forward(&mut framed).await.is_err());
454        framed.flush().await.unwrap();
455        let sink = framed.into_inner();
456        assert_eq!(sink, b"(foo)");
457    }
458
459    #[tokio::test]
460    async fn test_encode_events_sink_non_empty_handle_framing_error() {
461        let encoder = Encoder::<Framer>::new(
462            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
463            crate::encoding::TextSerializerConfig::default()
464                .build()
465                .into(),
466        );
467        let source = futures::stream::iter(vec![
468            Event::Log(LogEvent::from("bar")),
469            Event::Log(LogEvent::from("baz")),
470            Event::Log(LogEvent::from("bat")),
471        ])
472        .map(Ok);
473        let sink = Vec::from("(foo)");
474        let mut framed = FramedWrite::new(sink, encoder);
475        assert!(source.forward(&mut framed).await.is_err());
476        framed.flush().await.unwrap();
477        let sink = framed.into_inner();
478        assert_eq!(sink, b"(foo)(bar)");
479    }
480
481    #[tokio::test]
482    async fn test_encode_batch_newline() {
483        let encoder = Encoder::<Framer>::new(
484            Framer::NewlineDelimited(crate::encoding::NewlineDelimitedEncoder::default()),
485            crate::encoding::TextSerializerConfig::default()
486                .build()
487                .into(),
488        );
489        let source = futures::stream::iter(vec![
490            Event::Log(LogEvent::from("bar")),
491            Event::Log(LogEvent::from("baz")),
492            Event::Log(LogEvent::from("bat")),
493        ])
494        .map(Ok);
495        let sink: Vec<u8> = Vec::new();
496        let mut framed = FramedWrite::new(sink, encoder);
497        source.forward(&mut framed).await.unwrap();
498        let sink = framed.into_inner();
499        assert_eq!(sink, b"bar\nbaz\nbat\n");
500    }
501}