1use std::{
2 collections::HashMap,
3 fmt,
4 hash::{Hash, Hasher},
5 num::NonZeroU64,
6};
7
8use vector_lib::{
9 config::LegacyKey,
10 lookup::{OwnedTargetPath, lookup_v2::OptionalValuePath},
11};
12
13use crate::{
14 conditions::Condition,
15 event::{Event, Value},
16 internal_events::SampleEventDiscarded,
17 sinks::prelude::TemplateRenderingError,
18 template::Template,
19 transforms::{FunctionTransform, OutputBuffer},
20};
21
22#[derive(Clone, Debug)]
26pub enum SampleMode {
27 Rate {
28 rate: u64,
29 counters: HashMap<Option<String>, u64>,
30 },
31 Ratio {
32 ratio: f64,
33 values: HashMap<Option<String>, f64>,
34 hash_ratio_threshold: u64,
35 },
36}
37
38impl SampleMode {
39 pub fn new_rate(rate: u64) -> Self {
40 Self::Rate {
41 rate,
42 counters: HashMap::default(),
43 }
44 }
45
46 pub fn new_ratio(ratio: f64) -> Self {
47 Self::Ratio {
48 ratio,
49 values: HashMap::default(),
50 hash_ratio_threshold: (ratio * (u64::MAX as u128) as f64) as u64,
58 }
59 }
60
61 fn increment(&mut self, group_by_key: Option<String>, value: Option<&Value>) -> bool {
62 let threshold_exceeded = match self {
63 Self::Rate { rate, counters } => {
64 let counter_value = counters.entry(group_by_key).or_default();
65 let old_counter_value = *counter_value;
66 *counter_value += 1;
67 old_counter_value % *rate == 0
68 }
69 Self::Ratio { ratio, values, .. } => {
70 let value = values.entry(group_by_key).or_insert(1.0 - *ratio);
71 let increment: f64 = *value + *ratio;
72 *value = if increment >= 1.0 {
73 increment - 1.0
74 } else {
75 increment
76 };
77 increment >= 1.0
78 }
79 };
80 if let Some(value) = value {
81 self.hash_within_ratio(value.to_string_lossy().as_bytes())
82 } else {
83 threshold_exceeded
84 }
85 }
86
87 fn hash_within_ratio(&self, value: &[u8]) -> bool {
88 let hash = seahash::hash(value);
89 match self {
90 Self::Rate { rate, .. } => hash.is_multiple_of(*rate),
91 Self::Ratio {
92 hash_ratio_threshold,
93 ..
94 } => hash <= *hash_ratio_threshold,
95 }
96 }
97}
98
99enum EventSampleMode {
100 Ratio(f64),
101 Rate(NonZeroU64),
102}
103
104impl EventSampleMode {
105 fn sample_rate_label(&self) -> String {
106 match self {
107 Self::Ratio(ratio) => ratio.to_string(),
108 Self::Rate(rate) => rate.to_string(),
109 }
110 }
111}
112
113#[derive(Clone, Default)]
114pub struct DynamicSampleFields {
115 pub ratio_field: Option<String>,
116 pub rate_field: Option<String>,
117}
118
119impl fmt::Display for SampleMode {
120 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121 match self {
124 Self::Rate { rate, .. } => write!(f, "{rate}"),
125 Self::Ratio { ratio, .. } => write!(f, "{ratio}"),
126 }
127 }
128}
129
130#[derive(Clone)]
131pub enum SampleKeySource {
132 Static {
133 key_field: Option<String>,
134 group_by: Option<Template>,
135 },
136 Dynamic {
137 fields: DynamicSampleFields,
138 group_by: Option<Template>,
139 },
140}
141
142#[derive(Clone)]
143pub struct Sample {
144 name: String,
145 static_mode: SampleMode,
146 key_source: SampleKeySource,
147 dynamic_event_counters: HashMap<Option<String>, u64>,
148 exclude: Option<Condition>,
149 sample_rate_key: OptionalValuePath,
150}
151
152impl Sample {
153 #![allow(dead_code)]
156 pub fn new(
157 name: String,
158 static_mode: SampleMode,
159 key_field: Option<String>,
160 group_by: Option<Template>,
161 exclude: Option<Condition>,
162 sample_rate_key: OptionalValuePath,
163 ) -> Self {
164 Self::new_with_source(
165 name,
166 static_mode,
167 SampleKeySource::Static {
168 key_field,
169 group_by,
170 },
171 exclude,
172 sample_rate_key,
173 )
174 }
175
176 pub fn new_with_dynamic(
177 name: String,
178 static_mode: SampleMode,
179 fields: DynamicSampleFields,
180 group_by: Option<Template>,
181 exclude: Option<Condition>,
182 sample_rate_key: OptionalValuePath,
183 ) -> Self {
184 Self::new_with_source(
185 name,
186 static_mode,
187 SampleKeySource::Dynamic { fields, group_by },
188 exclude,
189 sample_rate_key,
190 )
191 }
192
193 fn new_with_source(
194 name: String,
195 static_mode: SampleMode,
196 key_source: SampleKeySource,
197 exclude: Option<Condition>,
198 sample_rate_key: OptionalValuePath,
199 ) -> Self {
200 Self {
201 name,
202 static_mode,
203 key_source,
204 dynamic_event_counters: HashMap::default(),
205 exclude,
206 sample_rate_key,
207 }
208 }
209
210 #[cfg(test)]
211 pub fn ratio(&self) -> f64 {
212 match &self.static_mode {
213 SampleMode::Rate { rate, .. } => 1.0f64 / *rate as f64,
214 SampleMode::Ratio { ratio, .. } => *ratio,
215 }
216 }
217
218 fn dynamic_sample_hash(group_by_key: Option<&str>, counter: u64) -> u64 {
219 let mut hasher = seahash::SeaHasher::new();
220 group_by_key.hash(&mut hasher);
221 counter.hash(&mut hasher);
222 hasher.finish()
223 }
224
225 fn sample_with_dynamic_ratio(&mut self, ratio: f64, group_by_key: Option<String>) -> bool {
226 let counter_value = self
227 .dynamic_event_counters
228 .entry(group_by_key.clone())
229 .or_default();
230 let old_counter_value = *counter_value;
231 *counter_value += 1;
232
233 let hash = Self::dynamic_sample_hash(group_by_key.as_deref(), old_counter_value);
234 let hash_ratio_threshold = (ratio * (u64::MAX as u128) as f64) as u64;
235 hash <= hash_ratio_threshold
236 }
237
238 fn event_ratio(&self, event: &Event) -> Option<f64> {
239 let ratio_field = match &self.key_source {
240 SampleKeySource::Dynamic { fields, .. } => fields.ratio_field.as_ref()?,
241 SampleKeySource::Static { .. } => return None,
242 };
243
244 let value = self.get_event_value(event, ratio_field.as_str())?;
245
246 let ratio = match value {
247 Value::Integer(value) => *value as f64,
248 Value::Float(value) => value.into_inner(),
249 Value::Bytes(bytes) => std::str::from_utf8(bytes).ok()?.parse::<f64>().ok()?,
250 _ => return None,
251 };
252
253 (ratio > 0.0 && ratio <= 1.0).then_some(ratio)
254 }
255
256 fn event_rate(&self, event: &Event) -> Option<NonZeroU64> {
257 let rate_field = match &self.key_source {
258 SampleKeySource::Dynamic { fields, .. } => fields.rate_field.as_ref()?,
259 SampleKeySource::Static { .. } => return None,
260 };
261
262 let value = self.get_event_value(event, rate_field.as_str())?;
263
264 match value {
265 Value::Integer(value) => u64::try_from(*value).ok().and_then(NonZeroU64::new),
266 Value::Bytes(bytes) => std::str::from_utf8(bytes).ok()?.parse::<NonZeroU64>().ok(),
267 _ => None,
268 }
269 }
270
271 fn get_event_value<'a>(&self, event: &'a Event, path: &str) -> Option<&'a Value> {
272 match event {
273 Event::Log(event) => event.parse_path_and_get_value(path).ok().flatten(),
274 Event::Trace(event) => event.parse_path_and_get_value(path).ok().flatten(),
275 Event::Metric(_) => panic!("component can never receive metric events"),
276 }
277 }
278
279 fn event_sample_mode(&self, event: &Event) -> Option<EventSampleMode> {
280 self.event_ratio(event)
281 .map(EventSampleMode::Ratio)
282 .or_else(|| self.event_rate(event).map(EventSampleMode::Rate))
283 }
284
285 fn sample_with_dynamic_rate(&mut self, rate: NonZeroU64, group_by_key: Option<String>) -> bool {
286 let counter_value = self
287 .dynamic_event_counters
288 .entry(group_by_key.clone())
289 .or_default();
290 let old_counter_value = *counter_value;
291 *counter_value += 1;
292 let hash = Self::dynamic_sample_hash(group_by_key.as_deref(), old_counter_value);
293
294 hash.is_multiple_of(rate.get())
295 }
296
297 fn group_by_key(&self, event: &Event) -> Option<String> {
298 let group_by = match &self.key_source {
299 SampleKeySource::Static { group_by, .. } => group_by.as_ref()?,
300 SampleKeySource::Dynamic { group_by, .. } => group_by.as_ref()?,
301 };
302
303 match event {
304 Event::Log(event) => group_by.render_string(event),
305 Event::Trace(event) => group_by.render_string(event),
306 Event::Metric(_) => panic!("component can never receive metric events"),
307 }
308 .map_err(|error| {
309 emit!(TemplateRenderingError {
310 error,
311 field: Some("group_by"),
312 drop_event: false,
313 })
314 })
315 .ok()
316 }
317
318 fn static_key_value<'a>(&self, event: &'a Event) -> Option<&'a Value> {
319 let key_field = match &self.key_source {
320 SampleKeySource::Static { key_field, .. } => key_field.as_ref()?,
321 SampleKeySource::Dynamic { .. } => return None,
322 };
323
324 self.get_event_value(event, key_field)
325 }
326}
327
328impl FunctionTransform for Sample {
329 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
330 let mut event = {
331 if let Some(condition) = self.exclude.as_ref() {
332 let (result, event) = condition.check(event);
333 if result {
334 output.push(event);
335 return;
336 } else {
337 event
338 }
339 } else {
340 event
341 }
342 };
343
344 let group_by_key = self.group_by_key(&event);
345 let value = self.static_key_value(&event);
346
347 let event_sample_mode = self.event_sample_mode(&event);
348 let sample_rate = event_sample_mode
349 .as_ref()
350 .map(EventSampleMode::sample_rate_label)
351 .unwrap_or_else(|| self.static_mode.to_string());
352
353 let should_sample = match event_sample_mode {
354 Some(EventSampleMode::Ratio(ratio)) => {
355 self.sample_with_dynamic_ratio(ratio, group_by_key)
356 }
357 Some(EventSampleMode::Rate(rate)) => self.sample_with_dynamic_rate(rate, group_by_key),
358 None => self.static_mode.increment(group_by_key, value),
359 };
360
361 if should_sample {
362 if let Some(path) = &self.sample_rate_key.path {
363 match event {
364 Event::Log(ref mut event) => {
365 event.namespace().insert_source_metadata(
366 self.name.as_str(),
367 event,
368 Some(LegacyKey::Overwrite(path)),
369 path,
370 sample_rate.clone(),
371 );
372 }
373 Event::Trace(ref mut event) => {
374 event.insert(&OwnedTargetPath::event(path.clone()), sample_rate);
375 }
376 Event::Metric(_) => panic!("component can never receive metric events"),
377 };
378 }
379 output.push(event);
380 } else {
381 emit!(SampleEventDiscarded);
382 }
383 }
384}