1use tracing::error;
4use vector_common::{
5 counter,
6 internal_event::{
7 ComponentEventsDropped, CounterName, InternalEvent, UNINTENTIONAL, emit, error_stage,
8 error_type,
9 },
10};
11use vector_common_macros::NamedInternalEvent;
12
13#[derive(Debug, NamedInternalEvent)]
14pub struct DecoderFramingError<E> {
16 pub error: E,
18}
19
20impl<E: std::fmt::Display> InternalEvent for DecoderFramingError<E> {
21 fn emit(self) {
22 error!(
23 message = "Failed framing bytes.",
24 error = %self.error,
25 error_code = "decoder_frame",
26 error_type = error_type::PARSER_FAILED,
27 stage = error_stage::PROCESSING,
28 );
29 counter!(
30 CounterName::ComponentErrorsTotal,
31 "error_code" => "decoder_frame",
32 "error_type" => error_type::PARSER_FAILED,
33 "stage" => error_stage::PROCESSING,
34 )
35 .increment(1);
36 }
37}
38
39#[derive(Debug, NamedInternalEvent)]
40pub struct DecoderDeserializeError<'a> {
42 pub error: &'a vector_common::Error,
44}
45
46impl InternalEvent for DecoderDeserializeError<'_> {
47 fn emit(self) {
48 error!(
49 message = "Failed deserializing frame.",
50 error = %self.error,
51 error_code = "decoder_deserialize",
52 error_type = error_type::PARSER_FAILED,
53 stage = error_stage::PROCESSING,
54 );
55 counter!(
56 CounterName::ComponentErrorsTotal,
57 "error_code" => "decoder_deserialize",
58 "error_type" => error_type::PARSER_FAILED,
59 "stage" => error_stage::PROCESSING,
60 )
61 .increment(1);
62 }
63}
64
65#[derive(Debug, NamedInternalEvent)]
66pub struct EncoderFramingError<'a> {
68 pub error: &'a crate::encoding::BoxedFramingError,
70}
71
72impl InternalEvent for EncoderFramingError<'_> {
73 fn emit(self) {
74 let reason = "Failed framing bytes.";
75 error!(
76 message = reason,
77 error = %self.error,
78 error_code = "encoder_frame",
79 error_type = error_type::ENCODER_FAILED,
80 stage = error_stage::SENDING,
81 );
82 counter!(
83 CounterName::ComponentErrorsTotal,
84 "error_code" => "encoder_frame",
85 "error_type" => error_type::ENCODER_FAILED,
86 "stage" => error_stage::SENDING,
87 )
88 .increment(1);
89 emit(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
90 }
91}
92
93#[derive(Debug, NamedInternalEvent)]
94pub struct EncoderSerializeError<'a> {
96 pub error: &'a vector_common::Error,
98}
99
100impl InternalEvent for EncoderSerializeError<'_> {
101 fn emit(self) {
102 const SERIALIZE_REASON: &str = "Failed serializing frame.";
103 error!(
104 message = SERIALIZE_REASON,
105 error = %self.error,
106 error_code = "encoder_serialize",
107 error_type = error_type::ENCODER_FAILED,
108 stage = error_stage::SENDING,
109 );
110 counter!(
111 CounterName::ComponentErrorsTotal,
112 "error_code" => "encoder_serialize",
113 "error_type" => error_type::ENCODER_FAILED,
114 "stage" => error_stage::SENDING,
115 )
116 .increment(1);
117 emit(ComponentEventsDropped::<UNINTENTIONAL> {
118 count: 1,
119 reason: SERIALIZE_REASON,
120 });
121 }
122}
123
124#[derive(Debug, NamedInternalEvent)]
125pub struct EncoderWriteError<'a, E> {
127 pub error: &'a E,
129 pub count: usize,
131}
132
133impl<E: std::fmt::Display> InternalEvent for EncoderWriteError<'_, E> {
134 fn emit(self) {
135 let reason = "Failed writing bytes.";
136 error!(
137 message = reason,
138 error = %self.error,
139 error_type = error_type::IO_FAILED,
140 stage = error_stage::SENDING,
141 );
142 counter!(
143 CounterName::ComponentErrorsTotal,
144 "error_type" => error_type::ENCODER_FAILED,
145 "stage" => error_stage::SENDING,
146 )
147 .increment(1);
148 if self.count > 0 {
149 emit(ComponentEventsDropped::<UNINTENTIONAL> {
150 count: self.count,
151 reason,
152 });
153 }
154 }
155}
156
157#[cfg(feature = "arrow")]
158#[derive(Debug, NamedInternalEvent)]
159pub struct EncoderNullConstraintError<'a> {
161 pub error: &'a vector_common::Error,
163}
164
165#[cfg(feature = "arrow")]
166impl InternalEvent for EncoderNullConstraintError<'_> {
167 fn emit(self) {
168 error!(
169 message = "Schema constraint violation.",
170 error = %self.error,
171 error_code = "encoding_null_constraint",
172 error_type = error_type::ENCODER_FAILED,
173 stage = error_stage::SENDING,
174 );
175 counter!(
176 CounterName::ComponentErrorsTotal,
177 "error_code" => "encoding_null_constraint",
178 "error_type" => error_type::ENCODER_FAILED,
179 "stage" => error_stage::SENDING,
180 )
181 .increment(1);
182 }
183}
184
185#[cfg(feature = "arrow")]
186#[derive(Debug, NamedInternalEvent)]
187pub struct EncoderRecordBatchError<'a, E> {
190 pub error: &'a E,
192 pub error_code: &'static str,
194}
195
196#[cfg(feature = "arrow")]
197impl<E: std::fmt::Display> InternalEvent for EncoderRecordBatchError<'_, E> {
198 fn emit(self) {
199 error!(
200 message = "Failed to build Arrow record batch.",
201 error = %self.error,
202 error_code = self.error_code,
203 error_type = error_type::ENCODER_FAILED,
204 stage = error_stage::SENDING,
205 );
206 counter!(
207 CounterName::ComponentErrorsTotal,
208 "error_code" => self.error_code,
209 "error_type" => error_type::ENCODER_FAILED,
210 "stage" => error_stage::SENDING,
211 )
212 .increment(1);
213 }
214}
215
216#[cfg(feature = "parquet")]
217#[derive(NamedInternalEvent)]
218pub(crate) struct SchemaGenerationError<'a> {
219 pub error: &'a arrow::error::ArrowError,
220}
221
222#[cfg(feature = "parquet")]
223impl InternalEvent for SchemaGenerationError<'_> {
224 fn emit(self) {
225 error!(
226 message = "Could not generate schema for batched events",
227 error = %self.error,
228 error_code = "parquet_schema_generation_failed",
229 error_type = error_type::ENCODER_FAILED,
230 stage = error_stage::SENDING,
231 internal_log_rate_limit = false,
232 );
233 counter!(
234 CounterName::ComponentErrorsTotal,
235 "error_code" => "parquet_schema_generation_failed",
236 "error_type" => error_type::ENCODER_FAILED,
237 "stage" => error_stage::SENDING,
238 )
239 .increment(1);
240 }
241}
242
243#[cfg(feature = "parquet")]
244#[derive(NamedInternalEvent)]
245pub(crate) struct ArrowWriterError<'a> {
246 pub error: &'a parquet::errors::ParquetError,
247}
248
249#[cfg(feature = "parquet")]
250impl InternalEvent for ArrowWriterError<'_> {
251 fn emit(self) {
252 error!(
253 message = "Failed to write record batch with ArrowWriter.",
254 error = %self.error,
255 error_code = "parquet_arrow_writer_failed",
256 error_type = error_type::ENCODER_FAILED,
257 stage = error_stage::SENDING,
258 internal_log_rate_limit = false,
259 );
260 counter!(
261 CounterName::ComponentErrorsTotal,
262 "error_code" => "parquet_arrow_writer_failed",
263 "error_type" => error_type::ENCODER_FAILED,
264 "stage" => error_stage::SENDING,
265 )
266 .increment(1);
267 }
268}
269
270#[cfg(feature = "parquet")]
271#[derive(NamedInternalEvent)]
272pub(crate) struct JsonSerializationError<'a> {
273 pub error: &'a serde_json::Error,
274}
275
276#[cfg(feature = "parquet")]
277impl InternalEvent for JsonSerializationError<'_> {
278 fn emit(self) {
279 error!(
280 message = "Could not serialize event to JSON.",
281 error = %self.error,
282 error_type = error_type::ENCODER_FAILED,
283 stage = error_stage::SENDING,
284 internal_log_rate_limit = true,
285 );
286 counter!(
287 CounterName::ComponentErrorsTotal,
288 "error_type" => error_type::ENCODER_FAILED,
289 "stage" => error_stage::SENDING,
290 )
291 .increment(1);
292 }
293}