codecs/
internal_events.rs

1//! Internal events for codecs.
2
3use tracing::error;
4use vector_common::{
5    counter,
6    internal_event::{
7        ComponentEventsDropped, CounterName, InternalEvent, UNINTENTIONAL, emit, error_stage,
8        error_type,
9    },
10};
11use vector_common_macros::NamedInternalEvent;
12
13#[derive(Debug, NamedInternalEvent)]
14/// Emitted when a decoder framing error occurs.
15pub struct DecoderFramingError<E> {
16    /// The framing error that occurred.
17    pub error: E,
18}
19
20impl<E: std::fmt::Display> InternalEvent for DecoderFramingError<E> {
21    fn emit(self) {
22        error!(
23            message = "Failed framing bytes.",
24            error = %self.error,
25            error_code = "decoder_frame",
26            error_type = error_type::PARSER_FAILED,
27            stage = error_stage::PROCESSING,
28        );
29        counter!(
30            CounterName::ComponentErrorsTotal,
31            "error_code" => "decoder_frame",
32            "error_type" => error_type::PARSER_FAILED,
33            "stage" => error_stage::PROCESSING,
34        )
35        .increment(1);
36    }
37}
38
39#[derive(Debug, NamedInternalEvent)]
40/// Emitted when a decoder fails to deserialize a frame.
41pub struct DecoderDeserializeError<'a> {
42    /// The deserialize error that occurred.
43    pub error: &'a vector_common::Error,
44}
45
46impl InternalEvent for DecoderDeserializeError<'_> {
47    fn emit(self) {
48        error!(
49            message = "Failed deserializing frame.",
50            error = %self.error,
51            error_code = "decoder_deserialize",
52            error_type = error_type::PARSER_FAILED,
53            stage = error_stage::PROCESSING,
54        );
55        counter!(
56            CounterName::ComponentErrorsTotal,
57            "error_code" => "decoder_deserialize",
58            "error_type" => error_type::PARSER_FAILED,
59            "stage" => error_stage::PROCESSING,
60        )
61        .increment(1);
62    }
63}
64
65#[derive(Debug, NamedInternalEvent)]
66/// Emitted when an encoder framing error occurs.
67pub struct EncoderFramingError<'a> {
68    /// The framing error that occurred.
69    pub error: &'a crate::encoding::BoxedFramingError,
70}
71
72impl InternalEvent for EncoderFramingError<'_> {
73    fn emit(self) {
74        let reason = "Failed framing bytes.";
75        error!(
76            message = reason,
77            error = %self.error,
78            error_code = "encoder_frame",
79            error_type = error_type::ENCODER_FAILED,
80            stage = error_stage::SENDING,
81        );
82        counter!(
83            CounterName::ComponentErrorsTotal,
84            "error_code" => "encoder_frame",
85            "error_type" => error_type::ENCODER_FAILED,
86            "stage" => error_stage::SENDING,
87        )
88        .increment(1);
89        emit(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
90    }
91}
92
93#[derive(Debug, NamedInternalEvent)]
94/// Emitted when an encoder fails to serialize a frame.
95pub struct EncoderSerializeError<'a> {
96    /// The serialization error that occurred.
97    pub error: &'a vector_common::Error,
98}
99
100impl InternalEvent for EncoderSerializeError<'_> {
101    fn emit(self) {
102        const SERIALIZE_REASON: &str = "Failed serializing frame.";
103        error!(
104            message = SERIALIZE_REASON,
105            error = %self.error,
106            error_code = "encoder_serialize",
107            error_type = error_type::ENCODER_FAILED,
108            stage = error_stage::SENDING,
109        );
110        counter!(
111            CounterName::ComponentErrorsTotal,
112            "error_code" => "encoder_serialize",
113            "error_type" => error_type::ENCODER_FAILED,
114            "stage" => error_stage::SENDING,
115        )
116        .increment(1);
117        emit(ComponentEventsDropped::<UNINTENTIONAL> {
118            count: 1,
119            reason: SERIALIZE_REASON,
120        });
121    }
122}
123
124#[derive(Debug, NamedInternalEvent)]
125/// Emitted when writing encoded bytes fails.
126pub struct EncoderWriteError<'a, E> {
127    /// The write error that occurred.
128    pub error: &'a E,
129    /// The number of events dropped by the failed write.
130    pub count: usize,
131}
132
133impl<E: std::fmt::Display> InternalEvent for EncoderWriteError<'_, E> {
134    fn emit(self) {
135        let reason = "Failed writing bytes.";
136        error!(
137            message = reason,
138            error = %self.error,
139            error_type = error_type::IO_FAILED,
140            stage = error_stage::SENDING,
141        );
142        counter!(
143            CounterName::ComponentErrorsTotal,
144            "error_type" => error_type::ENCODER_FAILED,
145            "stage" => error_stage::SENDING,
146        )
147        .increment(1);
148        if self.count > 0 {
149            emit(ComponentEventsDropped::<UNINTENTIONAL> {
150                count: self.count,
151                reason,
152            });
153        }
154    }
155}
156
157#[cfg(feature = "arrow")]
158#[derive(Debug, NamedInternalEvent)]
159/// Emitted when encoding violates a schema constraint.
160pub struct EncoderNullConstraintError<'a> {
161    /// The schema constraint error that occurred.
162    pub error: &'a vector_common::Error,
163}
164
165#[cfg(feature = "arrow")]
166impl InternalEvent for EncoderNullConstraintError<'_> {
167    fn emit(self) {
168        error!(
169            message = "Schema constraint violation.",
170            error = %self.error,
171            error_code = "encoding_null_constraint",
172            error_type = error_type::ENCODER_FAILED,
173            stage = error_stage::SENDING,
174        );
175        counter!(
176            CounterName::ComponentErrorsTotal,
177            "error_code" => "encoding_null_constraint",
178            "error_type" => error_type::ENCODER_FAILED,
179            "stage" => error_stage::SENDING,
180        )
181        .increment(1);
182    }
183}
184
185#[cfg(feature = "arrow")]
186#[derive(Debug, NamedInternalEvent)]
187/// Emitted when Arrow record batch construction fails (e.g. schema decoder build,
188/// JSON-to-Arrow decoding such as type mismatches).
189pub struct EncoderRecordBatchError<'a, E> {
190    /// The encoding error that occurred.
191    pub error: &'a E,
192    /// Stable error code identifying the failure mode.
193    pub error_code: &'static str,
194}
195
196#[cfg(feature = "arrow")]
197impl<E: std::fmt::Display> InternalEvent for EncoderRecordBatchError<'_, E> {
198    fn emit(self) {
199        error!(
200            message = "Failed to build Arrow record batch.",
201            error = %self.error,
202            error_code = self.error_code,
203            error_type = error_type::ENCODER_FAILED,
204            stage = error_stage::SENDING,
205        );
206        counter!(
207            CounterName::ComponentErrorsTotal,
208            "error_code" => self.error_code,
209            "error_type" => error_type::ENCODER_FAILED,
210            "stage" => error_stage::SENDING,
211        )
212        .increment(1);
213    }
214}
215
216#[cfg(feature = "parquet")]
217#[derive(NamedInternalEvent)]
218pub(crate) struct SchemaGenerationError<'a> {
219    pub error: &'a arrow::error::ArrowError,
220}
221
222#[cfg(feature = "parquet")]
223impl InternalEvent for SchemaGenerationError<'_> {
224    fn emit(self) {
225        error!(
226            message = "Could not generate schema for batched events",
227            error = %self.error,
228            error_code = "parquet_schema_generation_failed",
229            error_type = error_type::ENCODER_FAILED,
230            stage = error_stage::SENDING,
231            internal_log_rate_limit = false,
232        );
233        counter!(
234            CounterName::ComponentErrorsTotal,
235            "error_code" => "parquet_schema_generation_failed",
236            "error_type" => error_type::ENCODER_FAILED,
237            "stage" => error_stage::SENDING,
238        )
239        .increment(1);
240    }
241}
242
243#[cfg(feature = "parquet")]
244#[derive(NamedInternalEvent)]
245pub(crate) struct ArrowWriterError<'a> {
246    pub error: &'a parquet::errors::ParquetError,
247}
248
249#[cfg(feature = "parquet")]
250impl InternalEvent for ArrowWriterError<'_> {
251    fn emit(self) {
252        error!(
253            message = "Failed to write record batch with ArrowWriter.",
254            error = %self.error,
255            error_code = "parquet_arrow_writer_failed",
256            error_type = error_type::ENCODER_FAILED,
257            stage = error_stage::SENDING,
258            internal_log_rate_limit = false,
259        );
260        counter!(
261            CounterName::ComponentErrorsTotal,
262            "error_code" => "parquet_arrow_writer_failed",
263            "error_type" => error_type::ENCODER_FAILED,
264            "stage" => error_stage::SENDING,
265        )
266        .increment(1);
267    }
268}
269
270#[cfg(feature = "parquet")]
271#[derive(NamedInternalEvent)]
272pub(crate) struct JsonSerializationError<'a> {
273    pub error: &'a serde_json::Error,
274}
275
276#[cfg(feature = "parquet")]
277impl InternalEvent for JsonSerializationError<'_> {
278    fn emit(self) {
279        error!(
280            message = "Could not serialize event to JSON.",
281            error = %self.error,
282            error_type = error_type::ENCODER_FAILED,
283            stage = error_stage::SENDING,
284            internal_log_rate_limit = true,
285        );
286        counter!(
287            CounterName::ComponentErrorsTotal,
288            "error_type" => error_type::ENCODER_FAILED,
289            "stage" => error_stage::SENDING,
290        )
291        .increment(1);
292    }
293}