1use std::fs::File;
2use std::io::Read;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use base64::prelude::*;
7
8use azure_core::error::Error as AzureCoreError;
9
10use crate::sinks::azure_common::connection_string::{Auth, ParsedConnectionString};
11use crate::sinks::azure_common::shared_key_policy::SharedKeyAuthorizationPolicy;
12use azure_core::http::{ClientMethodOptions, StatusCode, Url};
13
14use azure_core::credentials::{TokenCredential, TokenRequestOptions};
15use azure_core::{Error, error::ErrorKind};
16
17use azure_identity::{
18 AzureCliCredential, ClientAssertion, ClientAssertionCredential, ClientCertificateCredential,
19 ClientCertificateCredentialOptions, ClientSecretCredential, ManagedIdentityCredential,
20 ManagedIdentityCredentialOptions, UserAssignedId, WorkloadIdentityCredential,
21 WorkloadIdentityCredentialOptions,
22};
23
24use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions};
25
26use bytes::Bytes;
27use futures::FutureExt;
28use snafu::Snafu;
29use vector_lib::{
30 configurable::configurable_component,
31 json_size::JsonSize,
32 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
33 sensitive_string::SensitiveString,
34 stream::DriverResponse,
35};
36
37use crate::{
38 event::{EventFinalizers, EventStatus, Finalizable},
39 sinks::{Healthcheck, util::retries::RetryLogic},
40};
41
42#[configurable_component]
44#[configurable(metadata(docs::advanced))]
45#[derive(Clone, Debug, Default)]
46#[serde(deny_unknown_fields)]
47pub struct AzureBlobTlsConfig {
48 #[serde(alias = "ca_path")]
52 #[configurable(metadata(docs::examples = "/path/to/certificate_authority.crt"))]
53 #[configurable(metadata(docs::human_name = "CA File Path"))]
54 pub ca_file: Option<PathBuf>,
55}
56
57#[configurable_component]
59#[derive(Clone, Debug, Eq, PartialEq)]
60#[serde(deny_unknown_fields, untagged)]
61pub enum AzureAuthentication {
62 #[configurable(metadata(docs::enum_tag_description = "The kind of Azure credential to use."))]
63 Specific(SpecificAzureCredential),
64
65 #[cfg(test)]
67 #[serde(skip)]
68 MockCredential,
69}
70
71impl Default for AzureAuthentication {
72 fn default() -> Self {
76 Self::Specific(SpecificAzureCredential::ManagedIdentity {
77 user_assigned_managed_identity_id: None,
78 user_assigned_managed_identity_id_type: None,
79 })
80 }
81}
82
83#[configurable_component]
84#[derive(Clone, Debug, Eq, PartialEq)]
85#[serde(deny_unknown_fields, rename_all = "snake_case")]
86#[derive(Default)]
87pub enum UserAssignedManagedIdentityIdType {
89 #[default]
90 ClientId,
92 ObjectId,
94 ResourceId,
96}
97
98#[configurable_component]
100#[derive(Clone, Debug, Eq, PartialEq)]
101#[serde(
102 tag = "azure_credential_kind",
103 rename_all = "snake_case",
104 deny_unknown_fields
105)]
106pub enum SpecificAzureCredential {
107 #[cfg(not(target_arch = "wasm32"))]
109 AzureCli {},
110
111 ClientCertificateCredential {
113 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
117 #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID:?err}"))]
118 azure_tenant_id: String,
119
120 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
124 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID:?err}"))]
125 azure_client_id: String,
126
127 #[configurable(metadata(docs::examples = "path/to/certificate.pfx"))]
129 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_CERTIFICATE_PATH:?err}"))]
130 certificate_path: PathBuf,
131
132 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_CERTIFICATE_PASSWORD}"))]
134 certificate_password: Option<SensitiveString>,
135 },
136
137 ClientSecretCredential {
139 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
143 #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID:?err}"))]
144 azure_tenant_id: String,
145
146 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
150 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID:?err}"))]
151 azure_client_id: String,
152
153 #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))]
157 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_SECRET:?err}"))]
158 azure_client_secret: SensitiveString,
159 },
160
161 ManagedIdentity {
163 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
165 #[serde(default, skip_serializing_if = "Option::is_none")]
166 user_assigned_managed_identity_id: Option<String>,
167
168 user_assigned_managed_identity_id_type: Option<UserAssignedManagedIdentityIdType>,
171 },
172
173 ManagedIdentityClientAssertion {
175 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
177 #[configurable(metadata(
178 docs::examples = "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/rg-vector/providers/Microsoft.ManagedIdentity/userAssignedIdentities/id-vector-uami"
179 ))]
180 #[serde(default, skip_serializing_if = "Option::is_none")]
181 user_assigned_managed_identity_id: Option<String>,
182
183 user_assigned_managed_identity_id_type: Option<UserAssignedManagedIdentityIdType>,
185
186 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
188 client_assertion_tenant_id: String,
189
190 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
192 client_assertion_client_id: String,
193 },
194
195 WorkloadIdentity {
197 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
201 #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID}"))]
202 tenant_id: Option<String>,
203
204 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
208 #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID}"))]
209 client_id: Option<String>,
210
211 #[configurable(metadata(
213 docs::examples = "/var/run/secrets/azure/tokens/azure-identity-token"
214 ))]
215 #[configurable(metadata(docs::examples = "${AZURE_FEDERATED_TOKEN_FILE}"))]
216 token_file_path: Option<PathBuf>,
217 },
218}
219
220#[derive(Debug)]
221struct ManagedIdentityClientAssertion {
222 credential: Arc<dyn TokenCredential>,
223 scope: String,
224}
225
226#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
227#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
228impl ClientAssertion for ManagedIdentityClientAssertion {
229 async fn secret(&self, options: Option<ClientMethodOptions<'_>>) -> azure_core::Result<String> {
230 Ok(self
231 .credential
232 .get_token(
233 &[&self.scope],
234 Some(TokenRequestOptions {
235 method_options: options.unwrap_or_default(),
236 }),
237 )
238 .await?
239 .token
240 .secret()
241 .to_string())
242 }
243}
244
245impl AzureAuthentication {
246 pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
248 match self {
249 Self::Specific(specific) => specific.credential().await,
250
251 #[cfg(test)]
252 Self::MockCredential => Ok(Arc::new(MockTokenCredential) as Arc<dyn TokenCredential>),
253 }
254 }
255}
256
257impl SpecificAzureCredential {
258 pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
260 let credential: Arc<dyn TokenCredential> = match self {
261 #[cfg(not(target_arch = "wasm32"))]
262 Self::AzureCli {} => AzureCliCredential::new(None)?,
263
264 Self::ClientCertificateCredential {
266 azure_tenant_id,
267 azure_client_id,
268 certificate_path,
269 certificate_password,
270 } => {
271 let certificate_bytes: Vec<u8> = std::fs::read(certificate_path).map_err(|e| {
272 Error::with_message(
273 ErrorKind::Credential,
274 format!(
275 "Failed to read certificate file {}: {e}",
276 certificate_path.display()
277 ),
278 )
279 })?;
280
281 let certificate_base64: azure_core::credentials::Secret =
283 BASE64_STANDARD.encode(&certificate_bytes).into();
284
285 let mut options: ClientCertificateCredentialOptions =
286 ClientCertificateCredentialOptions::default();
287 if let Some(password) = certificate_password {
288 options.password = Some(password.inner().to_string().into());
289 }
290
291 ClientCertificateCredential::new(
292 azure_tenant_id.clone(),
293 azure_client_id.clone(),
294 certificate_base64,
295 Some(options),
296 )?
297 }
298
299 Self::ClientSecretCredential {
300 azure_tenant_id,
301 azure_client_id,
302 azure_client_secret,
303 } => {
304 if azure_tenant_id.is_empty() {
305 return Err(Error::with_message(ErrorKind::Credential,
306 "`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
307 ));
308 }
309 if azure_client_id.is_empty() {
310 return Err(Error::with_message(ErrorKind::Credential,
311 "`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
312 ));
313 }
314 if azure_client_secret.inner().is_empty() {
315 return Err(Error::with_message(ErrorKind::Credential,
316 "`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
317 ));
318 }
319
320 let secret: String = azure_client_secret.inner().into();
321 ClientSecretCredential::new(
322 &azure_tenant_id.clone(),
323 azure_client_id.clone(),
324 secret.into(),
325 None,
326 )?
327 }
328
329 Self::ManagedIdentity {
330 user_assigned_managed_identity_id,
331 user_assigned_managed_identity_id_type,
332 } => {
333 let mut options = ManagedIdentityCredentialOptions::default();
334 if let Some(id) = user_assigned_managed_identity_id {
335 options.user_assigned_id = match user_assigned_managed_identity_id_type
336 .as_ref()
337 .unwrap_or(&Default::default())
338 {
339 UserAssignedManagedIdentityIdType::ClientId => {
340 Some(UserAssignedId::ClientId(id.clone()))
341 }
342 UserAssignedManagedIdentityIdType::ObjectId => {
343 Some(UserAssignedId::ObjectId(id.clone()))
344 }
345 UserAssignedManagedIdentityIdType::ResourceId => {
346 Some(UserAssignedId::ResourceId(id.clone()))
347 }
348 };
349 }
350 ManagedIdentityCredential::new(Some(options))?
351 }
352
353 Self::ManagedIdentityClientAssertion {
354 user_assigned_managed_identity_id,
355 user_assigned_managed_identity_id_type,
356 client_assertion_tenant_id,
357 client_assertion_client_id,
358 } => {
359 let mut options = ManagedIdentityCredentialOptions::default();
360 if let Some(id) = user_assigned_managed_identity_id {
361 options.user_assigned_id = match user_assigned_managed_identity_id_type
362 .as_ref()
363 .unwrap_or(&Default::default())
364 {
365 UserAssignedManagedIdentityIdType::ClientId => {
366 Some(UserAssignedId::ClientId(id.clone()))
367 }
368 UserAssignedManagedIdentityIdType::ObjectId => {
369 Some(UserAssignedId::ObjectId(id.clone()))
370 }
371 UserAssignedManagedIdentityIdType::ResourceId => {
372 Some(UserAssignedId::ResourceId(id.clone()))
373 }
374 };
375 }
376 let msi: Arc<dyn TokenCredential> = ManagedIdentityCredential::new(Some(options))?;
377 let assertion = ManagedIdentityClientAssertion {
378 credential: msi,
379 scope: "api://AzureADTokenExchange/.default".to_string(),
381 };
382
383 ClientAssertionCredential::new(
384 client_assertion_tenant_id.clone(),
385 client_assertion_client_id.clone(),
386 assertion,
387 None,
388 )?
389 }
390
391 Self::WorkloadIdentity {
392 tenant_id,
393 client_id,
394 token_file_path,
395 } => {
396 let options = WorkloadIdentityCredentialOptions {
397 tenant_id: tenant_id.clone(),
398 client_id: client_id.clone(),
399 token_file_path: token_file_path.clone(),
400 ..Default::default()
401 };
402
403 WorkloadIdentityCredential::new(Some(options))?
404 }
405 };
406 Ok(credential)
407 }
408}
409
410#[derive(Debug, Clone)]
411pub struct AzureBlobRequest {
412 pub blob_data: Bytes,
413 pub content_encoding: Option<&'static str>,
414 pub content_type: &'static str,
415 pub metadata: AzureBlobMetadata,
416 pub request_metadata: RequestMetadata,
417}
418
419impl Finalizable for AzureBlobRequest {
420 fn take_finalizers(&mut self) -> EventFinalizers {
421 std::mem::take(&mut self.metadata.finalizers)
422 }
423}
424
425impl MetaDescriptive for AzureBlobRequest {
426 fn get_metadata(&self) -> &RequestMetadata {
427 &self.request_metadata
428 }
429
430 fn metadata_mut(&mut self) -> &mut RequestMetadata {
431 &mut self.request_metadata
432 }
433}
434
435#[derive(Clone, Debug)]
436pub struct AzureBlobMetadata {
437 pub partition_key: String,
438 pub count: usize,
439 pub byte_size: JsonSize,
440 pub finalizers: EventFinalizers,
441}
442
443#[derive(Debug, Clone)]
444pub struct AzureBlobRetryLogic;
445
446impl RetryLogic for AzureBlobRetryLogic {
447 type Error = AzureCoreError;
448 type Request = AzureBlobRequest;
449 type Response = AzureBlobResponse;
450
451 fn is_retriable_error(&self, error: &Self::Error) -> bool {
452 match error.http_status() {
453 Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests,
454 None => false,
455 }
456 }
457}
458
459#[derive(Debug)]
460pub struct AzureBlobResponse {
461 pub events_byte_size: GroupedCountByteSize,
462 pub byte_size: usize,
463}
464
465impl DriverResponse for AzureBlobResponse {
466 fn event_status(&self) -> EventStatus {
467 EventStatus::Delivered
468 }
469
470 fn events_sent(&self) -> &GroupedCountByteSize {
471 &self.events_byte_size
472 }
473
474 fn bytes_sent(&self) -> Option<usize> {
475 Some(self.byte_size)
476 }
477}
478
479#[derive(Debug, Snafu)]
480pub enum HealthcheckError {
481 #[snafu(display("Invalid connection string specified"))]
482 InvalidCredentials,
483 #[snafu(display("Container: {:?} not found", container))]
484 UnknownContainer { container: String },
485 #[snafu(display("Unknown status code: {}", status))]
486 Unknown { status: StatusCode },
487}
488
489pub fn build_healthcheck(
490 container_name: String,
491 client: Arc<BlobContainerClient>,
492) -> crate::Result<Healthcheck> {
493 let healthcheck = async move {
494 let resp: crate::Result<()> = match client.get_properties(None).await {
495 Ok(_) => Ok(()),
496 Err(error) => {
497 let code = error.http_status();
498 Err(match code {
499 Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials),
500 Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer {
501 container: container_name,
502 }),
503 Some(status) => Box::new(HealthcheckError::Unknown { status }),
504 None => "unknown status code".into(),
505 })
506 }
507 };
508 resp
509 };
510
511 Ok(healthcheck.boxed())
512}
513
514pub async fn build_client(
515 auth: Option<AzureAuthentication>,
516 connection_string: String,
517 container_name: String,
518 proxy: &crate::config::ProxyConfig,
519 tls: Option<AzureBlobTlsConfig>,
520) -> crate::Result<Arc<BlobContainerClient>> {
521 let parsed = ParsedConnectionString::parse(&connection_string)
523 .map_err(|e| format!("Invalid connection string: {e}"))?;
524 let container_url = parsed
526 .container_url(&container_name)
527 .map_err(|e| format!("Failed to build container URL: {e}"))?;
528 let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?;
529
530 let mut credential: Option<Arc<dyn TokenCredential>> = None;
531
532 let mut options = BlobContainerClientOptions::default();
534 match (parsed.auth(), &auth) {
535 (Auth::None, None) => {
536 warn!("No authentication method provided, requests will be anonymous.");
537 }
538 (Auth::Sas { .. }, None) => {
539 info!("Using SAS token authentication.");
540 }
541 (
542 Auth::SharedKey {
543 account_name,
544 account_key,
545 },
546 None,
547 ) => {
548 info!("Using Shared Key authentication.");
549
550 let policy = SharedKeyAuthorizationPolicy::new(
551 account_name,
552 account_key,
553 String::from("2025-11-05"),
555 )
556 .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?;
557 options
558 .client_options
559 .per_call_policies
560 .push(Arc::new(policy));
561 }
562 (Auth::None, Some(AzureAuthentication::Specific(..))) => {
563 info!("Using Azure Authentication method.");
564 let credential_result: Arc<dyn TokenCredential> =
565 auth.unwrap().credential().await.map_err(|e| {
566 Error::with_message(
567 ErrorKind::Credential,
568 format!("Failed to configure Azure Authentication: {e}"),
569 )
570 })?;
571 credential = Some(credential_result);
572 }
573 (Auth::Sas { .. }, Some(AzureAuthentication::Specific(..))) => {
574 return Err(Box::new(Error::with_message(
575 ErrorKind::Credential,
576 "Cannot use both SAS token and another Azure Authentication method at the same time",
577 )));
578 }
579 (Auth::SharedKey { .. }, Some(AzureAuthentication::Specific(..))) => {
580 return Err(Box::new(Error::with_message(
581 ErrorKind::Credential,
582 "Cannot use both Shared Key and another Azure Authentication method at the same time",
583 )));
584 }
585 #[cfg(test)]
586 (Auth::None, Some(AzureAuthentication::MockCredential)) => {
587 warn!("Using mock token credential authentication.");
588 credential = Some(auth.unwrap().credential().await.unwrap());
589 }
590 #[cfg(test)]
591 (_, Some(AzureAuthentication::MockCredential)) => {
592 return Err(Box::new(Error::with_message(
593 ErrorKind::Credential,
594 "Cannot use both connection string auth and mock credential at the same time",
595 )));
596 }
597 }
598
599 let mut reqwest_builder = reqwest_12::ClientBuilder::new();
601 let bypass_proxy = {
602 let host = url.host_str().unwrap_or("");
603 let port = url.port();
604 proxy.no_proxy.matches(host)
605 || port
606 .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
607 .unwrap_or(false)
608 };
609 if bypass_proxy || !proxy.enabled {
610 reqwest_builder = reqwest_builder.no_proxy();
612 } else {
613 if let Some(http) = &proxy.http {
614 let p = reqwest_12::Proxy::http(http)
615 .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
616 reqwest_builder = reqwest_builder.proxy(p);
618 }
619 if let Some(https) = &proxy.https {
620 let p = reqwest_12::Proxy::https(https)
621 .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
622 reqwest_builder = reqwest_builder.proxy(p);
624 }
625 }
626
627 if let Some(AzureBlobTlsConfig { ca_file }) = &tls
628 && let Some(ca_file) = ca_file
629 {
630 let mut buf = Vec::new();
631 File::open(ca_file)?.read_to_end(&mut buf)?;
632 let cert = reqwest_12::Certificate::from_pem(&buf)?;
633
634 warn!("Adding TLS root certificate from {}", ca_file.display());
635 reqwest_builder = reqwest_builder.add_root_certificate(cert);
636 }
637
638 options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
639 reqwest_builder
640 .build()
641 .map_err(|e| format!("Failed to build reqwest client: {e}"))?,
642 )));
643 let client = BlobContainerClient::from_url(url, credential, Some(options))
644 .map_err(|e| format!("{e}"))?;
645 Ok(Arc::new(client))
646}
647
648#[cfg(test)]
649#[derive(Debug)]
650struct MockTokenCredential;
651
652#[cfg(test)]
653#[async_trait::async_trait]
654impl TokenCredential for MockTokenCredential {
655 async fn get_token(
656 &self,
657 scopes: &[&str],
658 _options: Option<azure_core::credentials::TokenRequestOptions<'_>>,
659 ) -> azure_core::Result<azure_core::credentials::AccessToken> {
660 let Some(scope) = scopes.first() else {
661 return Err(Error::with_message(
662 ErrorKind::Credential,
663 "no scopes were provided",
664 ));
665 };
666
667 let jwt = serde_json::json!({
670 "aud": scope.strip_suffix("/.default").unwrap_or(*scope),
671 "exp": 2147483647,
672 "iat": 0,
673 "iss": "https://sts.windows.net/",
674 "nbf": 0,
675 });
676
677 let jwt_base64 = format!(
680 "e30.{}.",
681 BASE64_STANDARD
682 .encode(serde_json::to_string(&jwt).unwrap())
683 .trim_end_matches("=")
684 )
685 .to_string();
686
687 warn!(
688 "Using mock token credential, JWT: {}, base64: {}",
689 serde_json::to_string(&jwt).unwrap(),
690 jwt_base64
691 );
692
693 Ok(azure_core::credentials::AccessToken::new(
694 jwt_base64,
695 azure_core::time::OffsetDateTime::now_utc() + std::time::Duration::from_secs(3600),
696 ))
697 }
698}
699
700#[cfg(test)]
701#[tokio::test]
702async fn azure_mock_token_credential_test() {
703 let credential = MockTokenCredential;
704 let access_token = credential
705 .get_token(&["https://example.com/.default"], None)
706 .await
707 .expect("valid credential should return a token");
708 assert_eq!(
709 access_token.token.secret(),
710 "e30.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiZXhwIjoyMTQ3NDgzNjQ3LCJpYXQiOjAsImlzcyI6Imh0dHBzOi8vc3RzLndpbmRvd3MubmV0LyIsIm5iZiI6MH0."
711 );
712}