vector/sinks/
opendal_common.rs

1//! opendal_common provide real sink supports for all opendal based services.
2//!
3//! # TODO
4//!
5//! opendal service now only support very basic sink features. To make it
6//! useful, we need to add the following features:
7//!
8//! - Error handling
9//! - Limitation
10
11use std::{fmt, task::Poll};
12
13use bytes::Bytes;
14use opendal::Operator;
15use snafu::Snafu;
16use tracing::Instrument;
17use vector_lib::codecs::encoding::Framer;
18
19use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner};
20
21/// OpenDalSink provides generic a service upon OpenDAL.
22///
23/// # Notes
24///
25/// OpenDAL based service only need to provide a `<Service>Config`, and
26/// implement `build_processor` like `WebHdfs` does.
27pub struct OpenDalSink<Svc> {
28    service: Svc,
29    request_builder: OpenDalRequestBuilder,
30    partitioner: KeyPartitioner,
31    batcher_settings: BatcherSettings,
32}
33
34impl<Svc> OpenDalSink<Svc> {
35    /// Build a new OpenDalSink via given input
36    pub const fn new(
37        service: Svc,
38        request_builder: OpenDalRequestBuilder,
39        partitioner: KeyPartitioner,
40        batcher_settings: BatcherSettings,
41    ) -> Self {
42        Self {
43            service,
44            request_builder,
45            partitioner,
46            batcher_settings,
47        }
48    }
49}
50
51#[async_trait::async_trait]
52impl<Svc> StreamSink<Event> for OpenDalSink<Svc>
53where
54    Svc: Service<OpenDalRequest> + Send + 'static,
55    Svc::Future: Send + 'static,
56    Svc::Response: DriverResponse + Send + 'static,
57    Svc::Error: fmt::Debug + Into<crate::Error> + Send,
58{
59    async fn run(
60        self: Box<Self>,
61        input: futures_util::stream::BoxStream<'_, Event>,
62    ) -> Result<(), ()> {
63        self.run_inner(input).await
64    }
65}
66
67impl<Svc> OpenDalSink<Svc>
68where
69    Svc: Service<OpenDalRequest> + Send + 'static,
70    Svc::Future: Send + 'static,
71    Svc::Response: DriverResponse + Send + 'static,
72    Svc::Error: fmt::Debug + Into<crate::Error> + Send,
73{
74    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
75        let partitioner = self.partitioner;
76        let settings = self.batcher_settings;
77
78        let request_builder = self.request_builder;
79
80        input
81            .batched_partitioned(partitioner, settings.timeout, |_| {
82                settings.as_byte_size_config()
83            })
84            .filter_map(|(key, batch)| async move {
85                // We don't need to emit an error here if the event is dropped since this will occur if the template
86                // couldn't be rendered during the partitioning. A `TemplateRenderingError` is already emitted when
87                // that occurs.
88                key.map(move |k| (k, batch))
89            })
90            .request_builder(default_request_builder_concurrency_limit(), request_builder)
91            .filter_map(|request| async move {
92                match request {
93                    Err(error) => {
94                        emit!(SinkRequestBuildError { error });
95                        None
96                    }
97                    Ok(req) => Some(req),
98                }
99            })
100            .into_driver(self.service)
101            // TODO: set protocol with services scheme instead hardcoded file
102            .protocol("file")
103            .run()
104            .await
105    }
106}
107
108/// OpenDalService is just a simple wrapper of `opendal::Operator` to
109/// implement traits we needed.
110#[derive(Debug, Clone)]
111pub struct OpenDalService {
112    op: Operator,
113}
114
115impl OpenDalService {
116    pub const fn new(op: Operator) -> OpenDalService {
117        OpenDalService { op }
118    }
119}
120
121/// OpenDalRequest is request will be handled by opendal services.
122///
123/// It will carry all information that opendal needed, like payload and
124/// metadata.
125#[derive(Clone)]
126pub struct OpenDalRequest {
127    pub payload: Bytes,
128    pub metadata: OpenDalMetadata,
129    pub request_metadata: RequestMetadata,
130}
131
132impl MetaDescriptive for OpenDalRequest {
133    fn get_metadata(&self) -> &RequestMetadata {
134        &self.request_metadata
135    }
136
137    fn metadata_mut(&mut self) -> &mut RequestMetadata {
138        &mut self.request_metadata
139    }
140}
141
142impl Finalizable for OpenDalRequest {
143    fn take_finalizers(&mut self) -> EventFinalizers {
144        std::mem::take(&mut self.metadata.finalizers)
145    }
146}
147
148/// OpenDalMetadata carries metadata that opendal service needed to write.
149#[derive(Clone)]
150pub struct OpenDalMetadata {
151    pub partition_key: String,
152    pub count: usize,
153    pub byte_size: JsonSize,
154    pub finalizers: EventFinalizers,
155}
156
157/// OpenDalRequestBuilder will collect and encode input events to build a
158/// valid [`OpenDalRequest`].
159pub struct OpenDalRequestBuilder {
160    pub encoder: (Transformer, Encoder<Framer>),
161    pub compression: Compression,
162}
163
164impl RequestBuilder<(String, Vec<Event>)> for OpenDalRequestBuilder {
165    type Metadata = OpenDalMetadata;
166    type Events = Vec<Event>;
167    type Encoder = (Transformer, Encoder<Framer>);
168    type Payload = Bytes;
169    type Request = OpenDalRequest;
170    type Error = std::io::Error;
171
172    fn compression(&self) -> Compression {
173        self.compression
174    }
175
176    fn encoder(&self) -> &Self::Encoder {
177        &self.encoder
178    }
179
180    fn split_input(
181        &self,
182        input: (String, Vec<Event>),
183    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
184        let (partition_key, mut events) = input;
185        let finalizers = events.take_finalizers();
186        let opendal_metadata = OpenDalMetadata {
187            partition_key,
188            count: events.len(),
189            byte_size: events.estimated_json_encoded_size_of(),
190            finalizers,
191        };
192
193        let builder = RequestMetadataBuilder::from_events(&events);
194
195        (opendal_metadata, builder, events)
196    }
197
198    fn build_request(
199        &self,
200        mut metadata: Self::Metadata,
201        request_metadata: RequestMetadata,
202        payload: EncodeResult<Self::Payload>,
203    ) -> Self::Request {
204        // TODO: we can support time format later.
205        let name = uuid::Uuid::new_v4().to_string();
206        let extension = self.compression.extension();
207
208        metadata.partition_key = format!("{}{}.{}", metadata.partition_key, name, extension);
209
210        OpenDalRequest {
211            metadata,
212            payload: payload.into_payload(),
213            request_metadata,
214        }
215    }
216}
217
218/// OpenDalResponse is the response returned by OpenDAL services.
219#[derive(Debug)]
220pub struct OpenDalResponse {
221    pub events_byte_size: GroupedCountByteSize,
222    pub byte_size: usize,
223}
224
225impl DriverResponse for OpenDalResponse {
226    fn event_status(&self) -> EventStatus {
227        EventStatus::Delivered
228    }
229
230    fn events_sent(&self) -> &GroupedCountByteSize {
231        &self.events_byte_size
232    }
233
234    fn bytes_sent(&self) -> Option<usize> {
235        Some(self.byte_size)
236    }
237}
238
239impl Service<OpenDalRequest> for OpenDalService {
240    type Response = OpenDalResponse;
241    type Error = opendal::Error;
242    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
243
244    // Emission of an internal event in case of errors is handled upstream by the caller.
245    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
246        Poll::Ready(Ok(()))
247    }
248
249    // Emission of internal events for errors and dropped events is handled upstream by the caller.
250    fn call(&mut self, request: OpenDalRequest) -> Self::Future {
251        let byte_size = request.payload.len();
252        let op = self.op.clone();
253
254        Box::pin(async move {
255            let result = op
256                .write(&request.metadata.partition_key, request.payload)
257                .in_current_span()
258                .await;
259            result.map(|_| OpenDalResponse {
260                events_byte_size: request
261                    .request_metadata
262                    .into_events_estimated_json_encoded_byte_size(),
263                byte_size,
264            })
265        })
266    }
267}
268
269/// OpenDalError is the error returned by opendal services.
270///
271/// # TODO
272///
273/// We need to provide more context about opendal errors.
274#[derive(Debug, Snafu)]
275pub enum OpenDalError {
276    #[snafu(display("Failed to call OpenDal: {}", source))]
277    OpenDal { source: opendal::Error },
278}
279
280impl From<opendal::Error> for OpenDalError {
281    fn from(source: opendal::Error) -> Self {
282        Self::OpenDal { source }
283    }
284}