vector/sinks/aws_s3/
config.rs

1use aws_sdk_s3::Client as S3Client;
2use tower::ServiceBuilder;
3#[cfg(feature = "codecs-parquet")]
4use vector_lib::codecs::BatchEncoder;
5#[cfg(feature = "codecs-parquet")]
6use vector_lib::codecs::encoding::{BatchSerializerConfig, format::ParquetSerializerConfig};
7use vector_lib::{
8    TimeZone,
9    codecs::{
10        EncoderKind, TextSerializerConfig,
11        encoding::{Framer, FramingConfig},
12    },
13    configurable::configurable_component,
14    sink::VectorSink,
15};
16
17use super::sink::S3RequestOptions;
18use crate::{
19    aws::{AwsAuthentication, RegionOrEndpoint},
20    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
21    config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
22    sinks::{
23        Healthcheck,
24        s3_common::{
25            self,
26            config::{RetryStrategy, S3Options},
27            partitioner::S3KeyPartitioner,
28            service::S3Service,
29            sink::S3Sink,
30        },
31        util::{
32            BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
33            TowerRequestConfig, timezone_to_offset,
34        },
35    },
36    template::Template,
37    tls::TlsConfig,
38};
39
40/// Batch encoding configuration for the `aws_s3` sink.
41#[cfg(feature = "codecs-parquet")]
42#[configurable_component]
43#[derive(Clone, Debug)]
44#[serde(tag = "codec", rename_all = "snake_case")]
45#[configurable(metadata(
46    docs::enum_tag_description = "The codec to use for batch encoding events."
47))]
48pub enum S3BatchEncoding {
49    /// Encodes events in [Apache Parquet][apache_parquet] columnar format.
50    ///
51    /// [apache_parquet]: https://parquet.apache.org/
52    Parquet(ParquetSerializerConfig),
53}
54
55/// Configuration for the `aws_s3` sink.
56#[configurable_component(sink(
57    "aws_s3",
58    "Store observability events in the AWS S3 object storage system."
59))]
60#[derive(Clone, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct S3SinkConfig {
63    /// The S3 bucket name.
64    ///
65    /// This must not include a leading `s3://` or a trailing `/`.
66    #[configurable(metadata(docs::examples = "my-bucket"))]
67    pub bucket: String,
68
69    /// A prefix to apply to all object keys.
70    ///
71    /// Prefixes are useful for partitioning objects, such as by creating an object key that
72    /// stores objects under a particular directory. If using a prefix for this purpose, it must end
73    /// in `/` to act as a directory path. A trailing `/` is **not** automatically added.
74    #[serde(default = "default_key_prefix")]
75    #[configurable(metadata(docs::templateable))]
76    #[configurable(metadata(docs::examples = "date=%F/hour=%H"))]
77    #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d"))]
78    #[configurable(metadata(docs::examples = "application_id={{ application_id }}/date=%F"))]
79    pub key_prefix: String,
80
81    /// The timestamp format for the time component of the object key.
82    ///
83    /// By default, object keys are appended with a timestamp that reflects when the objects are
84    /// sent to S3, such that the resulting object key is functionally equivalent to joining the key
85    /// prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
86    ///
87    /// This would represent a `key_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
88    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
89    /// timestamps in seconds since the Unix epoch.
90    ///
91    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
92    /// languages.
93    ///
94    /// When set to an empty string, no timestamp is appended to the key prefix.
95    ///
96    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
97    #[serde(default = "default_filename_time_format")]
98    pub filename_time_format: String,
99
100    /// Whether or not to append a UUID v4 token to the end of the object key.
101    ///
102    /// The UUID is appended to the timestamp portion of the object key, such that if the object key
103    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
104    /// in an object key that looks like `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
105    ///
106    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
107    /// object keys must be unique.
108    #[serde(default = "crate::serde::default_true")]
109    #[configurable(metadata(docs::human_name = "Append UUID to Filename"))]
110    pub filename_append_uuid: bool,
111
112    /// The filename extension to use in the object key.
113    ///
114    /// This overrides setting the extension based on the configured `compression`.
115    #[configurable(metadata(docs::examples = "json"))]
116    pub filename_extension: Option<String>,
117
118    #[serde(flatten)]
119    pub options: S3Options,
120
121    #[serde(flatten)]
122    pub region: RegionOrEndpoint,
123
124    #[serde(flatten)]
125    pub encoding: EncodingConfigWithFraming,
126
127    /// Batch encoding configuration for columnar formats.
128    ///
129    /// When set, events are encoded together as a batch in a columnar format (Parquet)
130    /// instead of the standard per-event framing-based encoding. The columnar format handles
131    /// its own internal compression, so the top-level `compression` setting is bypassed.
132    #[cfg(feature = "codecs-parquet")]
133    #[configurable(derived)]
134    #[serde(default)]
135    pub batch_encoding: Option<S3BatchEncoding>,
136
137    /// Compression configuration.
138    ///
139    /// All compression algorithms use the default compression level unless otherwise specified.
140    ///
141    /// Some cloud storage API clients and browsers handle decompression transparently, so
142    /// depending on how they are accessed, files may not always appear to be compressed.
143    #[configurable(derived)]
144    #[serde(default = "Compression::gzip_default")]
145    pub compression: Compression,
146
147    #[configurable(derived)]
148    #[serde(default)]
149    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
150
151    #[configurable(derived)]
152    #[serde(default)]
153    pub request: TowerRequestConfig,
154
155    #[configurable(derived)]
156    pub tls: Option<TlsConfig>,
157
158    #[configurable(derived)]
159    #[serde(default)]
160    pub auth: AwsAuthentication,
161
162    #[configurable(derived)]
163    #[serde(
164        default,
165        deserialize_with = "crate::serde::bool_or_struct",
166        skip_serializing_if = "crate::serde::is_default"
167    )]
168    pub acknowledgements: AcknowledgementsConfig,
169
170    #[configurable(derived)]
171    #[serde(default)]
172    pub timezone: Option<TimeZone>,
173
174    /// Specifies which addressing style to use.
175    ///
176    /// This controls if the bucket name is in the hostname or part of the URL.
177    #[serde(default = "crate::serde::default_true")]
178    pub force_path_style: bool,
179
180    /// Specifies retry strategy for failed requests.
181    ///
182    /// By default, the sink only retries attempts it deems possible to retry.
183    /// These settings extend the default behavior.
184    #[configurable(derived)]
185    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
186    pub retry_strategy: RetryStrategy,
187}
188
189pub(super) fn default_key_prefix() -> String {
190    "date=%F".to_string()
191}
192
193pub(super) fn default_filename_time_format() -> String {
194    "%s".to_string()
195}
196
197impl GenerateConfig for S3SinkConfig {
198    fn generate_config() -> toml::Value {
199        toml::Value::try_from(Self {
200            bucket: "".to_owned(),
201            key_prefix: default_key_prefix(),
202            filename_time_format: default_filename_time_format(),
203            filename_append_uuid: true,
204            filename_extension: None,
205            options: S3Options::default(),
206            region: RegionOrEndpoint::default(),
207            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
208            #[cfg(feature = "codecs-parquet")]
209            batch_encoding: None,
210            compression: Compression::gzip_default(),
211            batch: BatchConfig::default(),
212            request: TowerRequestConfig::default(),
213            tls: Some(TlsConfig::default()),
214            auth: AwsAuthentication::default(),
215            acknowledgements: Default::default(),
216            timezone: Default::default(),
217            force_path_style: Default::default(),
218            retry_strategy: Default::default(),
219        })
220        .unwrap()
221    }
222}
223
224#[async_trait::async_trait]
225#[typetag::serde(name = "aws_s3")]
226impl SinkConfig for S3SinkConfig {
227    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
228        let service = self.create_service(&cx.proxy).await?;
229        let healthcheck = self.build_healthcheck(service.client())?;
230        let sink = self.build_processor(service, cx)?;
231        Ok((sink, healthcheck))
232    }
233
234    fn input(&self) -> Input {
235        #[cfg(feature = "codecs-parquet")]
236        if let Some(batch_encoding) = &self.batch_encoding {
237            let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
238            let resolved = BatchSerializerConfig::Parquet(parquet_config.clone());
239            return Input::new(resolved.input_type());
240        }
241        Input::new(self.encoding.config().1.input_type())
242    }
243
244    fn acknowledgements(&self) -> &AcknowledgementsConfig {
245        &self.acknowledgements
246    }
247}
248
249impl S3SinkConfig {
250    pub fn build_processor(
251        &self,
252        service: S3Service,
253        cx: SinkContext,
254    ) -> crate::Result<VectorSink> {
255        // Build our S3 client/service, which is what we'll ultimately feed
256        // requests into in order to ship files to S3.  We build this here in
257        // order to configure the client/service with retries, concurrency
258        // limits, rate limits, and whatever else the client should have.
259        let request_limits = self.request.into_settings();
260        let retry_strategy = self.retry_strategy.clone();
261        let service = ServiceBuilder::new()
262            .settings(request_limits, retry_strategy)
263            .service(service);
264
265        let offset = self
266            .timezone
267            .or(cx.globals.timezone)
268            .and_then(timezone_to_offset);
269
270        // Configure our partitioning/batching.
271        let batch_settings = self.batch.into_batcher_settings()?;
272
273        let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);
274
275        let ssekms_key_id = self
276            .options
277            .ssekms_key_id
278            .as_ref()
279            .cloned()
280            .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
281            .transpose()?;
282
283        let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
284
285        let transformer = self.encoding.transformer();
286
287        // When batch_encoding is configured (e.g., Parquet), use batch mode
288        // with internal compression and appropriate file extension.
289        #[cfg(feature = "codecs-parquet")]
290        if let Some(batch_encoding) = &self.batch_encoding {
291            let S3BatchEncoding::Parquet(parquet_config) = batch_encoding;
292            let resolved_batch_config = BatchSerializerConfig::Parquet(parquet_config.clone());
293
294            let batch_serializer = resolved_batch_config.build_batch_serializer()?;
295            let batch_encoder = BatchEncoder::new(batch_serializer);
296
297            // Auto-detect Content-Type from batch format. Users can still
298            // override via `options.content_type`; we only set it when unset.
299            let mut api_options = self.options.clone();
300            if api_options.content_type.is_none() {
301                api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
302            }
303
304            let encoder = EncoderKind::Batch(batch_encoder);
305
306            let filename_extension = self.filename_extension.clone().or_else(|| {
307                Some(
308                    match batch_encoding {
309                        S3BatchEncoding::Parquet(_) => "parquet",
310                    }
311                    .to_string(),
312                )
313            });
314
315            if self.compression != Compression::None {
316                warn!("Top level compression setting ignored when batch_encoding set to parquet.")
317            }
318
319            let request_options = S3RequestOptions {
320                bucket: self.bucket.clone(),
321                api_options,
322                filename_extension,
323                filename_time_format: self.filename_time_format.clone(),
324                filename_append_uuid: self.filename_append_uuid,
325                encoder: (transformer, encoder),
326                // Batch formats handle their own compression internally
327                compression: Compression::None,
328                filename_tz_offset: offset,
329            };
330
331            let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
332            return Ok(VectorSink::from_event_streamsink(sink));
333        }
334
335        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
336        let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
337
338        let request_options = S3RequestOptions {
339            bucket: self.bucket.clone(),
340            api_options: self.options.clone(),
341            filename_extension: self.filename_extension.clone(),
342            filename_time_format: self.filename_time_format.clone(),
343            filename_append_uuid: self.filename_append_uuid,
344            encoder: (transformer, encoder),
345            compression: self.compression,
346            filename_tz_offset: offset,
347        };
348
349        let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
350
351        Ok(VectorSink::from_event_streamsink(sink))
352    }
353
354    pub fn build_healthcheck(&self, client: S3Client) -> crate::Result<Healthcheck> {
355        s3_common::config::build_healthcheck(self.bucket.clone(), client)
356    }
357
358    pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
359        s3_common::config::create_service(
360            &self.region,
361            &self.auth,
362            proxy,
363            self.tls.as_ref(),
364            self.force_path_style,
365        )
366        .await
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::S3SinkConfig;
373
374    #[test]
375    fn generate_config() {
376        crate::test_util::test_generate_config::<S3SinkConfig>();
377    }
378
379    /// Correct TOML shape: `batch_encoding.codec = "parquet"` with `schema_mode = "auto_infer"`.
380    #[cfg(feature = "codecs-parquet")]
381    #[test]
382    fn parquet_batch_encoding_correct_toml_shape() {
383        let config: S3SinkConfig = toml::from_str(
384            r#"
385            bucket = "test-bucket"
386            compression = "none"
387
388            [encoding]
389            codec = "text"
390
391            [batch_encoding]
392            schema_mode = "auto_infer"
393            codec = "parquet"
394
395            [batch_encoding.compression]
396            algorithm = "snappy"
397
398            "#,
399        )
400        .expect("correct batch_encoding shape should parse");
401
402        let batch_enc = config
403            .batch_encoding
404            .expect("batch_encoding should be Some");
405        let super::S3BatchEncoding::Parquet(ref p) = batch_enc;
406        use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode};
407        assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer);
408        assert_eq!(p.compression, ParquetCompression::Snappy);
409    }
410
411    /// Content-Type must be auto-detected as `application/vnd.apache.parquet`
412    /// when `batch_encoding` is set and `content_type` is not explicitly provided.
413    #[cfg(feature = "codecs-parquet")]
414    #[test]
415    fn parquet_content_type_auto_detected() {
416        use vector_lib::codecs::encoding::format::{
417            ParquetCompression, ParquetSchemaMode, ParquetSerializerConfig,
418        };
419
420        use crate::sinks::s3_common::config::S3Options;
421        use crate::sinks::util::{BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression};
422        use vector_lib::codecs::TextSerializerConfig;
423        use vector_lib::codecs::encoding::{BatchSerializerConfig, FramingConfig};
424
425        let parquet_config = ParquetSerializerConfig {
426            schema_mode: ParquetSchemaMode::AutoInfer,
427            compression: ParquetCompression::Snappy,
428            ..Default::default()
429        };
430
431        let config = S3SinkConfig {
432            bucket: "test".to_string(),
433            key_prefix: super::default_key_prefix(),
434            filename_time_format: super::default_filename_time_format(),
435            filename_append_uuid: true,
436            filename_extension: None,
437            options: S3Options::default(),
438            region: crate::aws::RegionOrEndpoint::with_both("us-east-1", "http://localhost:4566"),
439            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
440            batch_encoding: Some(super::S3BatchEncoding::Parquet(parquet_config)),
441            compression: Compression::None,
442            batch: BatchConfig::<BulkSizeBasedDefaultBatchSettings>::default(),
443            request: Default::default(),
444            tls: Default::default(),
445            auth: Default::default(),
446            acknowledgements: Default::default(),
447            timezone: Default::default(),
448            force_path_style: true,
449            retry_strategy: Default::default(),
450        };
451
452        let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
453        let batch_config = BatchSerializerConfig::Parquet(p.clone());
454        let batch_serializer = batch_config.build_batch_serializer().unwrap();
455        let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
456
457        let mut api_options = config.options.clone();
458        if api_options.content_type.is_none() {
459            api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
460        }
461
462        assert_eq!(
463            api_options.content_type.as_deref(),
464            Some("application/vnd.apache.parquet"),
465            "Content-Type must be auto-detected for Parquet"
466        );
467    }
468
469    /// When user explicitly sets `content_type`, the auto-detection must not override it.
470    #[cfg(feature = "codecs-parquet")]
471    #[test]
472    fn parquet_content_type_user_override_preserved() {
473        let config: S3SinkConfig = toml::from_str(
474            r#"
475            bucket = "test-bucket"
476            compression = "none"
477            content_type = "application/octet-stream"
478
479            [encoding]
480            codec = "text"
481
482            [batch_encoding]
483            codec = "parquet"
484            schema_mode = "auto_infer"
485
486            [batch_encoding.compression]
487            algorithm = "gzip"
488            level = 9
489            "#,
490        )
491        .unwrap();
492
493        let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.as_ref().unwrap();
494        let batch_config = vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p.clone());
495        let batch_serializer = batch_config.build_batch_serializer().unwrap();
496        let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
497
498        let mut api_options = config.options.clone();
499        if api_options.content_type.is_none() {
500            api_options.content_type = batch_encoder.content_type().map(|s| s.to_string());
501        }
502
503        assert_eq!(
504            api_options.content_type.as_deref(),
505            Some("application/octet-stream"),
506            "User-specified Content-Type must not be overridden"
507        );
508    }
509
510    /// Codecs other than `parquet` must be rejected at parse time, since
511    /// `S3BatchEncoding` only exposes the `parquet` variant.
512    #[cfg(feature = "codecs-parquet")]
513    #[test]
514    fn parquet_batch_encoding_rejects_unsupported_codec() {
515        let err = serde_yaml::from_str::<S3SinkConfig>(
516            r#"
517            bucket: test-bucket
518            compression: none
519            encoding:
520              codec: text
521            batch_encoding:
522              codec: arrow_stream
523            "#,
524        )
525        .unwrap_err();
526
527        assert!(
528            err.to_string().contains("arrow_stream"),
529            "expected error to mention the offending codec, got: {err}"
530        );
531    }
532
533    /// Explicit filename_extension overrides the `.parquet` default.
534    #[cfg(feature = "codecs-parquet")]
535    #[test]
536    fn parquet_filename_extension_user_override() {
537        let config: S3SinkConfig = toml::from_str(
538            r#"
539            bucket = "test-bucket"
540            compression = "none"
541            filename_extension = "pq"
542
543            [encoding]
544            codec = "text"
545
546            [batch_encoding]
547            codec = "parquet"
548            schema_mode = "auto_infer"
549            "#,
550        )
551        .unwrap();
552
553        assert_eq!(config.filename_extension.as_deref(), Some("pq"));
554    }
555
556    /// `schema_mode` defaults to `relaxed` when not specified.
557    #[cfg(feature = "codecs-parquet")]
558    #[test]
559    fn parquet_schema_mode_defaults_to_relaxed() {
560        use vector_lib::codecs::encoding::format::ParquetSchemaMode;
561
562        let config: S3SinkConfig = toml::from_str(
563            r#"
564            bucket = "test-bucket"
565            compression = "none"
566
567            [encoding]
568            codec = "text"
569
570            [batch_encoding]
571            codec = "parquet"
572            "#,
573        )
574        .unwrap();
575
576        let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
577        assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed);
578    }
579
580    /// Explicit `schema_mode = "strict"` is correctly parsed.
581    #[cfg(feature = "codecs-parquet")]
582    #[test]
583    fn parquet_schema_mode_strict_parsed() {
584        use vector_lib::codecs::encoding::format::ParquetSchemaMode;
585
586        let config: S3SinkConfig = toml::from_str(
587            r#"
588            bucket = "test-bucket"
589            compression = "none"
590
591            [encoding]
592            codec = "text"
593
594            [batch_encoding]
595            codec = "parquet"
596            schema_mode = "strict"
597            schema_file = "tmp/something.schema"
598            "#,
599        )
600        .unwrap();
601
602        let super::S3BatchEncoding::Parquet(p) = config.batch_encoding.unwrap();
603        assert_eq!(p.schema_mode, ParquetSchemaMode::Strict);
604    }
605}