codecs/decoding/framing/
newline_delimited.rs1use bytes::{Bytes, BytesMut};
2use tokio_util::codec::Decoder;
3use vector_config::configurable_component;
4
5use super::{BoxedFramingError, CharacterDelimitedDecoder};
6
7#[configurable_component]
9#[derive(Debug, Clone, Default, PartialEq, Eq)]
10pub struct NewlineDelimitedDecoderConfig {
11 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
13 pub newline_delimited: NewlineDelimitedDecoderOptions,
14}
15
16#[configurable_component]
18#[derive(Clone, Debug, Default, PartialEq, Eq)]
19pub struct NewlineDelimitedDecoderOptions {
20 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
32 pub max_length: Option<usize>,
33}
34
35impl NewlineDelimitedDecoderOptions {
36 pub const fn new_with_max_length(max_length: usize) -> Self {
38 Self {
39 max_length: Some(max_length),
40 }
41 }
42}
43
44impl NewlineDelimitedDecoderConfig {
45 pub fn new() -> Self {
47 Default::default()
48 }
49
50 pub const fn new_with_max_length(max_length: usize) -> Self {
52 Self {
53 newline_delimited: { NewlineDelimitedDecoderOptions::new_with_max_length(max_length) },
54 }
55 }
56
57 pub const fn build(&self) -> NewlineDelimitedDecoder {
59 if let Some(max_length) = self.newline_delimited.max_length {
60 NewlineDelimitedDecoder::new_with_max_length(max_length)
61 } else {
62 NewlineDelimitedDecoder::new()
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct NewlineDelimitedDecoder(CharacterDelimitedDecoder);
70
71impl NewlineDelimitedDecoder {
72 pub const fn new() -> Self {
74 Self(CharacterDelimitedDecoder::new(b'\n'))
75 }
76
77 pub const fn new_with_max_length(max_length: usize) -> Self {
81 Self(CharacterDelimitedDecoder::new_with_max_length(
82 b'\n', max_length,
83 ))
84 }
85}
86
87impl Default for NewlineDelimitedDecoder {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl Decoder for NewlineDelimitedDecoder {
94 type Item = Bytes;
95 type Error = BoxedFramingError;
96
97 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
98 self.0.decode(src)
99 }
100
101 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
102 self.0.decode_eof(src)
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109
110 #[test]
111 fn decode_bytes_with_newlines() {
112 let mut input = BytesMut::from("foo\nbar\nbaz");
113 let mut decoder = NewlineDelimitedDecoder::new();
114
115 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
116 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
117 assert_eq!(decoder.decode(&mut input).unwrap(), None);
118 }
119
120 #[test]
121 fn decode_bytes_with_newlines_trailing() {
122 let mut input = BytesMut::from("foo\nbar\nbaz\n");
123 let mut decoder = NewlineDelimitedDecoder::new();
124
125 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
126 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
127 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "baz");
128 assert_eq!(decoder.decode(&mut input).unwrap(), None);
129 }
130
131 #[test]
132 fn decode_bytes_with_newlines_and_max_length() {
133 let mut input = BytesMut::from("foo\nbarbara\nbaz\n");
134 let mut decoder = NewlineDelimitedDecoder::new_with_max_length(3);
135
136 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
137 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "baz");
138 assert_eq!(decoder.decode(&mut input).unwrap(), None);
139 }
140
141 #[test]
142 fn decode_eof_bytes_with_newlines() {
143 let mut input = BytesMut::from("foo\nbar\nbaz");
144 let mut decoder = NewlineDelimitedDecoder::new();
145
146 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
147 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
148 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
149 }
150
151 #[test]
152 fn decode_eof_bytes_with_newlines_trailing() {
153 let mut input = BytesMut::from("foo\nbar\nbaz\n");
154 let mut decoder = NewlineDelimitedDecoder::new();
155
156 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
157 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
158 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
159 assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
160 }
161
162 #[test]
163 fn decode_eof_bytes_with_newlines_and_max_length() {
164 let mut input = BytesMut::from("foo\nbarbara\nbaz\n");
165 let mut decoder = NewlineDelimitedDecoder::new_with_max_length(3);
166
167 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
168 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
169 assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
170 }
171}