vector/sinks/gcs_common/
config.rs1use std::marker::PhantomData;
2
3use futures::FutureExt;
4use http::{StatusCode, Uri};
5use hyper::Body;
6use snafu::Snafu;
7use vector_lib::configurable::configurable_component;
8
9use crate::{
10 gcp::{GcpAuthenticator, GcpError},
11 http::HttpClient,
12 sinks::{
13 Healthcheck, HealthcheckError,
14 gcs_common::service::GcsResponse,
15 util::retries::{RetryAction, RetryLogic},
16 },
17};
18
19pub fn default_endpoint() -> String {
20 "https://storage.googleapis.com".to_string()
21}
22
23#[configurable_component]
29#[derive(Clone, Copy, Debug, Default)]
30#[serde(rename_all = "kebab-case")]
31pub enum GcsPredefinedAcl {
32 AuthenticatedRead,
37
38 BucketOwnerFullControl,
45
46 BucketOwnerRead,
54
55 Private,
60
61 #[default]
68 ProjectPrivate,
69
70 PublicRead,
75}
76
77#[configurable_component]
83#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
84#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
85pub enum GcsStorageClass {
86 #[default]
90 Standard,
91
92 Nearline,
94
95 Coldline,
97
98 Archive,
100}
101
102#[derive(Debug, Snafu)]
103pub enum GcsError {
104 #[snafu(display("Bucket {:?} not found", bucket))]
105 BucketNotFound { bucket: String },
106}
107
108pub fn build_healthcheck(
109 bucket: String,
110 client: HttpClient,
111 base_url: String,
112 auth: GcpAuthenticator,
113) -> crate::Result<Healthcheck> {
114 let healthcheck = async move {
115 let uri = base_url.parse::<Uri>()?;
116 let mut request = http::Request::head(uri).body(Body::empty())?;
117
118 auth.apply(&mut request);
119
120 let not_found_error = GcsError::BucketNotFound { bucket }.into();
121
122 let response = client.send(request).await?;
123 healthcheck_response(response, not_found_error)
124 };
125
126 Ok(healthcheck.boxed())
127}
128
129pub fn healthcheck_response(
130 response: http::Response<hyper::Body>,
131 not_found_error: crate::Error,
132) -> crate::Result<()> {
133 match response.status() {
134 StatusCode::OK => Ok(()),
135 StatusCode::FORBIDDEN => Err(GcpError::HealthcheckForbidden.into()),
136 StatusCode::NOT_FOUND => Err(not_found_error),
137 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
138 }
139}
140
141pub struct GcsRetryLogic<Request> {
142 request: PhantomData<Request>,
143}
144
145impl<Request> Default for GcsRetryLogic<Request> {
146 fn default() -> Self {
147 Self {
148 request: PhantomData,
149 }
150 }
151}
152
153impl<Request> Clone for GcsRetryLogic<Request> {
154 fn clone(&self) -> Self {
155 Self {
156 request: PhantomData,
157 }
158 }
159}
160
161impl<Request: Clone + Send + Sync + 'static> RetryLogic for GcsRetryLogic<Request> {
163 type Error = hyper::Error;
164 type Request = Request;
165 type Response = GcsResponse;
166
167 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
168 true
169 }
170
171 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
172 let status = response.inner.status();
173
174 match status {
175 StatusCode::UNAUTHORIZED => RetryAction::Retry("unauthorized".into()),
176 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
177 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
178 StatusCode::NOT_IMPLEMENTED => {
179 RetryAction::DontRetry("endpoint not implemented".into())
180 }
181 _ if status.is_server_error() => RetryAction::Retry(status.to_string().into()),
182 _ if status.is_success() => RetryAction::Successful,
183 _ => RetryAction::DontRetry(format!("response status: {status}").into()),
184 }
185 }
186}