vector/sinks/azure_common/
config.rs

1use std::fs::File;
2use std::io::Read;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use base64::prelude::*;
7
8use azure_core::error::Error as AzureCoreError;
9
10use crate::sinks::azure_common::connection_string::{Auth, ParsedConnectionString};
11use crate::sinks::azure_common::shared_key_policy::SharedKeyAuthorizationPolicy;
12use azure_core::http::{ClientMethodOptions, StatusCode, Url};
13
14use azure_core::credentials::{TokenCredential, TokenRequestOptions};
15use azure_core::{Error, error::ErrorKind};
16
17use azure_identity::{
18    AzureCliCredential, ClientAssertion, ClientAssertionCredential, ClientCertificateCredential,
19    ClientCertificateCredentialOptions, ClientSecretCredential, ManagedIdentityCredential,
20    ManagedIdentityCredentialOptions, UserAssignedId, WorkloadIdentityCredential,
21    WorkloadIdentityCredentialOptions,
22};
23
24use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions};
25
26use bytes::Bytes;
27use futures::FutureExt;
28use snafu::Snafu;
29use vector_lib::{
30    configurable::configurable_component,
31    json_size::JsonSize,
32    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
33    sensitive_string::SensitiveString,
34    stream::DriverResponse,
35};
36
37use crate::{
38    event::{EventFinalizers, EventStatus, Finalizable},
39    sinks::{Healthcheck, util::retries::RetryLogic},
40};
41
42/// TLS configuration.
43#[configurable_component]
44#[configurable(metadata(docs::advanced))]
45#[derive(Clone, Debug, Default)]
46#[serde(deny_unknown_fields)]
47pub struct AzureBlobTlsConfig {
48    /// Absolute path to an additional CA certificate file.
49    ///
50    /// The certificate must be in PEM (X.509) format.
51    #[serde(alias = "ca_path")]
52    #[configurable(metadata(docs::examples = "/path/to/certificate_authority.crt"))]
53    #[configurable(metadata(docs::human_name = "CA File Path"))]
54    pub ca_file: Option<PathBuf>,
55}
56
57/// Azure service principal authentication.
58#[configurable_component]
59#[derive(Clone, Debug, Eq, PartialEq)]
60#[serde(deny_unknown_fields, untagged)]
61pub enum AzureAuthentication {
62    #[configurable(metadata(docs::enum_tag_description = "The kind of Azure credential to use."))]
63    Specific(SpecificAzureCredential),
64
65    /// Mock credential for testing — returns a static fake token
66    #[cfg(test)]
67    #[serde(skip)]
68    MockCredential,
69}
70
71impl Default for AzureAuthentication {
72    // This should never be actually used.
73    // This is only needed when using Default::default() (such as unit tests),
74    // as serde requires `azure_credential_kind` to be specified.
75    fn default() -> Self {
76        Self::Specific(SpecificAzureCredential::ManagedIdentity {
77            user_assigned_managed_identity_id: None,
78            user_assigned_managed_identity_id_type: None,
79        })
80    }
81}
82
83#[configurable_component]
84#[derive(Clone, Debug, Eq, PartialEq)]
85#[serde(deny_unknown_fields, rename_all = "snake_case")]
86#[derive(Default)]
87/// User Assigned Managed Identity Types.
88pub enum UserAssignedManagedIdentityIdType {
89    #[default]
90    /// Client ID
91    ClientId,
92    /// Object ID
93    ObjectId,
94    /// Resource ID
95    ResourceId,
96}
97
98/// Specific Azure credential types.
99#[configurable_component]
100#[derive(Clone, Debug, Eq, PartialEq)]
101#[serde(
102    tag = "azure_credential_kind",
103    rename_all = "snake_case",
104    deny_unknown_fields
105)]
106pub enum SpecificAzureCredential {
107    /// Use Azure CLI credentials
108    #[cfg(not(target_arch = "wasm32"))]
109    AzureCli {},
110
111    /// Use certificate credentials
112    ClientCertificateCredential {
113        /// The [Azure Tenant ID][azure_tenant_id].
114        ///
115        /// [azure_tenant_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
116        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
117        #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID:?err}"))]
118        azure_tenant_id: String,
119
120        /// The [Azure Client ID][azure_client_id].
121        ///
122        /// [azure_client_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
123        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
124        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID:?err}"))]
125        azure_client_id: String,
126
127        /// PKCS12 certificate with RSA private key.
128        #[configurable(metadata(docs::examples = "path/to/certificate.pfx"))]
129        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_CERTIFICATE_PATH:?err}"))]
130        certificate_path: PathBuf,
131
132        /// The password for the client certificate, if applicable.
133        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_CERTIFICATE_PASSWORD}"))]
134        certificate_password: Option<SensitiveString>,
135    },
136
137    /// Use client ID/secret credentials
138    ClientSecretCredential {
139        /// The [Azure Tenant ID][azure_tenant_id].
140        ///
141        /// [azure_tenant_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
142        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
143        #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID:?err}"))]
144        azure_tenant_id: String,
145
146        /// The [Azure Client ID][azure_client_id].
147        ///
148        /// [azure_client_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
149        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
150        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID:?err}"))]
151        azure_client_id: String,
152
153        /// The [Azure Client Secret][azure_client_secret].
154        ///
155        /// [azure_client_secret]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
156        #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))]
157        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_SECRET:?err}"))]
158        azure_client_secret: SensitiveString,
159    },
160
161    /// Use Managed Identity credentials
162    ManagedIdentity {
163        /// The User Assigned Managed Identity to use.
164        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
165        #[serde(default, skip_serializing_if = "Option::is_none")]
166        user_assigned_managed_identity_id: Option<String>,
167
168        /// The type of the User Assigned Managed Identity ID provided (Client ID, Object ID,
169        /// or Resource ID). Defaults to Client ID.
170        user_assigned_managed_identity_id_type: Option<UserAssignedManagedIdentityIdType>,
171    },
172
173    /// Use Managed Identity with Client Assertion credentials
174    ManagedIdentityClientAssertion {
175        /// The User Assigned Managed Identity to use for the managed identity.
176        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
177        #[configurable(metadata(
178            docs::examples = "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/rg-vector/providers/Microsoft.ManagedIdentity/userAssignedIdentities/id-vector-uami"
179        ))]
180        #[serde(default, skip_serializing_if = "Option::is_none")]
181        user_assigned_managed_identity_id: Option<String>,
182
183        /// The type of the User Assigned Managed Identity ID provided (Client ID, Object ID, or Resource ID). Defaults to Client ID.
184        user_assigned_managed_identity_id_type: Option<UserAssignedManagedIdentityIdType>,
185
186        /// The target Tenant ID to use.
187        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
188        client_assertion_tenant_id: String,
189
190        /// The target Client ID to use.
191        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
192        client_assertion_client_id: String,
193    },
194
195    /// Use Workload Identity credentials
196    WorkloadIdentity {
197        /// The [Azure Tenant ID][azure_tenant_id]. Defaults to the value of the environment variable `AZURE_TENANT_ID`.
198        ///
199        /// [azure_tenant_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
200        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
201        #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID}"))]
202        tenant_id: Option<String>,
203
204        /// The [Azure Client ID][azure_client_id]. Defaults to the value of the environment variable `AZURE_CLIENT_ID`.
205        ///
206        /// [azure_client_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
207        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
208        #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID}"))]
209        client_id: Option<String>,
210
211        /// Path of a file containing a Kubernetes service account token. Defaults to the value of the environment variable `AZURE_FEDERATED_TOKEN_FILE`.
212        #[configurable(metadata(
213            docs::examples = "/var/run/secrets/azure/tokens/azure-identity-token"
214        ))]
215        #[configurable(metadata(docs::examples = "${AZURE_FEDERATED_TOKEN_FILE}"))]
216        token_file_path: Option<PathBuf>,
217    },
218}
219
220#[derive(Debug)]
221struct ManagedIdentityClientAssertion {
222    credential: Arc<dyn TokenCredential>,
223    scope: String,
224}
225
226#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
227#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
228impl ClientAssertion for ManagedIdentityClientAssertion {
229    async fn secret(&self, options: Option<ClientMethodOptions<'_>>) -> azure_core::Result<String> {
230        Ok(self
231            .credential
232            .get_token(
233                &[&self.scope],
234                Some(TokenRequestOptions {
235                    method_options: options.unwrap_or_default(),
236                }),
237            )
238            .await?
239            .token
240            .secret()
241            .to_string())
242    }
243}
244
245impl AzureAuthentication {
246    /// Returns the provider for the credentials based on the authentication mechanism chosen.
247    pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
248        match self {
249            Self::Specific(specific) => specific.credential().await,
250
251            #[cfg(test)]
252            Self::MockCredential => Ok(Arc::new(MockTokenCredential) as Arc<dyn TokenCredential>),
253        }
254    }
255}
256
257impl SpecificAzureCredential {
258    /// Returns the provider for the credentials based on the specific credential type.
259    pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
260        let credential: Arc<dyn TokenCredential> = match self {
261            #[cfg(not(target_arch = "wasm32"))]
262            Self::AzureCli {} => AzureCliCredential::new(None)?,
263
264            // requires azure_identity feature 'client_certificate'
265            Self::ClientCertificateCredential {
266                azure_tenant_id,
267                azure_client_id,
268                certificate_path,
269                certificate_password,
270            } => {
271                let certificate_bytes: Vec<u8> = std::fs::read(certificate_path).map_err(|e| {
272                    Error::with_message(
273                        ErrorKind::Credential,
274                        format!(
275                            "Failed to read certificate file {}: {e}",
276                            certificate_path.display()
277                        ),
278                    )
279                })?;
280
281                // Note: in azure_identity 0.33.0+, this changes to SecretBytes, and the base64 encoding is no longer needed
282                let certificate_base64: azure_core::credentials::Secret =
283                    BASE64_STANDARD.encode(&certificate_bytes).into();
284
285                let mut options: ClientCertificateCredentialOptions =
286                    ClientCertificateCredentialOptions::default();
287                if let Some(password) = certificate_password {
288                    options.password = Some(password.inner().to_string().into());
289                }
290
291                ClientCertificateCredential::new(
292                    azure_tenant_id.clone(),
293                    azure_client_id.clone(),
294                    certificate_base64,
295                    Some(options),
296                )?
297            }
298
299            Self::ClientSecretCredential {
300                azure_tenant_id,
301                azure_client_id,
302                azure_client_secret,
303            } => {
304                if azure_tenant_id.is_empty() {
305                    return Err(Error::with_message(ErrorKind::Credential,
306                        "`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
307                    ));
308                }
309                if azure_client_id.is_empty() {
310                    return Err(Error::with_message(ErrorKind::Credential,
311                        "`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
312                    ));
313                }
314                if azure_client_secret.inner().is_empty() {
315                    return Err(Error::with_message(ErrorKind::Credential,
316                        "`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
317                    ));
318                }
319
320                let secret: String = azure_client_secret.inner().into();
321                ClientSecretCredential::new(
322                    &azure_tenant_id.clone(),
323                    azure_client_id.clone(),
324                    secret.into(),
325                    None,
326                )?
327            }
328
329            Self::ManagedIdentity {
330                user_assigned_managed_identity_id,
331                user_assigned_managed_identity_id_type,
332            } => {
333                let mut options = ManagedIdentityCredentialOptions::default();
334                if let Some(id) = user_assigned_managed_identity_id {
335                    options.user_assigned_id = match user_assigned_managed_identity_id_type
336                        .as_ref()
337                        .unwrap_or(&Default::default())
338                    {
339                        UserAssignedManagedIdentityIdType::ClientId => {
340                            Some(UserAssignedId::ClientId(id.clone()))
341                        }
342                        UserAssignedManagedIdentityIdType::ObjectId => {
343                            Some(UserAssignedId::ObjectId(id.clone()))
344                        }
345                        UserAssignedManagedIdentityIdType::ResourceId => {
346                            Some(UserAssignedId::ResourceId(id.clone()))
347                        }
348                    };
349                }
350                ManagedIdentityCredential::new(Some(options))?
351            }
352
353            Self::ManagedIdentityClientAssertion {
354                user_assigned_managed_identity_id,
355                user_assigned_managed_identity_id_type,
356                client_assertion_tenant_id,
357                client_assertion_client_id,
358            } => {
359                let mut options = ManagedIdentityCredentialOptions::default();
360                if let Some(id) = user_assigned_managed_identity_id {
361                    options.user_assigned_id = match user_assigned_managed_identity_id_type
362                        .as_ref()
363                        .unwrap_or(&Default::default())
364                    {
365                        UserAssignedManagedIdentityIdType::ClientId => {
366                            Some(UserAssignedId::ClientId(id.clone()))
367                        }
368                        UserAssignedManagedIdentityIdType::ObjectId => {
369                            Some(UserAssignedId::ObjectId(id.clone()))
370                        }
371                        UserAssignedManagedIdentityIdType::ResourceId => {
372                            Some(UserAssignedId::ResourceId(id.clone()))
373                        }
374                    };
375                }
376                let msi: Arc<dyn TokenCredential> = ManagedIdentityCredential::new(Some(options))?;
377                let assertion = ManagedIdentityClientAssertion {
378                    credential: msi,
379                    // Future: make this configurable for sovereign clouds? (no way to test...)
380                    scope: "api://AzureADTokenExchange/.default".to_string(),
381                };
382
383                ClientAssertionCredential::new(
384                    client_assertion_tenant_id.clone(),
385                    client_assertion_client_id.clone(),
386                    assertion,
387                    None,
388                )?
389            }
390
391            Self::WorkloadIdentity {
392                tenant_id,
393                client_id,
394                token_file_path,
395            } => {
396                let options = WorkloadIdentityCredentialOptions {
397                    tenant_id: tenant_id.clone(),
398                    client_id: client_id.clone(),
399                    token_file_path: token_file_path.clone(),
400                    ..Default::default()
401                };
402
403                WorkloadIdentityCredential::new(Some(options))?
404            }
405        };
406        Ok(credential)
407    }
408}
409
410#[derive(Debug, Clone)]
411pub struct AzureBlobRequest {
412    pub blob_data: Bytes,
413    pub content_encoding: Option<&'static str>,
414    pub content_type: &'static str,
415    pub metadata: AzureBlobMetadata,
416    pub request_metadata: RequestMetadata,
417}
418
419impl Finalizable for AzureBlobRequest {
420    fn take_finalizers(&mut self) -> EventFinalizers {
421        std::mem::take(&mut self.metadata.finalizers)
422    }
423}
424
425impl MetaDescriptive for AzureBlobRequest {
426    fn get_metadata(&self) -> &RequestMetadata {
427        &self.request_metadata
428    }
429
430    fn metadata_mut(&mut self) -> &mut RequestMetadata {
431        &mut self.request_metadata
432    }
433}
434
435#[derive(Clone, Debug)]
436pub struct AzureBlobMetadata {
437    pub partition_key: String,
438    pub count: usize,
439    pub byte_size: JsonSize,
440    pub finalizers: EventFinalizers,
441}
442
443#[derive(Debug, Clone)]
444pub struct AzureBlobRetryLogic;
445
446impl RetryLogic for AzureBlobRetryLogic {
447    type Error = AzureCoreError;
448    type Request = AzureBlobRequest;
449    type Response = AzureBlobResponse;
450
451    fn is_retriable_error(&self, error: &Self::Error) -> bool {
452        match error.http_status() {
453            Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests,
454            None => false,
455        }
456    }
457}
458
459#[derive(Debug)]
460pub struct AzureBlobResponse {
461    pub events_byte_size: GroupedCountByteSize,
462    pub byte_size: usize,
463}
464
465impl DriverResponse for AzureBlobResponse {
466    fn event_status(&self) -> EventStatus {
467        EventStatus::Delivered
468    }
469
470    fn events_sent(&self) -> &GroupedCountByteSize {
471        &self.events_byte_size
472    }
473
474    fn bytes_sent(&self) -> Option<usize> {
475        Some(self.byte_size)
476    }
477}
478
479#[derive(Debug, Snafu)]
480pub enum HealthcheckError {
481    #[snafu(display("Invalid connection string specified"))]
482    InvalidCredentials,
483    #[snafu(display("Container: {:?} not found", container))]
484    UnknownContainer { container: String },
485    #[snafu(display("Unknown status code: {}", status))]
486    Unknown { status: StatusCode },
487}
488
489pub fn build_healthcheck(
490    container_name: String,
491    client: Arc<BlobContainerClient>,
492) -> crate::Result<Healthcheck> {
493    let healthcheck = async move {
494        let resp: crate::Result<()> = match client.get_properties(None).await {
495            Ok(_) => Ok(()),
496            Err(error) => {
497                let code = error.http_status();
498                Err(match code {
499                    Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials),
500                    Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer {
501                        container: container_name,
502                    }),
503                    Some(status) => Box::new(HealthcheckError::Unknown { status }),
504                    None => "unknown status code".into(),
505                })
506            }
507        };
508        resp
509    };
510
511    Ok(healthcheck.boxed())
512}
513
514pub async fn build_client(
515    auth: Option<AzureAuthentication>,
516    connection_string: String,
517    container_name: String,
518    proxy: &crate::config::ProxyConfig,
519    tls: Option<AzureBlobTlsConfig>,
520) -> crate::Result<Arc<BlobContainerClient>> {
521    // Parse connection string without legacy SDK
522    let parsed = ParsedConnectionString::parse(&connection_string)
523        .map_err(|e| format!("Invalid connection string: {e}"))?;
524    // Compose container URL (SAS appended if present)
525    let container_url = parsed
526        .container_url(&container_name)
527        .map_err(|e| format!("Failed to build container URL: {e}"))?;
528    let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?;
529
530    let mut credential: Option<Arc<dyn TokenCredential>> = None;
531
532    // Prepare options; attach Shared Key policy if needed
533    let mut options = BlobContainerClientOptions::default();
534    match (parsed.auth(), &auth) {
535        (Auth::None, None) => {
536            warn!("No authentication method provided, requests will be anonymous.");
537        }
538        (Auth::Sas { .. }, None) => {
539            info!("Using SAS token authentication.");
540        }
541        (
542            Auth::SharedKey {
543                account_name,
544                account_key,
545            },
546            None,
547        ) => {
548            info!("Using Shared Key authentication.");
549
550            let policy = SharedKeyAuthorizationPolicy::new(
551                account_name,
552                account_key,
553                // Use an Azurite-supported storage service version
554                String::from("2025-11-05"),
555            )
556            .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?;
557            options
558                .client_options
559                .per_call_policies
560                .push(Arc::new(policy));
561        }
562        (Auth::None, Some(AzureAuthentication::Specific(..))) => {
563            info!("Using Azure Authentication method.");
564            let credential_result: Arc<dyn TokenCredential> =
565                auth.unwrap().credential().await.map_err(|e| {
566                    Error::with_message(
567                        ErrorKind::Credential,
568                        format!("Failed to configure Azure Authentication: {e}"),
569                    )
570                })?;
571            credential = Some(credential_result);
572        }
573        (Auth::Sas { .. }, Some(AzureAuthentication::Specific(..))) => {
574            return Err(Box::new(Error::with_message(
575                ErrorKind::Credential,
576                "Cannot use both SAS token and another Azure Authentication method at the same time",
577            )));
578        }
579        (Auth::SharedKey { .. }, Some(AzureAuthentication::Specific(..))) => {
580            return Err(Box::new(Error::with_message(
581                ErrorKind::Credential,
582                "Cannot use both Shared Key and another Azure Authentication method at the same time",
583            )));
584        }
585        #[cfg(test)]
586        (Auth::None, Some(AzureAuthentication::MockCredential)) => {
587            warn!("Using mock token credential authentication.");
588            credential = Some(auth.unwrap().credential().await.unwrap());
589        }
590        #[cfg(test)]
591        (_, Some(AzureAuthentication::MockCredential)) => {
592            return Err(Box::new(Error::with_message(
593                ErrorKind::Credential,
594                "Cannot use both connection string auth and mock credential at the same time",
595            )));
596        }
597    }
598
599    // Use reqwest v0.12 since Azure SDK only implements HttpClient for reqwest::Client v0.12
600    let mut reqwest_builder = reqwest_12::ClientBuilder::new();
601    let bypass_proxy = {
602        let host = url.host_str().unwrap_or("");
603        let port = url.port();
604        proxy.no_proxy.matches(host)
605            || port
606                .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
607                .unwrap_or(false)
608    };
609    if bypass_proxy || !proxy.enabled {
610        // Ensure no proxy (and disable any potential system proxy auto-detection)
611        reqwest_builder = reqwest_builder.no_proxy();
612    } else {
613        if let Some(http) = &proxy.http {
614            let p = reqwest_12::Proxy::http(http)
615                .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
616            // If credentials are embedded in the proxy URL, reqwest will handle them.
617            reqwest_builder = reqwest_builder.proxy(p);
618        }
619        if let Some(https) = &proxy.https {
620            let p = reqwest_12::Proxy::https(https)
621                .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
622            // If credentials are embedded in the proxy URL, reqwest will handle them.
623            reqwest_builder = reqwest_builder.proxy(p);
624        }
625    }
626
627    if let Some(AzureBlobTlsConfig { ca_file }) = &tls
628        && let Some(ca_file) = ca_file
629    {
630        let mut buf = Vec::new();
631        File::open(ca_file)?.read_to_end(&mut buf)?;
632        let cert = reqwest_12::Certificate::from_pem(&buf)?;
633
634        warn!("Adding TLS root certificate from {}", ca_file.display());
635        reqwest_builder = reqwest_builder.add_root_certificate(cert);
636    }
637
638    options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
639        reqwest_builder
640            .build()
641            .map_err(|e| format!("Failed to build reqwest client: {e}"))?,
642    )));
643    let client = BlobContainerClient::from_url(url, credential, Some(options))
644        .map_err(|e| format!("{e}"))?;
645    Ok(Arc::new(client))
646}
647
648#[cfg(test)]
649#[derive(Debug)]
650struct MockTokenCredential;
651
652#[cfg(test)]
653#[async_trait::async_trait]
654impl TokenCredential for MockTokenCredential {
655    async fn get_token(
656        &self,
657        scopes: &[&str],
658        _options: Option<azure_core::credentials::TokenRequestOptions<'_>>,
659    ) -> azure_core::Result<azure_core::credentials::AccessToken> {
660        let Some(scope) = scopes.first() else {
661            return Err(Error::with_message(
662                ErrorKind::Credential,
663                "no scopes were provided",
664            ));
665        };
666
667        // serde_json sometimes does and sometimes doesn't preserve order, be careful to sort
668        // the claims in alphabetical order to ensure a consistent base64 encoding for testing
669        let jwt = serde_json::json!({
670            "aud": scope.strip_suffix("/.default").unwrap_or(*scope),
671            "exp": 2147483647,
672            "iat": 0,
673            "iss": "https://sts.windows.net/",
674            "nbf": 0,
675        });
676
677        // JWTs do not include standard base64 padding.
678        // this seemed cleaner than importing a new crates just for this function
679        let jwt_base64 = format!(
680            "e30.{}.",
681            BASE64_STANDARD
682                .encode(serde_json::to_string(&jwt).unwrap())
683                .trim_end_matches("=")
684        )
685        .to_string();
686
687        warn!(
688            "Using mock token credential, JWT: {}, base64: {}",
689            serde_json::to_string(&jwt).unwrap(),
690            jwt_base64
691        );
692
693        Ok(azure_core::credentials::AccessToken::new(
694            jwt_base64,
695            azure_core::time::OffsetDateTime::now_utc() + std::time::Duration::from_secs(3600),
696        ))
697    }
698}
699
700#[cfg(test)]
701#[tokio::test]
702async fn azure_mock_token_credential_test() {
703    let credential = MockTokenCredential;
704    let access_token = credential
705        .get_token(&["https://example.com/.default"], None)
706        .await
707        .expect("valid credential should return a token");
708    assert_eq!(
709        access_token.token.secret(),
710        "e30.eyJhdWQiOiJodHRwczovL2V4YW1wbGUuY29tIiwiZXhwIjoyMTQ3NDgzNjQ3LCJpYXQiOjAsImlzcyI6Imh0dHBzOi8vc3RzLndpbmRvd3MubmV0LyIsIm5iZiI6MH0."
711    );
712}