vector/sinks/azure_monitor_logs/
config.rs

1use openssl::{base64, pkey};
2use vector_lib::{
3    config::log_schema,
4    configurable::configurable_component,
5    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath},
6    schema,
7    sensitive_string::SensitiveString,
8};
9use vrl::value::Kind;
10
11use super::{
12    service::{AzureMonitorLogsResponse, AzureMonitorLogsService},
13    sink::AzureMonitorLogsSink,
14};
15use crate::{
16    http::{HttpClient, get_http_scheme_from_uri},
17    sinks::{
18        prelude::*,
19        util::{
20            RealtimeSizeBasedDefaultBatchSettings, UriSerde,
21            http::{HttpStatusRetryLogic, RetryStrategy},
22        },
23    },
24};
25
26/// Max number of bytes in request body
27const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
28
29pub(super) fn default_host() -> String {
30    "ods.opinsights.azure.com".into()
31}
32
33/// Configuration for the `azure_monitor_logs` sink.
34#[configurable_component(sink(
35    "azure_monitor_logs",
36    "Publish log events to the Azure Monitor Data Collector API."
37))]
38#[derive(Clone, Debug)]
39#[serde(deny_unknown_fields)]
40pub struct AzureMonitorLogsConfig {
41    /// The [unique identifier][uniq_id] for the Log Analytics workspace.
42    ///
43    /// [uniq_id]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-uri-parameters
44    #[configurable(metadata(docs::examples = "5ce893d9-2c32-4b6c-91a9-b0887c2de2d6"))]
45    #[configurable(metadata(docs::examples = "97ce69d9-b4be-4241-8dbd-d265edcf06c4"))]
46    pub customer_id: String,
47
48    /// The [primary or the secondary key][shared_key] for the Log Analytics workspace.
49    ///
50    /// [shared_key]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#authorization
51    #[configurable(metadata(
52        docs::examples = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA=="
53    ))]
54    #[configurable(metadata(docs::examples = "${AZURE_MONITOR_SHARED_KEY_ENV_VAR}"))]
55    pub shared_key: SensitiveString,
56
57    /// The [record type][record_type] of the data that is being submitted.
58    ///
59    /// Can only contain letters, numbers, and underscores (_), and may not exceed 100 characters.
60    ///
61    /// [record_type]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-headers
62    #[configurable(validation(pattern = "[a-zA-Z0-9_]{1,100}"))]
63    #[configurable(metadata(docs::examples = "MyTableName"))]
64    #[configurable(metadata(docs::examples = "MyRecordType"))]
65    pub log_type: String,
66
67    /// The [Resource ID][resource_id] of the Azure resource the data should be associated with.
68    ///
69    /// [resource_id]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-headers
70    #[configurable(metadata(
71        docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/otherResourceGroup/providers/Microsoft.Storage/storageAccounts/examplestorage"
72    ))]
73    #[configurable(metadata(
74        docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/examplegroup/providers/Microsoft.SQL/servers/serverName/databases/databaseName"
75    ))]
76    pub azure_resource_id: Option<String>,
77
78    /// [Alternative host][alt_host] for dedicated Azure regions.
79    ///
80    /// [alt_host]: https://docs.azure.cn/en-us/articles/guidance/developerdifferences#check-endpoints-in-azure
81    #[configurable(metadata(docs::examples = "ods.opinsights.azure.us"))]
82    #[configurable(metadata(docs::examples = "ods.opinsights.azure.cn"))]
83    #[serde(default = "default_host")]
84    pub(super) host: String,
85
86    #[configurable(derived)]
87    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
88    pub encoding: Transformer,
89
90    #[configurable(derived)]
91    #[serde(default)]
92    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
93
94    #[configurable(derived)]
95    #[serde(default)]
96    pub request: TowerRequestConfig,
97
98    /// Use this option to customize the log field used as [`TimeGenerated`][1] in Azure.
99    ///
100    /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used here by default.
101    /// This field should be used in rare cases where `TimeGenerated` should point to a specific log
102    /// field. For example, use this field to set the log field `source_timestamp` as holding the
103    /// value that should be used as `TimeGenerated` on the Azure side.
104    ///
105    /// [1]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated
106    #[configurable(metadata(docs::examples = "time_generated"))]
107    pub time_generated_key: Option<OptionalValuePath>,
108
109    #[configurable(derived)]
110    pub tls: Option<TlsConfig>,
111
112    #[configurable(derived)]
113    #[serde(
114        default,
115        deserialize_with = "crate::serde::bool_or_struct",
116        skip_serializing_if = "crate::serde::is_default"
117    )]
118    pub acknowledgements: AcknowledgementsConfig,
119
120    #[configurable(derived)]
121    #[serde(default)]
122    pub retry_strategy: RetryStrategy,
123}
124
125impl Default for AzureMonitorLogsConfig {
126    fn default() -> Self {
127        Self {
128            customer_id: "my-customer-id".to_string(),
129            shared_key: Default::default(),
130            log_type: "MyRecordType".to_string(),
131            azure_resource_id: None,
132            host: default_host(),
133            encoding: Default::default(),
134            batch: Default::default(),
135            request: Default::default(),
136            time_generated_key: None,
137            tls: None,
138            acknowledgements: Default::default(),
139            retry_strategy: Default::default(),
140        }
141    }
142}
143
144impl AzureMonitorLogsConfig {
145    pub(super) fn build_shared_key(&self) -> crate::Result<pkey::PKey<pkey::Private>> {
146        if self.shared_key.inner().is_empty() {
147            return Err("shared_key cannot be an empty string".into());
148        }
149        let shared_key_bytes = base64::decode_block(self.shared_key.inner())?;
150        let shared_key = pkey::PKey::hmac(&shared_key_bytes)?;
151        Ok(shared_key)
152    }
153
154    fn get_time_generated_key(&self) -> Option<OwnedValuePath> {
155        self.time_generated_key
156            .clone()
157            .and_then(|k| k.path)
158            .or_else(|| log_schema().timestamp_key().cloned())
159    }
160
161    pub(super) async fn build_inner(
162        &self,
163        cx: SinkContext,
164        endpoint: UriSerde,
165    ) -> crate::Result<(VectorSink, Healthcheck)> {
166        let endpoint = endpoint.with_default_parts().uri;
167        let protocol = get_http_scheme_from_uri(&endpoint).to_string();
168
169        let batch_settings = self
170            .batch
171            .validate()?
172            .limit_max_bytes(MAX_BATCH_SIZE)?
173            .into_batcher_settings()?;
174
175        let shared_key = self.build_shared_key()?;
176        let time_generated_key = self.get_time_generated_key();
177
178        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
179        let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
180
181        let service = AzureMonitorLogsService::new(
182            client,
183            endpoint,
184            self.customer_id.clone(),
185            self.azure_resource_id.as_deref(),
186            &self.log_type,
187            time_generated_key.clone(),
188            shared_key,
189        )?;
190        let healthcheck = service.healthcheck();
191
192        let retry_logic = HttpStatusRetryLogic::new(
193            |res: &AzureMonitorLogsResponse| res.http_status,
194            self.retry_strategy.clone(),
195        );
196        let request_settings = self.request.into_settings();
197        let service = ServiceBuilder::new()
198            .settings(request_settings, retry_logic)
199            .service(service);
200
201        let sink = AzureMonitorLogsSink::new(
202            batch_settings,
203            self.encoding.clone(),
204            service,
205            time_generated_key,
206            protocol,
207        );
208
209        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
210    }
211}
212
213impl_generate_config_from_default!(AzureMonitorLogsConfig);
214
215#[async_trait::async_trait]
216#[typetag::serde(name = "azure_monitor_logs")]
217impl SinkConfig for AzureMonitorLogsConfig {
218    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
219        let endpoint = format!("https://{}.{}", self.customer_id, self.host).parse()?;
220        self.build_inner(cx, endpoint).await
221    }
222
223    fn input(&self) -> Input {
224        let requirements =
225            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
226
227        Input::log().with_schema_requirement(requirements)
228    }
229
230    fn acknowledgements(&self) -> &AcknowledgementsConfig {
231        &self.acknowledgements
232    }
233}