vector/sinks/util/
http.rs

1#[cfg(feature = "aws-core")]
2use aws_credential_types::provider::SharedCredentialsProvider;
3#[cfg(feature = "aws-core")]
4use aws_types::region::Region;
5use bytes::{Buf, Bytes};
6use futures::{Sink, future::BoxFuture};
7use headers::HeaderName;
8use http::{HeaderValue, Request, Response, StatusCode, header};
9use http_body::Body as _;
10use tracing::debug;
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13pub struct OrderedHeaderName(HeaderName);
14
15impl OrderedHeaderName {
16    pub const fn new(header_name: HeaderName) -> Self {
17        Self(header_name)
18    }
19
20    pub const fn inner(&self) -> &HeaderName {
21        &self.0
22    }
23}
24
25impl From<HeaderName> for OrderedHeaderName {
26    fn from(header_name: HeaderName) -> Self {
27        Self(header_name)
28    }
29}
30
31impl Ord for OrderedHeaderName {
32    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
33        self.0.as_str().cmp(other.0.as_str())
34    }
35}
36
37impl PartialOrd for OrderedHeaderName {
38    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
39        Some(self.cmp(other))
40    }
41}
42use std::{
43    collections::BTreeMap,
44    fmt,
45    future::Future,
46    hash::Hash,
47    marker::PhantomData,
48    pin::Pin,
49    sync::Arc,
50    task::{Context, Poll, ready},
51    time::Duration,
52};
53
54use hyper::Body;
55use pin_project::pin_project;
56use snafu::{ResultExt, Snafu};
57use tower::{Service, ServiceBuilder};
58use tower_http::decompression::DecompressionLayer;
59use vector_lib::{
60    ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
61    stream::batcher::limiter::ItemBatchSize,
62};
63
64use super::{
65    Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig,
66    TowerRequestSettings,
67    retries::{RetryAction, RetryLogic},
68    sink::{self, Response as _},
69    uri,
70};
71#[cfg(feature = "aws-core")]
72use crate::aws::sign_request;
73use crate::{
74    event::Event,
75    http::{HttpClient, HttpError},
76    internal_events::{EndpointBytesSent, SinkRequestBuildError},
77    sinks::prelude::*,
78    template::Template,
79};
80
81pub trait HttpEventEncoder<Output> {
82    // The encoder handles internal event emission for Error and EventsDropped.
83    fn encode_event(&mut self, event: Event) -> Option<Output>;
84}
85
86pub trait HttpSink: Send + Sync + 'static {
87    type Input;
88    type Output;
89    type Encoder: HttpEventEncoder<Self::Input>;
90
91    fn build_encoder(&self) -> Self::Encoder;
92    fn build_request(
93        &self,
94        events: Self::Output,
95    ) -> impl Future<Output = crate::Result<http::Request<Bytes>>> + Send;
96}
97
98/// Provides a simple wrapper around internal tower and
99/// batching sinks for http.
100///
101/// This type wraps some `HttpSink` and some `Batch` type
102/// and will apply request, batch and tls settings. Internally,
103/// it holds an Arc reference to the `HttpSink`. It then exposes
104/// a `Sink` interface that can be returned from `SinkConfig`.
105///
106/// Implementation details we require to buffer a single item due
107/// to how `Sink` works. This is because we must "encode" the type
108/// to be able to send it to the inner batch type and sink. Because of
109/// this we must provide a single buffer slot. To ensure the buffer is
110/// fully flushed make sure `poll_flush` returns ready.
111///
112/// Note: This has been deprecated, please do not use when creating new Sinks.
113#[pin_project]
114pub struct BatchedHttpSink<T, B, RL = HttpRetryLogic<<B as Batch>::Output>>
115where
116    B: Batch,
117    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
118    T: HttpSink<Input = B::Input, Output = B::Output>,
119    RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
120        + Send
121        + 'static,
122{
123    sink: Arc<T>,
124    #[pin]
125    inner: TowerBatchedSink<
126        HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
127        B,
128        RL,
129    >,
130    encoder: T::Encoder,
131    // An empty slot is needed to buffer an item where we encoded it but
132    // the inner sink is applying back pressure. This trick is used in the `WithFlatMap`
133    // sink combinator. https://docs.rs/futures/0.1.29/src/futures/sink/with_flat_map.rs.html#20
134    slot: Option<EncodedEvent<B::Input>>,
135}
136
137impl<T, B> BatchedHttpSink<T, B>
138where
139    B: Batch,
140    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
141    T: HttpSink<Input = B::Input, Output = B::Output>,
142{
143    pub fn new(
144        sink: T,
145        batch: B,
146        request_settings: TowerRequestSettings,
147        batch_timeout: Duration,
148        client: HttpClient,
149    ) -> Self {
150        Self::with_logic(
151            sink,
152            batch,
153            HttpRetryLogic::default(),
154            request_settings,
155            batch_timeout,
156            client,
157        )
158    }
159}
160
161impl<T, B, RL> BatchedHttpSink<T, B, RL>
162where
163    B: Batch,
164    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
165    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
166        + Send
167        + 'static,
168    T: HttpSink<Input = B::Input, Output = B::Output>,
169{
170    pub fn with_logic(
171        sink: T,
172        batch: B,
173        retry_logic: RL,
174        request_settings: TowerRequestSettings,
175        batch_timeout: Duration,
176        client: HttpClient,
177    ) -> Self {
178        let sink = Arc::new(sink);
179
180        let sink1 = Arc::clone(&sink);
181        let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
182            let sink = Arc::clone(&sink1);
183            Box::pin(async move { sink.build_request(b).await })
184        };
185
186        let svc = HttpBatchService::new(client, request_builder);
187        let inner = request_settings.batch_sink(retry_logic, svc, batch, batch_timeout);
188        let encoder = sink.build_encoder();
189
190        Self {
191            sink,
192            inner,
193            encoder,
194            slot: None,
195        }
196    }
197}
198
199impl<T, B, RL> Sink<Event> for BatchedHttpSink<T, B, RL>
200where
201    B: Batch,
202    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
203    T: HttpSink<Input = B::Input, Output = B::Output>,
204    RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
205        + Send
206        + 'static,
207{
208    type Error = crate::Error;
209
210    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
211        if self.slot.is_some() {
212            match self.as_mut().poll_flush(cx) {
213                Poll::Ready(Ok(())) => {}
214                Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
215                Poll::Pending => {
216                    if self.slot.is_some() {
217                        return Poll::Pending;
218                    }
219                }
220            }
221        }
222
223        Poll::Ready(Ok(()))
224    }
225
226    fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
227        let byte_size = event.size_of();
228        let json_byte_size = event.estimated_json_encoded_size_of();
229        let finalizers = event.metadata_mut().take_finalizers();
230        if let Some(item) = self.encoder.encode_event(event) {
231            *self.project().slot = Some(EncodedEvent {
232                item,
233                finalizers,
234                byte_size,
235                json_byte_size,
236            });
237        }
238
239        Ok(())
240    }
241
242    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
243        let mut this = self.project();
244        if this.slot.is_some() {
245            ready!(this.inner.as_mut().poll_ready(cx))?;
246            this.inner.as_mut().start_send(this.slot.take().unwrap())?;
247        }
248
249        this.inner.poll_flush(cx)
250    }
251
252    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
253        ready!(self.as_mut().poll_flush(cx))?;
254        self.project().inner.poll_close(cx)
255    }
256}
257
258/// Note: This has been deprecated, please do not use when creating new Sinks.
259#[pin_project]
260pub struct PartitionHttpSink<T, B, K, RL = HttpRetryLogic<<B as Batch>::Output>>
261where
262    B: Batch,
263    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
264    B::Input: Partition<K>,
265    K: Hash + Eq + Clone + Send + 'static,
266    T: HttpSink<Input = B::Input, Output = B::Output>,
267    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
268{
269    sink: Arc<T>,
270    #[pin]
271    inner: TowerPartitionSink<
272        HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
273        B,
274        RL,
275        K,
276    >,
277    encoder: T::Encoder,
278    slot: Option<EncodedEvent<B::Input>>,
279}
280
281impl<T, B, K> PartitionHttpSink<T, B, K, HttpRetryLogic<<B as Batch>::Output>>
282where
283    B: Batch,
284    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
285    B::Input: Partition<K>,
286    K: Hash + Eq + Clone + Send + 'static,
287    T: HttpSink<Input = B::Input, Output = B::Output>,
288{
289    pub fn new(
290        sink: T,
291        batch: B,
292        request_settings: TowerRequestSettings,
293        batch_timeout: Duration,
294        client: HttpClient,
295    ) -> Self {
296        Self::with_retry_logic(
297            sink,
298            batch,
299            HttpRetryLogic::default(),
300            request_settings,
301            batch_timeout,
302            client,
303        )
304    }
305}
306
307impl<T, B, K, RL> PartitionHttpSink<T, B, K, RL>
308where
309    B: Batch,
310    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
311    B::Input: Partition<K>,
312    K: Hash + Eq + Clone + Send + 'static,
313    T: HttpSink<Input = B::Input, Output = B::Output>,
314    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
315        + Send
316        + 'static,
317{
318    pub fn with_retry_logic(
319        sink: T,
320        batch: B,
321        retry_logic: RL,
322        request_settings: TowerRequestSettings,
323        batch_timeout: Duration,
324        client: HttpClient,
325    ) -> Self {
326        let sink = Arc::new(sink);
327
328        let sink1 = Arc::clone(&sink);
329        let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
330            let sink = Arc::clone(&sink1);
331            Box::pin(async move { sink.build_request(b).await })
332        };
333
334        let svc = HttpBatchService::new(client, request_builder);
335        let inner = request_settings.partition_sink(retry_logic, svc, batch, batch_timeout);
336        let encoder = sink.build_encoder();
337
338        Self {
339            sink,
340            inner,
341            encoder,
342            slot: None,
343        }
344    }
345
346    /// Enforces per partition ordering of request.
347    pub fn ordered(mut self) -> Self {
348        self.inner.ordered();
349        self
350    }
351}
352
353impl<T, B, K, RL> Sink<Event> for PartitionHttpSink<T, B, K, RL>
354where
355    B: Batch,
356    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
357    B::Input: Partition<K>,
358    K: Hash + Eq + Clone + Send + 'static,
359    T: HttpSink<Input = B::Input, Output = B::Output>,
360    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
361{
362    type Error = crate::Error;
363
364    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
365        if self.slot.is_some() {
366            match self.as_mut().poll_flush(cx) {
367                Poll::Ready(Ok(())) => {}
368                Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
369                Poll::Pending => {
370                    if self.slot.is_some() {
371                        return Poll::Pending;
372                    }
373                }
374            }
375        }
376
377        Poll::Ready(Ok(()))
378    }
379
380    fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
381        let finalizers = event.metadata_mut().take_finalizers();
382        let byte_size = event.size_of();
383        let json_byte_size = event.estimated_json_encoded_size_of();
384
385        if let Some(item) = self.encoder.encode_event(event) {
386            *self.project().slot = Some(EncodedEvent {
387                item,
388                finalizers,
389                byte_size,
390                json_byte_size,
391            });
392        }
393
394        Ok(())
395    }
396
397    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
398        let mut this = self.project();
399        if this.slot.is_some() {
400            ready!(this.inner.as_mut().poll_ready(cx))?;
401            this.inner.as_mut().start_send(this.slot.take().unwrap())?;
402        }
403
404        this.inner.poll_flush(cx)
405    }
406
407    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
408        ready!(self.as_mut().poll_flush(cx))?;
409        self.project().inner.poll_close(cx)
410    }
411}
412
413#[cfg(feature = "aws-core")]
414#[derive(Clone)]
415pub struct SigV4Config {
416    pub(crate) shared_credentials_provider: SharedCredentialsProvider,
417    pub(crate) region: Region,
418    pub(crate) service: String,
419}
420
421/// @struct HttpBatchService
422///
423/// NOTE: This has been deprecated, please do not use directly when creating new sinks.
424///       The `HttpService` currently wraps this structure. Eventually all sinks currently using the
425///       HttpBatchService directly should be updated to use `HttpService`. At which time we can
426///       remove this struct and inline the functionality into the `HttpService` directly.
427pub struct HttpBatchService<F, B = Bytes> {
428    inner: HttpClient<Body>,
429    request_builder: Arc<dyn Fn(B) -> F + Send + Sync>,
430    #[cfg(feature = "aws-core")]
431    sig_v4_config: Option<SigV4Config>,
432}
433
434impl<F, B> HttpBatchService<F, B> {
435    pub fn new(
436        inner: HttpClient,
437        request_builder: impl Fn(B) -> F + Send + Sync + 'static,
438    ) -> Self {
439        HttpBatchService {
440            inner,
441            request_builder: Arc::new(Box::new(request_builder)),
442            #[cfg(feature = "aws-core")]
443            sig_v4_config: None,
444        }
445    }
446
447    #[cfg(feature = "aws-core")]
448    pub fn new_with_sig_v4(
449        inner: HttpClient,
450        request_builder: impl Fn(B) -> F + Send + Sync + 'static,
451        sig_v4_config: SigV4Config,
452    ) -> Self {
453        HttpBatchService {
454            inner,
455            request_builder: Arc::new(Box::new(request_builder)),
456            sig_v4_config: Some(sig_v4_config),
457        }
458    }
459}
460
461impl<F, B> Service<B> for HttpBatchService<F, B>
462where
463    F: Future<Output = crate::Result<hyper::Request<Bytes>>> + Send + 'static,
464    B: ByteSizeOf + Send + 'static,
465{
466    type Response = http::Response<Bytes>;
467    type Error = crate::Error;
468    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
469
470    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
471        Poll::Ready(Ok(()))
472    }
473
474    fn call(&mut self, body: B) -> Self::Future {
475        let request_builder = Arc::clone(&self.request_builder);
476        #[cfg(feature = "aws-core")]
477        let sig_v4_config = self.sig_v4_config.clone();
478        let http_client = self.inner.clone();
479
480        Box::pin(async move {
481            let request = request_builder(body).await.inspect_err(|error| {
482                emit!(SinkRequestBuildError { error });
483            })?;
484
485            #[cfg(feature = "aws-core")]
486            let request = match sig_v4_config {
487                None => request,
488                Some(sig_v4_config) => {
489                    let mut signed_request = request;
490                    sign_request(
491                        sig_v4_config.service.as_str(),
492                        &mut signed_request,
493                        &sig_v4_config.shared_credentials_provider,
494                        Some(&sig_v4_config.region),
495                        false,
496                    )
497                    .await?;
498
499                    signed_request
500                }
501            };
502            let byte_size = request.body().len();
503            let request = request.map(Body::from);
504            let (protocol, endpoint) = uri::protocol_endpoint(request.uri().clone());
505
506            let mut decompression_service = ServiceBuilder::new()
507                .layer(DecompressionLayer::new())
508                .service(http_client);
509
510            // Any errors raised in `http_client.call` results in a `GotHttpWarning` event being emitted
511            // in `HttpClient::send`. This does not result in incrementing `component_errors_total` however,
512            // because that is incremented by the driver when retries have been exhausted.
513            let response = decompression_service.call(request).await?;
514
515            if response.status().is_success() {
516                emit!(EndpointBytesSent {
517                    byte_size,
518                    protocol: &protocol,
519                    endpoint: &endpoint
520                });
521            }
522
523            let (parts, body) = response.into_parts();
524            let mut body = body.collect().await?.aggregate();
525            Ok(hyper::Response::from_parts(
526                parts,
527                body.copy_to_bytes(body.remaining()),
528            ))
529        })
530    }
531}
532
533impl<F, B> Clone for HttpBatchService<F, B> {
534    fn clone(&self) -> Self {
535        Self {
536            inner: self.inner.clone(),
537            request_builder: Arc::clone(&self.request_builder),
538            #[cfg(feature = "aws-core")]
539            sig_v4_config: self.sig_v4_config.clone(),
540        }
541    }
542}
543
544impl<T: fmt::Debug> sink::Response for http::Response<T> {
545    fn is_successful(&self) -> bool {
546        self.status().is_success()
547    }
548
549    fn is_transient(&self) -> bool {
550        self.status().is_server_error()
551    }
552}
553
554/// Serializes and deserializes a [`Vec<StatusCode>`]
555mod status_code_vec {
556    use http::StatusCode;
557    use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error};
558
559    /// Deserializes a [`Vec<StatusCode>`]
560    pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<StatusCode>, D::Error>
561    where
562        D: Deserializer<'de>,
563    {
564        Vec::<u16>::deserialize(deserializer)?
565            .into_iter()
566            .map(StatusCode::from_u16)
567            .collect::<Result<Vec<_>, _>>()
568            .map_err(Error::custom)
569    }
570
571    /// Serializes a [`Vec<StatusCode>`]
572    pub fn serialize<S>(status_codes: &[StatusCode], serializer: S) -> Result<S::Ok, S::Error>
573    where
574        S: Serializer,
575    {
576        status_codes
577            .iter()
578            .map(StatusCode::as_u16)
579            .collect::<Vec<_>>()
580            .serialize(serializer)
581    }
582}
583
584/// Configurable retry strategy for `http` based sinks.
585///
586/// For more information about error responses, see [Client Error Responses][error_responses].
587///
588/// [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses
589#[configurable_component]
590#[derive(Debug, Clone, Default, PartialEq)]
591#[serde(tag = "type", rename_all = "snake_case")]
592#[configurable(metadata(docs::enum_tag_description = "The retry strategy enum."))]
593pub enum RetryStrategy {
594    /// Don't retry any errors, including request timeouts.
595    None,
596
597    /// Default strategy. See [`RetryStrategy::retry_action`] for more details.
598    #[default]
599    Default,
600
601    /// Retry on *all* HTTP status codes except for success codes (2xx)
602    All,
603
604    /// Custom retry strategy
605    Custom {
606        /// Retry on these specific HTTP status codes
607        #[serde(with = "status_code_vec")]
608        status_codes: Vec<StatusCode>,
609    },
610}
611
612impl RetryStrategy {
613    /// Returns the name of the retry strategy.
614    #[must_use]
615    const fn name(&self) -> &str {
616        match self {
617            Self::None => "Never retry strategy",
618            Self::Default => "Default retry strategy",
619            Self::All => "Retry all strategy",
620            Self::Custom { .. } => "Custom retry strategy",
621        }
622    }
623
624    /// Determines if the given status code should be retried.
625    ///
626    /// For the `Default` strategy, the following status codes will be retried:
627    /// - 429 (Too Many Requests)
628    /// - 408 (Request Timeout)
629    /// - 5xx (Server Error)
630    ///
631    /// For the `Custom` strategy, the status codes specified in the `status_codes` field will be retried.
632    ///
633    /// For the `All` strategy, all non-success status codes will be retried.
634    #[must_use]
635    pub fn retry_action<Req>(&self, status: http::StatusCode) -> RetryAction<Req> {
636        if status.is_success() {
637            return RetryAction::Successful;
638        }
639
640        let reason = format!(
641            "{}: {}",
642            self.name(),
643            status.canonical_reason().unwrap_or_else(|| status.as_str())
644        )
645        .into();
646
647        match self {
648            Self::None => RetryAction::DontRetry(reason),
649            Self::Default => match status {
650                StatusCode::TOO_MANY_REQUESTS | StatusCode::REQUEST_TIMEOUT => {
651                    RetryAction::Retry(reason)
652                }
653                StatusCode::NOT_IMPLEMENTED => RetryAction::DontRetry(reason),
654                _ => {
655                    if status.is_server_error() {
656                        RetryAction::Retry(reason)
657                    } else {
658                        RetryAction::DontRetry(reason)
659                    }
660                }
661            },
662            Self::All => RetryAction::Retry(reason),
663            Self::Custom { status_codes } => {
664                if status_codes.contains(&status) {
665                    RetryAction::Retry(reason)
666                } else {
667                    RetryAction::DontRetry(reason)
668                }
669            }
670        }
671    }
672}
673
674#[derive(Debug, Clone)]
675pub struct HttpRetryLogic<Req> {
676    request: PhantomData<Req>,
677    retry_strategy: RetryStrategy,
678}
679
680impl<Req> Default for HttpRetryLogic<Req> {
681    fn default() -> Self {
682        Self {
683            request: PhantomData,
684            retry_strategy: RetryStrategy::Default,
685        }
686    }
687}
688
689impl<Req: Clone + Send + Sync + 'static> RetryLogic for HttpRetryLogic<Req> {
690    type Error = HttpError;
691    type Request = Req;
692    type Response = hyper::Response<Bytes>;
693
694    fn is_retriable_error(&self, error: &Self::Error) -> bool {
695        if self.retry_strategy == RetryStrategy::None {
696            false
697        } else {
698            error.is_retriable()
699        }
700    }
701
702    fn is_retriable_timeout(&self) -> bool {
703        self.retry_strategy != RetryStrategy::None
704    }
705
706    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
707        let status = response.status();
708        if !status.is_success() {
709            debug!(
710                message = "HTTP response.",
711                %status,
712                body = %String::from_utf8_lossy(response.body()),
713            );
714        }
715        self.retry_strategy.retry_action(status)
716    }
717}
718
719/// A more generic version of `HttpRetryLogic` that accepts anything that can be converted
720/// to a status code
721#[derive(Debug)]
722pub struct HttpStatusRetryLogic<F, Req, Res> {
723    func: F,
724    request: PhantomData<Req>,
725    response: PhantomData<Res>,
726    retry_strategy: RetryStrategy,
727}
728
729impl<F, Req, Res> HttpStatusRetryLogic<F, Req, Res>
730where
731    F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
732    Req: Send + Sync + 'static,
733    Res: Send + Sync + 'static,
734{
735    pub const fn new(func: F, retry_strategy: RetryStrategy) -> HttpStatusRetryLogic<F, Req, Res> {
736        HttpStatusRetryLogic {
737            func,
738            request: PhantomData,
739            response: PhantomData,
740            retry_strategy,
741        }
742    }
743}
744
745impl<F, Req, Res> RetryLogic for HttpStatusRetryLogic<F, Req, Res>
746where
747    F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
748    Req: Send + Sync + 'static,
749    Res: Send + Sync + 'static,
750{
751    type Error = HttpError;
752    type Request = Req;
753    type Response = Res;
754
755    fn is_retriable_error(&self, error: &Self::Error) -> bool {
756        if self.retry_strategy == RetryStrategy::None {
757            false
758        } else {
759            error.is_retriable()
760        }
761    }
762
763    fn is_retriable_timeout(&self) -> bool {
764        self.retry_strategy != RetryStrategy::None
765    }
766
767    fn should_retry_response(&self, response: &Res) -> RetryAction<Req> {
768        let status = (self.func)(response);
769        self.retry_strategy.retry_action(status)
770    }
771}
772
773impl<F, Req, Res> Clone for HttpStatusRetryLogic<F, Req, Res>
774where
775    F: Clone,
776{
777    fn clone(&self) -> Self {
778        Self {
779            func: self.func.clone(),
780            request: PhantomData,
781            response: PhantomData,
782            retry_strategy: self.retry_strategy.clone(),
783        }
784    }
785}
786
787/// Outbound HTTP request settings.
788#[configurable_component]
789#[derive(Clone, Debug, Default)]
790pub struct RequestConfig {
791    #[serde(flatten)]
792    pub tower: TowerRequestConfig,
793
794    /// Additional HTTP headers to add to every HTTP request.
795    #[serde(default)]
796    #[configurable(metadata(
797        docs::additional_props_description = "An HTTP request header and its value. Both header names and values support templating with event data."
798    ))]
799    #[configurable(metadata(docs::examples = "headers_examples()"))]
800    pub headers: BTreeMap<String, String>,
801}
802
803fn headers_examples() -> BTreeMap<String, String> {
804    btreemap! {
805        "Accept" => "text/plain",
806        "X-My-Custom-Header" => "A-Value",
807        "X-Event-Level" => "{{level}}",
808        "X-Event-Timestamp" => "{{timestamp}}",
809    }
810}
811
812impl RequestConfig {
813    pub fn split_headers(&self) -> (BTreeMap<String, String>, BTreeMap<String, Template>) {
814        let mut static_headers = BTreeMap::new();
815        let mut template_headers = BTreeMap::new();
816
817        for (name, value) in &self.headers {
818            match Template::try_from(value.as_str()) {
819                Ok(template) if !template.is_dynamic() => {
820                    static_headers.insert(name.clone(), value.clone());
821                }
822                Ok(template) => {
823                    template_headers.insert(name.clone(), template);
824                }
825                Err(_) => {
826                    static_headers.insert(name.clone(), value.clone());
827                }
828            }
829        }
830
831        (static_headers, template_headers)
832    }
833}
834
835#[derive(Debug, Snafu)]
836pub enum HeaderValidationError {
837    #[snafu(display("{}: {}", source, name))]
838    InvalidHeaderName {
839        name: String,
840        source: header::InvalidHeaderName,
841    },
842    #[snafu(display("{}: {}", source, value))]
843    InvalidHeaderValue {
844        value: String,
845        source: header::InvalidHeaderValue,
846    },
847}
848
849pub fn validate_headers(
850    headers: &BTreeMap<String, String>,
851) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
852    let mut validated_headers = BTreeMap::new();
853    for (name, value) in headers {
854        let name = HeaderName::from_bytes(name.as_bytes())
855            .with_context(|_| InvalidHeaderNameSnafu { name })?;
856        let value = HeaderValue::from_bytes(value.as_bytes())
857            .with_context(|_| InvalidHeaderValueSnafu { value })?;
858
859        validated_headers.insert(name.into(), value);
860    }
861
862    Ok(validated_headers)
863}
864
865/// Request type for use in the `Service` implementation of HTTP stream sinks.
866#[derive(Debug, Clone)]
867pub struct HttpRequest<T: Send> {
868    payload: Bytes,
869    finalizers: EventFinalizers,
870    request_metadata: RequestMetadata,
871    additional_metadata: T,
872}
873
874impl<T: Send> HttpRequest<T> {
875    /// Creates a new `HttpRequest`.
876    pub const fn new(
877        payload: Bytes,
878        finalizers: EventFinalizers,
879        request_metadata: RequestMetadata,
880        additional_metadata: T,
881    ) -> Self {
882        Self {
883            payload,
884            finalizers,
885            request_metadata,
886            additional_metadata,
887        }
888    }
889
890    pub const fn get_additional_metadata(&self) -> &T {
891        &self.additional_metadata
892    }
893
894    pub fn take_payload(&mut self) -> Bytes {
895        std::mem::take(&mut self.payload)
896    }
897}
898
899impl<T: Send> Finalizable for HttpRequest<T> {
900    fn take_finalizers(&mut self) -> EventFinalizers {
901        self.finalizers.take_finalizers()
902    }
903}
904
905impl<T: Send> MetaDescriptive for HttpRequest<T> {
906    fn get_metadata(&self) -> &RequestMetadata {
907        &self.request_metadata
908    }
909
910    fn metadata_mut(&mut self) -> &mut RequestMetadata {
911        &mut self.request_metadata
912    }
913}
914
915impl<T: Send> ByteSizeOf for HttpRequest<T> {
916    fn allocated_bytes(&self) -> usize {
917        self.payload.allocated_bytes() + self.finalizers.allocated_bytes()
918    }
919}
920
921/// Response type for use in the `Service` implementation of HTTP stream sinks.
922pub struct HttpResponse {
923    pub http_response: Response<Bytes>,
924    pub events_byte_size: GroupedCountByteSize,
925    pub raw_byte_size: usize,
926}
927
928impl DriverResponse for HttpResponse {
929    fn event_status(&self) -> EventStatus {
930        if self.http_response.is_successful() {
931            EventStatus::Delivered
932        } else if self.http_response.is_transient() {
933            EventStatus::Errored
934        } else {
935            EventStatus::Rejected
936        }
937    }
938
939    fn events_sent(&self) -> &GroupedCountByteSize {
940        &self.events_byte_size
941    }
942
943    fn bytes_sent(&self) -> Option<usize> {
944        Some(self.raw_byte_size)
945    }
946}
947
948/// Creates a `RetryLogic` for use with `HttpResponse`.
949pub fn http_response_retry_logic<Request: Clone + Send + Sync + 'static>(
950    retry_strategy: RetryStrategy,
951) -> HttpStatusRetryLogic<
952    impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static,
953    Request,
954    HttpResponse,
955> {
956    HttpStatusRetryLogic::new(
957        |req: &HttpResponse| req.http_response.status(),
958        retry_strategy,
959    )
960}
961
962/// Uses the estimated json encoded size to determine batch sizing.
963#[derive(Default)]
964pub struct HttpJsonBatchSizer;
965
966impl ItemBatchSize<Event> for HttpJsonBatchSizer {
967    fn size(&self, item: &Event) -> usize {
968        item.estimated_json_encoded_size_of().get()
969    }
970}
971
972/// HTTP request builder for HTTP stream sinks using the generic `HttpService`
973pub trait HttpServiceRequestBuilder<T: Send> {
974    fn build(&self, request: HttpRequest<T>) -> Result<Request<Bytes>, crate::Error>;
975}
976
977/// Generic 'Service' implementation for HTTP stream sinks.
978#[derive(Clone)]
979pub struct HttpService<B, T: Send> {
980    batch_service:
981        HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HttpRequest<T>>,
982    _phantom: PhantomData<B>,
983}
984
985impl<B, T: Send + 'static> HttpService<B, T>
986where
987    B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
988{
989    pub fn new(http_client: HttpClient<Body>, http_request_builder: B) -> Self {
990        let http_request_builder = Arc::new(http_request_builder);
991
992        let batch_service = HttpBatchService::new(http_client, move |req: HttpRequest<T>| {
993            let request_builder = Arc::clone(&http_request_builder);
994
995            let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
996                Box::pin(async move { request_builder.build(req) });
997
998            fut
999        });
1000        Self {
1001            batch_service,
1002            _phantom: PhantomData,
1003        }
1004    }
1005
1006    #[cfg(feature = "aws-core")]
1007    pub fn new_with_sig_v4(
1008        http_client: HttpClient<Body>,
1009        http_request_builder: B,
1010        sig_v4_config: SigV4Config,
1011    ) -> Self {
1012        let http_request_builder = Arc::new(http_request_builder);
1013
1014        let batch_service = HttpBatchService::new_with_sig_v4(
1015            http_client,
1016            move |req: HttpRequest<T>| {
1017                let request_builder = Arc::clone(&http_request_builder);
1018
1019                let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
1020                    Box::pin(async move { request_builder.build(req) });
1021
1022                fut
1023            },
1024            sig_v4_config,
1025        );
1026        Self {
1027            batch_service,
1028            _phantom: PhantomData,
1029        }
1030    }
1031}
1032
1033impl<B, T: Send + 'static> Service<HttpRequest<T>> for HttpService<B, T>
1034where
1035    B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
1036{
1037    type Response = HttpResponse;
1038    type Error = crate::Error;
1039    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
1040
1041    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1042        Poll::Ready(Ok(()))
1043    }
1044
1045    fn call(&mut self, mut request: HttpRequest<T>) -> Self::Future {
1046        let mut http_service = self.batch_service.clone();
1047
1048        // NOTE: By taking the metadata here, when passing the request to `call()` below,
1049        //       that function does not have access to the metadata anymore.
1050        let metadata = std::mem::take(request.metadata_mut());
1051        let raw_byte_size = metadata.request_encoded_size();
1052        let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
1053
1054        Box::pin(async move {
1055            let http_response = http_service.call(request).await?;
1056
1057            Ok(HttpResponse {
1058                http_response,
1059                events_byte_size,
1060                raw_byte_size,
1061            })
1062        })
1063    }
1064}
1065
1066#[cfg(test)]
1067mod test {
1068    #![allow(clippy::print_stderr)] //tests
1069
1070    use futures::{StreamExt, future::ready};
1071    use hyper::{
1072        Response, Server, Uri,
1073        service::{make_service_fn, service_fn},
1074    };
1075
1076    use super::*;
1077    use crate::{config::ProxyConfig, test_util::addr::next_addr};
1078
1079    #[test]
1080    fn util_http_retry_logic() {
1081        let logic = HttpRetryLogic::<()>::default();
1082
1083        let response_408 = Response::builder().status(408).body(Bytes::new()).unwrap();
1084        let response_429 = Response::builder().status(429).body(Bytes::new()).unwrap();
1085        let response_500 = Response::builder().status(500).body(Bytes::new()).unwrap();
1086        let response_400 = Response::builder().status(400).body(Bytes::new()).unwrap();
1087        let response_501 = Response::builder().status(501).body(Bytes::new()).unwrap();
1088        assert!(logic.should_retry_response(&response_429).is_retryable());
1089        assert!(logic.should_retry_response(&response_500).is_retryable());
1090        assert!(logic.should_retry_response(&response_408).is_retryable());
1091        assert!(
1092            logic
1093                .should_retry_response(&response_400)
1094                .is_not_retryable()
1095        );
1096        assert!(
1097            logic
1098                .should_retry_response(&response_501)
1099                .is_not_retryable()
1100        );
1101    }
1102
1103    #[test]
1104    fn retry_strategy_none_preserves_success_and_rejects_failures() {
1105        let strategy = RetryStrategy::None;
1106
1107        assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful());
1108        assert!(
1109            strategy
1110                .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR)
1111                .is_not_retryable()
1112        );
1113    }
1114
1115    #[test]
1116    fn retry_strategy_none_disables_timeout_retries() {
1117        let logic = HttpRetryLogic::<()> {
1118            request: PhantomData,
1119            retry_strategy: RetryStrategy::None,
1120        };
1121        let status_logic =
1122            HttpStatusRetryLogic::<_, (), ()>::new(|_: &()| StatusCode::OK, RetryStrategy::None);
1123
1124        assert!(!logic.is_retriable_timeout());
1125        assert!(!status_logic.is_retriable_timeout());
1126    }
1127
1128    #[test]
1129    fn retry_strategy_all_preserves_success_and_retries_failures() {
1130        let strategy = RetryStrategy::All;
1131
1132        assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful());
1133        assert!(
1134            strategy
1135                .retry_action::<()>(StatusCode::BAD_REQUEST)
1136                .is_retryable()
1137        );
1138        assert!(
1139            strategy
1140                .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR)
1141                .is_retryable()
1142        );
1143    }
1144
1145    #[test]
1146    fn retry_strategy_custom_only_retries_configured_statuses() {
1147        let strategy = RetryStrategy::Custom {
1148            status_codes: vec![StatusCode::BAD_REQUEST],
1149        };
1150
1151        assert!(strategy.retry_action::<()>(StatusCode::OK).is_successful());
1152        assert!(
1153            strategy
1154                .retry_action::<()>(StatusCode::BAD_REQUEST)
1155                .is_retryable()
1156        );
1157        assert!(
1158            strategy
1159                .retry_action::<()>(StatusCode::INTERNAL_SERVER_ERROR)
1160                .is_not_retryable()
1161        );
1162    }
1163
1164    #[test]
1165    fn retry_strategy_custom_serde_roundtrips_status_codes() {
1166        let json = r#"{"type":"custom","status_codes":[400,503]}"#;
1167        let strategy: RetryStrategy = serde_json::from_str(json).unwrap();
1168        assert_eq!(
1169            strategy,
1170            RetryStrategy::Custom {
1171                status_codes: vec![StatusCode::BAD_REQUEST, StatusCode::SERVICE_UNAVAILABLE],
1172            }
1173        );
1174        let encoded = serde_json::to_string(&strategy).unwrap();
1175        let roundtrip: RetryStrategy = serde_json::from_str(&encoded).unwrap();
1176        assert_eq!(roundtrip, strategy);
1177    }
1178
1179    #[test]
1180    fn retry_strategy_custom_serde_rejects_invalid_status_codes() {
1181        // `http::StatusCode::from_u16` only accepts 100–999; 1000 is out of range.
1182        let json = r#"{"type":"custom","status_codes":[1000]}"#;
1183        let result = serde_json::from_str::<RetryStrategy>(json);
1184        assert!(
1185            result.is_err(),
1186            "expected invalid status code to fail deserialization"
1187        );
1188    }
1189
1190    #[tokio::test]
1191    async fn util_http_it_makes_http_requests() {
1192        let (_guard, addr) = next_addr();
1193
1194        let uri = format!("http://{}:{}/", addr.ip(), addr.port())
1195            .parse::<Uri>()
1196            .unwrap();
1197
1198        let request = Bytes::from("hello");
1199        let proxy = ProxyConfig::default();
1200        let client = HttpClient::new(None, &proxy).unwrap();
1201        let mut service = HttpBatchService::new(client, move |body: Bytes| {
1202            Box::pin(ready(
1203                http::Request::post(&uri).body(body).map_err(Into::into),
1204            ))
1205        });
1206
1207        let (tx, rx) = futures::channel::mpsc::channel(10);
1208
1209        let new_service = make_service_fn(move |_| {
1210            let tx = tx.clone();
1211
1212            let svc = service_fn(move |req: http::Request<Body>| {
1213                let mut tx = tx.clone();
1214
1215                async move {
1216                    let mut body = http_body::Body::collect(req.into_body())
1217                        .await
1218                        .map_err(|error| format!("error: {error}"))?
1219                        .aggregate();
1220                    let string = String::from_utf8(body.copy_to_bytes(body.remaining()).to_vec())
1221                        .map_err(|_| "Wasn't UTF-8".to_string())?;
1222                    tx.try_send(string).map_err(|_| "Send error".to_string())?;
1223
1224                    Ok::<_, crate::Error>(Response::new(Body::from("")))
1225                }
1226            });
1227
1228            async move { Ok::<_, std::convert::Infallible>(svc) }
1229        });
1230
1231        tokio::spawn(async move {
1232            if let Err(error) = Server::bind(&addr).serve(new_service).await {
1233                eprintln!("Server error: {error}");
1234            }
1235        });
1236
1237        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1238        service.call(request).await.unwrap();
1239
1240        let (body, _rest) = StreamExt::into_future(rx).await;
1241        assert_eq!(body.unwrap(), "hello");
1242    }
1243}