codecs/encoding/framing/
length_delimited.rs1use bytes::BytesMut;
2use tokio_util::codec::{Encoder, LengthDelimitedCodec};
3use vector_config::configurable_component;
4
5use super::BoxedFramingError;
6use crate::common::length_delimited::LengthDelimitedCoderOptions;
7
8#[configurable_component]
10#[derive(Debug, Clone, Default, Eq, PartialEq)]
11pub struct LengthDelimitedEncoderConfig {
12 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
14 pub length_delimited: LengthDelimitedCoderOptions,
15}
16
17impl LengthDelimitedEncoderConfig {
18 pub fn build(&self) -> LengthDelimitedEncoder {
20 LengthDelimitedEncoder::new(&self.length_delimited)
21 }
22}
23
24#[derive(Debug, Clone)]
26pub struct LengthDelimitedEncoder {
27 codec: LengthDelimitedCodec,
28 inner_buffer: BytesMut,
29}
30
31impl LengthDelimitedEncoder {
32 pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
34 Self {
35 codec: config.build_codec(),
36 inner_buffer: BytesMut::new(),
37 }
38 }
39}
40
41impl Default for LengthDelimitedEncoder {
42 fn default() -> Self {
43 Self {
44 codec: LengthDelimitedCodec::new(),
45 inner_buffer: BytesMut::new(),
46 }
47 }
48}
49
50impl Encoder<()> for LengthDelimitedEncoder {
51 type Error = BoxedFramingError;
52
53 fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), BoxedFramingError> {
54 self.inner_buffer.clear();
55 self.inner_buffer.extend_from_slice(buffer);
56 buffer.clear();
57 let bytes = self.inner_buffer.split().freeze();
58 self.codec.encode(bytes, buffer)?;
59 Ok(())
60 }
61}
62
63#[cfg(test)]
64mod tests {
65 use super::*;
66
67 #[test]
68 fn encode() {
69 let mut codec = LengthDelimitedEncoder::default();
70
71 let mut buffer = BytesMut::from("abc");
72 codec.encode((), &mut buffer).unwrap();
73
74 assert_eq!(&buffer[..], b"\0\0\0\x03abc");
75 }
76
77 #[test]
78 fn encode_2byte_length() {
79 let mut codec = LengthDelimitedEncoder::new(&LengthDelimitedCoderOptions {
80 length_field_length: 2,
81 ..Default::default()
82 });
83
84 let mut buffer = BytesMut::from("abc");
85 codec.encode((), &mut buffer).unwrap();
86
87 assert_eq!(&buffer[..], b"\0\x03abc");
88 }
89}