codecs/decoding/framing/
newline_delimited.rs

1use bytes::{Bytes, BytesMut};
2use tokio_util::codec::Decoder;
3use vector_config::configurable_component;
4
5use super::{BoxedFramingError, CharacterDelimitedDecoder};
6
7/// Config used to build a `NewlineDelimitedDecoder`.
8#[configurable_component]
9#[derive(Debug, Clone, Default, PartialEq, Eq)]
10pub struct NewlineDelimitedDecoderConfig {
11    /// Options for the newline delimited decoder.
12    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
13    pub newline_delimited: NewlineDelimitedDecoderOptions,
14}
15
16/// Options for building a `NewlineDelimitedDecoder`.
17#[configurable_component]
18#[derive(Clone, Debug, Default, PartialEq, Eq)]
19pub struct NewlineDelimitedDecoderOptions {
20    /// The maximum length of the byte buffer.
21    ///
22    /// This length does *not* include the trailing delimiter.
23    ///
24    /// By default, no maximum length is enforced. If events are malformed, this can lead to
25    /// additional resource usage as events continue to be buffered in memory, and can potentially
26    /// lead to memory exhaustion in extreme cases.
27    ///
28    /// If there is a risk of processing malformed data, such as logs with user-controlled input,
29    /// consider setting the maximum length to a reasonably large value as a safety net. This
30    /// prevents processing from being unbounded.
31    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
32    pub max_length: Option<usize>,
33}
34
35impl NewlineDelimitedDecoderOptions {
36    /// Creates a `NewlineDelimitedDecoderOptions` with a maximum frame length limit.
37    pub const fn new_with_max_length(max_length: usize) -> Self {
38        Self {
39            max_length: Some(max_length),
40        }
41    }
42}
43
44impl NewlineDelimitedDecoderConfig {
45    /// Creates a new `NewlineDelimitedDecoderConfig`.
46    pub fn new() -> Self {
47        Default::default()
48    }
49
50    /// Creates a `NewlineDelimitedDecoder` with a maximum frame length limit.
51    pub const fn new_with_max_length(max_length: usize) -> Self {
52        Self {
53            newline_delimited: { NewlineDelimitedDecoderOptions::new_with_max_length(max_length) },
54        }
55    }
56
57    /// Build the `NewlineDelimitedDecoder` from this configuration.
58    pub const fn build(&self) -> NewlineDelimitedDecoder {
59        if let Some(max_length) = self.newline_delimited.max_length {
60            NewlineDelimitedDecoder::new_with_max_length(max_length)
61        } else {
62            NewlineDelimitedDecoder::new()
63        }
64    }
65}
66
67/// A codec for handling bytes that are delimited by (a) newline(s).
68#[derive(Debug, Clone)]
69pub struct NewlineDelimitedDecoder(CharacterDelimitedDecoder);
70
71impl NewlineDelimitedDecoder {
72    /// Creates a new `NewlineDelimitedDecoder`.
73    pub const fn new() -> Self {
74        Self(CharacterDelimitedDecoder::new(b'\n'))
75    }
76
77    /// Creates a `NewlineDelimitedDecoder` with a maximum frame length limit.
78    ///
79    /// Any frames longer than `max_length` bytes will be discarded entirely.
80    pub const fn new_with_max_length(max_length: usize) -> Self {
81        Self(CharacterDelimitedDecoder::new_with_max_length(
82            b'\n', max_length,
83        ))
84    }
85}
86
87impl Default for NewlineDelimitedDecoder {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93impl Decoder for NewlineDelimitedDecoder {
94    type Item = Bytes;
95    type Error = BoxedFramingError;
96
97    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
98        self.0.decode(src)
99    }
100
101    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
102        self.0.decode_eof(src)
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109
110    #[test]
111    fn decode_bytes_with_newlines() {
112        let mut input = BytesMut::from("foo\nbar\nbaz");
113        let mut decoder = NewlineDelimitedDecoder::new();
114
115        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
116        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
117        assert_eq!(decoder.decode(&mut input).unwrap(), None);
118    }
119
120    #[test]
121    fn decode_bytes_with_newlines_trailing() {
122        let mut input = BytesMut::from("foo\nbar\nbaz\n");
123        let mut decoder = NewlineDelimitedDecoder::new();
124
125        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
126        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
127        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "baz");
128        assert_eq!(decoder.decode(&mut input).unwrap(), None);
129    }
130
131    #[test]
132    fn decode_bytes_with_newlines_and_max_length() {
133        let mut input = BytesMut::from("foo\nbarbara\nbaz\n");
134        let mut decoder = NewlineDelimitedDecoder::new_with_max_length(3);
135
136        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
137        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "baz");
138        assert_eq!(decoder.decode(&mut input).unwrap(), None);
139    }
140
141    #[test]
142    fn decode_eof_bytes_with_newlines() {
143        let mut input = BytesMut::from("foo\nbar\nbaz");
144        let mut decoder = NewlineDelimitedDecoder::new();
145
146        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
147        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
148        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
149    }
150
151    #[test]
152    fn decode_eof_bytes_with_newlines_trailing() {
153        let mut input = BytesMut::from("foo\nbar\nbaz\n");
154        let mut decoder = NewlineDelimitedDecoder::new();
155
156        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
157        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
158        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
159        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
160    }
161
162    #[test]
163    fn decode_eof_bytes_with_newlines_and_max_length() {
164        let mut input = BytesMut::from("foo\nbarbara\nbaz\n");
165        let mut decoder = NewlineDelimitedDecoder::new_with_max_length(3);
166
167        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
168        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
169        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
170    }
171}