codecs/decoding/framing/
varint_length_delimited.rs

1use bytes::{Buf, Bytes, BytesMut};
2use snafu::Snafu;
3use tokio_util::codec::Decoder;
4use vector_config::configurable_component;
5
6use super::{BoxedFramingError, FramingError, StreamDecodingError};
7
8/// Errors that can occur during varint length delimited framing.
9#[derive(Debug, Snafu)]
10pub enum VarintFramingError {
11    #[snafu(display("Varint too large"))]
12    VarintOverflow,
13
14    #[snafu(display("Frame too large: {length} bytes (max: {max})"))]
15    FrameTooLarge { length: usize, max: usize },
16
17    #[snafu(display("Trailing data at EOF"))]
18    TrailingData,
19}
20
21impl StreamDecodingError for VarintFramingError {
22    fn can_continue(&self) -> bool {
23        match self {
24            // Varint overflow and frame too large are not recoverable
25            Self::VarintOverflow | Self::FrameTooLarge { .. } => false,
26            // Trailing data at EOF is not recoverable
27            Self::TrailingData => false,
28        }
29    }
30}
31
32impl FramingError for VarintFramingError {
33    fn as_any(&self) -> &dyn std::any::Any {
34        self as &dyn std::any::Any
35    }
36}
37
38/// Config used to build a `VarintLengthDelimitedDecoder`.
39#[configurable_component]
40#[derive(Debug, Clone, Default)]
41pub struct VarintLengthDelimitedDecoderConfig {
42    /// Maximum frame length
43    #[serde(default = "default_max_frame_length")]
44    pub max_frame_length: usize,
45}
46
47const fn default_max_frame_length() -> usize {
48    8 * 1_024 * 1_024
49}
50
51impl VarintLengthDelimitedDecoderConfig {
52    /// Build the `VarintLengthDelimitedDecoder` from this configuration.
53    pub fn build(&self) -> VarintLengthDelimitedDecoder {
54        VarintLengthDelimitedDecoder::new(self.max_frame_length)
55    }
56}
57
58/// A codec for handling bytes sequences whose length is encoded as a varint prefix.
59/// This is compatible with protobuf's length-delimited encoding.
60#[derive(Debug, Clone)]
61pub struct VarintLengthDelimitedDecoder {
62    max_frame_length: usize,
63}
64
65impl VarintLengthDelimitedDecoder {
66    /// Creates a new `VarintLengthDelimitedDecoder`.
67    pub fn new(max_frame_length: usize) -> Self {
68        Self { max_frame_length }
69    }
70
71    /// Decode a varint from the buffer
72    fn decode_varint(&self, buf: &mut BytesMut) -> Result<Option<u64>, BoxedFramingError> {
73        if buf.is_empty() {
74            return Ok(None);
75        }
76
77        let mut value: u64 = 0;
78        let mut shift: u8 = 0;
79        let mut bytes_read = 0;
80
81        for byte in buf.iter() {
82            bytes_read += 1;
83            let byte_value = (*byte & 0x7F) as u64;
84            value |= byte_value << shift;
85
86            if *byte & 0x80 == 0 {
87                // Last byte of varint
88                buf.advance(bytes_read);
89                return Ok(Some(value));
90            }
91
92            shift += 7;
93            if shift >= 64 {
94                return Err(VarintFramingError::VarintOverflow.into());
95            }
96        }
97
98        // Incomplete varint
99        Ok(None)
100    }
101}
102
103impl Default for VarintLengthDelimitedDecoder {
104    fn default() -> Self {
105        Self::new(default_max_frame_length())
106    }
107}
108
109impl Decoder for VarintLengthDelimitedDecoder {
110    type Item = Bytes;
111    type Error = BoxedFramingError;
112
113    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
114        // First, try to decode the varint length
115        let length = match self.decode_varint(src)? {
116            Some(len) => len as usize,
117            None => return Ok(None), // Incomplete varint
118        };
119
120        // Check if the length is reasonable
121        if length > self.max_frame_length {
122            return Err(VarintFramingError::FrameTooLarge {
123                length,
124                max: self.max_frame_length,
125            }
126            .into());
127        }
128
129        // Check if we have enough data for the complete frame
130        if src.len() < length {
131            return Ok(None); // Incomplete frame
132        }
133
134        // Extract the frame
135        let frame = src.split_to(length).freeze();
136        Ok(Some(frame))
137    }
138
139    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
140        if src.is_empty() {
141            Ok(None)
142        } else {
143            // Try to decode what we have, even if incomplete
144            match self.decode(src)? {
145                Some(frame) => Ok(Some(frame)),
146                None => {
147                    // If we have data but couldn't decode it, it's trailing data
148                    if !src.is_empty() {
149                        Err(VarintFramingError::TrailingData.into())
150                    } else {
151                        Ok(None)
152                    }
153                }
154            }
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162
163    #[test]
164    fn decode_single_byte_varint() {
165        let mut input = BytesMut::from(&[0x03, b'f', b'o', b'o'][..]);
166        let mut decoder = VarintLengthDelimitedDecoder::default();
167
168        assert_eq!(
169            decoder.decode(&mut input).unwrap().unwrap(),
170            Bytes::from("foo")
171        );
172        assert_eq!(decoder.decode(&mut input).unwrap(), None);
173    }
174
175    #[test]
176    fn decode_multi_byte_varint() {
177        // 300 in varint encoding: 0xAC 0x02
178        let mut input = BytesMut::from(&[0xAC, 0x02][..]);
179        // Add 300 bytes of data
180        input.extend_from_slice(&vec![b'x'; 300]);
181        let mut decoder = VarintLengthDelimitedDecoder::default();
182
183        let result = decoder.decode(&mut input).unwrap().unwrap();
184        assert_eq!(result.len(), 300);
185        assert_eq!(decoder.decode(&mut input).unwrap(), None);
186    }
187
188    #[test]
189    fn decode_incomplete_varint() {
190        let mut input = BytesMut::from(&[0x80][..]); // Incomplete varint
191        let mut decoder = VarintLengthDelimitedDecoder::default();
192
193        assert_eq!(decoder.decode(&mut input).unwrap(), None);
194    }
195
196    #[test]
197    fn decode_incomplete_frame() {
198        let mut input = BytesMut::from(&[0x05, b'f', b'o'][..]); // Length 5, but only 2 bytes
199        let mut decoder = VarintLengthDelimitedDecoder::default();
200
201        assert_eq!(decoder.decode(&mut input).unwrap(), None);
202    }
203
204    #[test]
205    fn decode_frame_too_large() {
206        let mut input =
207            BytesMut::from(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01][..]);
208        let mut decoder = VarintLengthDelimitedDecoder::new(1000);
209
210        assert!(decoder.decode(&mut input).is_err());
211    }
212
213    #[test]
214    fn decode_trailing_data_at_eof() {
215        let mut input = BytesMut::from(&[0x03, b'f', b'o', b'o', b'e', b'x', b't', b'r', b'a'][..]);
216        let mut decoder = VarintLengthDelimitedDecoder::default();
217
218        // First decode should succeed
219        assert_eq!(
220            decoder.decode(&mut input).unwrap().unwrap(),
221            Bytes::from("foo")
222        );
223
224        // Second decode should fail with trailing data
225        assert!(decoder.decode_eof(&mut input).is_err());
226    }
227}