vector/internal_events/
http_client.rs

1use std::time::Duration;
2
3use http::{
4    Request, Response,
5    header::{self, HeaderMap, HeaderValue},
6};
7use hyper::{Error, body::HttpBody};
8use vector_lib::{
9    NamedInternalEvent, counter, histogram,
10    internal_event::{CounterName, HistogramName, InternalEvent, error_stage, error_type},
11};
12
13#[derive(Debug, NamedInternalEvent)]
14pub struct AboutToSendHttpRequest<'a, T> {
15    pub request: &'a Request<T>,
16}
17
18fn remove_sensitive(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue> {
19    let mut headers = headers.clone();
20    for name in &[
21        header::AUTHORIZATION,
22        header::PROXY_AUTHORIZATION,
23        header::COOKIE,
24        header::SET_COOKIE,
25    ] {
26        if let Some(value) = headers.get_mut(name) {
27            value.set_sensitive(true);
28        }
29    }
30    headers
31}
32
33impl<T: HttpBody> InternalEvent for AboutToSendHttpRequest<'_, T> {
34    fn emit(self) {
35        debug!(
36            message = "Sending HTTP request.",
37            uri = %self.request.uri(),
38            method = %self.request.method(),
39            version = ?self.request.version(),
40            headers = ?remove_sensitive(self.request.headers()),
41            body = %FormatBody(self.request.body()),
42        );
43        counter!(CounterName::HttpClientRequestsSentTotal, "method" => self.request.method().to_string())
44            .increment(1);
45    }
46}
47
48#[derive(Debug, NamedInternalEvent)]
49pub struct GotHttpResponse<'a, T> {
50    pub response: &'a Response<T>,
51    pub roundtrip: Duration,
52}
53
54impl<T: HttpBody> InternalEvent for GotHttpResponse<'_, T> {
55    fn emit(self) {
56        debug!(
57            message = "HTTP response.",
58            status = %self.response.status(),
59            version = ?self.response.version(),
60            headers = ?remove_sensitive(self.response.headers()),
61            body = %FormatBody(self.response.body()),
62        );
63        counter!(
64            CounterName::HttpClientResponsesTotal,
65            "status" => self.response.status().as_u16().to_string(),
66        )
67        .increment(1);
68        histogram!(HistogramName::HttpClientRttSeconds).record(self.roundtrip);
69        histogram!(
70            HistogramName::HttpClientResponseRttSeconds,
71            "status" => self.response.status().as_u16().to_string(),
72        )
73        .record(self.roundtrip);
74    }
75}
76
77#[derive(Debug, NamedInternalEvent)]
78pub struct GotHttpWarning<'a> {
79    pub error: &'a Error,
80    pub roundtrip: Duration,
81}
82
83impl InternalEvent for GotHttpWarning<'_> {
84    fn emit(self) {
85        warn!(
86            message = "HTTP error.",
87            error = %self.error,
88            error_type = error_type::REQUEST_FAILED,
89            stage = error_stage::PROCESSING,
90        );
91        counter!(CounterName::HttpClientErrorsTotal, "error_kind" => self.error.to_string())
92            .increment(1);
93        histogram!(HistogramName::HttpClientRttSeconds).record(self.roundtrip);
94        histogram!(HistogramName::HttpClientErrorRttSeconds, "error_kind" => self.error.to_string())
95            .record(self.roundtrip);
96    }
97}
98
99/// Newtype placeholder to provide a formatter for the request and response body.
100struct FormatBody<'a, B>(&'a B);
101
102impl<B: HttpBody> std::fmt::Display for FormatBody<'_, B> {
103    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
104        let size = self.0.size_hint();
105        match (size.lower(), size.upper()) {
106            (0, None) => write!(fmt, "[unknown]"),
107            (lower, None) => write!(fmt, "[>={lower} bytes]"),
108
109            (0, Some(0)) => write!(fmt, "[empty]"),
110            (0, Some(upper)) => write!(fmt, "[<={upper} bytes]"),
111
112            (lower, Some(upper)) if lower == upper => write!(fmt, "[{lower} bytes]"),
113            (lower, Some(upper)) => write!(fmt, "[{lower}..={upper} bytes]"),
114        }
115    }
116}