codecs/encoding/format/
proto_batch.rs

1//! Protobuf batch serializer for encoding events as individual protobuf records.
2//!
3//! Encodes each event in a batch independently into protobuf bytes, producing
4//! a `Vec<Vec<u8>>` where each element is a single serialized protobuf message.
5
6use prost_reflect::{MessageDescriptor, prost::Message as _};
7use snafu::Snafu;
8use std::sync::Arc;
9use vector_config::configurable_component;
10use vector_core::{config::DataType, event::Event, schema};
11use vrl::protobuf::encode::{Options, encode_message};
12
13/// Errors that can occur during protobuf batch encoding
14#[derive(Debug, Snafu)]
15pub enum ProtoBatchEncodingError {
16    /// No events provided
17    #[snafu(display("Cannot encode an empty batch"))]
18    NoEvents,
19
20    /// Unsupported event type
21    #[snafu(display("Unsupported event type: only Log events are supported"))]
22    UnsupportedEventType,
23
24    /// Protobuf encoding failed
25    #[snafu(display("Protobuf encoding failed: {}", source))]
26    EncodingFailed {
27        /// The underlying encoding error
28        source: vector_common::Error,
29    },
30
31    /// Protobuf prost encoding failed
32    #[snafu(display("Protobuf prost encoding failed: {}", source))]
33    ProstEncodingFailed {
34        /// The underlying prost error
35        source: prost_reflect::prost::EncodeError,
36    },
37}
38
39/// Configuration for protobuf batch serialization
40#[configurable_component]
41#[derive(Clone, Default)]
42pub struct ProtoBatchSerializerConfig {
43    /// The protobuf message descriptor to use for encoding.
44    #[serde(skip)]
45    #[configurable(derived)]
46    pub descriptor: Option<MessageDescriptor>,
47}
48
49impl std::fmt::Debug for ProtoBatchSerializerConfig {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("ProtoBatchSerializerConfig")
52            .field(
53                "descriptor",
54                &self.descriptor.as_ref().map(|d| d.full_name().to_string()),
55            )
56            .finish()
57    }
58}
59
60impl ProtoBatchSerializerConfig {
61    /// Create a new ProtoBatchSerializerConfig with a message descriptor
62    pub fn new(descriptor: MessageDescriptor) -> Self {
63        Self {
64            descriptor: Some(descriptor),
65        }
66    }
67
68    /// The data type of events that are accepted by this serializer.
69    pub fn input_type(&self) -> DataType {
70        DataType::Log
71    }
72
73    /// The schema required by the serializer.
74    pub fn schema_requirement(&self) -> schema::Requirement {
75        schema::Requirement::empty()
76    }
77}
78
79/// Protobuf batch serializer that encodes each event into individual protobuf bytes.
80#[derive(Clone, Debug)]
81pub struct ProtoBatchSerializer {
82    descriptor: Arc<MessageDescriptor>,
83    options: Options,
84}
85
86impl ProtoBatchSerializer {
87    /// Create a new ProtoBatchSerializer with the given configuration.
88    pub fn new(config: ProtoBatchSerializerConfig) -> Result<Self, vector_common::Error> {
89        let descriptor = config.descriptor.ok_or_else(|| {
90            vector_common::Error::from("Proto batch serializer requires a message descriptor.")
91        })?;
92
93        Ok(Self {
94            descriptor: Arc::new(descriptor),
95            options: Options {
96                use_json_names: false,
97                allow_lossy_string_coercion: true,
98            },
99        })
100    }
101
102    /// Encode a batch of events into individual protobuf byte buffers.
103    pub fn encode_batch(&self, events: &[Event]) -> Result<Vec<Vec<u8>>, ProtoBatchEncodingError> {
104        if events.is_empty() {
105            return Err(ProtoBatchEncodingError::NoEvents);
106        }
107
108        let mut records = Vec::with_capacity(events.len());
109
110        for event in events {
111            let dynamic_message = match event {
112                Event::Log(log) => {
113                    encode_message(&self.descriptor, log.value().clone(), &self.options)
114                }
115                Event::Trace(_) | Event::Metric(_) => {
116                    return Err(ProtoBatchEncodingError::UnsupportedEventType);
117                }
118            }
119            .map_err(|source| ProtoBatchEncodingError::EncodingFailed {
120                source: source.into(),
121            })?;
122
123            records.push(dynamic_message.encode_to_vec());
124        }
125
126        Ok(records)
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use prost_reflect::{
134        DescriptorPool, DynamicMessage, Value as ProstValue,
135        prost_types::{
136            DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
137            field_descriptor_proto::{Label, Type},
138        },
139    };
140    use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent, Value};
141    use vrl::btreemap;
142
143    fn build_descriptor() -> MessageDescriptor {
144        // message Inner { string label = 1; }
145        let inner = DescriptorProto {
146            name: Some("Inner".to_string()),
147            field: vec![FieldDescriptorProto {
148                name: Some("label".to_string()),
149                number: Some(1),
150                label: Some(Label::Optional as i32),
151                r#type: Some(Type::String as i32),
152                ..Default::default()
153            }],
154            ..Default::default()
155        };
156
157        // message Outer { string name = 1; int64 count = 2; Inner inner = 3; }
158        let outer = DescriptorProto {
159            name: Some("Outer".to_string()),
160            field: vec![
161                FieldDescriptorProto {
162                    name: Some("name".to_string()),
163                    number: Some(1),
164                    label: Some(Label::Optional as i32),
165                    r#type: Some(Type::String as i32),
166                    ..Default::default()
167                },
168                FieldDescriptorProto {
169                    name: Some("count".to_string()),
170                    number: Some(2),
171                    label: Some(Label::Optional as i32),
172                    r#type: Some(Type::Int64 as i32),
173                    ..Default::default()
174                },
175                FieldDescriptorProto {
176                    name: Some("inner".to_string()),
177                    number: Some(3),
178                    label: Some(Label::Optional as i32),
179                    r#type: Some(Type::Message as i32),
180                    type_name: Some(".test.Inner".to_string()),
181                    ..Default::default()
182                },
183            ],
184            nested_type: vec![],
185            ..Default::default()
186        };
187
188        let file = FileDescriptorProto {
189            name: Some("test.proto".to_string()),
190            package: Some("test".to_string()),
191            message_type: vec![outer, inner],
192            syntax: Some("proto3".to_string()),
193            ..Default::default()
194        };
195
196        let pool = DescriptorPool::from_file_descriptor_set(FileDescriptorSet { file: vec![file] })
197            .expect("descriptor pool builds");
198        pool.get_message_by_name("test.Outer")
199            .expect("Outer message exists")
200    }
201
202    fn make_serializer() -> ProtoBatchSerializer {
203        ProtoBatchSerializer::new(ProtoBatchSerializerConfig::new(build_descriptor()))
204            .expect("serializer builds")
205    }
206
207    #[test]
208    fn empty_batch_returns_no_events_error() {
209        let serializer = make_serializer();
210        let err = serializer
211            .encode_batch(&[])
212            .expect_err("empty batch errors");
213        assert!(matches!(err, ProtoBatchEncodingError::NoEvents));
214    }
215
216    #[test]
217    fn metric_event_is_rejected() {
218        let serializer = make_serializer();
219        let metric = Event::Metric(Metric::new(
220            "test",
221            MetricKind::Absolute,
222            MetricValue::Counter { value: 1.0 },
223        ));
224        let err = serializer
225            .encode_batch(&[metric])
226            .expect_err("metric event errors");
227        assert!(matches!(err, ProtoBatchEncodingError::UnsupportedEventType));
228    }
229
230    #[test]
231    fn trace_event_is_rejected() {
232        let serializer = make_serializer();
233        let trace = Event::Trace(TraceEvent::default());
234        let err = serializer
235            .encode_batch(&[trace])
236            .expect_err("trace event errors");
237        assert!(matches!(err, ProtoBatchEncodingError::UnsupportedEventType));
238    }
239
240    #[test]
241    fn round_trip_decode_preserves_field_mapping() {
242        let descriptor = build_descriptor();
243        let serializer =
244            ProtoBatchSerializer::new(ProtoBatchSerializerConfig::new(descriptor.clone()))
245                .expect("serializer builds");
246
247        let event = Event::Log(LogEvent::from(btreemap! {
248            "name" => Value::from("hello"),
249            "count" => Value::from(42_i64),
250            "inner" => Value::from(btreemap! {
251                "label" => Value::from("nested"),
252            }),
253        }));
254
255        let records = serializer
256            .encode_batch(&[event])
257            .expect("encoding succeeds");
258        assert_eq!(records.len(), 1);
259
260        let decoded =
261            DynamicMessage::decode(descriptor, records[0].as_slice()).expect("decode succeeds");
262
263        let name_field = decoded
264            .get_field_by_name("name")
265            .expect("name field present");
266        assert_eq!(name_field.as_str(), Some("hello"));
267
268        let count_field = decoded
269            .get_field_by_name("count")
270            .expect("count field present");
271        assert_eq!(count_field.as_i64(), Some(42));
272
273        let inner_field = decoded
274            .get_field_by_name("inner")
275            .expect("inner field present");
276        let inner_msg = match &*inner_field {
277            ProstValue::Message(m) => m,
278            other => panic!("expected nested message, got {:?}", other),
279        };
280        let label = inner_msg
281            .get_field_by_name("label")
282            .expect("label field present");
283        assert_eq!(label.as_str(), Some("nested"));
284    }
285}