codecs/decoding/
decoder.rs

1use bytes::{Bytes, BytesMut};
2use smallvec::SmallVec;
3use vector_common::internal_event::emit;
4use vector_core::{
5    config::LogNamespace,
6    event::{Event, EventMetadata},
7};
8
9use crate::{
10    decoding::format::Deserializer as _,
11    decoding::{
12        BoxedFramingError, BytesDeserializer, Deserializer, Error, Framer, NewlineDelimitedDecoder,
13    },
14    internal_events::{DecoderDeserializeError, DecoderFramingError},
15};
16
17type DecodedFrame = (SmallVec<[Event; 1]>, usize);
18
19/// A decoder that can decode structured events from a byte stream / byte
20/// messages.
21#[derive(Clone)]
22pub struct Decoder {
23    /// The framer being used.
24    pub framer: Framer,
25    /// The deserializer being used.
26    pub deserializer: Deserializer,
27    /// The `log_namespace` being used.
28    pub log_namespace: LogNamespace,
29}
30
31impl Default for Decoder {
32    fn default() -> Self {
33        Self {
34            framer: Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
35            deserializer: Deserializer::Bytes(BytesDeserializer),
36            log_namespace: LogNamespace::Legacy,
37        }
38    }
39}
40
41impl Decoder {
42    /// Creates a new `Decoder` with the specified `Framer` to produce byte
43    /// frames from the byte stream / byte messages and `Deserializer` to parse
44    /// structured events from a byte frame.
45    pub const fn new(framer: Framer, deserializer: Deserializer) -> Self {
46        Self {
47            framer,
48            deserializer,
49            log_namespace: LogNamespace::Legacy,
50        }
51    }
52
53    /// Sets the log namespace that will be used when decoding.
54    pub const fn with_log_namespace(mut self, log_namespace: LogNamespace) -> Self {
55        self.log_namespace = log_namespace;
56        self
57    }
58
59    /// Attaches a per-decode-call metadata template to the inner deserializer,
60    /// allowing deserializers to read from and write to event metadata.
61    pub fn with_metadata_template(mut self, metadata: EventMetadata) -> Self {
62        self.deserializer = self.deserializer.with_metadata_template(metadata);
63        self
64    }
65
66    /// Handles the framing result and parses it into a structured event, if
67    /// possible.
68    ///
69    /// Emits logs if either framing or parsing failed.
70    fn handle_framing_result(
71        &mut self,
72        frame: Result<Option<Bytes>, BoxedFramingError>,
73    ) -> Result<Option<DecodedFrame>, Error> {
74        let frame = frame.map_err(|error| {
75            emit(DecoderFramingError { error: &error });
76            Error::FramingError(error)
77        })?;
78
79        frame
80            .map(|frame| self.deserializer_parse(frame))
81            .transpose()
82    }
83
84    /// Parses a frame using the included deserializer, and handles any errors by logging.
85    pub fn deserializer_parse(&self, frame: Bytes) -> Result<DecodedFrame, Error> {
86        let byte_size = frame.len();
87
88        // Parse structured events from the byte frame.
89        self.deserializer
90            .parse(frame, self.log_namespace)
91            .map(|events| (events, byte_size))
92            .map_err(|error| {
93                emit(DecoderDeserializeError { error: &error });
94                Error::ParsingError(error)
95            })
96    }
97}
98
99impl tokio_util::codec::Decoder for Decoder {
100    type Item = DecodedFrame;
101    type Error = Error;
102
103    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
104        let frame = self.framer.decode(buf);
105        self.handle_framing_result(frame)
106    }
107
108    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
109        let frame = self.framer.decode_eof(buf);
110        self.handle_framing_result(frame)
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use bytes::Bytes;
117    use futures::{StreamExt, stream};
118    use tokio_util::io::StreamReader;
119    use vrl::value::Value;
120
121    use super::Decoder;
122    use crate::{
123        DecoderFramedRead, JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
124        decoding::{Deserializer, Framer},
125    };
126
127    #[tokio::test]
128    async fn framed_read_recover_from_error() {
129        let iter = stream::iter(
130            ["{ \"foo\": 1 }\n", "invalid\n", "{ \"bar\": 2 }\n"]
131                .into_iter()
132                .map(Bytes::from),
133        );
134        let stream = iter.map(Ok::<_, std::io::Error>);
135        let reader = StreamReader::new(stream);
136        let decoder = Decoder::new(
137            Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
138            Deserializer::Json(JsonDeserializer::default()),
139        );
140        let mut stream = DecoderFramedRead::new(reader, decoder);
141
142        let next = stream.next().await.unwrap();
143        let event = next.unwrap().0.pop().unwrap().into_log();
144        assert_eq!(event.get("foo").unwrap(), &Value::from(1));
145
146        let next = stream.next().await.unwrap();
147        let error = next.unwrap_err();
148        assert!(error.can_continue());
149
150        let next = stream.next().await.unwrap();
151        let event = next.unwrap().0.pop().unwrap().into_log();
152        assert_eq!(event.get("bar").unwrap(), &Value::from(2));
153    }
154}