vector/sinks/databricks_zerobus/
config.rs1use vector_lib::configurable::configurable_component;
4use vector_lib::sensitive_string::SensitiveString;
5
6use crate::config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext};
7use crate::sinks::{
8 prelude::*,
9 util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings},
10};
11
12use vector_lib::codecs::encoding::{
13 BatchEncoder, BatchSerializerConfig, ProtoBatchSerializerConfig,
14};
15
16use super::{
17 error::ZerobusSinkError,
18 service::{StreamMode, ZerobusService},
19 sink::ZerobusSink,
20};
21
22#[configurable_component]
24#[derive(Clone, Debug)]
25#[serde(tag = "strategy", rename_all = "snake_case")]
26#[configurable(metadata(
27 docs::enum_tag_description = "The authentication strategy to use for Databricks."
28))]
29pub enum DatabricksAuthentication {
30 #[serde(rename = "oauth")]
32 OAuth {
33 #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_ID}"))]
35 #[configurable(metadata(docs::examples = "abc123..."))]
36 client_id: SensitiveString,
37
38 #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_SECRET}"))]
40 #[configurable(metadata(docs::examples = "secret123..."))]
41 client_secret: SensitiveString,
42 },
43}
44
45impl DatabricksAuthentication {
46 pub fn credentials(&self) -> (&str, &str) {
48 match self {
49 DatabricksAuthentication::OAuth {
50 client_id,
51 client_secret,
52 } => (client_id.inner(), client_secret.inner()),
53 }
54 }
55}
56
57#[configurable_component]
62#[derive(Clone, Debug)]
63#[serde(deny_unknown_fields)]
64pub struct ZerobusStreamOptions {
65 #[serde(default = "default_flush_timeout_ms")]
67 #[configurable(metadata(docs::examples = 30000))]
68 pub flush_timeout_ms: u64,
69
70 #[serde(default = "default_server_ack_timeout_ms")]
72 #[configurable(metadata(docs::examples = 60000))]
73 pub server_lack_of_ack_timeout_ms: u64,
74}
75
76impl Default for ZerobusStreamOptions {
77 fn default() -> Self {
78 Self {
79 flush_timeout_ms: default_flush_timeout_ms(),
80 server_lack_of_ack_timeout_ms: default_server_ack_timeout_ms(),
81 }
82 }
83}
84
85#[configurable_component(sink(
87 "databricks_zerobus",
88 "Stream observability data to Databricks Unity Catalog via Zerobus."
89))]
90#[derive(Clone, Debug)]
91#[serde(deny_unknown_fields)]
92pub struct ZerobusSinkConfig {
93 #[configurable(metadata(docs::examples = "https://ingest.dev.databricks.com"))]
97 #[configurable(metadata(docs::examples = "https://ingest.prod.databricks.com"))]
98 pub ingestion_endpoint: String,
99
100 #[configurable(metadata(docs::examples = "logging_platform.my_team.logs"))]
104 #[configurable(metadata(docs::examples = "main.default.vector_logs"))]
105 pub table_name: String,
106
107 #[configurable(metadata(
111 docs::examples = "https://dbc-e2f0eb31-2b0e.staging.cloud.databricks.com"
112 ))]
113 #[configurable(metadata(docs::examples = "https://your-workspace.cloud.databricks.com"))]
114 pub unity_catalog_endpoint: String,
115
116 #[configurable(derived)]
118 pub auth: DatabricksAuthentication,
119
120 #[configurable(derived)]
121 #[serde(default)]
122 pub stream_options: ZerobusStreamOptions,
123
124 #[configurable(derived)]
125 #[serde(default)]
126 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
127
128 #[configurable(derived)]
129 #[serde(default)]
130 pub request: TowerRequestConfig,
131
132 #[configurable(derived)]
133 #[serde(
134 default,
135 deserialize_with = "crate::serde::bool_or_struct",
136 skip_serializing_if = "crate::serde::is_default"
137 )]
138 pub acknowledgements: AcknowledgementsConfig,
139}
140
141impl GenerateConfig for ZerobusSinkConfig {
142 fn generate_config() -> toml::Value {
143 toml::Value::try_from(Self {
144 ingestion_endpoint: "https://ingest.dev.databricks.com".to_string(),
145 table_name: "catalog.schema.table".to_string(),
146 unity_catalog_endpoint: "https://your-workspace.cloud.databricks.com".to_string(),
147 auth: DatabricksAuthentication::OAuth {
148 client_id: SensitiveString::from("${DATABRICKS_CLIENT_ID}".to_string()),
149 client_secret: SensitiveString::from("${DATABRICKS_CLIENT_SECRET}".to_string()),
150 },
151 stream_options: ZerobusStreamOptions::default(),
152 batch: BatchConfig::default(),
153 request: TowerRequestConfig::default(),
154 acknowledgements: AcknowledgementsConfig::default(),
155 })
156 .unwrap()
157 }
158}
159
160#[async_trait::async_trait]
161#[typetag::serde(name = "databricks_zerobus")]
162impl SinkConfig for ZerobusSinkConfig {
163 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
164 self.validate()?;
165
166 let descriptor = ZerobusService::resolve_descriptor(self, cx.proxy()).await?;
167
168 let descriptor_proto = std::sync::Arc::new(descriptor.descriptor_proto().clone());
171 let stream_mode = StreamMode::Proto { descriptor_proto };
172
173 let proto_config = ProtoBatchSerializerConfig {
174 descriptor: Some(descriptor),
175 };
176 let batch_serializer = BatchSerializerConfig::ProtoBatch(proto_config)
177 .build_batch_serializer()
178 .map_err(|e| format!("Failed to build batch serializer: {}", e))?;
179 let encoder = BatchEncoder::new(batch_serializer);
180
181 let service = ZerobusService::new(self.clone(), stream_mode, cx.proxy()).await?;
182 let healthcheck_service = service.clone();
183
184 let request_limits = self.request.into_settings();
185
186 let sink = ZerobusSink::new(service, request_limits, self.batch, encoder)?;
187
188 let healthcheck = async move {
189 healthcheck_service
190 .ensure_stream()
191 .await
192 .map_err(|e| e.into())
193 };
194
195 Ok((
196 VectorSink::from_event_streamsink(sink),
197 Box::pin(healthcheck),
198 ))
199 }
200
201 fn input(&self) -> Input {
202 Input::log()
203 }
204
205 fn acknowledgements(&self) -> &AcknowledgementsConfig {
206 &self.acknowledgements
207 }
208}
209
210impl ZerobusSinkConfig {
211 pub fn validate(&self) -> Result<(), ZerobusSinkError> {
212 if self.ingestion_endpoint.is_empty() {
213 return Err(ZerobusSinkError::ConfigError {
214 message: "ingestion_endpoint cannot be empty".to_string(),
215 });
216 }
217
218 if self.table_name.is_empty() {
219 return Err(ZerobusSinkError::ConfigError {
220 message: "table_name cannot be empty".to_string(),
221 });
222 }
223
224 let parts: Vec<&str> = self.table_name.split('.').collect();
225 if parts.len() != 3 || parts.iter().any(|p| p.is_empty()) {
226 return Err(ZerobusSinkError::ConfigError {
227 message: "table_name must be in format 'catalog.schema.table' (exactly 3 non-empty parts)"
228 .to_string(),
229 });
230 }
231
232 if self.unity_catalog_endpoint.is_empty() {
233 return Err(ZerobusSinkError::ConfigError {
234 message: "unity_catalog_endpoint cannot be empty".to_string(),
235 });
236 }
237
238 match &self.auth {
240 DatabricksAuthentication::OAuth {
241 client_id,
242 client_secret,
243 } => {
244 if client_id.inner().is_empty() {
245 return Err(ZerobusSinkError::ConfigError {
246 message: "OAuth client_id cannot be empty".to_string(),
247 });
248 }
249 if client_secret.inner().is_empty() {
250 return Err(ZerobusSinkError::ConfigError {
251 message: "OAuth client_secret cannot be empty".to_string(),
252 });
253 }
254 }
255 }
256
257 if let Some(max_bytes) = self.batch.max_bytes {
258 if max_bytes > 10_000_000 {
264 return Err(ZerobusSinkError::ConfigError {
265 message: "max_bytes must be less than or equal to 10MB".to_string(),
266 });
267 }
268 }
269
270 Ok(())
271 }
272}
273
274const fn default_flush_timeout_ms() -> u64 {
276 30000
277}
278
279const fn default_server_ack_timeout_ms() -> u64 {
280 60000
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use vector_lib::sensitive_string::SensitiveString;
287
288 fn create_test_config() -> ZerobusSinkConfig {
289 ZerobusSinkConfig {
290 ingestion_endpoint: "https://test.databricks.com".to_string(),
291 table_name: "test.default.logs".to_string(),
292 unity_catalog_endpoint: "https://test-workspace.databricks.com".to_string(),
293 auth: DatabricksAuthentication::OAuth {
294 client_id: SensitiveString::from("test-client-id".to_string()),
295 client_secret: SensitiveString::from("test-client-secret".to_string()),
296 },
297 stream_options: ZerobusStreamOptions::default(),
298 batch: Default::default(),
299 request: Default::default(),
300 acknowledgements: Default::default(),
301 }
302 }
303
304 #[test]
305 fn test_config_validation_success() {
306 let config = create_test_config();
307 assert!(config.validate().is_ok());
308 }
309
310 #[test]
311 fn test_config_validation_empty_endpoint() {
312 let mut config = create_test_config();
313 config.ingestion_endpoint = "".to_string();
314
315 let result = config.validate();
316 assert!(result.is_err());
317
318 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
319 message,
320 }) = result
321 {
322 assert!(message.contains("ingestion_endpoint cannot be empty"));
323 } else {
324 panic!("Expected ConfigError for empty ingestion_endpoint");
325 }
326 }
327
328 #[test]
329 fn test_config_validation_empty_table_name() {
330 let mut config = create_test_config();
331 config.table_name = "".to_string();
332
333 let result = config.validate();
334 assert!(result.is_err());
335
336 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
337 message,
338 }) = result
339 {
340 assert!(message.contains("table_name cannot be empty"));
341 } else {
342 panic!("Expected ConfigError for empty table_name");
343 }
344 }
345
346 #[test]
347 fn test_config_validation_invalid_table_name() {
348 let mut config = create_test_config();
349 config.table_name = "invalid_table".to_string(); let result = config.validate();
352 assert!(result.is_err());
353
354 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
355 message,
356 }) = result
357 {
358 assert!(message.contains("catalog.schema.table"));
359 } else {
360 panic!("Expected ConfigError for invalid table_name format");
361 }
362 }
363
364 #[test]
365 fn test_config_validation_table_name_empty_segments() {
366 for bad in [
367 "catalog..table",
368 ".schema.table",
369 "catalog.schema.",
370 "..",
371 "catalog.schema.table.extra",
372 ] {
373 let mut config = create_test_config();
374 config.table_name = bad.to_string();
375 let result = config.validate();
376 assert!(result.is_err(), "expected error for table_name={bad:?}");
377 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
378 message,
379 }) = result
380 {
381 assert!(message.contains("catalog.schema.table"));
382 } else {
383 panic!("Expected ConfigError for table_name={bad:?}");
384 }
385 }
386 }
387
388 #[test]
389 fn test_config_validation_empty_unity_catalog_endpoint() {
390 let mut config = create_test_config();
391 config.unity_catalog_endpoint = "".to_string();
392
393 let result = config.validate();
394 assert!(result.is_err());
395
396 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
397 message,
398 }) = result
399 {
400 assert!(message.contains("unity_catalog_endpoint cannot be empty"));
401 } else {
402 panic!("Expected ConfigError for empty unity_catalog_endpoint");
403 }
404 }
405
406 #[test]
407 fn test_config_validation_empty_oauth_credentials() {
408 let mut config = create_test_config();
409 config.auth = DatabricksAuthentication::OAuth {
410 client_id: SensitiveString::from("".to_string()),
411 client_secret: SensitiveString::from("test-secret".to_string()),
412 };
413
414 let result = config.validate();
415 assert!(result.is_err());
416
417 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
418 message,
419 }) = result
420 {
421 assert!(message.contains("OAuth client_id cannot be empty"));
422 } else {
423 panic!("Expected ConfigError for empty OAuth client_id");
424 }
425 }
426
427 #[test]
433 fn test_batch_max_bytes_none_defaults_to_10mb() {
434 let mut config = create_test_config();
435 config.batch.max_bytes = None;
436
437 let settings = config
438 .batch
439 .into_batcher_settings()
440 .expect("batch settings should build");
441
442 assert_eq!(settings.size_limit, 10_000_000);
443 }
444}