vector_buffers/
config.rs

1use std::{
2    fmt,
3    num::{NonZeroU64, NonZeroUsize},
4    path::{Path, PathBuf},
5    slice,
6};
7
8use serde::{Deserialize, Deserializer, Serialize, de};
9use snafu::{ResultExt, Snafu};
10use tracing::Span;
11use vector_common::{config::ComponentKey, finalization::Finalizable};
12use vector_config::configurable_component;
13
14use crate::{
15    Bufferable, WhenFull,
16    topology::{
17        builder::{TopologyBuilder, TopologyError},
18        channel::{BufferReceiver, BufferSender},
19    },
20    variants::{DiskV2Buffer, MemoryBuffer},
21};
22
23#[derive(Debug, Snafu)]
24pub enum BufferBuildError {
25    #[snafu(display("the configured buffer type requires `data_dir` be specified"))]
26    RequiresDataDir,
27    #[snafu(display("error occurred when building buffer: {}", source))]
28    FailedToBuildTopology { source: TopologyError },
29    #[snafu(display("`max_events` must be greater than zero"))]
30    InvalidMaxEvents,
31}
32
33#[derive(Deserialize, Serialize)]
34enum BufferTypeKind {
35    #[serde(rename = "memory")]
36    Memory,
37    #[serde(rename = "disk")]
38    DiskV2,
39}
40
41const ALL_FIELDS: [&str; 4] = ["type", "max_events", "max_size", "when_full"];
42
43struct BufferTypeVisitor;
44
45impl BufferTypeVisitor {
46    fn visit_map_impl<'de, A>(mut map: A) -> Result<BufferType, A::Error>
47    where
48        A: de::MapAccess<'de>,
49    {
50        let mut kind: Option<BufferTypeKind> = None;
51        let mut max_events: Option<NonZeroUsize> = None;
52        let mut max_size: Option<NonZeroU64> = None;
53        let mut when_full: Option<WhenFull> = None;
54        while let Some(key) = map.next_key::<String>()? {
55            match key.as_str() {
56                "type" => {
57                    if kind.is_some() {
58                        return Err(de::Error::duplicate_field("type"));
59                    }
60                    kind = Some(map.next_value()?);
61                }
62                "max_events" => {
63                    if max_events.is_some() {
64                        return Err(de::Error::duplicate_field("max_events"));
65                    }
66                    max_events = Some(map.next_value()?);
67                }
68                "max_size" => {
69                    if max_size.is_some() {
70                        return Err(de::Error::duplicate_field("max_size"));
71                    }
72                    max_size = Some(map.next_value()?);
73                }
74                "when_full" => {
75                    if when_full.is_some() {
76                        return Err(de::Error::duplicate_field("when_full"));
77                    }
78                    when_full = Some(map.next_value()?);
79                }
80                other => {
81                    return Err(de::Error::unknown_field(other, &ALL_FIELDS));
82                }
83            }
84        }
85        let kind = kind.unwrap_or(BufferTypeKind::Memory);
86        let when_full = when_full.unwrap_or_default();
87        match kind {
88            BufferTypeKind::Memory => {
89                let size = match (max_events, max_size) {
90                    (Some(_), Some(_)) => {
91                        return Err(de::Error::unknown_field(
92                            "max_events",
93                            &["type", "max_size", "when_full"],
94                        ));
95                    }
96                    (_, Some(max_size)) => {
97                        if let Ok(bounded_max_bytes) = usize::try_from(max_size.get()) {
98                            MemoryBufferSize::MaxSize(NonZeroUsize::new(bounded_max_bytes).unwrap())
99                        } else {
100                            return Err(de::Error::invalid_value(
101                                de::Unexpected::Unsigned(max_size.into()),
102                                &format!(
103                                    "Value for max_bytes must be a positive integer <= {}",
104                                    usize::MAX
105                                )
106                                .as_str(),
107                            ));
108                        }
109                    }
110                    _ => MemoryBufferSize::MaxEvents(
111                        max_events.unwrap_or_else(memory_buffer_default_max_events),
112                    ),
113                };
114                Ok(BufferType::Memory { size, when_full })
115            }
116            BufferTypeKind::DiskV2 => {
117                if max_events.is_some() {
118                    return Err(de::Error::unknown_field(
119                        "max_events",
120                        &["type", "max_size", "when_full"],
121                    ));
122                }
123                Ok(BufferType::DiskV2 {
124                    max_size: max_size.ok_or_else(|| de::Error::missing_field("max_size"))?,
125                    when_full,
126                })
127            }
128        }
129    }
130}
131
132impl<'de> de::Visitor<'de> for BufferTypeVisitor {
133    type Value = BufferType;
134
135    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
136        formatter.write_str("enum BufferType")
137    }
138
139    fn visit_map<A>(self, map: A) -> Result<Self::Value, A::Error>
140    where
141        A: de::MapAccess<'de>,
142    {
143        BufferTypeVisitor::visit_map_impl(map)
144    }
145}
146
147impl<'de> Deserialize<'de> for BufferType {
148    fn deserialize<D>(deserializer: D) -> Result<BufferType, D::Error>
149    where
150        D: Deserializer<'de>,
151    {
152        deserializer.deserialize_map(BufferTypeVisitor)
153    }
154}
155
156pub const fn memory_buffer_default_max_events() -> NonZeroUsize {
157    unsafe { NonZeroUsize::new_unchecked(500) }
158}
159
160/// Disk usage configuration for disk-backed buffers.
161#[derive(Debug)]
162pub struct DiskUsage {
163    id: ComponentKey,
164    data_dir: PathBuf,
165    max_size: NonZeroU64,
166}
167
168impl DiskUsage {
169    /// Creates a new `DiskUsage` with the given usage configuration.
170    pub fn new(id: ComponentKey, data_dir: PathBuf, max_size: NonZeroU64) -> Self {
171        Self {
172            id,
173            data_dir,
174            max_size,
175        }
176    }
177
178    /// Gets the component key for the component this buffer is attached to.
179    pub fn id(&self) -> &ComponentKey {
180        &self.id
181    }
182
183    /// Gets the maximum size, in bytes, that this buffer can consume on disk.
184    pub fn max_size(&self) -> u64 {
185        self.max_size.get()
186    }
187
188    /// Gets the data directory path that this buffer will store its files on disk.
189    pub fn data_dir(&self) -> &Path {
190        self.data_dir.as_path()
191    }
192}
193
194/// Enumeration to define exactly what terms the bounds of the buffer is expressed in: length, or
195/// `byte_size`.
196#[configurable_component(no_deser)]
197#[serde(rename_all = "snake_case")]
198#[derive(Clone, Copy, Debug, PartialEq, Eq)]
199pub enum MemoryBufferSize {
200    /// The maximum number of events allowed in the buffer.
201    MaxEvents(#[serde(default = "memory_buffer_default_max_events")] NonZeroUsize),
202
203    // Doc string is duplicated here as a workaround due to a name collision with the `max_size`
204    // field with the DiskV2 variant of `BufferType`.
205    /// The maximum allowed amount of allocated memory the buffer can hold.
206    ///
207    /// If `type = "disk"` then must be at least ~256 megabytes (268435488 bytes).
208    MaxSize(#[configurable(metadata(docs::type_unit = "bytes"))] NonZeroUsize),
209}
210
211/// A specific type of buffer stage.
212#[configurable_component(no_deser)]
213#[derive(Clone, Copy, Debug, PartialEq, Eq)]
214#[serde(rename_all = "snake_case", tag = "type")]
215#[configurable(metadata(docs::enum_tag_description = "The type of buffer to use."))]
216pub enum BufferType {
217    /// A buffer stage backed by an in-memory channel provided by `tokio`.
218    ///
219    /// This is more performant, but less durable. Data will be lost if Vector is restarted
220    /// forcefully or crashes.
221    #[configurable(title = "Events are buffered in memory.")]
222    Memory {
223        /// The terms around how to express buffering limits, can be in size or `bytes_size`.
224        #[serde(flatten)]
225        size: MemoryBufferSize,
226
227        #[configurable(derived)]
228        #[serde(default)]
229        when_full: WhenFull,
230    },
231
232    /// A buffer stage backed by disk.
233    ///
234    /// This is less performant, but more durable. Data that has been synchronized to disk will not
235    /// be lost if Vector is restarted forcefully or crashes.
236    ///
237    /// Data is synchronized to disk every 500ms.
238    #[configurable(title = "Events are buffered on disk.")]
239    #[serde(rename = "disk")]
240    DiskV2 {
241        /// The maximum size of the buffer on disk.
242        ///
243        /// Must be at least ~256 megabytes (268435488 bytes).
244        #[configurable(
245            validation(range(min = 268435488)),
246            metadata(docs::type_unit = "bytes")
247        )]
248        max_size: NonZeroU64,
249
250        #[configurable(derived)]
251        #[serde(default)]
252        when_full: WhenFull,
253    },
254}
255
256impl BufferType {
257    /// Gets the metadata around disk usage by the buffer, if supported.
258    ///
259    /// For buffer types that write to disk, `Some(value)` is returned with their usage metadata,
260    /// such as maximum size and data directory path.
261    ///
262    /// Otherwise, `None` is returned.
263    pub fn disk_usage(
264        &self,
265        global_data_dir: Option<PathBuf>,
266        id: &ComponentKey,
267    ) -> Option<DiskUsage> {
268        // All disk-backed buffers require the global data directory to be specified, and
269        // non-disk-backed buffers do not require it to be set... so if it's not set here, we ignore
270        // it because either:
271        // - it's a non-disk-backed buffer, in which case we can just ignore, or
272        // - this method is being called at a point before we actually check that a global data
273        //   directory is specified because we have a disk buffer present
274        //
275        // Since we're not able to emit/surface errors about a lack of a global data directory from
276        // where this method is called, we simply return `None` to let it reach the code that _does_
277        // emit/surface those errors... and once those errors are fixed, this code can return valid
278        // disk usage information, which will then be validated and emit any errors for _that_
279        // aspect.
280        match global_data_dir {
281            None => None,
282            Some(global_data_dir) => match self {
283                Self::Memory { .. } => None,
284                Self::DiskV2 { max_size, .. } => {
285                    let data_dir = crate::variants::disk_v2::get_disk_v2_data_dir_path(
286                        &global_data_dir,
287                        id.id(),
288                    );
289
290                    Some(DiskUsage::new(id.clone(), data_dir, *max_size))
291                }
292            },
293        }
294    }
295
296    /// Adds this buffer type as a stage to an existing [`TopologyBuilder`].
297    ///
298    /// # Errors
299    ///
300    /// If a required parameter is missing, or if there is an error building the topology itself, an
301    /// error variant will be returned describing the error
302    pub fn add_to_builder<T>(
303        &self,
304        builder: &mut TopologyBuilder<T>,
305        data_dir: Option<PathBuf>,
306        id: String,
307    ) -> Result<(), BufferBuildError>
308    where
309        T: Bufferable + Clone + Finalizable,
310    {
311        match *self {
312            BufferType::Memory { size, when_full } => {
313                builder.stage(MemoryBuffer::new(size), when_full);
314            }
315            BufferType::DiskV2 {
316                when_full,
317                max_size,
318            } => {
319                let data_dir = data_dir.ok_or(BufferBuildError::RequiresDataDir)?;
320                builder.stage(DiskV2Buffer::new(id, data_dir, max_size), when_full);
321            }
322        }
323
324        Ok(())
325    }
326}
327
328/// Buffer configuration.
329///
330/// Buffers are compromised of stages(*) that form a buffer _topology_, with input items being
331/// subject to configurable behavior when each stage reaches configured limits.  Buffers are
332/// configured for sinks, where backpressure from the sink can be handled by the buffer.  This
333/// allows absorbing temporary load, or potentially adding write-ahead-log behavior to a sink to
334/// increase the durability of a given Vector pipeline.
335///
336/// While we use the term "buffer topology" here, a buffer topology is referred to by the more
337/// common "buffer" or "buffers" shorthand.  This is related to buffers originally being a single
338/// component, where you could only choose which buffer type to use.  As we expand buffer
339/// functionality to allow chaining buffers together, you'll see "buffer topology" used in internal
340/// documentation to correctly reflect the internal structure.
341///
342// TODO: We need to limit chained buffers to only allowing a single copy of each buffer type to be
343// defined, otherwise, for example, two instances of the same disk buffer type in a single chained
344// buffer topology would try to both open the same buffer files on disk, which wouldn't work or
345// would go horribly wrong.
346#[configurable_component]
347#[derive(Clone, Debug, PartialEq, Eq)]
348#[serde(untagged)]
349#[configurable(
350    title = "Configures the buffering behavior for this sink.",
351    description = r#"More information about the individual buffer types, and buffer behavior, can be found in the
352[Buffering Model][buffering_model] section.
353
354[buffering_model]: /docs/architecture/buffering-model/"#
355)]
356pub enum BufferConfig {
357    /// A single stage buffer topology.
358    Single(BufferType),
359
360    /// A chained buffer topology.
361    Chained(Vec<BufferType>),
362}
363
364impl Default for BufferConfig {
365    fn default() -> Self {
366        Self::Single(BufferType::Memory {
367            size: MemoryBufferSize::MaxEvents(memory_buffer_default_max_events()),
368            when_full: WhenFull::default(),
369        })
370    }
371}
372
373impl BufferConfig {
374    /// Returns true if any stage in this buffer configuration uses disk-based storage.
375    pub fn has_disk_stage(&self) -> bool {
376        self.stages()
377            .iter()
378            .any(|stage| matches!(stage, BufferType::DiskV2 { .. }))
379    }
380
381    /// Gets all of the configured stages for this buffer.
382    pub fn stages(&self) -> &[BufferType] {
383        match self {
384            Self::Single(stage) => slice::from_ref(stage),
385            Self::Chained(stages) => stages.as_slice(),
386        }
387    }
388
389    /// Builds the buffer components represented by this configuration.
390    ///
391    /// The caller gets back a `Sink` and `Stream` implementation that represent a way to push items
392    /// into the buffer, as well as pop items out of the buffer, respectively.
393    ///
394    /// # Errors
395    ///
396    /// If the buffer is configured with anything other than a single stage, an error variant will
397    /// be thrown.
398    ///
399    /// If a disk buffer stage is configured and the data directory provided is `None`, an error
400    /// variant will be thrown.
401    #[allow(clippy::needless_pass_by_value)]
402    pub async fn build<T>(
403        &self,
404        data_dir: Option<PathBuf>,
405        buffer_id: String,
406        span: Span,
407    ) -> Result<(BufferSender<T>, BufferReceiver<T>), BufferBuildError>
408    where
409        T: Bufferable + Clone + Finalizable,
410    {
411        let mut builder = TopologyBuilder::default();
412
413        for stage in self.stages() {
414            stage.add_to_builder(&mut builder, data_dir.clone(), buffer_id.clone())?;
415        }
416
417        builder
418            .build(buffer_id, span)
419            .await
420            .context(FailedToBuildTopologySnafu)
421    }
422}
423
424#[cfg(test)]
425mod test {
426    use std::num::{NonZeroU64, NonZeroUsize};
427
428    use crate::{BufferConfig, BufferType, MemoryBufferSize, WhenFull};
429
430    fn check_single_stage(source: &str, expected: BufferType) {
431        let config: BufferConfig = serde_yaml::from_str(source).unwrap();
432        assert_eq!(config.stages().len(), 1);
433        let actual = config.stages().first().unwrap();
434        assert_eq!(actual, &expected);
435    }
436
437    fn check_multiple_stages(source: &str, expected_stages: &[BufferType]) {
438        let config: BufferConfig = serde_yaml::from_str(source).unwrap();
439        assert_eq!(config.stages().len(), expected_stages.len());
440        for (actual, expected) in config.stages().iter().zip(expected_stages) {
441            assert_eq!(actual, expected);
442        }
443    }
444
445    const BUFFER_CONFIG_NO_MATCH_ERR: &str =
446        "data did not match any variant of untagged enum BufferConfig";
447
448    #[test]
449    fn parse_empty() {
450        let source = "";
451        let error = serde_yaml::from_str::<BufferConfig>(source).unwrap_err();
452        assert_eq!(error.to_string(), BUFFER_CONFIG_NO_MATCH_ERR);
453    }
454
455    #[test]
456    fn parse_only_invalid_keys() {
457        let source = "foo: 314";
458        let error = serde_yaml::from_str::<BufferConfig>(source).unwrap_err();
459        assert_eq!(error.to_string(), BUFFER_CONFIG_NO_MATCH_ERR);
460    }
461
462    #[test]
463    fn parse_partial_invalid_keys() {
464        let source = r"max_size: 100
465max_events: 42
466";
467        let error = serde_yaml::from_str::<BufferConfig>(source).unwrap_err();
468        assert_eq!(error.to_string(), BUFFER_CONFIG_NO_MATCH_ERR);
469    }
470
471    #[test]
472    fn parse_without_type_tag() {
473        check_single_stage(
474            r"
475          max_events: 100
476          ",
477            BufferType::Memory {
478                size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()),
479                when_full: WhenFull::Block,
480            },
481        );
482    }
483
484    #[test]
485    fn parse_memory_with_byte_size_option() {
486        check_single_stage(
487            r"
488        max_size: 4096
489        ",
490            BufferType::Memory {
491                size: MemoryBufferSize::MaxSize(NonZeroUsize::new(4096).unwrap()),
492                when_full: WhenFull::Block,
493            },
494        );
495    }
496
497    #[test]
498    fn parse_multiple_stages() {
499        check_multiple_stages(
500            r"
501          - max_events: 42
502          - max_events: 100
503            when_full: drop_newest
504          ",
505            &[
506                BufferType::Memory {
507                    size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(42).unwrap()),
508                    when_full: WhenFull::Block,
509                },
510                BufferType::Memory {
511                    size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()),
512                    when_full: WhenFull::DropNewest,
513                },
514            ],
515        );
516    }
517
518    #[test]
519    fn ensure_field_defaults_for_all_types() {
520        check_single_stage(
521            r"
522          type: memory
523          ",
524            BufferType::Memory {
525                size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(500).unwrap()),
526                when_full: WhenFull::Block,
527            },
528        );
529
530        check_single_stage(
531            r"
532          type: memory
533          max_events: 100
534          ",
535            BufferType::Memory {
536                size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(100).unwrap()),
537                when_full: WhenFull::Block,
538            },
539        );
540
541        check_single_stage(
542            r"
543          type: memory
544          when_full: drop_newest
545          ",
546            BufferType::Memory {
547                size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(500).unwrap()),
548                when_full: WhenFull::DropNewest,
549            },
550        );
551
552        check_single_stage(
553            r"
554          type: memory
555          when_full: overflow
556          ",
557            BufferType::Memory {
558                size: MemoryBufferSize::MaxEvents(NonZeroUsize::new(500).unwrap()),
559                when_full: WhenFull::Overflow,
560            },
561        );
562
563        check_single_stage(
564            r"
565          type: disk
566          max_size: 1024
567          ",
568            BufferType::DiskV2 {
569                max_size: NonZeroU64::new(1024).unwrap(),
570                when_full: WhenFull::Block,
571            },
572        );
573    }
574}