codecs/encoding/format/
protobuf.rs1use std::path::PathBuf;
2
3use crate::encoding::BuildError;
4use bytes::BytesMut;
5use prost_reflect::{MessageDescriptor, prost::Message as _};
6use tokio_util::codec::Encoder;
7use vector_config_macros::configurable_component;
8use vector_core::{
9 config::DataType,
10 event::{Event, Value},
11 schema,
12};
13use vrl::protobuf::{
14 descriptor::{get_message_descriptor, get_message_descriptor_from_bytes},
15 encode::{Options, encode_message},
16};
17
18#[configurable_component]
20#[derive(Debug, Clone)]
21pub struct ProtobufSerializerConfig {
22 pub protobuf: ProtobufSerializerOptions,
24}
25
26impl ProtobufSerializerConfig {
27 pub fn build(&self) -> Result<ProtobufSerializer, BuildError> {
29 let message_descriptor =
30 get_message_descriptor(&self.protobuf.desc_file, &self.protobuf.message_type)?;
31 Ok(ProtobufSerializer {
32 message_descriptor,
33 options: Options {
34 use_json_names: self.protobuf.use_json_names,
35 allow_lossy_string_coercion: true,
36 },
37 })
38 }
39
40 pub fn input_type(&self) -> DataType {
42 DataType::Log | DataType::Trace
43 }
44
45 pub fn schema_requirement(&self) -> schema::Requirement {
47 schema::Requirement::empty()
50 }
51}
52
53#[configurable_component]
55#[derive(Debug, Clone)]
56pub struct ProtobufSerializerOptions {
57 #[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))]
63 pub desc_file: PathBuf,
64
65 #[configurable(metadata(docs::examples = "package.Message"))]
67 pub message_type: String,
68
69 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
77 pub use_json_names: bool,
78}
79
80#[derive(Debug, Clone)]
82pub struct ProtobufSerializer {
83 message_descriptor: MessageDescriptor,
85 options: Options,
86}
87
88impl ProtobufSerializer {
89 pub fn new(message_descriptor: MessageDescriptor) -> Self {
91 Self {
92 message_descriptor,
93 options: Options::default(),
94 }
95 }
96
97 pub fn new_from_bytes(
99 desc_bytes: &[u8],
100 message_type: &str,
101 options: &Options,
102 ) -> vector_common::Result<Self> {
103 let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
104 Ok(Self {
105 message_descriptor,
106 options: options.clone(),
107 })
108 }
109
110 pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto {
112 self.message_descriptor.descriptor_proto()
113 }
114}
115
116impl Encoder<Event> for ProtobufSerializer {
117 type Error = vector_common::Error;
118
119 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
120 let message = match event {
121 Event::Log(log) => {
122 encode_message(&self.message_descriptor, log.into_parts().0, &self.options)
123 }
124 Event::Metric(_) => unimplemented!(),
125 Event::Trace(trace) => encode_message(
126 &self.message_descriptor,
127 Value::Object(trace.into_parts().0),
128 &self.options,
129 ),
130 }?;
131 message.encode(buffer).map_err(Into::into)
132 }
133}