vector/transforms/sample/
config.rs1use snafu::Snafu;
2use vector_lib::{
3 config::LegacyKey,
4 configurable::configurable_component,
5 lookup::{lookup_v2::OptionalValuePath, owned_value_path},
6};
7use vrl::value::Kind;
8
9use super::transform::{DynamicSampleFields, Sample, SampleMode};
10use crate::{
11 conditions::AnyCondition,
12 config::{
13 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
14 TransformOutput,
15 },
16 schema,
17 template::Template,
18 transforms::Transform,
19};
20
21#[derive(Debug, Snafu)]
22pub enum SampleError {
23 #[snafu(display(
25 "Only positive, non-zero numbers are allowed values for `ratio`, value: {ratio}"
26 ))]
27 InvalidRatio { ratio: f64 },
28
29 #[snafu(display("Only non-zero numbers are allowed values for `rate`"))]
30 InvalidRate,
31
32 #[snafu(display("Only one value can be provided for either 'rate' or 'ratio', but not both"))]
33 InvalidStaticConfiguration,
34
35 #[snafu(display(
36 "Only one value can be provided for either 'ratio_field' or 'rate_field', but not both"
37 ))]
38 InvalidDynamicConfiguration,
39
40 #[snafu(display(
41 "Exactly one value must be provided for either 'rate' or 'ratio' to configure static sampling"
42 ))]
43 MissingStaticConfiguration,
44
45 #[snafu(display(
46 "'key_field' cannot be combined with 'ratio_field' or 'rate_field' because dynamic values can vary per event and break key-based coherence"
47 ))]
48 InvalidKeyFieldDynamicCombination,
49}
50
51#[configurable_component(transform(
53 "sample",
54 "Sample events from an event stream based on supplied criteria and at a configurable rate."
55))]
56#[derive(Clone, Debug)]
57#[serde(deny_unknown_fields)]
58pub struct SampleConfig {
59 #[configurable(metadata(docs::examples = 1500))]
65 pub rate: Option<u64>,
66
67 #[configurable(metadata(docs::examples = 0.13))]
74 #[configurable(validation(range(min = 0.0, max = 1.0)))]
75 pub ratio: Option<f64>,
76
77 #[configurable(metadata(docs::examples = "sample_rate"))]
84 pub ratio_field: Option<String>,
85
86 #[configurable(metadata(docs::examples = "sample_rate_n"))]
93 pub rate_field: Option<String>,
94
95 #[configurable(metadata(docs::examples = "message"))]
109 pub key_field: Option<String>,
110
111 #[configurable(metadata(docs::examples = "sample_rate"))]
113 #[serde(default = "default_sample_rate_key")]
114 pub sample_rate_key: OptionalValuePath,
115
116 #[configurable(metadata(
124 docs::examples = "{{ service }}",
125 docs::examples = "{{ hostname }}-{{ service }}"
126 ))]
127 pub group_by: Option<Template>,
128
129 pub exclude: Option<AnyCondition>,
131}
132
133impl SampleConfig {
134 fn sample_rate(&self) -> Result<SampleMode, SampleError> {
135 if self.ratio_field.is_some() && self.rate_field.is_some() {
136 return Err(SampleError::InvalidDynamicConfiguration);
137 }
138
139 if self.key_field.is_some() && (self.ratio_field.is_some() || self.rate_field.is_some()) {
140 return Err(SampleError::InvalidKeyFieldDynamicCombination);
141 }
142
143 if self.rate.is_some() && self.ratio.is_some() {
144 return Err(SampleError::InvalidStaticConfiguration);
145 }
146
147 match (self.rate, self.ratio) {
148 (None, Some(ratio)) => {
149 if ratio <= 0.0 {
150 Err(SampleError::InvalidRatio { ratio })
151 } else {
152 Ok(SampleMode::new_ratio(ratio))
153 }
154 }
155 (Some(rate), None) => {
156 if rate == 0 {
157 Err(SampleError::InvalidRate)
158 } else {
159 Ok(SampleMode::new_rate(rate))
160 }
161 }
162 (None, None) => Err(SampleError::MissingStaticConfiguration),
163 _ => Err(SampleError::InvalidStaticConfiguration),
164 }
165 }
166}
167
168impl GenerateConfig for SampleConfig {
169 fn generate_config() -> toml::Value {
170 toml::Value::try_from(Self {
171 rate: None,
172 ratio: Some(0.1),
173 ratio_field: None,
174 rate_field: None,
175 key_field: None,
176 group_by: None,
177 exclude: None::<AnyCondition>,
178 sample_rate_key: default_sample_rate_key(),
179 })
180 .unwrap()
181 }
182}
183
184#[async_trait::async_trait]
185#[typetag::serde(name = "sample")]
186impl TransformConfig for SampleConfig {
187 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
188 let sample_mode = self.sample_rate()?;
189 let exclude = self
190 .exclude
191 .as_ref()
192 .map(|condition| condition.build(&context.enrichment_tables, &context.metrics_storage))
193 .transpose()?;
194
195 let sample = if self.ratio_field.is_some() || self.rate_field.is_some() {
196 Sample::new_with_dynamic(
197 Self::NAME.to_string(),
198 sample_mode,
199 DynamicSampleFields {
200 ratio_field: self.ratio_field.clone(),
201 rate_field: self.rate_field.clone(),
202 },
203 self.group_by.clone(),
204 exclude,
205 self.sample_rate_key.clone(),
206 )
207 } else {
208 Sample::new(
209 Self::NAME.to_string(),
210 sample_mode,
211 self.key_field.clone(),
212 self.group_by.clone(),
213 exclude,
214 self.sample_rate_key.clone(),
215 )
216 };
217
218 Ok(Transform::function(sample))
219 }
220
221 fn input(&self) -> Input {
222 Input::new(DataType::Log | DataType::Trace)
223 }
224
225 fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
226 self.sample_rate()
227 .map(|_| ())
228 .map_err(|e| vec![e.to_string()])
229 }
230
231 fn outputs(
232 &self,
233 _: &TransformContext,
234 input_definitions: &[(OutputId, schema::Definition)],
235 ) -> Vec<TransformOutput> {
236 vec![TransformOutput::new(
237 DataType::Log | DataType::Trace,
238 input_definitions
239 .iter()
240 .map(|(output, definition)| {
241 (
242 output.clone(),
243 definition.clone().with_source_metadata(
244 SampleConfig::NAME,
245 Some(LegacyKey::Overwrite(owned_value_path!("sample_rate"))),
246 &owned_value_path!("sample_rate"),
247 Kind::bytes(),
248 None,
249 ),
250 )
251 })
252 .collect(),
253 )]
254 }
255}
256
257pub fn default_sample_rate_key() -> OptionalValuePath {
258 OptionalValuePath::from(owned_value_path!("sample_rate"))
259}
260
261#[cfg(test)]
262mod tests {
263 use crate::{
264 config::TransformConfig,
265 transforms::sample::config::{SampleConfig, SampleError},
266 };
267
268 #[test]
269 fn generate_config() {
270 crate::test_util::test_generate_config::<SampleConfig>();
271 }
272
273 #[test]
274 fn rejects_dynamic_ratio_only_configuration() {
275 let config = SampleConfig {
276 rate: None,
277 ratio: None,
278 ratio_field: Some("sample_rate".to_string()),
279 rate_field: None,
280 key_field: None,
281 sample_rate_key: super::default_sample_rate_key(),
282 group_by: None,
283 exclude: None,
284 };
285
286 let err = config.sample_rate().unwrap_err();
287 assert!(matches!(err, SampleError::MissingStaticConfiguration));
288 }
289
290 #[test]
291 fn rejects_dynamic_rate_only_configuration() {
292 let config = SampleConfig {
293 rate: None,
294 ratio: None,
295 ratio_field: None,
296 rate_field: Some("sample_rate_n".to_string()),
297 key_field: None,
298 sample_rate_key: super::default_sample_rate_key(),
299 group_by: None,
300 exclude: None,
301 };
302
303 let err = config.sample_rate().unwrap_err();
304 assert!(matches!(err, SampleError::MissingStaticConfiguration));
305 }
306
307 #[test]
308 fn validates_static_with_dynamic_configuration() {
309 let config = SampleConfig {
310 rate: Some(10),
311 ratio: None,
312 ratio_field: None,
313 rate_field: Some("sample_rate_n".to_string()),
314 key_field: None,
315 sample_rate_key: super::default_sample_rate_key(),
316 group_by: None,
317 exclude: None,
318 };
319
320 assert!(config.validate(&crate::schema::Definition::any()).is_ok());
321 }
322
323 #[test]
324 fn rejects_both_dynamic_fields_configuration() {
325 let config = SampleConfig {
326 rate: Some(10),
327 ratio: None,
328 ratio_field: Some("sample_rate".to_string()),
329 rate_field: Some("sample_rate_n".to_string()),
330 key_field: None,
331 sample_rate_key: super::default_sample_rate_key(),
332 group_by: None,
333 exclude: None,
334 };
335
336 let err = config.sample_rate().unwrap_err();
337 assert!(matches!(err, SampleError::InvalidDynamicConfiguration));
338 }
339
340 #[test]
341 fn rejects_key_field_with_dynamic_configuration() {
342 let config = SampleConfig {
343 rate: Some(10),
344 ratio: None,
345 ratio_field: Some("sample_ratio".to_string()),
346 rate_field: None,
347 key_field: Some("trace_id".to_string()),
348 sample_rate_key: super::default_sample_rate_key(),
349 group_by: None,
350 exclude: None,
351 };
352
353 let err = config.sample_rate().unwrap_err();
354 assert!(matches!(
355 err,
356 SampleError::InvalidKeyFieldDynamicCombination
357 ));
358 }
359}