vector/sinks/azure_logs_ingestion/
config.rs

1use std::sync::Arc;
2
3use azure_core::credentials::TokenCredential;
4
5use vector_lib::{configurable::configurable_component, schema};
6use vrl::value::Kind;
7
8use crate::{
9    http::{HttpClient, get_http_scheme_from_uri},
10    sinks::{
11        azure_common::config::AzureAuthentication,
12        prelude::*,
13        util::{
14            RealtimeSizeBasedDefaultBatchSettings, UriSerde,
15            http::{HttpStatusRetryLogic, RetryStrategy},
16        },
17    },
18};
19
20use super::{
21    service::{AzureLogsIngestionResponse, AzureLogsIngestionService},
22    sink::AzureLogsIngestionSink,
23};
24
25/// Max number of bytes in request body
26const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
27
28pub(super) fn default_scope() -> String {
29    "https://monitor.azure.com/.default".into()
30}
31
32pub(super) fn default_timestamp_field() -> String {
33    "TimeGenerated".into()
34}
35
36/// Configuration for the `azure_logs_ingestion` sink.
37#[configurable_component(sink(
38    "azure_logs_ingestion",
39    "Publish log events to the Azure Monitor Logs Ingestion API."
40))]
41#[derive(Clone, Debug)]
42#[serde(deny_unknown_fields)]
43pub struct AzureLogsIngestionConfig {
44    /// The [Data collection endpoint URI][endpoint] associated with the Log Analytics workspace.
45    ///
46    /// [endpoint]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
47    #[configurable(metadata(
48        docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"
49    ))]
50    pub endpoint: String,
51
52    /// The [Data collection rule immutable ID][dcr_immutable_id] for the Data collection endpoint.
53    ///
54    /// [dcr_immutable_id]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
55    #[configurable(metadata(docs::examples = "dcr-000a00a000a00000a000000aa000a0aa"))]
56    pub dcr_immutable_id: String,
57
58    /// The [Stream name][stream_name] for the Data collection rule.
59    ///
60    /// [stream_name]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
61    #[configurable(metadata(docs::examples = "Custom-MyTable"))]
62    pub stream_name: String,
63
64    #[configurable(derived)]
65    pub auth: AzureAuthentication,
66
67    /// [Token scope][token_scope] for dedicated Azure regions.
68    ///
69    /// [token_scope]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
70    #[configurable(metadata(docs::examples = "https://monitor.azure.us/.default"))]
71    #[configurable(metadata(docs::examples = "https://monitor.azure.cn/.default"))]
72    #[serde(default = "default_scope")]
73    pub(super) token_scope: String,
74
75    /// The destination field (column) for the timestamp.
76    ///
77    /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used as the source.
78    /// Most schemas use `TimeGenerated`, but some use `Timestamp` (legacy) or `EventStartTime` (ASIM) [std_columns].
79    ///
80    /// [std_columns]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated
81    #[configurable(metadata(docs::examples = "EventStartTime"))]
82    #[configurable(metadata(docs::examples = "Timestamp"))]
83    #[serde(default = "default_timestamp_field")]
84    pub timestamp_field: String,
85
86    #[configurable(derived)]
87    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
88    pub encoding: Transformer,
89
90    #[configurable(derived)]
91    #[serde(default)]
92    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
93
94    #[configurable(derived)]
95    #[serde(default)]
96    pub request: TowerRequestConfig,
97
98    #[configurable(derived)]
99    pub tls: Option<TlsConfig>,
100
101    #[configurable(derived)]
102    #[serde(
103        default,
104        deserialize_with = "crate::serde::bool_or_struct",
105        skip_serializing_if = "crate::serde::is_default"
106    )]
107    pub acknowledgements: AcknowledgementsConfig,
108
109    #[configurable(derived)]
110    #[serde(default)]
111    pub retry_strategy: RetryStrategy,
112}
113
114impl Default for AzureLogsIngestionConfig {
115    fn default() -> Self {
116        Self {
117            endpoint: Default::default(),
118            dcr_immutable_id: Default::default(),
119            stream_name: Default::default(),
120            auth: Default::default(),
121            token_scope: default_scope(),
122            timestamp_field: default_timestamp_field(),
123            encoding: Default::default(),
124            batch: Default::default(),
125            request: Default::default(),
126            tls: None,
127            acknowledgements: Default::default(),
128            retry_strategy: Default::default(),
129        }
130    }
131}
132
133impl AzureLogsIngestionConfig {
134    #[allow(clippy::too_many_arguments)]
135    pub(super) async fn build_inner(
136        &self,
137        cx: SinkContext,
138        endpoint: UriSerde,
139        dcr_immutable_id: String,
140        stream_name: String,
141        credential: Arc<dyn TokenCredential>,
142        token_scope: String,
143        timestamp_field: String,
144    ) -> crate::Result<(VectorSink, Healthcheck)> {
145        let endpoint = endpoint.with_default_parts().uri;
146        let protocol = get_http_scheme_from_uri(&endpoint).to_string();
147
148        let batch_settings = self
149            .batch
150            .validate()?
151            .limit_max_bytes(MAX_BATCH_SIZE)?
152            .into_batcher_settings()?;
153
154        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
155        let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
156
157        let service = AzureLogsIngestionService::new(
158            client,
159            endpoint,
160            dcr_immutable_id,
161            stream_name,
162            credential,
163            token_scope,
164        )?;
165        let healthcheck = service.healthcheck();
166
167        let retry_logic = HttpStatusRetryLogic::new(
168            |res: &AzureLogsIngestionResponse| res.http_status,
169            self.retry_strategy.clone(),
170        );
171        let request_settings = self.request.into_settings();
172        let service = ServiceBuilder::new()
173            .settings(request_settings, retry_logic)
174            .service(service);
175
176        let sink = AzureLogsIngestionSink::new(
177            batch_settings,
178            self.encoding.clone(),
179            service,
180            timestamp_field,
181            protocol,
182        );
183
184        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
185    }
186}
187
188impl_generate_config_from_default!(AzureLogsIngestionConfig);
189
190#[async_trait::async_trait]
191#[typetag::serde(name = "azure_logs_ingestion")]
192impl SinkConfig for AzureLogsIngestionConfig {
193    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
194        let endpoint: UriSerde = self.endpoint.parse()?;
195
196        let credential: Arc<dyn TokenCredential> = self.auth.credential().await?;
197
198        self.build_inner(
199            cx,
200            endpoint,
201            self.dcr_immutable_id.clone(),
202            self.stream_name.clone(),
203            credential,
204            self.token_scope.clone(),
205            self.timestamp_field.clone(),
206        )
207        .await
208    }
209
210    fn input(&self) -> Input {
211        let requirements =
212            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
213
214        Input::log().with_schema_requirement(requirements)
215    }
216
217    fn acknowledgements(&self) -> &AcknowledgementsConfig {
218        &self.acknowledgements
219    }
220}