vector/sources/splunk_hec/
mod.rs

1use std::{
2    collections::HashMap,
3    convert::Infallible,
4    io::Read,
5    net::{Ipv4Addr, SocketAddr},
6    sync::Arc,
7    time::Duration,
8};
9
10use bytes::{Buf, Bytes, BytesMut};
11use chrono::{DateTime, TimeZone, Utc};
12use flate2::read::MultiGzDecoder;
13use futures::FutureExt;
14use http::StatusCode;
15use hyper::{Server, service::make_service_fn};
16use serde::{Serialize, de::DeserializeOwned};
17use serde_json::{
18    Deserializer, Value as JsonValue,
19    de::{Read as JsonRead, StrRead},
20};
21use snafu::Snafu;
22use tokio::net::TcpStream;
23use tokio_util::codec::Decoder as _;
24use tower::ServiceBuilder;
25use tracing::Span;
26use vector_lib::{
27    EstimatedJsonEncodedSizeOf,
28    codecs::{
29        Decoder, StreamDecodingError,
30        decoding::{DeserializerConfig, FramingConfig},
31    },
32    config::{LegacyKey, LogNamespace},
33    configurable::configurable_component,
34    event::{BatchNotifier, BatchStatusReceiver, EventMetadata},
35    internal_event::{
36        ComponentEventsDropped, CountByteSize, InternalEventHandle as _, Registered, UNINTENTIONAL,
37    },
38    lookup::{
39        self, OwnedValuePath, event_path, lookup_v2::OptionalValuePath, metadata_path,
40        owned_value_path,
41    },
42    schema::meaning,
43    sensitive_string::SensitiveString,
44    source_sender::SendError,
45    tls::MaybeTlsIncomingStream,
46};
47use vrl::{
48    path::{OwnedTargetPath, PathPrefix, ValuePath as _},
49    value::{Kind, kind::Collection},
50};
51use warp::{
52    Filter, Reply,
53    filters::BoxedFilter,
54    http::header::{CONTENT_TYPE, HeaderValue},
55    path,
56    reject::Rejection,
57    reply::Response,
58};
59
60use self::{
61    acknowledgements::{
62        HecAckStatusRequest, HecAckStatusResponse, HecAcknowledgementsConfig,
63        IndexerAcknowledgement,
64    },
65    splunk_response::{HecResponse, HecResponseMetadata, HecStatusCode},
66};
67use crate::{
68    SourceSender,
69    codecs::DecodingConfig,
70    config::{DataType, Resource, SourceConfig, SourceContext, SourceOutput, log_schema},
71    event::{Event, LogEvent, Value},
72    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
73    internal_events::{
74        EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError,
75    },
76    serde::bool_or_struct,
77    tls::{MaybeTlsSettings, TlsEnableableConfig},
78};
79
80mod acknowledgements;
81
82// Event fields unique to splunk_hec source
83pub const CHANNEL: &str = "splunk_channel";
84pub const INDEX: &str = "splunk_index";
85pub const SOURCE: &str = "splunk_source";
86pub const SOURCETYPE: &str = "splunk_sourcetype";
87
88const X_SPLUNK_REQUEST_CHANNEL: &str = "x-splunk-request-channel";
89
90/// Configuration for the `splunk_hec` source.
91#[configurable_component(source("splunk_hec", "Receive logs from Splunk."))]
92#[derive(Clone, Debug)]
93#[serde(deny_unknown_fields, default)]
94pub struct SplunkConfig {
95    /// The socket address to listen for connections on.
96    ///
97    /// The address _must_ include a port.
98    #[serde(default = "default_socket_address")]
99    pub address: SocketAddr,
100
101    /// Optional authorization token.
102    ///
103    /// If supplied, incoming requests must supply this token in the `Authorization` header, just as a client would if
104    /// it was communicating with the Splunk HEC endpoint directly.
105    ///
106    /// If _not_ supplied, the `Authorization` header is ignored and requests are not authenticated.
107    #[configurable(deprecated = "This option has been deprecated, use `valid_tokens` instead.")]
108    token: Option<SensitiveString>,
109
110    /// A list of valid authorization tokens.
111    ///
112    /// If supplied, incoming requests must supply one of these tokens in the `Authorization` header, just as a client
113    /// would if it was communicating with the Splunk HEC endpoint directly.
114    ///
115    /// If _not_ supplied, the `Authorization` header is ignored and requests are not authenticated.
116    #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
117    valid_tokens: Option<Vec<SensitiveString>>,
118
119    /// Whether or not to forward the Splunk HEC authentication token with events.
120    ///
121    /// If set to `true`, when incoming requests contain a Splunk HEC token, the token used is kept in the
122    /// event metadata and preferentially used if the event is sent to a Splunk HEC sink.
123    store_hec_token: bool,
124
125    #[configurable(derived)]
126    tls: Option<TlsEnableableConfig>,
127
128    #[configurable(derived)]
129    #[serde(deserialize_with = "bool_or_struct")]
130    acknowledgements: HecAcknowledgementsConfig,
131
132    /// The namespace to use for logs. This overrides the global settings.
133    #[configurable(metadata(docs::hidden))]
134    #[serde(default)]
135    log_namespace: Option<bool>,
136
137    #[configurable(derived)]
138    #[serde(default)]
139    keepalive: KeepaliveConfig,
140
141    /// Codec configuration applied to events received on `/services/collector/event`.
142    ///
143    /// When `decoding` is set, Vector applies a second decoding pass after parsing the
144    /// HEC envelope. The envelope's `event` field is passed through the codec,
145    /// and a single envelope can fan out to multiple events. Decode failures are
146    /// swallowed and do not return an error to the Splunk client.
147    ///
148    /// The VRL codec can access HEC envelope metadata, such as host, sourcetype, and,
149    /// channel, and the authentication token via `%splunk_hec.*` paths and
150    /// `get_secret!("splunk_hec_token")` before the program executes.
151    #[configurable(derived)]
152    #[configurable(metadata(docs::advanced))]
153    #[serde(default)]
154    pub event: CodecConfig,
155
156    /// Codec configuration applied to events received on `/services/collector/raw`.
157    ///
158    /// When `decoding` is set, the (decompressed) request body is fed through the
159    /// codec instead of being emitted as a single event. Decode failures are
160    /// swallowed and do not return an error to the Splunk client. When unset, the
161    /// endpoint preserves its existing behavior of one event per request body.
162    #[configurable(derived)]
163    #[configurable(metadata(docs::advanced))]
164    #[serde(default)]
165    pub raw: CodecConfig,
166}
167
168/// Codec configuration applied to one of the `splunk_hec` endpoints.
169#[configurable_component]
170#[derive(Clone, Debug, Default)]
171#[serde(deny_unknown_fields, default)]
172pub struct CodecConfig {
173    /// Framing configuration applied to the payload.
174    ///
175    /// Only used when `decoding` is also set. Defaults to a per-codec choice
176    /// (typically `bytes`) that produces one event per payload.
177    #[configurable(derived)]
178    #[configurable(metadata(docs::advanced))]
179    #[serde(default)]
180    pub framing: Option<FramingConfig>,
181
182    /// Decoding configuration applied to the payload.
183    ///
184    /// When unset, the endpoint preserves its existing per-endpoint default
185    /// behavior. When set, the endpoint-selected payload is processed through
186    /// `framing` and `decoding`, and a single payload can fan out to multiple
187    /// events.
188    #[configurable(derived)]
189    #[configurable(metadata(docs::advanced))]
190    #[serde(default)]
191    pub decoding: Option<DeserializerConfig>,
192}
193
194impl CodecConfig {
195    fn build_decoder(&self, log_namespace: LogNamespace) -> crate::Result<Option<Decoder>> {
196        match &self.decoding {
197            Some(decoding) => {
198                let framing = self
199                    .framing
200                    .clone()
201                    .unwrap_or_else(|| decoding.default_message_based_framing());
202                Ok(Some(
203                    DecodingConfig::new(framing, decoding.clone(), log_namespace).build()?,
204                ))
205            }
206            None => Ok(None),
207        }
208    }
209}
210
211impl_generate_config_from_default!(SplunkConfig);
212
213impl Default for SplunkConfig {
214    fn default() -> Self {
215        SplunkConfig {
216            address: default_socket_address(),
217            token: None,
218            valid_tokens: None,
219            tls: None,
220            acknowledgements: Default::default(),
221            store_hec_token: false,
222            log_namespace: None,
223            keepalive: Default::default(),
224            event: CodecConfig::default(),
225            raw: CodecConfig::default(),
226        }
227    }
228}
229
230fn default_socket_address() -> SocketAddr {
231    SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 8088)
232}
233
234#[async_trait::async_trait]
235#[typetag::serde(name = "splunk_hec")]
236impl SourceConfig for SplunkConfig {
237    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
238        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
239        let shutdown = cx.shutdown.clone();
240        let out = cx.out.clone();
241        let log_namespace = cx.log_namespace(self.log_namespace);
242        let event_decoder = self.event.build_decoder(log_namespace)?;
243        let raw_decoder = self.raw.build_decoder(log_namespace)?;
244        let source = SplunkSource::new(
245            self,
246            tls.http_protocol_name(),
247            event_decoder,
248            raw_decoder,
249            cx,
250        );
251
252        let event_service = source.event_service(out.clone());
253        let raw_service = source.raw_service(out);
254        let health_service = source.health_service();
255        let ack_service = source.ack_service();
256        let options = SplunkSource::options();
257
258        let services = path!("services" / "collector" / ..)
259            .and(
260                event_service
261                    .or(raw_service)
262                    .unify()
263                    .or(health_service)
264                    .unify()
265                    .or(ack_service)
266                    .unify()
267                    .or(options)
268                    .unify(),
269            )
270            .or_else(finish_err);
271
272        let listener = tls.bind(&self.address).await?;
273
274        let keepalive_settings = self.keepalive.clone();
275        Ok(Box::pin(async move {
276            let span = Span::current();
277            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
278                let svc = ServiceBuilder::new()
279                    .layer(build_http_trace_layer(span.clone()))
280                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
281                        MaxConnectionAgeLayer::new(
282                            Duration::from_secs(secs),
283                            keepalive_settings.max_connection_age_jitter_factor,
284                            conn.peer_addr(),
285                        )
286                    }))
287                    .service(warp::service(services.clone()));
288                futures_util::future::ok::<_, Infallible>(svc)
289            });
290
291            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
292                .serve(make_svc)
293                .with_graceful_shutdown(shutdown.map(|_| ()))
294                .await
295                .map_err(|err| {
296                    error!("An error occurred: {:?}.", err);
297                })?;
298
299            Ok(())
300        }))
301    }
302
303    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
304        let log_namespace = global_log_namespace.merge(self.log_namespace);
305
306        // Build schemas per endpoint, then merge them. Each endpoint decides at
307        // runtime whether source metadata overwrites event fields or defers to a
308        // decoder-produced value, so applying one global strategy would make mixed
309        // decoder/no-decoder configurations advertise the wrong contract.
310        let legacy_base = || match log_namespace {
311            LogNamespace::Legacy => {
312                let definition = vector_lib::schema::Definition::empty_legacy_namespace()
313                    .with_event_field(
314                        &owned_value_path!("line"),
315                        Kind::object(Collection::empty())
316                            .or_array(Collection::empty())
317                            .or_undefined(),
318                        None,
319                    );
320
321                if let Some(message_key) = log_schema().message_key() {
322                    definition.with_event_field(
323                        message_key,
324                        Kind::bytes().or_undefined(),
325                        Some(meaning::MESSAGE),
326                    )
327                } else {
328                    definition
329                }
330            }
331            LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
332                Kind::bytes().or_object(Collection::empty()),
333                [log_namespace],
334            )
335            .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
336        };
337
338        let endpoint_base = |decoding: &Option<DeserializerConfig>| match decoding {
339            Some(decoding) => decoding.schema_definition(log_namespace),
340            None => legacy_base(),
341        };
342
343        let splunk_legacy_key = |path: OwnedValuePath, has_decoder: bool| {
344            if has_decoder {
345                LegacyKey::InsertIfEmpty(path)
346            } else {
347                LegacyKey::Overwrite(path)
348            }
349        };
350
351        let add_common_metadata = |definition: vector_lib::schema::Definition| {
352            definition
353                .with_standard_vector_source_metadata()
354                .with_source_metadata(
355                    SplunkConfig::NAME,
356                    log_schema()
357                        .host_key()
358                        .cloned()
359                        .map(LegacyKey::InsertIfEmpty),
360                    &owned_value_path!("host"),
361                    Kind::bytes(),
362                    Some(meaning::HOST),
363                )
364        };
365
366        let add_channel_metadata = |definition: vector_lib::schema::Definition,
367                                    has_decoder: bool| {
368            definition.with_source_metadata(
369                SplunkConfig::NAME,
370                Some(splunk_legacy_key(owned_value_path!(CHANNEL), has_decoder)),
371                &owned_value_path!("channel"),
372                Kind::bytes(),
373                None,
374            )
375        };
376
377        let event_has_decoder = self.event.decoding.is_some();
378        let raw_has_decoder = self.raw.decoding.is_some();
379
380        // Merge the per-endpoint base schemas (event root kind + standard Vector
381        // metadata). Splunk-specific fields are added once afterward with
382        // per-field decoder flags, avoiding the widening that occurs when the
383        // raw schema's open `metadata_kind.unknown` overrides specific fields
384        // from the event schema during merge.
385        let merged_base = add_common_metadata(
386            endpoint_base(&self.event.decoding).merge(endpoint_base(&self.raw.decoding)),
387        );
388
389        // `index`, `source`, `sourcetype` are only written by the /event endpoint.
390        // `channel` is written by both; use Overwrite if either endpoint has no
391        // decoder (some events still overwrite it).
392        let channel_has_decoder = event_has_decoder && raw_has_decoder;
393        let schema_definition = add_channel_metadata(
394            merged_base
395                .with_source_metadata(
396                    SplunkConfig::NAME,
397                    Some(splunk_legacy_key(
398                        owned_value_path!(INDEX),
399                        event_has_decoder,
400                    )),
401                    &owned_value_path!("index"),
402                    Kind::bytes(),
403                    None,
404                )
405                .with_source_metadata(
406                    SplunkConfig::NAME,
407                    Some(splunk_legacy_key(
408                        owned_value_path!(SOURCE),
409                        event_has_decoder,
410                    )),
411                    &owned_value_path!("source"),
412                    Kind::bytes(),
413                    Some(meaning::SERVICE),
414                )
415                // Not to be confused with `source_type`.
416                .with_source_metadata(
417                    SplunkConfig::NAME,
418                    Some(splunk_legacy_key(
419                        owned_value_path!(SOURCETYPE),
420                        event_has_decoder,
421                    )),
422                    &owned_value_path!("sourcetype"),
423                    Kind::bytes(),
424                    None,
425                ),
426            channel_has_decoder,
427        );
428
429        // Output type is the union of both endpoints' decoder output types
430        // (logs from a JSON codec, metrics from native, etc.). The legacy path
431        // always emits logs, so when an endpoint has no decoder we OR `Log` in.
432        let output_type = match (&self.event.decoding, &self.raw.decoding) {
433            (None, None) => DataType::Log,
434            (Some(d), None) | (None, Some(d)) => d.output_type() | DataType::Log,
435            (Some(de), Some(dr)) => de.output_type() | dr.output_type(),
436        };
437        vec![SourceOutput::new_maybe_logs(output_type, schema_definition)]
438    }
439
440    fn resources(&self) -> Vec<Resource> {
441        vec![Resource::tcp(self.address)]
442    }
443
444    fn can_acknowledge(&self) -> bool {
445        true
446    }
447}
448
449/// Shared data for responding to requests.
450struct SplunkSource {
451    valid_credentials: Vec<String>,
452    protocol: &'static str,
453    idx_ack: Option<Arc<IndexerAcknowledgement>>,
454    store_hec_token: bool,
455    log_namespace: LogNamespace,
456    events_received: Registered<EventsReceived>,
457    event_decoder: Option<Decoder>,
458    raw_decoder: Option<Decoder>,
459}
460
461impl SplunkSource {
462    fn new(
463        config: &SplunkConfig,
464        protocol: &'static str,
465        event_decoder: Option<Decoder>,
466        raw_decoder: Option<Decoder>,
467        cx: SourceContext,
468    ) -> Self {
469        let log_namespace = cx.log_namespace(config.log_namespace);
470        let acknowledgements = cx.do_acknowledgements(config.acknowledgements.enabled.into());
471        let shutdown = cx.shutdown;
472        let valid_tokens = config
473            .valid_tokens
474            .iter()
475            .flatten()
476            .chain(config.token.iter());
477
478        let idx_ack = acknowledgements.then(|| {
479            Arc::new(IndexerAcknowledgement::new(
480                config.acknowledgements.clone(),
481                shutdown,
482            ))
483        });
484
485        SplunkSource {
486            valid_credentials: valid_tokens
487                .map(|token| format!("Splunk {}", token.inner()))
488                .collect(),
489            protocol,
490            idx_ack,
491            store_hec_token: config.store_hec_token,
492            log_namespace,
493            events_received: register!(EventsReceived),
494            event_decoder,
495            raw_decoder,
496        }
497    }
498
499    fn event_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
500        let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
501            .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
502        let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
503
504        let splunk_channel = splunk_channel_header
505            .and(splunk_channel_query_param)
506            .map(|header: Option<String>, query_param| header.or(query_param));
507
508        let protocol = self.protocol;
509        let idx_ack = self.idx_ack.clone();
510        let store_hec_token = self.store_hec_token;
511        let log_namespace = self.log_namespace;
512        let events_received = self.events_received.clone();
513        let decoder = self.event_decoder.clone();
514
515        warp::post()
516            .and(
517                path!("event")
518                    .or(path!("event" / "1.0"))
519                    .or(warp::path::end()),
520            )
521            .and(self.authorization())
522            .and(splunk_channel)
523            .and(warp::addr::remote())
524            .and(warp::header::optional::<String>("X-Forwarded-For"))
525            .and(self.gzip())
526            .and(warp::body::bytes())
527            .and(warp::path::full())
528            .and_then(
529                move |_,
530                      token: Option<String>,
531                      channel: Option<String>,
532                      remote: Option<SocketAddr>,
533                      remote_addr: Option<String>,
534                      gzip: bool,
535                      body: Bytes,
536                      path: warp::path::FullPath| {
537                    let mut out = out.clone();
538                    let idx_ack = idx_ack.clone();
539                    let events_received = events_received.clone();
540                    let decoder = decoder.clone();
541
542                    async move {
543                        if idx_ack.is_some() && channel.is_none() {
544                            return Err(Rejection::from(ApiError::MissingChannel));
545                        }
546
547                        let mut data = Vec::new();
548                        let (byte_size, body) = if gzip {
549                            MultiGzDecoder::new(body.reader())
550                                .read_to_end(&mut data)
551                                .map_err(|_| Rejection::from(ApiError::BadRequest))?;
552                            (data.len(), String::from_utf8_lossy(data.as_slice()))
553                        } else {
554                            (body.len(), String::from_utf8_lossy(body.as_ref()))
555                        };
556                        emit!(HttpBytesReceived {
557                            byte_size,
558                            http_path: path.as_str(),
559                            protocol,
560                        });
561
562                        let (batch, mut receiver) =
563                            BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
564                        let decoder_in_use = decoder.is_some();
565
566                        // Without a decoder, register the ack id BEFORE iteration so
567                        // capacity-exhaustion (`ServiceUnavailable`) short-circuits
568                        // the request without parsing the body - byte-for-byte parity
569                        // with the pre-decoder behavior.
570                        let mut maybe_ack_id = None;
571                        if !decoder_in_use {
572                            maybe_ack_id =
573                                register_ack(idx_ack.clone(), receiver.take(), channel.clone())
574                                    .await?;
575                        }
576
577                        let mut error = None;
578                        let mut events = Vec::new();
579                        let mut had_decode_errors = false;
580
581                        let iter: EventIterator<'_, StrRead<'_>> = EventIteratorGenerator {
582                            deserializer: Deserializer::from_str(&body).into_iter::<JsonValue>(),
583                            channel: channel.clone(),
584                            remote,
585                            remote_addr,
586                            batch,
587                            token: token.filter(|_| store_hec_token).map(Into::into),
588                            log_namespace,
589                            events_received,
590                            decoder,
591                        }
592                        .into();
593
594                        for result in iter {
595                            match result {
596                                Ok((chunk, errored)) => {
597                                    events.extend(chunk);
598                                    had_decode_errors |= errored;
599                                }
600                                Err(err) => {
601                                    error = Some(err);
602                                    break;
603                                }
604                            }
605                        }
606
607                        // With a decoder, defer ack registration until we know whether
608                        // the codec emitted anything *and* whether it dropped any
609                        // frames. Also skip ack registration when a later envelope
610                        // errored even if earlier ones produced events: the client
611                        // gets a 400 and never sees the ack id, so registering it
612                        // only leaks pending-ack capacity.
613                        if decoder_in_use {
614                            maybe_ack_id =
615                                if events.is_empty() || had_decode_errors || error.is_some() {
616                                    drop(receiver);
617                                    None
618                                } else {
619                                    register_ack(idx_ack, receiver, channel).await?
620                                };
621                        }
622
623                        if !events.is_empty() {
624                            match out.send_batch(events).await {
625                                Ok(()) => (),
626                                Err(SendError::Closed) => {
627                                    return Err(Rejection::from(ApiError::ServerShutdown));
628                                }
629                                Err(SendError::Timeout) => {
630                                    unreachable!("No timeout is configured for this source.")
631                                }
632                            }
633                        }
634
635                        if let Some(error) = error {
636                            Err(error)
637                        } else {
638                            Ok(maybe_ack_id)
639                        }
640                    }
641                },
642            )
643            .map(finish_ok)
644            .boxed()
645    }
646
647    fn raw_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
648        let protocol = self.protocol;
649        let idx_ack = self.idx_ack.clone();
650        let store_hec_token = self.store_hec_token;
651        let events_received = self.events_received.clone();
652        let log_namespace = self.log_namespace;
653        let decoder = self.raw_decoder.clone();
654
655        warp::post()
656            .and(path!("raw" / "1.0").or(path!("raw")))
657            .and(self.authorization())
658            .and(SplunkSource::required_channel())
659            .and(warp::addr::remote())
660            .and(warp::header::optional::<String>("X-Forwarded-For"))
661            .and(self.gzip())
662            .and(warp::body::bytes())
663            .and(warp::path::full())
664            .and_then(
665                move |_,
666                      token: Option<String>,
667                      channel_id: String,
668                      remote: Option<SocketAddr>,
669                      xff: Option<String>,
670                      gzip: bool,
671                      body: Bytes,
672                      path: warp::path::FullPath| {
673                    let mut out = out.clone();
674                    let idx_ack = idx_ack.clone();
675                    let events_received = events_received.clone();
676                    let decoder = decoder.clone();
677                    emit!(HttpBytesReceived {
678                        byte_size: body.len(),
679                        http_path: path.as_str(),
680                        protocol,
681                    });
682
683                    async move {
684                        let (batch, receiver) =
685                            BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
686
687                        // No-decoder path: byte-for-byte identical to the pre-decoder
688                        // code - register ack first (fast-fail under capacity
689                        // exhaustion), build a single event, send via `send_event`
690                        // (avoids `send_batch_latency` emission).
691                        let Some(decoder) = decoder else {
692                            let maybe_ack_id =
693                                register_ack(idx_ack, receiver, Some(channel_id.clone())).await?;
694                            let (mut events, _) = raw_event(
695                                body,
696                                gzip,
697                                channel_id,
698                                remote,
699                                xff,
700                                batch,
701                                log_namespace,
702                                &events_received,
703                                None,
704                                None,
705                            )?;
706                            // raw_event with no decoder always produces exactly one
707                            // event.
708                            let mut event = events.pop().expect(
709                                "raw_event always produces a single event when no decoder is set",
710                            );
711                            if let Some(token) = token.filter(|_| store_hec_token) {
712                                event.metadata_mut().set_splunk_hec_token(token.into());
713                            }
714                            let res = out.send_event(event).await;
715                            return res
716                                .map(|_| maybe_ack_id)
717                                .map_err(|_| Rejection::from(ApiError::ServerShutdown));
718                        };
719
720                        // Decoder path: pass the optional HEC token into raw_event so
721                        // it's stamped on each event the moment it leaves the codec
722                        // (rather than after the whole payload is decoded).
723                        let token: Option<Arc<str>> =
724                            token.filter(|_| store_hec_token).map(Arc::from);
725                        let (events, had_decode_errors) = raw_event(
726                            body,
727                            gzip,
728                            channel_id.clone(),
729                            remote,
730                            xff,
731                            batch,
732                            log_namespace,
733                            &events_received,
734                            Some(decoder),
735                            token,
736                        )?;
737
738                        if events.is_empty() || had_decode_errors {
739                            // With newline framing, `valid \n invalid \n valid`
740                            // decodes to two events plus one dropped frame; returning
741                            // an ack id there would let `/services/collector/ack`
742                            // report success for data Vector silently lost.
743                            drop(receiver);
744                            if events.is_empty() {
745                                return Ok(None);
746                            }
747                            // Forward the partial events with no ack so the source's
748                            // existing partial-delivery semantics still apply.
749                            let res = out.send_batch(events).await;
750                            return res
751                                .map(|_| None)
752                                .map_err(|_| Rejection::from(ApiError::ServerShutdown));
753                        }
754
755                        let maybe_ack_id =
756                            register_ack(idx_ack, receiver, Some(channel_id)).await?;
757
758                        let res = out.send_batch(events).await;
759                        res.map(|_| maybe_ack_id)
760                            .map_err(|_| Rejection::from(ApiError::ServerShutdown))
761                    }
762                },
763            )
764            .map(finish_ok)
765            .boxed()
766    }
767
768    fn health_service(&self) -> BoxedFilter<(Response,)> {
769        // The Splunk docs document this endpoint as returning a 400 if given an invalid Splunk
770        // token, but, in practice, it seems to ignore the token altogether
771        //
772        // The response body was taken from Splunk 8.2.4
773        //
774        // https://docs.splunk.com/Documentation/Splunk/8.2.5/RESTREF/RESTinput#services.2Fcollector.2Fhealth
775        warp::get()
776            .and(path!("health" / "1.0").or(path!("health")))
777            .map(move |_| {
778                http::Response::builder()
779                    .header(http::header::CONTENT_TYPE, "application/json")
780                    .body(hyper::Body::from(r#"{"text":"HEC is healthy","code":17}"#))
781                    .expect("static response")
782            })
783            .boxed()
784    }
785
786    fn lenient_json_content_type_check<T>() -> impl Filter<Extract = (T,), Error = Rejection> + Clone
787    where
788        T: Send + DeserializeOwned + 'static,
789    {
790        warp::header::optional::<HeaderValue>(CONTENT_TYPE.as_str())
791            .and(warp::body::bytes())
792            .and_then(
793                |ctype: Option<HeaderValue>, body: bytes::Bytes| async move {
794                    let ok = ctype
795                        .as_ref()
796                        .and_then(|v| v.to_str().ok())
797                        .map(|h| h.to_ascii_lowercase().contains("application/json"))
798                        .unwrap_or(true);
799
800                    if !ok {
801                        return Err(warp::reject::custom(ApiError::UnsupportedContentType));
802                    }
803
804                    let value = serde_json::from_slice::<T>(&body)
805                        .map_err(|_| warp::reject::custom(ApiError::BadRequest))?;
806
807                    Ok(value)
808                },
809            )
810    }
811
812    fn ack_service(&self) -> BoxedFilter<(Response,)> {
813        let idx_ack = self.idx_ack.clone();
814
815        warp::post()
816            .and(warp::path!("ack"))
817            .and(self.authorization())
818            .and(SplunkSource::required_channel())
819            .and(Self::lenient_json_content_type_check::<HecAckStatusRequest>())
820            .and_then(move |_, channel: String, req: HecAckStatusRequest| {
821                let idx_ack = idx_ack.clone();
822                async move {
823                    if let Some(idx_ack) = idx_ack {
824                        let acks = idx_ack
825                            .get_acks_status_from_channel(channel, &req.acks)
826                            .await?;
827                        Ok(warp::reply::json(&HecAckStatusResponse { acks }).into_response())
828                    } else {
829                        Err(warp::reject::custom(ApiError::AckIsDisabled))
830                    }
831                }
832            })
833            .boxed()
834    }
835
836    fn options() -> BoxedFilter<(Response,)> {
837        let post = warp::options()
838            .and(
839                path!("event")
840                    .or(path!("event" / "1.0"))
841                    .or(path!("raw" / "1.0"))
842                    .or(path!("raw")),
843            )
844            .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
845
846        let get = warp::options()
847            .and(path!("health").or(path!("health" / "1.0")))
848            .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
849
850        post.or(get).unify().boxed()
851    }
852
853    /// Authorize request
854    fn authorization(&self) -> BoxedFilter<(Option<String>,)> {
855        let valid_credentials = self.valid_credentials.clone();
856        warp::header::optional("Authorization")
857            .and_then(move |token: Option<String>| {
858                let valid_credentials = valid_credentials.clone();
859                async move {
860                    match (token, valid_credentials.is_empty()) {
861                        // Remove the "Splunk " prefix if present as it is not
862                        // part of the token itself
863                        (token, true) => {
864                            Ok(token
865                                .map(|t| t.strip_prefix("Splunk ").map(Into::into).unwrap_or(t)))
866                        }
867                        (Some(token), false) if valid_credentials.contains(&token) => Ok(Some(
868                            token
869                                .strip_prefix("Splunk ")
870                                .map(Into::into)
871                                .unwrap_or(token),
872                        )),
873                        (Some(_), false) => Err(Rejection::from(ApiError::InvalidAuthorization)),
874                        (None, false) => Err(Rejection::from(ApiError::MissingAuthorization)),
875                    }
876                }
877            })
878            .boxed()
879    }
880
881    /// Is body encoded with gzip
882    fn gzip(&self) -> BoxedFilter<(bool,)> {
883        warp::header::optional::<String>("Content-Encoding")
884            .and_then(|encoding: Option<String>| async move {
885                match encoding {
886                    Some(s) if s.as_bytes() == b"gzip" => Ok(true),
887                    Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
888                    None => Ok(false),
889                }
890            })
891            .boxed()
892    }
893
894    fn required_channel() -> BoxedFilter<(String,)> {
895        let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
896            .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
897        let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
898
899        splunk_channel_header
900            .and(splunk_channel_query_param)
901            .and_then(|header: Option<String>, query_param| async move {
902                header
903                    .or(query_param)
904                    .ok_or_else(|| Rejection::from(ApiError::MissingChannel))
905            })
906            .boxed()
907    }
908}
909/// Constructs one or more events from json-s coming from reader.
910/// If errors, it's done with input.
911struct EventIterator<'de, R: JsonRead<'de>> {
912    /// Remaining request with JSON events
913    deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
914    /// Count of HEC envelopes (not fan-out events) processed so far. Used both as the
915    /// `InvalidEventNumber` index in Splunk error responses (zero-indexed: subtract 1
916    /// for build-time errors, use as-is for parse errors that haven't entered build)
917    /// and as the "did we see any envelope?" check that gates the `NoData` error.
918    envelopes_processed: usize,
919    /// Optional channel from headers
920    channel: Option<Value>,
921    /// Default time
922    time: Time,
923    /// Remaining extracted default values
924    extractors: [DefaultExtractor; 4],
925    /// Event finalization
926    batch: Option<BatchNotifier>,
927    /// Splunk HEC Token for passthrough
928    token: Option<Arc<str>>,
929    /// Lognamespace to put the events in
930    log_namespace: LogNamespace,
931    /// handle to EventsReceived registry
932    events_received: Registered<EventsReceived>,
933    /// Optional second-stage decoder applied to the envelope payload after HEC
934    /// envelope parsing.
935    decoder: Option<Decoder>,
936}
937
938/// Intermediate struct to generate an `EventIterator`
939struct EventIteratorGenerator<'de, R: JsonRead<'de>> {
940    deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
941    channel: Option<String>,
942    batch: Option<BatchNotifier>,
943    token: Option<Arc<str>>,
944    log_namespace: LogNamespace,
945    events_received: Registered<EventsReceived>,
946    remote: Option<SocketAddr>,
947    remote_addr: Option<String>,
948    decoder: Option<Decoder>,
949}
950
951impl<'de, R: JsonRead<'de>> From<EventIteratorGenerator<'de, R>> for EventIterator<'de, R> {
952    fn from(f: EventIteratorGenerator<'de, R>) -> Self {
953        // The host field can collide with decoder-produced output in legacy namespace
954        // (its legacy key is `log_schema().host_key()`, typically `"host"`). When a
955        // decoder is configured, prefer the decoder's value over the envelope's so the
956        // user's parsed view wins on conflict. With no decoder configured, behavior is
957        // unchanged: every extractor uses `Overwrite`.
958        let extractor_strategy = if f.decoder.is_some() {
959            LegacyKeyStrategy::InsertIfEmpty
960        } else {
961            LegacyKeyStrategy::Overwrite
962        };
963        Self {
964            deserializer: f.deserializer,
965            envelopes_processed: 0,
966            channel: f.channel.map(Value::from),
967            time: Time::Now(Utc::now()),
968            extractors: [
969                // Extract the host field with the given priority:
970                // 1. The host field is present in the event payload
971                // 2. The x-forwarded-for header is present in the incoming request
972                // 3. Use the `remote`: SocketAddr value provided by warp
973                DefaultExtractor::new_with(
974                    "host",
975                    log_schema().host_key().cloned().into(),
976                    f.remote_addr
977                        .or_else(|| f.remote.map(|addr| addr.to_string()))
978                        .map(Value::from),
979                    f.log_namespace,
980                )
981                .with_legacy_key_strategy(extractor_strategy),
982                DefaultExtractor::new("index", OptionalValuePath::new(INDEX), f.log_namespace)
983                    .with_legacy_key_strategy(extractor_strategy),
984                DefaultExtractor::new("source", OptionalValuePath::new(SOURCE), f.log_namespace)
985                    .with_legacy_key_strategy(extractor_strategy),
986                DefaultExtractor::new(
987                    "sourcetype",
988                    OptionalValuePath::new(SOURCETYPE),
989                    f.log_namespace,
990                )
991                .with_legacy_key_strategy(extractor_strategy),
992            ],
993            batch: f.batch,
994            token: f.token,
995            log_namespace: f.log_namespace,
996            events_received: f.events_received,
997            decoder: f.decoder,
998        }
999    }
1000}
1001
1002impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
1003    /// Process the envelope's `time` field, updating `self.time` (sticky across envelopes
1004    /// when not explicitly provided).
1005    fn process_time(&mut self, json: &mut JsonValue) -> Result<(), Rejection> {
1006        let parsed_time = match json.get_mut("time").map(JsonValue::take) {
1007            Some(JsonValue::Number(time)) => Some(Some(time)),
1008            Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
1009            _ => None,
1010        };
1011
1012        match parsed_time {
1013            None => Ok(()),
1014            Some(Some(t)) => {
1015                if let Some(t) = t.as_u64() {
1016                    let time = parse_timestamp(t as i64).ok_or(ApiError::InvalidDataFormat {
1017                        event: self.envelopes_processed.saturating_sub(1),
1018                    })?;
1019                    self.time = Time::Provided(time);
1020                    Ok(())
1021                } else if let Some(t) = t.as_f64() {
1022                    self.time = Time::Provided(
1023                        Utc.timestamp_opt(
1024                            t.floor() as i64,
1025                            (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
1026                        )
1027                        .single()
1028                        .expect("invalid timestamp"),
1029                    );
1030                    Ok(())
1031                } else {
1032                    Err(ApiError::InvalidDataFormat {
1033                        event: self.envelopes_processed.saturating_sub(1),
1034                    }
1035                    .into())
1036                }
1037            }
1038            Some(None) => Err(ApiError::InvalidDataFormat {
1039                event: self.envelopes_processed.saturating_sub(1),
1040            }
1041            .into()),
1042        }
1043    }
1044
1045    fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
1046        self.envelopes_processed += 1;
1047        // Construct Event from parsed json event
1048        let mut log = match self.log_namespace {
1049            LogNamespace::Vector => self.build_log_vector(&mut json)?,
1050            LogNamespace::Legacy => self.build_log_legacy(&mut json)?,
1051        };
1052
1053        // Add source type
1054        self.log_namespace.insert_vector_metadata(
1055            &mut log,
1056            log_schema().source_type_key(),
1057            &owned_value_path!("source_type"),
1058            SplunkConfig::NAME,
1059        );
1060
1061        // Process channel field
1062        let channel_path = owned_value_path!(CHANNEL);
1063        if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
1064            self.log_namespace.insert_source_metadata(
1065                SplunkConfig::NAME,
1066                &mut log,
1067                Some(LegacyKey::Overwrite(&channel_path)),
1068                lookup::path!(CHANNEL),
1069                guid,
1070            );
1071        } else if let Some(guid) = self.channel.as_ref() {
1072            self.log_namespace.insert_source_metadata(
1073                SplunkConfig::NAME,
1074                &mut log,
1075                Some(LegacyKey::Overwrite(&channel_path)),
1076                lookup::path!(CHANNEL),
1077                guid.clone(),
1078            );
1079        }
1080
1081        // Process fields field
1082        if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
1083            for (key, value) in object {
1084                self.log_namespace.insert_source_metadata(
1085                    SplunkConfig::NAME,
1086                    &mut log,
1087                    Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))),
1088                    lookup::path!(key.as_str()),
1089                    value,
1090                );
1091            }
1092        }
1093
1094        self.process_time(&mut json)?;
1095
1096        // Add time field
1097        let timestamp = match self.time.clone() {
1098            Time::Provided(time) => time,
1099            Time::Now(time) => time,
1100        };
1101
1102        self.log_namespace.insert_source_metadata(
1103            SplunkConfig::NAME,
1104            &mut log,
1105            log_schema().timestamp_key().map(LegacyKey::Overwrite),
1106            lookup::path!("timestamp"),
1107            timestamp,
1108        );
1109
1110        // Extract default extracted fields
1111        for de in self.extractors.iter_mut() {
1112            de.extract(&mut log, &mut json);
1113        }
1114
1115        // Add passthrough token if present
1116        if let Some(token) = &self.token {
1117            log.metadata_mut().set_splunk_hec_token(Arc::clone(token));
1118        }
1119
1120        if let Some(batch) = self.batch.clone() {
1121            log = log.with_batch_notifier(&batch);
1122        }
1123
1124        Ok(log.into())
1125    }
1126
1127    /// Build an `EventMetadata` template from the current envelope context so
1128    /// that VRL decoders can read source-supplied values via `%`-prefixed paths
1129    /// before the decoder program executes.
1130    ///
1131    /// Peeks at the envelope `json` without consuming any fields (consumption
1132    /// happens later in `build_events_decoded`). Falls back to sticky extractor
1133    /// state for fields not present in the current envelope.
1134    fn build_vrl_metadata(&self, json: &JsonValue) -> EventMetadata {
1135        let mut metadata = EventMetadata::default();
1136
1137        // Splunk HEC token as a secret so VRL can read it via get_secret!()
1138        if let Some(token) = &self.token {
1139            metadata.set_splunk_hec_token(Arc::clone(token));
1140        }
1141
1142        // Envelope host/source/sourcetype/index: peek current value; fall back
1143        // to sticky extractor state.
1144        let fields: &[(&str, &str)] = &[
1145            ("host", "splunk_hec.host"),
1146            ("source", "splunk_hec.source"),
1147            ("sourcetype", "splunk_hec.sourcetype"),
1148            ("index", "splunk_hec.index"),
1149        ];
1150        for (json_key, meta_path) in fields {
1151            let val = json
1152                .get(json_key)
1153                .and_then(|v| v.as_str())
1154                .map(|s| Value::from(s.to_string()))
1155                .or_else(|| {
1156                    self.extractors
1157                        .iter()
1158                        .find(|e| e.field == *json_key)
1159                        .and_then(|e| e.value.clone())
1160                });
1161            if let Some(v) = val {
1162                metadata.value_mut().insert(*meta_path, v);
1163            }
1164        }
1165
1166        // Channel: envelope field or header default
1167        let channel = json
1168            .get("channel")
1169            .and_then(|v| v.as_str())
1170            .map(|s| Value::from(s.to_string()))
1171            .or_else(|| self.channel.clone());
1172        if let Some(ch) = channel {
1173            metadata.value_mut().insert("splunk_hec.channel", ch);
1174        }
1175
1176        metadata
1177    }
1178
1179    /// Decoded path: extract the envelope's `event` field as bytes (preserving shape),
1180    /// run it through the second-stage decoder, and overlay envelope metadata so that
1181    /// decoder-produced fields win on conflict. Returns the events along with a flag
1182    /// indicating whether the codec hit any errors (so the caller can refuse to ack
1183    /// a request that lost data).
1184    fn build_events_decoded(
1185        &mut self,
1186        mut json: JsonValue,
1187        decoder: Decoder,
1188    ) -> Result<(Vec<Event>, bool), Rejection> {
1189        self.envelopes_processed += 1;
1190        let event = self.validate_event_field(&json)?;
1191        // Strings are passed as raw bytes so decoders see the bare content
1192        // (e.g. a JSON string event containing `{"foo":"bar"}` arrives at the
1193        // decoder as `{"foo":"bar"}`, not `"{\"foo\":\"bar\"}"` ). All other
1194        // JSON values (objects, arrays, numbers, bools) are serialized to JSON.
1195        let payload = if let Some(s) = event.as_str() {
1196            s.as_bytes().to_vec()
1197        } else {
1198            match serde_json::to_vec(event) {
1199                Ok(bytes) => bytes,
1200                Err(error) => {
1201                    let error: vector_lib::Error = Box::new(error);
1202                    emit!(
1203                        vector_lib::codecs::internal_events::DecoderDeserializeError {
1204                            error: &error
1205                        }
1206                    );
1207                    emit!(ComponentEventsDropped::<UNINTENTIONAL> {
1208                        count: 1,
1209                        reason: "Failed to serialize event field to bytes.",
1210                    });
1211                    return Ok((vec![], true));
1212                }
1213            }
1214        };
1215
1216        self.process_time(&mut json)?;
1217
1218        // Always forward a fallback timestamp so events without an explicit envelope
1219        // `time` field still get one (matches the legacy /event behavior, which always
1220        // wrote a timestamp). `decode_message` uses `try_insert`, so a decoder-supplied
1221        // timestamp still wins on conflict.
1222        let fallback_time = match self.time {
1223            Time::Provided(t) | Time::Now(t) => t,
1224        };
1225
1226        // Build a metadata template so VRL decoders can read envelope context
1227        // via `%`-prefixed paths (e.g. `%splunk_hec.host`, `%vector.secrets.*`).
1228        // For non-VRL decoders `with_metadata_template` is a no-op.
1229        let decoder = decoder.with_metadata_template(self.build_vrl_metadata(&json));
1230
1231        let (decoded, had_decode_errors) = decode_payload(
1232            decoder,
1233            &payload,
1234            Some(fallback_time),
1235            true, // /event: write %splunk_hec.timestamp
1236            DecodePayloadContext {
1237                batch: &self.batch,
1238                log_namespace: self.log_namespace,
1239                events_received: &self.events_received,
1240                splunk_hec_token: self.token.as_ref(),
1241            },
1242        );
1243
1244        // Snapshot envelope metadata that has to apply uniformly to every decoded event.
1245        let envelope_channel: Option<Value> = match json.get_mut("channel").map(JsonValue::take) {
1246            Some(JsonValue::String(guid)) => Some(guid.into()),
1247            _ => None,
1248        };
1249        let envelope_fields: Option<serde_json::Map<String, JsonValue>> =
1250            match json.get_mut("fields").map(JsonValue::take) {
1251                Some(JsonValue::Object(object)) => Some(object),
1252                _ => None,
1253            };
1254        let channel_path = owned_value_path!(CHANNEL);
1255
1256        let mut out = Vec::with_capacity(decoded.len());
1257        for mut event in decoded {
1258            if let Event::Log(log) = &mut event {
1259                // channel: envelope value beats header default. Use `InsertIfEmpty`
1260                // for legacy event fields, and `try_insert` for the Vector metadata
1261                // path so a decoder-produced `%splunk_hec.channel` survives.
1262                if let Some(channel_val) = envelope_channel.clone().or_else(|| self.channel.clone())
1263                {
1264                    match self.log_namespace {
1265                        LogNamespace::Legacy => {
1266                            self.log_namespace.insert_source_metadata(
1267                                SplunkConfig::NAME,
1268                                log,
1269                                Some(LegacyKey::InsertIfEmpty(&channel_path)),
1270                                lookup::path!(CHANNEL),
1271                                channel_val,
1272                            );
1273                        }
1274                        LogNamespace::Vector => {
1275                            log.try_insert(
1276                                metadata_path!(SplunkConfig::NAME, CHANNEL),
1277                                channel_val,
1278                            );
1279                        }
1280                    }
1281                }
1282
1283                // Top-level envelope fields (host/index/source/sourcetype) must be
1284                // applied before `fields.*` so top-level beats `fields.*` when both
1285                // are present — matching the non-decoder runtime precedence. Both
1286                // still use InsertIfEmpty so the decoder's output wins over all
1287                // envelope metadata. Order: decoder > top-level > fields.
1288                for de in self.extractors.iter_mut() {
1289                    de.extract(log, &mut json);
1290                }
1291
1292                // fields: use `InsertIfEmpty` / `try_insert` to preserve decoder-wins
1293                // and extractor-wins semantics (fields fill only what neither the
1294                // decoder nor the top-level envelope keys have already set).
1295                if let Some(ref fields) = envelope_fields {
1296                    for (key, value) in fields {
1297                        match self.log_namespace {
1298                            LogNamespace::Legacy => {
1299                                self.log_namespace.insert_source_metadata(
1300                                    SplunkConfig::NAME,
1301                                    log,
1302                                    Some(LegacyKey::InsertIfEmpty(&owned_value_path!(
1303                                        key.as_str()
1304                                    ))),
1305                                    lookup::path!(key.as_str()),
1306                                    value.clone(),
1307                                );
1308                            }
1309                            LogNamespace::Vector => {
1310                                log.try_insert(
1311                                    metadata_path!(SplunkConfig::NAME, key.as_str()),
1312                                    value.clone(),
1313                                );
1314                            }
1315                        }
1316                    }
1317                }
1318            }
1319            // `splunk_hec_token` is set inside `decode_payload` so the metadata is
1320            // attached at the moment each event leaves the codec. Don't overwrite it
1321            // here.
1322            out.push(event);
1323        }
1324
1325        Ok((out, had_decode_errors))
1326    }
1327
1328    /// Validate the `event` field of a HEC envelope, returning a reference to the
1329    /// validated value or an error if it is missing, null, or (for string values)
1330    /// empty. Shared between the decoder path and the legacy/vector construction
1331    /// paths so they all enforce the same HEC protocol contract.
1332    fn validate_event_field<'a>(&self, json: &'a JsonValue) -> Result<&'a JsonValue, Rejection> {
1333        let event_idx = self.envelopes_processed.saturating_sub(1);
1334        match json.get("event") {
1335            None | Some(JsonValue::Null) => {
1336                Err(ApiError::MissingEventField { event: event_idx }.into())
1337            }
1338            Some(JsonValue::String(s)) if s.is_empty() => {
1339                Err(ApiError::EmptyEventField { event: event_idx }.into())
1340            }
1341            Some(event) => Ok(event),
1342        }
1343    }
1344
1345    /// Build the log event for the vector namespace.
1346    /// In this namespace the log event is created entirely from the event field.
1347    /// No renaming of the `line` field is done.
1348    fn build_log_vector(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
1349        let event: Value = self.validate_event_field(json)?.into();
1350        let mut log = LogEvent::from(event);
1351
1352        // EstimatedJsonSizeOf must be calculated before enrichment
1353        self.events_received
1354            .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1355
1356        // The timestamp is extracted from the message for the Legacy namespace.
1357        self.log_namespace.insert_vector_metadata(
1358            &mut log,
1359            log_schema().timestamp_key(),
1360            lookup::path!("ingest_timestamp"),
1361            chrono::Utc::now(),
1362        );
1363
1364        Ok(log)
1365    }
1366
1367    /// Build the log event for the legacy namespace.
1368    /// If the event is a string, or the event contains a field called `line` that is a string
1369    /// (the docker splunk logger places the message in the event.line field) that string
1370    /// is placed in the message field.
1371    fn build_log_legacy(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
1372        // validate_event_field checks for missing/null/empty-string
1373        self.validate_event_field(json)?;
1374        let mut log = LogEvent::default();
1375        match json["event"].take() {
1376            JsonValue::String(string) => {
1377                log.maybe_insert(log_schema().message_key_target_path(), string);
1378            }
1379            JsonValue::Object(mut object) => {
1380                if object.is_empty() {
1381                    return Err(ApiError::EmptyEventField {
1382                        event: self.envelopes_processed.saturating_sub(1),
1383                    }
1384                    .into());
1385                }
1386
1387                // Add 'line' value as 'event::schema().message_key'
1388                if let Some(line) = object.remove("line") {
1389                    match line {
1390                        // This don't quite fit the meaning of a event::schema().message_key
1391                        JsonValue::Array(_) | JsonValue::Object(_) => {
1392                            log.insert(event_path!("line"), line);
1393                        }
1394                        _ => {
1395                            log.maybe_insert(log_schema().message_key_target_path(), line);
1396                        }
1397                    }
1398                }
1399
1400                for (key, value) in object {
1401                    log.insert(event_path!(key.as_str()), value);
1402                }
1403            }
1404            _ => {
1405                return Err(ApiError::InvalidDataFormat {
1406                    event: self.envelopes_processed.saturating_sub(1),
1407                }
1408                .into());
1409            }
1410        }
1411
1412        // EstimatedJsonSizeOf must be calculated before enrichment
1413        self.events_received
1414            .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1415
1416        Ok(log)
1417    }
1418}
1419
1420impl<'de, R: JsonRead<'de>> Iterator for EventIterator<'de, R> {
1421    /// Each item is `(events, had_decode_errors)` for one envelope - the boolean is
1422    /// only ever `true` in the decoder path. Callers OR these together across the
1423    /// whole request to decide whether ack registration is safe.
1424    type Item = Result<(Vec<Event>, bool), Rejection>;
1425
1426    fn next(&mut self) -> Option<Self::Item> {
1427        match self.deserializer.next() {
1428            Some(Ok(json)) => {
1429                let result = if let Some(decoder) = self.decoder.clone() {
1430                    self.build_events_decoded(json, decoder)
1431                } else {
1432                    self.build_event(json).map(|event| (vec![event], false))
1433                };
1434                Some(result)
1435            }
1436            None => {
1437                if self.envelopes_processed == 0 {
1438                    Some(Err(ApiError::NoData.into()))
1439                } else {
1440                    None
1441                }
1442            }
1443            Some(Err(error)) => {
1444                emit!(SplunkHecRequestBodyInvalidError {
1445                    error: error.into()
1446                });
1447                // The deserializer failed to parse the next envelope, so the failing
1448                // envelope's index is the count of envelopes already processed (not
1449                // `envelopes_processed - 1`, which is what build-time errors use).
1450                Some(Err(ApiError::InvalidDataFormat {
1451                    event: self.envelopes_processed,
1452                }
1453                .into()))
1454            }
1455        }
1456    }
1457}
1458
1459struct DecodePayloadContext<'a> {
1460    batch: &'a Option<BatchNotifier>,
1461    log_namespace: LogNamespace,
1462    events_received: &'a Registered<EventsReceived>,
1463    splunk_hec_token: Option<&'a Arc<str>>,
1464}
1465
1466/// Run a payload through the configured `framing` + `decoding` codec.
1467///
1468/// Returns the decoded events along with a flag indicating whether any decode error
1469/// occurred. The shared `crate::sources::util::decode_message` helper swallows
1470/// decode errors silently, which is fine for sources without ack semantics, but for
1471/// `splunk_hec` we need to know about errors so we can refuse to acknowledge a
1472/// request that lost data mid-stream.
1473///
1474/// On each decoded event this helper sets `source_type`, `vector.ingest_timestamp`,
1475/// the optional `splunk_hec.timestamp` (only when `set_source_timestamp` is `true`,
1476/// i.e. for the `/event` endpoint which carries an HEC envelope `time` field), and
1477/// the optional Splunk HEC token. Pass `set_source_timestamp = false` for `/raw`,
1478/// which has no envelope timestamp and should only receive `%vector.ingest_timestamp`.
1479fn decode_payload(
1480    mut decoder: Decoder,
1481    payload: &[u8],
1482    fallback_timestamp: Option<DateTime<Utc>>,
1483    set_source_timestamp: bool,
1484    ctx: DecodePayloadContext<'_>,
1485) -> (Vec<Event>, bool) {
1486    let DecodePayloadContext {
1487        batch,
1488        log_namespace,
1489        events_received,
1490        splunk_hec_token,
1491    } = ctx;
1492    let mut buffer = BytesMut::with_capacity(payload.len());
1493    buffer.extend_from_slice(payload);
1494    let now = Utc::now();
1495    let mut events: Vec<Event> = Vec::new();
1496    let mut had_errors = false;
1497
1498    loop {
1499        match decoder.decode_eof(&mut buffer) {
1500            Ok(Some((decoded, _))) => {
1501                for mut event in decoded {
1502                    if let Event::Log(log) = &mut event {
1503                        log_namespace.insert_vector_metadata(
1504                            log,
1505                            log_schema().source_type_key(),
1506                            lookup::path!("source_type"),
1507                            Bytes::from_static(SplunkConfig::NAME.as_bytes()),
1508                        );
1509                        match log_namespace {
1510                            LogNamespace::Vector => {
1511                                // Only write %splunk_hec.timestamp for the /event
1512                                // endpoint, which has a real HEC envelope timestamp.
1513                                // /raw has no envelope time and should only get the
1514                                // standard %vector.ingest_timestamp below.
1515                                if set_source_timestamp && let Some(timestamp) = fallback_timestamp
1516                                {
1517                                    log.try_insert(
1518                                        metadata_path!(SplunkConfig::NAME, "timestamp"),
1519                                        timestamp,
1520                                    );
1521                                }
1522                                log.insert(metadata_path!("vector", "ingest_timestamp"), now);
1523                            }
1524                            LogNamespace::Legacy => {
1525                                if let Some(timestamp) = fallback_timestamp
1526                                    && let Some(timestamp_key) = log_schema().timestamp_key()
1527                                {
1528                                    log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
1529                                }
1530                            }
1531                        }
1532                    }
1533                    if let Some(token) = splunk_hec_token {
1534                        event.metadata_mut().set_splunk_hec_token(Arc::clone(token));
1535                    }
1536                    events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
1537                    events.push(event.with_batch_notifier_option(batch));
1538                }
1539            }
1540            Ok(None) => break,
1541            Err(error) => {
1542                // The decoder logs its own error; record that one occurred so the
1543                // caller can refuse to ack a request that lost data.
1544                had_errors = true;
1545                if !error.can_continue() {
1546                    break;
1547                }
1548            }
1549        }
1550    }
1551
1552    (events, had_errors)
1553}
1554
1555/// Parse a `i64` unix timestamp that can either be in seconds, milliseconds or
1556/// nanoseconds.
1557///
1558/// This attempts to parse timestamps based on what cutoff range they fall into.
1559/// For seconds to be parsed the timestamp must be less than the unix epoch of
1560/// the year `2400`. For this to parse milliseconds the time must be smaller
1561/// than the year `10,000` in unix epoch milliseconds. If the value is larger
1562/// than both we attempt to parse it as nanoseconds.
1563///
1564/// Returns `None` if `t` is negative.
1565fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
1566    // Utc.ymd(2400, 1, 1).and_hms(0,0,0).timestamp();
1567    const SEC_CUTOFF: i64 = 13569465600;
1568    // Utc.ymd(10_000, 1, 1).and_hms(0,0,0).timestamp_millis();
1569    const MILLISEC_CUTOFF: i64 = 253402300800000;
1570
1571    // Timestamps can't be negative!
1572    if t < 0 {
1573        return None;
1574    }
1575
1576    let ts = if t < SEC_CUTOFF {
1577        Utc.timestamp_opt(t, 0).single().expect("invalid timestamp")
1578    } else if t < MILLISEC_CUTOFF {
1579        Utc.timestamp_millis_opt(t)
1580            .single()
1581            .expect("invalid timestamp")
1582    } else {
1583        Utc.timestamp_nanos(t)
1584    };
1585
1586    Some(ts)
1587}
1588
1589/// How to write the legacy key when `DefaultExtractor::extract` applies a value.
1590#[derive(Clone, Copy)]
1591enum LegacyKeyStrategy {
1592    Overwrite,
1593    InsertIfEmpty,
1594}
1595
1596/// Maintains last known extracted value of field and uses it in the absence of field.
1597struct DefaultExtractor {
1598    field: &'static str,
1599    to_field: OptionalValuePath,
1600    value: Option<Value>,
1601    log_namespace: LogNamespace,
1602    legacy_key_strategy: LegacyKeyStrategy,
1603}
1604
1605impl DefaultExtractor {
1606    const fn new(
1607        field: &'static str,
1608        to_field: OptionalValuePath,
1609        log_namespace: LogNamespace,
1610    ) -> Self {
1611        DefaultExtractor {
1612            field,
1613            to_field,
1614            value: None,
1615            log_namespace,
1616            legacy_key_strategy: LegacyKeyStrategy::Overwrite,
1617        }
1618    }
1619
1620    fn new_with(
1621        field: &'static str,
1622        to_field: OptionalValuePath,
1623        value: impl Into<Option<Value>>,
1624        log_namespace: LogNamespace,
1625    ) -> Self {
1626        DefaultExtractor {
1627            field,
1628            to_field,
1629            value: value.into(),
1630            log_namespace,
1631            legacy_key_strategy: LegacyKeyStrategy::Overwrite,
1632        }
1633    }
1634
1635    /// Set the strategy used when writing this extractor's legacy key. Defaults to
1636    /// `Overwrite`; the decoder path uses `InsertIfEmpty` for fields that may collide
1637    /// with decoder-produced output (e.g. `host`).
1638    const fn with_legacy_key_strategy(mut self, strategy: LegacyKeyStrategy) -> Self {
1639        self.legacy_key_strategy = strategy;
1640        self
1641    }
1642
1643    fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
1644        // Process json_field
1645        if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
1646            self.value = Some(new_value.into());
1647        }
1648
1649        // Add data field
1650        if let Some(index) = self.value.as_ref()
1651            && let Some(metadata_key) = self.to_field.path.as_ref()
1652        {
1653            // For Vector namespace + InsertIfEmpty (decoder mode): check the metadata
1654            // value tree before inserting so VRL-produced values aren't overwritten.
1655            // `insert_source_metadata` for Vector ns always calls `insert`, not
1656            // `try_insert`, so we replicate its path construction here.
1657            if matches!(self.log_namespace, LogNamespace::Vector)
1658                && matches!(self.legacy_key_strategy, LegacyKeyStrategy::InsertIfEmpty)
1659            {
1660                log.try_insert(
1661                    (
1662                        PathPrefix::Metadata,
1663                        lookup::path!(SplunkConfig::NAME).concat(metadata_key),
1664                    ),
1665                    index.clone(),
1666                );
1667            } else {
1668                let legacy_key = match self.legacy_key_strategy {
1669                    LegacyKeyStrategy::Overwrite => LegacyKey::Overwrite(metadata_key),
1670                    LegacyKeyStrategy::InsertIfEmpty => LegacyKey::InsertIfEmpty(metadata_key),
1671                };
1672                self.log_namespace.insert_source_metadata(
1673                    SplunkConfig::NAME,
1674                    log,
1675                    Some(legacy_key),
1676                    &self.to_field.path.clone().unwrap_or(owned_value_path!("")),
1677                    index.clone(),
1678                );
1679            }
1680        }
1681    }
1682}
1683
1684/// For tracking origin of the timestamp
1685#[derive(Clone, Debug)]
1686enum Time {
1687    /// Backup
1688    Now(DateTime<Utc>),
1689    /// Provided in the request
1690    Provided(DateTime<Utc>),
1691}
1692
1693/// Creates events from a raw HEC request body.
1694///
1695/// Without a decoder, returns a single event whose message is the (decompressed)
1696/// request body. With a decoder, the body is fed through the configured framing +
1697/// decoding pipeline and one or more events are returned. The boolean second tuple
1698/// element is `true` when the decoder hit any (recoverable or non-recoverable)
1699/// errors during the request, so the caller can refuse to acknowledge the request.
1700#[allow(clippy::too_many_arguments)]
1701fn raw_event(
1702    bytes: Bytes,
1703    gzip: bool,
1704    channel: String,
1705    remote: Option<SocketAddr>,
1706    xff: Option<String>,
1707    batch: Option<BatchNotifier>,
1708    log_namespace: LogNamespace,
1709    events_received: &Registered<EventsReceived>,
1710    decoder: Option<Decoder>,
1711    splunk_hec_token: Option<Arc<str>>,
1712) -> Result<(Vec<Event>, bool), Rejection> {
1713    // Process gzip
1714    let body_bytes: Bytes = if gzip {
1715        let mut data = Vec::new();
1716        match MultiGzDecoder::new(bytes.reader()).read_to_end(&mut data) {
1717            Ok(0) => return Err(ApiError::NoData.into()),
1718            Ok(_) => Bytes::from(data),
1719            Err(error) => {
1720                emit!(SplunkHecRequestBodyInvalidError { error });
1721                return Err(ApiError::InvalidDataFormat { event: 0 }.into());
1722            }
1723        }
1724    } else {
1725        bytes
1726    };
1727
1728    // host-field priority for raw endpoint:
1729    // - x-forwarded-for is set to `host` field first, if present. If not present:
1730    // - set remote addr to host field
1731    let host = if let Some(remote_address) = xff {
1732        Some(remote_address)
1733    } else {
1734        remote.map(|remote| remote.to_string())
1735    };
1736
1737    let decoder_in_use = decoder.is_some();
1738    let (mut events, had_decode_errors): (Vec<Event>, bool) = if let Some(decoder) = decoder {
1739        // Build a metadata template so VRL decoders can read raw-endpoint context
1740        // via `%`-prefixed paths (e.g. `%splunk_hec.channel`, `%splunk_hec.host`,
1741        // `%vector.secrets.splunk_hec_token`). No-op for non-VRL decoders.
1742        let decoder = {
1743            let mut meta = EventMetadata::default();
1744            if let Some(token) = splunk_hec_token.as_ref() {
1745                meta.set_splunk_hec_token(Arc::clone(token));
1746            }
1747            if let Some(ref h) = host {
1748                meta.value_mut().insert("splunk_hec.host", h.clone());
1749            }
1750            meta.value_mut()
1751                .insert("splunk_hec.channel", channel.clone());
1752            decoder.with_metadata_template(meta)
1753        };
1754
1755        // Pass ingest time as the fallback timestamp so decoded events always have
1756        // one - matches `insert_standard_vector_source_metadata` in the legacy raw
1757        // path. `decode_payload` uses `try_insert`, so a decoder-supplied timestamp
1758        // still wins on conflict.
1759        decode_payload(
1760            decoder,
1761            &body_bytes,
1762            Some(Utc::now()),
1763            false, // /raw: no HEC envelope timestamp; only %vector.ingest_timestamp
1764            DecodePayloadContext {
1765                batch: &batch,
1766                log_namespace,
1767                events_received,
1768                splunk_hec_token: splunk_hec_token.as_ref(),
1769            },
1770        )
1771    } else {
1772        let message: Value = body_bytes.into();
1773        let mut log = match log_namespace {
1774            LogNamespace::Vector => LogEvent::from(message),
1775            LogNamespace::Legacy => {
1776                let mut log = LogEvent::default();
1777                log.maybe_insert(log_schema().message_key_target_path(), message);
1778                log
1779            }
1780        };
1781        // We need to calculate the estimated json size of the event BEFORE enrichment.
1782        events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1783
1784        log_namespace.insert_standard_vector_source_metadata(
1785            &mut log,
1786            SplunkConfig::NAME,
1787            Utc::now(),
1788        );
1789
1790        if let Some(batch) = batch.clone() {
1791            log = log.with_batch_notifier(&batch);
1792        }
1793        (vec![Event::from(log)], false)
1794    };
1795
1796    let channel_path = owned_value_path!(CHANNEL);
1797    for event in &mut events {
1798        if let Event::Log(log) = event {
1799            // With a decoder configured, defer to anything it produced at the legacy
1800            // When a decoder is in use, preserve decoder-wins semantics for Vector ns
1801            // by using `try_insert` on the metadata path (insert_source_metadata for
1802            // Vector ns always overwrites). Without a decoder the log is freshly
1803            // constructed so overwriting is correct.
1804            if decoder_in_use && matches!(log_namespace, LogNamespace::Vector) {
1805                log.try_insert(metadata_path!(SplunkConfig::NAME, CHANNEL), channel.clone());
1806                if let Some(ref h) = host {
1807                    log.try_insert(metadata_path!(SplunkConfig::NAME, "host"), h.clone());
1808                }
1809            } else {
1810                let channel_legacy_key = if decoder_in_use {
1811                    LegacyKey::InsertIfEmpty(&channel_path)
1812                } else {
1813                    LegacyKey::Overwrite(&channel_path)
1814                };
1815                log_namespace.insert_source_metadata(
1816                    SplunkConfig::NAME,
1817                    log,
1818                    Some(channel_legacy_key),
1819                    lookup::path!(CHANNEL),
1820                    channel.clone(),
1821                );
1822                if let Some(ref host) = host {
1823                    log_namespace.insert_source_metadata(
1824                        SplunkConfig::NAME,
1825                        log,
1826                        log_schema().host_key().map(LegacyKey::InsertIfEmpty),
1827                        lookup::path!("host"),
1828                        host.clone(),
1829                    );
1830                }
1831            }
1832        }
1833    }
1834
1835    Ok((events, had_decode_errors))
1836}
1837
1838#[derive(Clone, Copy, Debug, Snafu)]
1839pub(crate) enum ApiError {
1840    MissingAuthorization,
1841    InvalidAuthorization,
1842    UnsupportedEncoding,
1843    UnsupportedContentType,
1844    MissingChannel,
1845    NoData,
1846    InvalidDataFormat { event: usize },
1847    ServerShutdown,
1848    EmptyEventField { event: usize },
1849    MissingEventField { event: usize },
1850    BadRequest,
1851    ServiceUnavailable,
1852    AckIsDisabled,
1853}
1854
1855impl warp::reject::Reject for ApiError {}
1856
1857/// Cached bodies for common responses
1858mod splunk_response {
1859    use serde::Serialize;
1860
1861    // https://docs.splunk.com/Documentation/Splunk/8.2.3/Data/TroubleshootHTTPEventCollector#Possible_error_codes
1862    pub enum HecStatusCode {
1863        Success = 0,
1864        TokenIsRequired = 2,
1865        InvalidAuthorization = 3,
1866        NoData = 5,
1867        InvalidDataFormat = 6,
1868        ServerIsBusy = 9,
1869        DataChannelIsMissing = 10,
1870        EventFieldIsRequired = 12,
1871        EventFieldCannotBeBlank = 13,
1872        AckIsDisabled = 14,
1873    }
1874
1875    #[derive(Serialize)]
1876    pub enum HecResponseMetadata {
1877        #[serde(rename = "ackId")]
1878        AckId(u64),
1879        #[serde(rename = "invalid-event-number")]
1880        InvalidEventNumber(usize),
1881    }
1882
1883    #[derive(Serialize)]
1884    pub struct HecResponse {
1885        text: &'static str,
1886        code: u8,
1887        #[serde(skip_serializing_if = "Option::is_none", flatten)]
1888        pub metadata: Option<HecResponseMetadata>,
1889    }
1890
1891    impl HecResponse {
1892        pub const fn new(code: HecStatusCode) -> Self {
1893            let text = match code {
1894                HecStatusCode::Success => "Success",
1895                HecStatusCode::TokenIsRequired => "Token is required",
1896                HecStatusCode::InvalidAuthorization => "Invalid authorization",
1897                HecStatusCode::NoData => "No data",
1898                HecStatusCode::InvalidDataFormat => "Invalid data format",
1899                HecStatusCode::DataChannelIsMissing => "Data channel is missing",
1900                HecStatusCode::EventFieldIsRequired => "Event field is required",
1901                HecStatusCode::EventFieldCannotBeBlank => "Event field cannot be blank",
1902                HecStatusCode::ServerIsBusy => "Server is busy",
1903                HecStatusCode::AckIsDisabled => "Ack is disabled",
1904            };
1905
1906            Self {
1907                text,
1908                code: code as u8,
1909                metadata: None,
1910            }
1911        }
1912
1913        pub const fn with_metadata(mut self, metadata: HecResponseMetadata) -> Self {
1914            self.metadata = Some(metadata);
1915            self
1916        }
1917    }
1918
1919    pub const INVALID_AUTHORIZATION: HecResponse =
1920        HecResponse::new(HecStatusCode::InvalidAuthorization);
1921    pub const TOKEN_IS_REQUIRED: HecResponse = HecResponse::new(HecStatusCode::TokenIsRequired);
1922    pub const NO_DATA: HecResponse = HecResponse::new(HecStatusCode::NoData);
1923    pub const SUCCESS: HecResponse = HecResponse::new(HecStatusCode::Success);
1924    pub const SERVER_IS_BUSY: HecResponse = HecResponse::new(HecStatusCode::ServerIsBusy);
1925    pub const NO_CHANNEL: HecResponse = HecResponse::new(HecStatusCode::DataChannelIsMissing);
1926    pub const ACK_IS_DISABLED: HecResponse = HecResponse::new(HecStatusCode::AckIsDisabled);
1927}
1928
1929async fn register_ack(
1930    idx_ack: Option<Arc<IndexerAcknowledgement>>,
1931    receiver: Option<BatchStatusReceiver>,
1932    channel: Option<String>,
1933) -> Result<Option<u64>, Rejection> {
1934    match (idx_ack, receiver, channel) {
1935        (Some(ack), Some(rx), Some(ch)) => Ok(Some(ack.get_ack_id_from_channel(ch, rx).await?)),
1936        _ => Ok(None),
1937    }
1938}
1939
1940fn finish_ok(maybe_ack_id: Option<u64>) -> Response {
1941    let body = if let Some(ack_id) = maybe_ack_id {
1942        HecResponse::new(HecStatusCode::Success).with_metadata(HecResponseMetadata::AckId(ack_id))
1943    } else {
1944        splunk_response::SUCCESS
1945    };
1946    response_json(StatusCode::OK, body)
1947}
1948
1949fn response_plain(code: StatusCode, msg: &'static str) -> Response {
1950    warp::reply::with_status(
1951        warp::reply::with_header(msg, http::header::CONTENT_TYPE, "text/plain; charset=utf-8"),
1952        code,
1953    )
1954    .into_response()
1955}
1956
1957async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
1958    if let Some(&error) = rejection.find::<ApiError>() {
1959        emit!(SplunkHecRequestError { error });
1960        Ok((match error {
1961            ApiError::MissingAuthorization => {
1962                response_json(StatusCode::UNAUTHORIZED, splunk_response::TOKEN_IS_REQUIRED)
1963            }
1964            ApiError::InvalidAuthorization => response_json(
1965                StatusCode::UNAUTHORIZED,
1966                splunk_response::INVALID_AUTHORIZATION,
1967            ),
1968            ApiError::UnsupportedEncoding => empty_response(StatusCode::UNSUPPORTED_MEDIA_TYPE),
1969            ApiError::UnsupportedContentType => response_plain(
1970                StatusCode::UNSUPPORTED_MEDIA_TYPE,
1971                "The request's content-type is not supported",
1972            ),
1973            ApiError::MissingChannel => {
1974                response_json(StatusCode::BAD_REQUEST, splunk_response::NO_CHANNEL)
1975            }
1976            ApiError::NoData => response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA),
1977            ApiError::ServerShutdown => empty_response(StatusCode::SERVICE_UNAVAILABLE),
1978            ApiError::InvalidDataFormat { event } => response_json(
1979                StatusCode::BAD_REQUEST,
1980                HecResponse::new(HecStatusCode::InvalidDataFormat)
1981                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1982            ),
1983            ApiError::EmptyEventField { event } => response_json(
1984                StatusCode::BAD_REQUEST,
1985                HecResponse::new(HecStatusCode::EventFieldCannotBeBlank)
1986                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1987            ),
1988            ApiError::MissingEventField { event } => response_json(
1989                StatusCode::BAD_REQUEST,
1990                HecResponse::new(HecStatusCode::EventFieldIsRequired)
1991                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1992            ),
1993            ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
1994            ApiError::ServiceUnavailable => response_json(
1995                StatusCode::SERVICE_UNAVAILABLE,
1996                splunk_response::SERVER_IS_BUSY,
1997            ),
1998            ApiError::AckIsDisabled => {
1999                response_json(StatusCode::BAD_REQUEST, splunk_response::ACK_IS_DISABLED)
2000            }
2001        },))
2002    } else {
2003        Err(rejection)
2004    }
2005}
2006
2007/// Response without body
2008fn empty_response(code: StatusCode) -> Response {
2009    let mut res = Response::default();
2010    *res.status_mut() = code;
2011    res
2012}
2013
2014/// Response with body
2015fn response_json(code: StatusCode, body: impl Serialize) -> Response {
2016    warp::reply::with_status(warp::reply::json(&body), code).into_response()
2017}
2018
2019#[cfg(feature = "sinks-splunk_hec")]
2020#[cfg(test)]
2021mod tests {
2022    use std::{net::SocketAddr, num::NonZeroU64};
2023
2024    use chrono::{TimeZone, Utc};
2025    use futures_util::Stream;
2026    use http::Uri;
2027    use reqwest::{RequestBuilder, Response};
2028    use serde::Deserialize;
2029    use vector_lib::{
2030        codecs::{
2031            BytesDecoderConfig, JsonSerializerConfig, TextSerializerConfig,
2032            decoding::{
2033                DeserializerConfig,
2034                format::{VrlDeserializerConfig, VrlDeserializerOptions},
2035            },
2036        },
2037        event::EventStatus,
2038        schema::Definition,
2039        sensitive_string::SensitiveString,
2040    };
2041    use vrl::path::PathPrefix;
2042
2043    use super::*;
2044    use crate::{
2045        SourceSender,
2046        codecs::{DecodingConfig, EncodingConfig},
2047        components::validation::prelude::*,
2048        config::{SinkConfig, SinkContext, SourceConfig, SourceContext, log_schema},
2049        event::{Event, LogEvent},
2050        sinks::{
2051            Healthcheck, VectorSink,
2052            splunk_hec::logs::config::HecLogsSinkConfig,
2053            util::{BatchConfig, Compression, TowerRequestConfig},
2054        },
2055        sources::splunk_hec::acknowledgements::{HecAckStatusRequest, HecAckStatusResponse},
2056        test_util::{
2057            addr::{PortGuard, next_addr},
2058            collect_n,
2059            components::{
2060                COMPONENT_ERROR_TAGS, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance,
2061                assert_source_error,
2062            },
2063            wait_for_tcp,
2064        },
2065    };
2066
2067    #[test]
2068    fn generate_config() {
2069        crate::test_util::test_generate_config::<SplunkConfig>();
2070    }
2071
2072    /// Splunk token
2073    const TOKEN: &str = "token";
2074    const VALID_TOKENS: &[&str; 2] = &[TOKEN, "secondary-token"];
2075
2076    async fn source(
2077        acknowledgements: Option<HecAcknowledgementsConfig>,
2078    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
2079        source_with(Some(TOKEN.to_owned().into()), None, acknowledgements, false).await
2080    }
2081
2082    async fn source_with(
2083        token: Option<SensitiveString>,
2084        valid_tokens: Option<&[&str]>,
2085        acknowledgements: Option<HecAcknowledgementsConfig>,
2086        store_hec_token: bool,
2087    ) -> (
2088        impl Stream<Item = Event> + Unpin + use<>,
2089        SocketAddr,
2090        PortGuard,
2091    ) {
2092        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
2093        let (_guard, address) = next_addr();
2094        let valid_tokens =
2095            valid_tokens.map(|tokens| tokens.iter().map(|v| v.to_string().into()).collect());
2096        let cx = SourceContext::new_test(sender, None);
2097        tokio::spawn(async move {
2098            SplunkConfig {
2099                address,
2100                token,
2101                valid_tokens,
2102                tls: None,
2103                acknowledgements: acknowledgements.unwrap_or_default(),
2104                store_hec_token,
2105                log_namespace: None,
2106                keepalive: Default::default(),
2107                event: CodecConfig::default(),
2108                raw: CodecConfig::default(),
2109            }
2110            .build(cx)
2111            .await
2112            .unwrap()
2113            .await
2114            .unwrap()
2115        });
2116        wait_for_tcp(address).await;
2117        (recv, address, _guard)
2118    }
2119
2120    async fn sink(
2121        address: SocketAddr,
2122        encoding: EncodingConfig,
2123        compression: Compression,
2124    ) -> (VectorSink, Healthcheck) {
2125        HecLogsSinkConfig {
2126            default_token: TOKEN.to_owned().into(),
2127            endpoint: format!("http://{address}"),
2128            host_key: None,
2129            indexed_fields: vec![],
2130            index: None,
2131            sourcetype: None,
2132            source: None,
2133            encoding,
2134            compression,
2135            batch: BatchConfig::default(),
2136            request: TowerRequestConfig::default(),
2137            tls: None,
2138            acknowledgements: Default::default(),
2139            timestamp_nanos_key: None,
2140            timestamp_key: None,
2141            auto_extract_timestamp: None,
2142            endpoint_target: Default::default(),
2143        }
2144        .build(SinkContext::default())
2145        .await
2146        .unwrap()
2147    }
2148
2149    async fn start(
2150        encoding: EncodingConfig,
2151        compression: Compression,
2152        acknowledgements: Option<HecAcknowledgementsConfig>,
2153    ) -> (VectorSink, impl Stream<Item = Event> + Unpin) {
2154        let (source, address, _guard) = source(acknowledgements).await;
2155        let (sink, health) = sink(address, encoding, compression).await;
2156        assert!(health.await.is_ok());
2157        (sink, source)
2158    }
2159
2160    async fn channel_n(
2161        messages: Vec<impl Into<String> + Send + 'static>,
2162        sink: VectorSink,
2163        source: impl Stream<Item = Event> + Unpin,
2164    ) -> Vec<Event> {
2165        let n = messages.len();
2166
2167        tokio::spawn(async move {
2168            sink.run_events(
2169                messages
2170                    .into_iter()
2171                    .map(|s| Event::Log(LogEvent::from(s.into()))),
2172            )
2173            .await
2174            .unwrap();
2175        });
2176
2177        let events = collect_n(source, n).await;
2178        assert_eq!(n, events.len());
2179
2180        events
2181    }
2182
2183    #[derive(Clone, Copy, Debug)]
2184    enum Channel<'a> {
2185        Header(&'a str),
2186        QueryParam(&'a str),
2187    }
2188
2189    #[derive(Default)]
2190    struct SendWithOpts<'a> {
2191        channel: Option<Channel<'a>>,
2192        forwarded_for: Option<String>,
2193    }
2194
2195    async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
2196        let channel = Channel::Header("channel");
2197        let options = SendWithOpts {
2198            channel: Some(channel),
2199            forwarded_for: None,
2200        };
2201        send_with(address, api, message, TOKEN, &options).await
2202    }
2203
2204    fn build_request(
2205        address: SocketAddr,
2206        api: &str,
2207        message: &str,
2208        token: &str,
2209        opts: &SendWithOpts<'_>,
2210    ) -> RequestBuilder {
2211        let mut b = reqwest::Client::new()
2212            .post(format!("http://{address}/{api}"))
2213            .header("Authorization", format!("Splunk {token}"));
2214
2215        b = match opts.channel {
2216            Some(c) => match c {
2217                Channel::Header(v) => b.header("x-splunk-request-channel", v),
2218                Channel::QueryParam(v) => b.query(&[("channel", v)]),
2219            },
2220            None => b,
2221        };
2222
2223        b = match &opts.forwarded_for {
2224            Some(f) => b.header("X-Forwarded-For", f),
2225            None => b,
2226        };
2227
2228        b.body(message.to_owned())
2229    }
2230
2231    async fn send_with(
2232        address: SocketAddr,
2233        api: &str,
2234        message: &str,
2235        token: &str,
2236        opts: &SendWithOpts<'_>,
2237    ) -> u16 {
2238        let b = build_request(address, api, message, token, opts);
2239        b.send().await.unwrap().status().as_u16()
2240    }
2241
2242    async fn send_with_response(
2243        address: SocketAddr,
2244        api: &str,
2245        message: &str,
2246        token: &str,
2247        opts: &SendWithOpts<'_>,
2248    ) -> Response {
2249        let b = build_request(address, api, message, token, opts);
2250        b.send().await.unwrap()
2251    }
2252
2253    #[tokio::test]
2254    async fn no_compression_text_event() {
2255        let message = "gzip_text_event";
2256        let (sink, source) = start(
2257            TextSerializerConfig::default().into(),
2258            Compression::None,
2259            None,
2260        )
2261        .await;
2262
2263        let event = channel_n(vec![message], sink, source).await.remove(0);
2264
2265        assert_eq!(
2266            event.as_log()[log_schema().message_key().unwrap().to_string()],
2267            message.into()
2268        );
2269        assert!(event.as_log().get_timestamp().is_some());
2270        assert_eq!(
2271            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2272            "splunk_hec".into()
2273        );
2274        assert!(event.metadata().splunk_hec_token().is_none());
2275    }
2276
2277    #[tokio::test]
2278    async fn one_simple_text_event() {
2279        let message = "one_simple_text_event";
2280        let (sink, source) = start(
2281            TextSerializerConfig::default().into(),
2282            Compression::gzip_default(),
2283            None,
2284        )
2285        .await;
2286
2287        let event = channel_n(vec![message], sink, source).await.remove(0);
2288
2289        assert_eq!(
2290            event.as_log()[log_schema().message_key().unwrap().to_string()],
2291            message.into()
2292        );
2293        assert!(event.as_log().get_timestamp().is_some());
2294        assert_eq!(
2295            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2296            "splunk_hec".into()
2297        );
2298        assert!(event.metadata().splunk_hec_token().is_none());
2299    }
2300
2301    #[tokio::test]
2302    async fn multiple_simple_text_event() {
2303        let n = 200;
2304        let (sink, source) = start(
2305            TextSerializerConfig::default().into(),
2306            Compression::None,
2307            None,
2308        )
2309        .await;
2310
2311        let messages = (0..n)
2312            .map(|i| format!("multiple_simple_text_event_{i}"))
2313            .collect::<Vec<_>>();
2314        let events = channel_n(messages.clone(), sink, source).await;
2315
2316        for (msg, event) in messages.into_iter().zip(events.into_iter()) {
2317            assert_eq!(
2318                event.as_log()[log_schema().message_key().unwrap().to_string()],
2319                msg.into()
2320            );
2321            assert!(event.as_log().get_timestamp().is_some());
2322            assert_eq!(
2323                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2324                "splunk_hec".into()
2325            );
2326            assert!(event.metadata().splunk_hec_token().is_none());
2327        }
2328    }
2329
2330    #[tokio::test]
2331    async fn one_simple_json_event() {
2332        let message = "one_simple_json_event";
2333        let (sink, source) = start(
2334            JsonSerializerConfig::default().into(),
2335            Compression::gzip_default(),
2336            None,
2337        )
2338        .await;
2339
2340        let event = channel_n(vec![message], sink, source).await.remove(0);
2341
2342        assert_eq!(
2343            event.as_log()[log_schema().message_key().unwrap().to_string()],
2344            message.into()
2345        );
2346        assert!(event.as_log().get_timestamp().is_some());
2347        assert_eq!(
2348            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2349            "splunk_hec".into()
2350        );
2351        assert!(event.metadata().splunk_hec_token().is_none());
2352    }
2353
2354    #[tokio::test]
2355    async fn multiple_simple_json_event() {
2356        let n = 200;
2357        let (sink, source) = start(
2358            JsonSerializerConfig::default().into(),
2359            Compression::gzip_default(),
2360            None,
2361        )
2362        .await;
2363
2364        let messages = (0..n)
2365            .map(|i| format!("multiple_simple_json_event{i}"))
2366            .collect::<Vec<_>>();
2367        let events = channel_n(messages.clone(), sink, source).await;
2368
2369        for (msg, event) in messages.into_iter().zip(events.into_iter()) {
2370            assert_eq!(
2371                event.as_log()[log_schema().message_key().unwrap().to_string()],
2372                msg.into()
2373            );
2374            assert!(event.as_log().get_timestamp().is_some());
2375            assert_eq!(
2376                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2377                "splunk_hec".into()
2378            );
2379            assert!(event.metadata().splunk_hec_token().is_none());
2380        }
2381    }
2382
2383    #[tokio::test]
2384    async fn json_event() {
2385        let (sink, source) = start(
2386            JsonSerializerConfig::default().into(),
2387            Compression::gzip_default(),
2388            None,
2389        )
2390        .await;
2391
2392        let mut log = LogEvent::default();
2393        log.insert("greeting", "hello");
2394        log.insert("name", "bob");
2395        sink.run_events(vec![log.into()]).await.unwrap();
2396
2397        let event = collect_n(source, 1).await.remove(0).into_log();
2398        assert_eq!(event["greeting"], "hello".into());
2399        assert_eq!(event["name"], "bob".into());
2400        assert!(event.get_timestamp().is_some());
2401        assert_eq!(
2402            event[log_schema().source_type_key().unwrap().to_string()],
2403            "splunk_hec".into()
2404        );
2405        assert!(event.metadata().splunk_hec_token().is_none());
2406    }
2407
2408    #[tokio::test]
2409    async fn json_invalid_path_event() {
2410        let (sink, source) = start(
2411            JsonSerializerConfig::default().into(),
2412            Compression::gzip_default(),
2413            None,
2414        )
2415        .await;
2416
2417        let mut log = LogEvent::default();
2418        // Test with a field that would be considered an invalid path if it were to
2419        // be treated as a path and not a simple field name.
2420        log.insert(event_path!("(greeting | thing"), "hello");
2421        sink.run_events(vec![log.into()]).await.unwrap();
2422
2423        let event = collect_n(source, 1).await.remove(0).into_log();
2424        assert_eq!(
2425            event.get(event_path!("(greeting | thing")),
2426            Some(&Value::from("hello"))
2427        );
2428    }
2429
2430    #[tokio::test]
2431    async fn line_to_message() {
2432        let (sink, source) = start(
2433            JsonSerializerConfig::default().into(),
2434            Compression::gzip_default(),
2435            None,
2436        )
2437        .await;
2438
2439        let mut event = LogEvent::default();
2440        event.insert("line", "hello");
2441        sink.run_events(vec![event.into()]).await.unwrap();
2442
2443        let event = collect_n(source, 1).await.remove(0);
2444        assert_eq!(
2445            event.as_log()[log_schema().message_key().unwrap().to_string()],
2446            "hello".into()
2447        );
2448        assert!(event.metadata().splunk_hec_token().is_none());
2449    }
2450
2451    #[tokio::test]
2452    async fn raw() {
2453        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2454            let message = "raw";
2455            let (source, address, _guard) = source(None).await;
2456
2457            assert_eq!(200, post(address, "services/collector/raw", message).await);
2458
2459            let event = collect_n(source, 1).await.remove(0);
2460            assert_eq!(
2461                event.as_log()[log_schema().message_key().unwrap().to_string()],
2462                message.into()
2463            );
2464            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
2465            assert!(event.as_log().get_timestamp().is_some());
2466            assert_eq!(
2467                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2468                "splunk_hec".into()
2469            );
2470            assert!(event.metadata().splunk_hec_token().is_none());
2471        })
2472        .await;
2473    }
2474
2475    #[tokio::test]
2476    async fn root() {
2477        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2478            let message = r#"{ "event": { "message": "root"} }"#;
2479            let (source, address, _guard) = source(None).await;
2480
2481            assert_eq!(200, post(address, "services/collector", message).await);
2482
2483            let event = collect_n(source, 1).await.remove(0);
2484            assert_eq!(
2485                event.as_log()[log_schema().message_key().unwrap().to_string()],
2486                "root".into()
2487            );
2488            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
2489            assert!(event.as_log().get_timestamp().is_some());
2490            assert_eq!(
2491                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2492                "splunk_hec".into()
2493            );
2494            assert!(event.metadata().splunk_hec_token().is_none());
2495        })
2496        .await;
2497    }
2498
2499    #[tokio::test]
2500    async fn channel_header() {
2501        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2502            let message = "raw";
2503            let (source, address, _guard) = source(None).await;
2504
2505            let opts = SendWithOpts {
2506                channel: Some(Channel::Header("guid")),
2507                forwarded_for: None,
2508            };
2509
2510            assert_eq!(
2511                200,
2512                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
2513            );
2514
2515            let event = collect_n(source, 1).await.remove(0);
2516            assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
2517        })
2518        .await;
2519    }
2520
2521    #[tokio::test]
2522    async fn xff_header_raw() {
2523        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2524            let message = "raw";
2525            let (source, address, _guard) = source(None).await;
2526
2527            let opts = SendWithOpts {
2528                channel: Some(Channel::Header("guid")),
2529                forwarded_for: Some(String::from("10.0.0.1")),
2530            };
2531
2532            assert_eq!(
2533                200,
2534                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
2535            );
2536
2537            let event = collect_n(source, 1).await.remove(0);
2538            assert_eq!(
2539                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
2540                "10.0.0.1".into()
2541            );
2542        })
2543        .await;
2544    }
2545
2546    // Test helps to illustrate that a payload's `host` value should override an x-forwarded-for header
2547    #[tokio::test]
2548    async fn xff_header_event_with_host_field() {
2549        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2550            let message = r#"{"event":"first", "host": "10.1.0.2"}"#;
2551            let (source, address, _guard) = source(None).await;
2552
2553            let opts = SendWithOpts {
2554                channel: Some(Channel::Header("guid")),
2555                forwarded_for: Some(String::from("10.0.0.1")),
2556            };
2557
2558            assert_eq!(
2559                200,
2560                send_with(address, "services/collector/event", message, TOKEN, &opts).await
2561            );
2562
2563            let event = collect_n(source, 1).await.remove(0);
2564            assert_eq!(
2565                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
2566                "10.1.0.2".into()
2567            );
2568        })
2569        .await;
2570    }
2571
2572    // Test helps to illustrate that a payload's `host` value should override an x-forwarded-for header
2573    #[tokio::test]
2574    async fn xff_header_event_without_host_field() {
2575        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2576            let message = r#"{"event":"first", "color": "blue"}"#;
2577            let (source, address, _guard) = source(None).await;
2578
2579            let opts = SendWithOpts {
2580                channel: Some(Channel::Header("guid")),
2581                forwarded_for: Some(String::from("10.0.0.1")),
2582            };
2583
2584            assert_eq!(
2585                200,
2586                send_with(address, "services/collector/event", message, TOKEN, &opts).await
2587            );
2588
2589            let event = collect_n(source, 1).await.remove(0);
2590            assert_eq!(
2591                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
2592                "10.0.0.1".into()
2593            );
2594        })
2595        .await;
2596    }
2597
2598    #[tokio::test]
2599    async fn channel_query_param() {
2600        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2601            let message = "raw";
2602            let (source, address, _guard) = source(None).await;
2603
2604            let opts = SendWithOpts {
2605                channel: Some(Channel::QueryParam("guid")),
2606                forwarded_for: None,
2607            };
2608
2609            assert_eq!(
2610                200,
2611                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
2612            );
2613
2614            let event = collect_n(source, 1).await.remove(0);
2615            assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
2616        })
2617        .await;
2618    }
2619
2620    #[tokio::test]
2621    async fn no_data() {
2622        let (_source, address, _guard) = source(None).await;
2623
2624        assert_eq!(400, post(address, "services/collector/event", "").await);
2625    }
2626
2627    #[tokio::test]
2628    async fn invalid_token() {
2629        assert_source_error(&COMPONENT_ERROR_TAGS, async {
2630            let (_source, address, _guard) = source(None).await;
2631            let opts = SendWithOpts {
2632                channel: Some(Channel::Header("channel")),
2633                forwarded_for: None,
2634            };
2635
2636            assert_eq!(
2637                401,
2638                send_with(address, "services/collector/event", "", "nope", &opts).await
2639            );
2640        })
2641        .await;
2642    }
2643
2644    #[tokio::test]
2645    async fn health_ignores_token() {
2646        let (_source, address, _guard) = source(None).await;
2647
2648        let res = reqwest::Client::new()
2649            .get(format!("http://{address}/services/collector/health"))
2650            .header("Authorization", format!("Splunk {}", "invalid token"))
2651            .send()
2652            .await
2653            .unwrap();
2654
2655        assert_eq!(200, res.status().as_u16());
2656    }
2657
2658    #[tokio::test]
2659    async fn health() {
2660        let (_source, address, _guard) = source(None).await;
2661
2662        let res = reqwest::Client::new()
2663            .get(format!("http://{address}/services/collector/health"))
2664            .send()
2665            .await
2666            .unwrap();
2667
2668        assert_eq!(200, res.status().as_u16());
2669    }
2670
2671    #[tokio::test]
2672    async fn secondary_token() {
2673        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2674            let message = r#"{"event":"first", "color": "blue"}"#;
2675            let (_source, address, _guard) =
2676                source_with(None, Some(VALID_TOKENS), None, false).await;
2677            let options = SendWithOpts {
2678                channel: None,
2679                forwarded_for: None,
2680            };
2681
2682            assert_eq!(
2683                200,
2684                send_with(
2685                    address,
2686                    "services/collector/event",
2687                    message,
2688                    VALID_TOKENS.get(1).unwrap(),
2689                    &options
2690                )
2691                .await
2692            );
2693        })
2694        .await;
2695    }
2696
2697    #[tokio::test]
2698    async fn event_service_token_passthrough_enabled() {
2699        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2700            let message = "passthrough_token_enabled";
2701            let (source, address, _guard) = source_with(None, Some(VALID_TOKENS), None, true).await;
2702            let (sink, health) = sink(
2703                address,
2704                TextSerializerConfig::default().into(),
2705                Compression::gzip_default(),
2706            )
2707            .await;
2708            assert!(health.await.is_ok());
2709
2710            let event = channel_n(vec![message], sink, source).await.remove(0);
2711
2712            assert_eq!(
2713                event.as_log()[log_schema().message_key().unwrap().to_string()],
2714                message.into()
2715            );
2716            assert_eq!(
2717                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2718                TOKEN
2719            );
2720        })
2721        .await;
2722    }
2723
2724    #[tokio::test]
2725    async fn raw_service_token_passthrough_enabled() {
2726        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2727            let message = "raw";
2728            let (source, address, _guard) = source_with(None, Some(VALID_TOKENS), None, true).await;
2729
2730            assert_eq!(200, post(address, "services/collector/raw", message).await);
2731
2732            let event = collect_n(source, 1).await.remove(0);
2733            assert_eq!(
2734                event.as_log()[log_schema().message_key().unwrap().to_string()],
2735                message.into()
2736            );
2737            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
2738            assert!(event.as_log().get_timestamp().is_some());
2739            assert_eq!(
2740                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2741                "splunk_hec".into()
2742            );
2743            assert_eq!(
2744                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2745                TOKEN
2746            );
2747        })
2748        .await;
2749    }
2750
2751    #[tokio::test]
2752    async fn no_authorization() {
2753        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2754            let message = "no_authorization";
2755            let (source, address, _guard) = source_with(None, None, None, false).await;
2756            let (sink, health) = sink(
2757                address,
2758                TextSerializerConfig::default().into(),
2759                Compression::gzip_default(),
2760            )
2761            .await;
2762            assert!(health.await.is_ok());
2763
2764            let event = channel_n(vec![message], sink, source).await.remove(0);
2765
2766            assert_eq!(
2767                event.as_log()[log_schema().message_key().unwrap().to_string()],
2768                message.into()
2769            );
2770            assert!(event.metadata().splunk_hec_token().is_none());
2771        })
2772        .await;
2773    }
2774
2775    #[tokio::test]
2776    async fn no_authorization_token_passthrough_enabled() {
2777        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2778            let message = "no_authorization";
2779            let (source, address, _guard) = source_with(None, None, None, true).await;
2780            let (sink, health) = sink(
2781                address,
2782                TextSerializerConfig::default().into(),
2783                Compression::gzip_default(),
2784            )
2785            .await;
2786            assert!(health.await.is_ok());
2787
2788            let event = channel_n(vec![message], sink, source).await.remove(0);
2789
2790            assert_eq!(
2791                event.as_log()[log_schema().message_key().unwrap().to_string()],
2792                message.into()
2793            );
2794            assert_eq!(
2795                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2796                TOKEN
2797            );
2798        })
2799        .await;
2800    }
2801
2802    #[tokio::test]
2803    async fn partial() {
2804        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2805            let message = r#"{"event":"first"}{"event":"second""#;
2806            let (source, address, _guard) = source(None).await;
2807
2808            assert_eq!(
2809                400,
2810                post(address, "services/collector/event", message).await
2811            );
2812
2813            let event = collect_n(source, 1).await.remove(0);
2814            assert_eq!(
2815                event.as_log()[log_schema().message_key().unwrap().to_string()],
2816                "first".into()
2817            );
2818            assert!(event.as_log().get_timestamp().is_some());
2819            assert_eq!(
2820                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2821                "splunk_hec".into()
2822            );
2823        })
2824        .await;
2825    }
2826
2827    #[tokio::test]
2828    async fn handles_newlines() {
2829        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2830            let message = r#"
2831{"event":"first"}
2832        "#;
2833            let (source, address, _guard) = source(None).await;
2834
2835            assert_eq!(
2836                200,
2837                post(address, "services/collector/event", message).await
2838            );
2839
2840            let event = collect_n(source, 1).await.remove(0);
2841            assert_eq!(
2842                event.as_log()[log_schema().message_key().unwrap().to_string()],
2843                "first".into()
2844            );
2845            assert!(event.as_log().get_timestamp().is_some());
2846            assert_eq!(
2847                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2848                "splunk_hec".into()
2849            );
2850        })
2851        .await;
2852    }
2853
2854    #[tokio::test]
2855    async fn handles_spaces() {
2856        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2857            let message = r#" {"event":"first"} "#;
2858            let (source, address, _guard) = source(None).await;
2859
2860            assert_eq!(
2861                200,
2862                post(address, "services/collector/event", message).await
2863            );
2864
2865            let event = collect_n(source, 1).await.remove(0);
2866            assert_eq!(
2867                event.as_log()[log_schema().message_key().unwrap().to_string()],
2868                "first".into()
2869            );
2870            assert!(event.as_log().get_timestamp().is_some());
2871            assert_eq!(
2872                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2873                "splunk_hec".into()
2874            );
2875        })
2876        .await;
2877    }
2878
2879    #[tokio::test]
2880    async fn handles_non_utf8() {
2881        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2882        let message = b" {\"event\": { \"non\": \"A non UTF8 character \xE4\", \"number\": 2, \"bool\": true } } ";
2883        let (source, address, _guard) = source(None).await;
2884
2885        let b = reqwest::Client::new()
2886            .post(format!(
2887                "http://{}/{}",
2888                address, "services/collector/event"
2889            ))
2890            .header("Authorization", format!("Splunk {TOKEN}"))
2891            .body::<&[u8]>(message);
2892
2893        assert_eq!(200, b.send().await.unwrap().status().as_u16());
2894
2895        let event = collect_n(source, 1).await.remove(0);
2896        assert_eq!(event.as_log()["non"], "A non UTF8 character �".into());
2897        assert_eq!(event.as_log()["number"], 2.into());
2898        assert_eq!(event.as_log()["bool"], true.into());
2899        assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some());
2900        assert_eq!(
2901            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2902            "splunk_hec".into()
2903        );
2904    }).await;
2905    }
2906
2907    #[tokio::test]
2908    async fn default() {
2909        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2910        let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
2911        let (source, address, _guard) = source(None).await;
2912
2913        assert_eq!(
2914            200,
2915            post(address, "services/collector/event", message).await
2916        );
2917
2918        let events = collect_n(source, 3).await;
2919
2920        assert_eq!(
2921            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
2922            "first".into()
2923        );
2924        assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
2925
2926        assert_eq!(
2927            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
2928            "second".into()
2929        );
2930        assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
2931
2932        assert_eq!(
2933            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
2934            "third".into()
2935        );
2936        assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
2937    }).await;
2938    }
2939
2940    #[test]
2941    fn parse_timestamps() {
2942        let cases = vec![
2943            Utc::now(),
2944            Utc.with_ymd_and_hms(1971, 11, 7, 1, 1, 1)
2945                .single()
2946                .expect("invalid timestamp"),
2947            Utc.with_ymd_and_hms(2011, 8, 5, 1, 1, 1)
2948                .single()
2949                .expect("invalid timestamp"),
2950            Utc.with_ymd_and_hms(2189, 11, 4, 2, 2, 2)
2951                .single()
2952                .expect("invalid timestamp"),
2953        ];
2954
2955        for case in cases {
2956            let sec = case.timestamp();
2957            let millis = case.timestamp_millis();
2958            let nano = case.timestamp_nanos_opt().expect("Timestamp out of range");
2959
2960            assert_eq!(parse_timestamp(sec).unwrap().timestamp(), case.timestamp());
2961            assert_eq!(
2962                parse_timestamp(millis).unwrap().timestamp_millis(),
2963                case.timestamp_millis()
2964            );
2965            assert_eq!(
2966                parse_timestamp(nano)
2967                    .unwrap()
2968                    .timestamp_nanos_opt()
2969                    .unwrap(),
2970                case.timestamp_nanos_opt().expect("Timestamp out of range")
2971            );
2972        }
2973
2974        assert!(parse_timestamp(-1).is_none());
2975    }
2976
2977    /// This test will fail once `warp` crate fixes support for
2978    /// custom connection listener, at that point this test can be
2979    /// modified to pass.
2980    /// https://github.com/vectordotdev/vector/issues/7097
2981    /// https://github.com/seanmonstar/warp/issues/830
2982    /// https://github.com/seanmonstar/warp/pull/713
2983    #[tokio::test]
2984    async fn host_test() {
2985        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2986            let message = "for the host";
2987            let (sink, source) = start(
2988                TextSerializerConfig::default().into(),
2989                Compression::gzip_default(),
2990                None,
2991            )
2992            .await;
2993
2994            let event = channel_n(vec![message], sink, source).await.remove(0);
2995
2996            assert_eq!(
2997                event.as_log()[log_schema().message_key().unwrap().to_string()],
2998                message.into()
2999            );
3000            assert!(
3001                event
3002                    .as_log()
3003                    .get((PathPrefix::Event, log_schema().host_key().unwrap()))
3004                    .is_none()
3005            );
3006        })
3007        .await;
3008    }
3009
3010    #[derive(Deserialize)]
3011    struct HecAckEventResponse {
3012        text: String,
3013        code: u8,
3014        #[serde(rename = "ackId")]
3015        ack_id: u64,
3016    }
3017
3018    #[tokio::test]
3019    async fn ack_json_event() {
3020        let ack_config = HecAcknowledgementsConfig {
3021            enabled: Some(true),
3022            ..Default::default()
3023        };
3024        let (source, address, _guard) = source(Some(ack_config)).await;
3025        let event_message = r#"{"event":"first", "color": "blue"}{"event":"second"}"#;
3026        let opts = SendWithOpts {
3027            channel: Some(Channel::Header("guid")),
3028            forwarded_for: None,
3029        };
3030        let event_res = send_with_response(
3031            address,
3032            "services/collector/event",
3033            event_message,
3034            TOKEN,
3035            &opts,
3036        )
3037        .await
3038        .json::<HecAckEventResponse>()
3039        .await
3040        .unwrap();
3041        assert_eq!("Success", event_res.text.as_str());
3042        assert_eq!(0, event_res.code);
3043        _ = collect_n(source, 1).await;
3044
3045        let ack_message = serde_json::to_string(&HecAckStatusRequest {
3046            acks: vec![event_res.ack_id],
3047        })
3048        .unwrap();
3049        let ack_res = send_with_response(
3050            address,
3051            "services/collector/ack",
3052            ack_message.as_str(),
3053            TOKEN,
3054            &opts,
3055        )
3056        .await
3057        .json::<HecAckStatusResponse>()
3058        .await
3059        .unwrap();
3060        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
3061    }
3062
3063    #[tokio::test]
3064    async fn ack_raw_event() {
3065        let ack_config = HecAcknowledgementsConfig {
3066            enabled: Some(true),
3067            ..Default::default()
3068        };
3069        let (source, address, _guard) = source(Some(ack_config)).await;
3070        let event_message = "raw event message";
3071        let opts = SendWithOpts {
3072            channel: Some(Channel::Header("guid")),
3073            forwarded_for: None,
3074        };
3075        let event_res = send_with_response(
3076            address,
3077            "services/collector/raw",
3078            event_message,
3079            TOKEN,
3080            &opts,
3081        )
3082        .await
3083        .json::<HecAckEventResponse>()
3084        .await
3085        .unwrap();
3086        assert_eq!("Success", event_res.text.as_str());
3087        assert_eq!(0, event_res.code);
3088        _ = collect_n(source, 1).await;
3089
3090        let ack_message = serde_json::to_string(&HecAckStatusRequest {
3091            acks: vec![event_res.ack_id],
3092        })
3093        .unwrap();
3094        let ack_res = send_with_response(
3095            address,
3096            "services/collector/ack",
3097            ack_message.as_str(),
3098            TOKEN,
3099            &opts,
3100        )
3101        .await
3102        .json::<HecAckStatusResponse>()
3103        .await
3104        .unwrap();
3105        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
3106    }
3107
3108    #[tokio::test]
3109    async fn ack_repeat_ack_query() {
3110        let ack_config = HecAcknowledgementsConfig {
3111            enabled: Some(true),
3112            ..Default::default()
3113        };
3114        let (source, address, _guard) = source(Some(ack_config)).await;
3115        let event_message = "raw event message";
3116        let opts = SendWithOpts {
3117            channel: Some(Channel::Header("guid")),
3118            forwarded_for: None,
3119        };
3120        let event_res = send_with_response(
3121            address,
3122            "services/collector/raw",
3123            event_message,
3124            TOKEN,
3125            &opts,
3126        )
3127        .await
3128        .json::<HecAckEventResponse>()
3129        .await
3130        .unwrap();
3131        _ = collect_n(source, 1).await;
3132
3133        let ack_message = serde_json::to_string(&HecAckStatusRequest {
3134            acks: vec![event_res.ack_id],
3135        })
3136        .unwrap();
3137        let ack_res = send_with_response(
3138            address,
3139            "services/collector/ack",
3140            ack_message.as_str(),
3141            TOKEN,
3142            &opts,
3143        )
3144        .await
3145        .json::<HecAckStatusResponse>()
3146        .await
3147        .unwrap();
3148        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
3149
3150        let ack_res = send_with_response(
3151            address,
3152            "services/collector/ack",
3153            ack_message.as_str(),
3154            TOKEN,
3155            &opts,
3156        )
3157        .await
3158        .json::<HecAckStatusResponse>()
3159        .await
3160        .unwrap();
3161        assert!(!ack_res.acks.get(&event_res.ack_id).unwrap());
3162    }
3163
3164    #[tokio::test]
3165    async fn ack_exceed_max_number_of_ack_channels() {
3166        let ack_config = HecAcknowledgementsConfig {
3167            enabled: Some(true),
3168            max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
3169            ..Default::default()
3170        };
3171
3172        let (_source, address, _guard) = source(Some(ack_config)).await;
3173        let mut opts = SendWithOpts {
3174            channel: Some(Channel::Header("guid")),
3175            forwarded_for: None,
3176        };
3177        assert_eq!(
3178            200,
3179            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
3180        );
3181
3182        opts.channel = Some(Channel::Header("other-guid"));
3183        assert_eq!(
3184            503,
3185            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
3186        );
3187        assert_eq!(
3188            503,
3189            send_with(
3190                address,
3191                "services/collector/event",
3192                r#"{"event":"first"}"#,
3193                TOKEN,
3194                &opts
3195            )
3196            .await
3197        );
3198    }
3199
3200    #[tokio::test]
3201    async fn ack_exceed_max_pending_acks_per_channel() {
3202        let ack_config = HecAcknowledgementsConfig {
3203            enabled: Some(true),
3204            max_pending_acks_per_channel: NonZeroU64::new(1).unwrap(),
3205            ..Default::default()
3206        };
3207
3208        let (source, address, _guard) = source(Some(ack_config)).await;
3209        let opts = SendWithOpts {
3210            channel: Some(Channel::Header("guid")),
3211            forwarded_for: None,
3212        };
3213        for _ in 0..5 {
3214            send_with(
3215                address,
3216                "services/collector/event",
3217                r#"{"event":"first"}"#,
3218                TOKEN,
3219                &opts,
3220            )
3221            .await;
3222        }
3223        for _ in 0..5 {
3224            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await;
3225        }
3226        let event_res = send_with_response(
3227            address,
3228            "services/collector/event",
3229            r#"{"event":"this will be acked"}"#,
3230            TOKEN,
3231            &opts,
3232        )
3233        .await
3234        .json::<HecAckEventResponse>()
3235        .await
3236        .unwrap();
3237        _ = collect_n(source, 11).await;
3238
3239        let ack_message_dropped = serde_json::to_string(&HecAckStatusRequest {
3240            acks: (0..10).collect::<Vec<u64>>(),
3241        })
3242        .unwrap();
3243        let ack_res = send_with_response(
3244            address,
3245            "services/collector/ack",
3246            ack_message_dropped.as_str(),
3247            TOKEN,
3248            &opts,
3249        )
3250        .await
3251        .json::<HecAckStatusResponse>()
3252        .await
3253        .unwrap();
3254        assert!(ack_res.acks.values().all(|ack_status| !*ack_status));
3255
3256        let ack_message_acked = serde_json::to_string(&HecAckStatusRequest {
3257            acks: vec![event_res.ack_id],
3258        })
3259        .unwrap();
3260        let ack_res = send_with_response(
3261            address,
3262            "services/collector/ack",
3263            ack_message_acked.as_str(),
3264            TOKEN,
3265            &opts,
3266        )
3267        .await
3268        .json::<HecAckStatusResponse>()
3269        .await
3270        .unwrap();
3271        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
3272    }
3273
3274    #[tokio::test]
3275    async fn ack_service_accepts_parameterized_content_type() {
3276        let ack_config = HecAcknowledgementsConfig {
3277            enabled: Some(true),
3278            ..Default::default()
3279        };
3280        let (source, address, _guard) = source(Some(ack_config)).await;
3281        let opts = SendWithOpts {
3282            channel: Some(Channel::Header("guid")),
3283            forwarded_for: None,
3284        };
3285
3286        let event_res = send_with_response(
3287            address,
3288            "services/collector/event",
3289            r#"{"event":"param-test"}"#,
3290            TOKEN,
3291            &opts,
3292        )
3293        .await
3294        .json::<HecAckEventResponse>()
3295        .await
3296        .unwrap();
3297        let _ = collect_n(source, 1).await;
3298
3299        let body = serde_json::to_string(&HecAckStatusRequest {
3300            acks: vec![event_res.ack_id],
3301        })
3302        .unwrap();
3303
3304        let res = reqwest::Client::new()
3305            .post(format!("http://{address}/services/collector/ack"))
3306            .header("Authorization", format!("Splunk {TOKEN}"))
3307            .header("x-splunk-request-channel", "guid")
3308            .header("Content-Type", "application/json; some-random-text; hello")
3309            .body(body)
3310            .send()
3311            .await
3312            .unwrap();
3313
3314        assert_eq!(200, res.status().as_u16());
3315
3316        let _parsed: HecAckStatusResponse = res.json().await.unwrap();
3317    }
3318
3319    #[tokio::test]
3320    async fn event_service_acknowledgements_enabled_channel_required() {
3321        let message = r#"{"event":"first", "color": "blue"}"#;
3322        let ack_config = HecAcknowledgementsConfig {
3323            enabled: Some(true),
3324            ..Default::default()
3325        };
3326        let (_, address, _guard) = source(Some(ack_config)).await;
3327
3328        let opts = SendWithOpts {
3329            channel: None,
3330            forwarded_for: None,
3331        };
3332
3333        assert_eq!(
3334            400,
3335            send_with(address, "services/collector/event", message, TOKEN, &opts).await
3336        );
3337    }
3338
3339    #[tokio::test]
3340    async fn ack_service_acknowledgements_disabled() {
3341        let message = r#" {"acks":[0]} "#;
3342        let (_, address, _guard) = source(None).await;
3343
3344        let opts = SendWithOpts {
3345            channel: Some(Channel::Header("guid")),
3346            forwarded_for: None,
3347        };
3348
3349        assert_eq!(
3350            400,
3351            send_with(address, "services/collector/ack", message, TOKEN, &opts).await
3352        );
3353    }
3354
3355    async fn source_with_codec(
3356        event: CodecConfig,
3357        raw: CodecConfig,
3358    ) -> (
3359        impl Stream<Item = Event> + Unpin + use<>,
3360        SocketAddr,
3361        PortGuard,
3362    ) {
3363        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
3364        let (_guard, address) = next_addr();
3365        let cx = SourceContext::new_test(sender, None);
3366        tokio::spawn(async move {
3367            SplunkConfig {
3368                address,
3369                token: Some(TOKEN.to_owned().into()),
3370                valid_tokens: None,
3371                tls: None,
3372                acknowledgements: Default::default(),
3373                store_hec_token: false,
3374                log_namespace: None,
3375                keepalive: Default::default(),
3376                event,
3377                raw,
3378            }
3379            .build(cx)
3380            .await
3381            .unwrap()
3382            .await
3383            .unwrap()
3384        });
3385        wait_for_tcp(address).await;
3386        (recv, address, _guard)
3387    }
3388
3389    /// Codec config that just sets `decoding` (default framing).
3390    fn codec_decoding(decoding: DeserializerConfig) -> CodecConfig {
3391        CodecConfig {
3392            framing: None,
3393            decoding: Some(decoding),
3394        }
3395    }
3396
3397    /// Codec config that sets both `framing` and `decoding`.
3398    fn codec_full(
3399        framing: Option<FramingConfig>,
3400        decoding: Option<DeserializerConfig>,
3401    ) -> CodecConfig {
3402        CodecConfig { framing, decoding }
3403    }
3404
3405    #[tokio::test]
3406    async fn decoder_event_endpoint_json_string() {
3407        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3408            let (source, address, _guard) = source_with_codec(
3409                codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3410                CodecConfig::default(),
3411            )
3412            .await;
3413            let envelope =
3414                r#"{"event":"{\"foo\":\"bar\",\"n\":42}","host":"client-host","sourcetype":"my-app"}"#;
3415            assert_eq!(
3416                200,
3417                post(address, "services/collector/event", envelope).await
3418            );
3419
3420            let event = collect_n(source, 1).await.remove(0);
3421            let log = event.as_log();
3422            assert_eq!(log["foo"], "bar".into());
3423            assert_eq!(log["n"], 42.into());
3424            assert_eq!(
3425                log[log_schema().host_key().unwrap().to_string().as_str()],
3426                "client-host".into()
3427            );
3428            assert_eq!(log[&super::SOURCETYPE], "my-app".into());
3429        })
3430        .await;
3431    }
3432
3433    #[tokio::test]
3434    async fn decoder_event_endpoint_json_object_round_trip() {
3435        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3436            let (source, address, _guard) = source_with_codec(
3437                codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3438                CodecConfig::default(),
3439            )
3440            .await;
3441            let envelope = r#"{"event":{"foo":"bar","nested":{"k":1}},"host":"h"}"#;
3442            assert_eq!(
3443                200,
3444                post(address, "services/collector/event", envelope).await
3445            );
3446
3447            let event = collect_n(source, 1).await.remove(0);
3448            let log = event.as_log();
3449            assert_eq!(log["foo"], "bar".into());
3450            assert_eq!(*log.get("nested.k").unwrap(), 1.into());
3451            assert_eq!(
3452                log[log_schema().host_key().unwrap().to_string().as_str()],
3453                "h".into()
3454            );
3455        })
3456        .await;
3457    }
3458
3459    #[tokio::test]
3460    async fn decoder_event_endpoint_all_envelope_fields_yield_to_decoder() {
3461        // The decoded path must defer to the codec for `splunk_channel`,
3462        // `splunk_index`, `splunk_source`, and `splunk_sourcetype` in legacy ns -
3463        // not just `host`. Otherwise the changelog's "decoder wins on conflict"
3464        // promise is broken for HEC envelope metadata.
3465        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3466            let (source, address, _guard) = source_with_codec(
3467                codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3468                CodecConfig::default(),
3469            )
3470            .await;
3471            // The string `event` decodes to a JSON object that pre-populates each
3472            // legacy splunk_* field. The envelope sets conflicting values for the
3473            // same fields and must lose.
3474            let envelope = r#"{
3475                "event":"{\"splunk_channel\":\"decoder-channel\",\"splunk_index\":\"decoder-index\",\"splunk_source\":\"decoder-source\",\"splunk_sourcetype\":\"decoder-sourcetype\"}",
3476                "index":"envelope-index",
3477                "source":"envelope-source",
3478                "sourcetype":"envelope-sourcetype"
3479            }"#;
3480            assert_eq!(
3481                200,
3482                post(address, "services/collector/event", envelope).await
3483            );
3484
3485            let event = collect_n(source, 1).await.remove(0);
3486            let log = event.as_log();
3487            assert_eq!(log[&super::CHANNEL], "decoder-channel".into());
3488            assert_eq!(log[&super::INDEX], "decoder-index".into());
3489            assert_eq!(log[&super::SOURCE], "decoder-source".into());
3490            assert_eq!(log[&super::SOURCETYPE], "decoder-sourcetype".into());
3491        })
3492        .await;
3493    }
3494
3495    #[tokio::test]
3496    async fn decoder_event_endpoint_decoder_field_wins_over_envelope() {
3497        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3498            let (source, address, _guard) = source_with_codec(
3499                codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3500                CodecConfig::default(),
3501            )
3502            .await;
3503            // The string `event` decodes to {host: "decoder-host"}; the envelope sets
3504            // host: "envelope-host". The decoder's value must win.
3505            let envelope = r#"{"event":"{\"host\":\"decoder-host\"}","host":"envelope-host"}"#;
3506            assert_eq!(
3507                200,
3508                post(address, "services/collector/event", envelope).await
3509            );
3510
3511            let event = collect_n(source, 1).await.remove(0);
3512            let log = event.as_log();
3513            assert_eq!(
3514                log[log_schema().host_key().unwrap().to_string().as_str()],
3515                "decoder-host".into()
3516            );
3517        })
3518        .await;
3519    }
3520
3521    #[tokio::test]
3522    async fn decoder_event_endpoint_decode_failure_returns_200() {
3523        // A malformed inner JSON must not surface as an HTTP error to the Splunk
3524        // client - decode failures are swallowed by the codec like other Vector
3525        // sources do.
3526        let (_source, address, _guard) = source_with_codec(
3527            codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3528            CodecConfig::default(),
3529        )
3530        .await;
3531        let envelope = r#"{"event":"not valid json {","host":"h"}"#;
3532        assert_eq!(
3533            200,
3534            post(address, "services/collector/event", envelope).await
3535        );
3536    }
3537
3538    #[tokio::test]
3539    async fn decoder_raw_endpoint_newline_delimited() {
3540        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3541            let (source, address, _guard) = source_with_codec(
3542                CodecConfig::default(),
3543                codec_full(
3544                    Some(FramingConfig::NewlineDelimited(Default::default())),
3545                    Some(DeserializerConfig::Bytes),
3546                ),
3547            )
3548            .await;
3549            let body = "line1\nline2\nline3";
3550            assert_eq!(200, post(address, "services/collector/raw", body).await);
3551
3552            let events = collect_n(source, 3).await;
3553            assert_eq!(events.len(), 3);
3554            let messages: Vec<String> = events
3555                .iter()
3556                .map(|e| {
3557                    e.as_log()[log_schema().message_key().unwrap().to_string()]
3558                        .to_string_lossy()
3559                        .into_owned()
3560                })
3561                .collect();
3562            assert!(messages.contains(&"line1".to_string()));
3563            assert!(messages.contains(&"line2".to_string()));
3564            assert!(messages.contains(&"line3".to_string()));
3565
3566            // All events share the channel from the request header.
3567            for event in &events {
3568                assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
3569            }
3570        })
3571        .await;
3572    }
3573
3574    #[tokio::test]
3575    async fn decoder_event_endpoint_envelope_without_time_has_fallback_timestamp() {
3576        // Regression: with a decoder set, an envelope that omits `time` must still
3577        // produce events with a timestamp (the legacy /event path always wrote one).
3578        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3579            let (source, address, _guard) = source_with_codec(
3580                codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3581                CodecConfig::default(),
3582            )
3583            .await;
3584            let envelope = r#"{"event":"{\"foo\":\"bar\"}"}"#;
3585            assert_eq!(
3586                200,
3587                post(address, "services/collector/event", envelope).await
3588            );
3589
3590            let event = collect_n(source, 1).await.remove(0);
3591            assert!(
3592                event.as_log().get_timestamp().is_some(),
3593                "decoded event from envelope without `time` field is missing a timestamp"
3594            );
3595        })
3596        .await;
3597    }
3598
3599    #[tokio::test]
3600    async fn decoder_independent_per_endpoint_codecs() {
3601        // /event and /raw can be configured with completely different codecs and
3602        // each endpoint applies only its own. Here /event uses JSON decoding (so a
3603        // string `event` field decodes to fields) and /raw uses newline framing
3604        // with a bytes decoder (so a multi-line body fans out to N events).
3605        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3606            let (source, address, _guard) = source_with_codec(
3607                codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3608                codec_full(
3609                    Some(FramingConfig::NewlineDelimited(Default::default())),
3610                    Some(DeserializerConfig::Bytes),
3611                ),
3612            )
3613            .await;
3614
3615            // /event: JSON decoder turns the inner string into structured fields.
3616            assert_eq!(
3617                200,
3618                post(
3619                    address,
3620                    "services/collector/event",
3621                    r#"{"event":"{\"foo\":\"bar\"}"}"#
3622                )
3623                .await
3624            );
3625            // /raw: newline framing splits the body into three events.
3626            assert_eq!(
3627                200,
3628                post(address, "services/collector/raw", "a\nb\nc").await
3629            );
3630
3631            let events = collect_n(source, 4).await;
3632            assert_eq!(events.len(), 4);
3633
3634            // The /event request produces one log with `foo=bar`.
3635            let event_log = events
3636                .iter()
3637                .find(|e| e.as_log().contains("foo"))
3638                .expect("expected /event request to produce a log with `foo` set");
3639            assert_eq!(event_log.as_log()["foo"], "bar".into());
3640
3641            // The /raw request produces three logs whose messages are the lines.
3642            let raw_messages: Vec<String> = events
3643                .iter()
3644                .filter(|e| !e.as_log().contains("foo"))
3645                .map(|e| {
3646                    e.as_log()[log_schema().message_key().unwrap().to_string()]
3647                        .to_string_lossy()
3648                        .into_owned()
3649                })
3650                .collect();
3651            assert_eq!(raw_messages.len(), 3);
3652            assert!(raw_messages.contains(&"a".to_string()));
3653            assert!(raw_messages.contains(&"b".to_string()));
3654            assert!(raw_messages.contains(&"c".to_string()));
3655        })
3656        .await;
3657    }
3658
3659    /// End-to-end test for the second-stage VRL decoder on `/services/collector/event`.
3660    ///
3661    /// Validates the core use case from PR #25312: a VRL program decodes the
3662    /// inner `event` payload *and* reads HEC envelope metadata injected before
3663    /// decoding via `%splunk_hec.*` paths.
3664    #[tokio::test]
3665    async fn decoder_vrl_reads_envelope_metadata() {
3666        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3667            let vrl_source = r#"
3668                # Read envelope metadata injected before this VRL program runs.
3669                .envelope_host = string!(%splunk_hec.host)
3670                .envelope_sourcetype = string!(%splunk_hec.sourcetype)
3671
3672                # Decode the inner JSON payload (the bytes of the `event` string).
3673                . = merge!(parse_json!(string!(.message)), .)
3674            "#;
3675
3676            let event_codec = codec_decoding(
3677                DeserializerConfig::Vrl(VrlDeserializerConfig {
3678                    vrl: VrlDeserializerOptions {
3679                        source: vrl_source.into(),
3680                        timezone: None,
3681                    },
3682                }),
3683            );
3684
3685            let (source, address, _guard) =
3686                source_with_codec(event_codec, CodecConfig::default()).await;
3687
3688            // Send a HEC event whose `event` field is a JSON-encoded string.
3689            // The VRL decoder should parse it and also read the envelope host/sourcetype.
3690            let payload = r#"{"event":"{\"level\":\"info\",\"msg\":\"hello\"}","host":"splunk-host","sourcetype":"my-app"}"#;
3691            assert_eq!(
3692                200,
3693                post(address, "services/collector/event", payload).await
3694            );
3695
3696            let event = collect_n(source, 1).await.remove(0);
3697            let log = event.as_log();
3698
3699            // Inner JSON decoded correctly.
3700            assert_eq!(log["level"], "info".into());
3701            assert_eq!(log["msg"], "hello".into());
3702
3703            // VRL read envelope metadata via %splunk_hec.* and wrote it to event fields.
3704            assert_eq!(log["envelope_host"], "splunk-host".into());
3705            assert_eq!(log["envelope_sourcetype"], "my-app".into());
3706
3707            // Post-decode splunk_hec metadata still applied (host, sourcetype).
3708            assert_eq!(
3709                log[log_schema().host_key().unwrap().to_string().as_str()],
3710                "splunk-host".into()
3711            );
3712            assert_eq!(log[&super::SOURCETYPE], "my-app".into());
3713        })
3714        .await;
3715    }
3716
3717    #[tokio::test]
3718    async fn decoder_raw_endpoint_event_has_fallback_timestamp() {
3719        // Regression: decoded /raw events must carry an ingest timestamp like the
3720        // legacy raw_event path did via `insert_standard_vector_source_metadata`.
3721        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
3722            let (source, address, _guard) = source_with_codec(
3723                CodecConfig::default(),
3724                codec_full(None, Some(DeserializerConfig::Bytes)),
3725            )
3726            .await;
3727            let body = "hello";
3728            assert_eq!(200, post(address, "services/collector/raw", body).await);
3729
3730            let event = collect_n(source, 1).await.remove(0);
3731            assert!(
3732                event.as_log().get_timestamp().is_some(),
3733                "decoded /raw event is missing a timestamp"
3734            );
3735        })
3736        .await;
3737    }
3738
3739    #[tokio::test]
3740    async fn decoder_raw_endpoint_empty_decode_does_not_ack() {
3741        // Regression: when the decoder produces zero events from a raw payload and
3742        // acknowledgements are enabled, the response must not include an `ackId`
3743        // because /services/collector/ack would otherwise report success for data
3744        // Vector silently dropped.
3745        let ack_config = HecAcknowledgementsConfig {
3746            enabled: Some(true),
3747            ..Default::default()
3748        };
3749        let (sender, _recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
3750        let (_guard, address) = next_addr();
3751        let cx = SourceContext::new_test(sender, None);
3752        tokio::spawn(async move {
3753            SplunkConfig {
3754                address,
3755                token: Some(TOKEN.to_owned().into()),
3756                valid_tokens: None,
3757                tls: None,
3758                acknowledgements: ack_config,
3759                store_hec_token: false,
3760                log_namespace: None,
3761                keepalive: Default::default(),
3762                event: CodecConfig::default(),
3763                raw: codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3764            }
3765            .build(cx)
3766            .await
3767            .unwrap()
3768            .await
3769            .unwrap()
3770        });
3771        wait_for_tcp(address).await;
3772
3773        let opts = SendWithOpts {
3774            channel: Some(Channel::Header("guid")),
3775            forwarded_for: None,
3776        };
3777        // A body the JSON decoder cannot parse - codec drops it, no events emitted.
3778        let body = "not json {";
3779        let response = send_with_response(address, "services/collector/raw", body, TOKEN, &opts)
3780            .await
3781            .json::<serde_json::Value>()
3782            .await
3783            .unwrap();
3784
3785        assert_eq!(response["code"].as_u64(), Some(0), "response: {response:?}");
3786        assert!(
3787            response.get("ackId").is_none(),
3788            "expected no ackId in response when decoder produced zero events, got: {response:?}"
3789        );
3790    }
3791
3792    #[tokio::test]
3793    async fn decoder_raw_endpoint_partial_decode_does_not_ack() {
3794        // Regression: a request whose body decodes into some valid frames AND some
3795        // dropped frames (e.g., `valid \n invalid \n valid` under newline framing
3796        // with a JSON decoder) must not return an `ackId`. Otherwise
3797        // /services/collector/ack reports success for data Vector silently dropped.
3798        let ack_config = HecAcknowledgementsConfig {
3799            enabled: Some(true),
3800            ..Default::default()
3801        };
3802        let (sender, _recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
3803        let (_guard, address) = next_addr();
3804        let cx = SourceContext::new_test(sender, None);
3805        tokio::spawn(async move {
3806            SplunkConfig {
3807                address,
3808                token: Some(TOKEN.to_owned().into()),
3809                valid_tokens: None,
3810                tls: None,
3811                acknowledgements: ack_config,
3812                store_hec_token: false,
3813                log_namespace: None,
3814                keepalive: Default::default(),
3815                event: CodecConfig::default(),
3816                raw: codec_full(
3817                    Some(FramingConfig::NewlineDelimited(Default::default())),
3818                    Some(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3819                ),
3820            }
3821            .build(cx)
3822            .await
3823            .unwrap()
3824            .await
3825            .unwrap()
3826        });
3827        wait_for_tcp(address).await;
3828
3829        let opts = SendWithOpts {
3830            channel: Some(Channel::Header("guid")),
3831            forwarded_for: None,
3832        };
3833        // Two valid JSON frames bracketing one invalid frame.
3834        let body = "{\"valid\":1}\nnot json\n{\"valid\":2}";
3835        let response = send_with_response(address, "services/collector/raw", body, TOKEN, &opts)
3836            .await
3837            .json::<serde_json::Value>()
3838            .await
3839            .unwrap();
3840
3841        assert_eq!(response["code"].as_u64(), Some(0), "response: {response:?}");
3842        assert!(
3843            response.get("ackId").is_none(),
3844            "expected no ackId when the decoder dropped a frame mid-request, got: {response:?}"
3845        );
3846    }
3847
3848    #[tokio::test]
3849    async fn decoder_event_endpoint_error_index_matches_envelope_not_fanout() {
3850        // Regression: with the decoder fanning out one envelope into many events,
3851        // `InvalidEventNumber` in error responses must still report the failing
3852        // envelope's zero-indexed position, not the cumulative event count.
3853        let (source, address, _guard) = source_with_codec(
3854            codec_full(
3855                Some(FramingConfig::NewlineDelimited(Default::default())),
3856                Some(DeserializerConfig::Bytes),
3857            ),
3858            CodecConfig::default(),
3859        )
3860        .await;
3861        // Envelope 0 has an `event` string with three lines: with newline framing
3862        // and a bytes decoder, that fans out to three events. Envelope 1 omits
3863        // `event`, so the decoded path returns `MissingEventField { event: 1 }`.
3864        let body = "{\"event\":\"a\\nb\\nc\"}{\"foo\":\"bar\"}";
3865
3866        let opts = SendWithOpts {
3867            channel: Some(Channel::Header("guid")),
3868            forwarded_for: None,
3869        };
3870        let response =
3871            send_with_response(address, "services/collector/event", body, TOKEN, &opts).await;
3872        let status = response.status();
3873        let body: serde_json::Value = response.json().await.unwrap();
3874
3875        assert_eq!(status.as_u16(), 400, "body: {body:?}");
3876        assert_eq!(
3877            body["invalid-event-number"].as_u64(),
3878            Some(1),
3879            "expected envelope index 1 (the failing envelope), not a fan-out event index. body: {body:?}"
3880        );
3881        // Drain the partially-emitted events so the source task doesn't block.
3882        let _ = collect_n(source, 3).await;
3883    }
3884
3885    #[test]
3886    fn output_schema_definition_with_decoder_vector_namespace() {
3887        let config = SplunkConfig {
3888            log_namespace: Some(true),
3889            event: codec_decoding(vector_lib::codecs::JsonDeserializerConfig::default().into()),
3890            ..Default::default()
3891        };
3892        let definition = config
3893            .outputs(LogNamespace::Vector)
3894            .remove(0)
3895            .schema_definition(true);
3896
3897        // The decoder's schema produces `Kind::json()` at the root, the source
3898        // layers its envelope metadata fields on top, and the legacy log shape is
3899        // unioned in (since /raw has no decoder and still emits legacy events) -
3900        // contributing the `message` meaning at root.
3901        let expected_definition =
3902            Definition::new_with_default_metadata(Kind::json(), [LogNamespace::Vector])
3903                .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
3904                .with_metadata_field(
3905                    &owned_value_path!("vector", "source_type"),
3906                    Kind::bytes(),
3907                    None,
3908                )
3909                .with_metadata_field(
3910                    &owned_value_path!("vector", "ingest_timestamp"),
3911                    Kind::timestamp(),
3912                    None,
3913                )
3914                .with_metadata_field(
3915                    &owned_value_path!("splunk_hec", "host"),
3916                    Kind::bytes(),
3917                    Some("host"),
3918                )
3919                .with_metadata_field(
3920                    &owned_value_path!("splunk_hec", "index"),
3921                    Kind::bytes(),
3922                    None,
3923                )
3924                .with_metadata_field(
3925                    &owned_value_path!("splunk_hec", "source"),
3926                    Kind::bytes(),
3927                    Some("service"),
3928                )
3929                .with_metadata_field(
3930                    &owned_value_path!("splunk_hec", "channel"),
3931                    Kind::bytes(),
3932                    None,
3933                )
3934                .with_metadata_field(
3935                    &owned_value_path!("splunk_hec", "sourcetype"),
3936                    Kind::bytes(),
3937                    None,
3938                );
3939
3940        assert_eq!(definition, Some(expected_definition));
3941    }
3942
3943    #[test]
3944    fn output_schema_definition_vector_namespace() {
3945        let config = SplunkConfig {
3946            log_namespace: Some(true),
3947            ..Default::default()
3948        };
3949
3950        let definition = config
3951            .outputs(LogNamespace::Vector)
3952            .remove(0)
3953            .schema_definition(true);
3954
3955        let expected_definition = Definition::new_with_default_metadata(
3956            Kind::object(Collection::empty()).or_bytes(),
3957            [LogNamespace::Vector],
3958        )
3959        .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
3960        .with_metadata_field(
3961            &owned_value_path!("vector", "source_type"),
3962            Kind::bytes(),
3963            None,
3964        )
3965        .with_metadata_field(
3966            &owned_value_path!("vector", "ingest_timestamp"),
3967            Kind::timestamp(),
3968            None,
3969        )
3970        .with_metadata_field(
3971            &owned_value_path!("splunk_hec", "host"),
3972            Kind::bytes(),
3973            Some("host"),
3974        )
3975        .with_metadata_field(
3976            &owned_value_path!("splunk_hec", "index"),
3977            Kind::bytes(),
3978            None,
3979        )
3980        .with_metadata_field(
3981            &owned_value_path!("splunk_hec", "source"),
3982            Kind::bytes(),
3983            Some("service"),
3984        )
3985        .with_metadata_field(
3986            &owned_value_path!("splunk_hec", "channel"),
3987            Kind::bytes(),
3988            None,
3989        )
3990        .with_metadata_field(
3991            &owned_value_path!("splunk_hec", "sourcetype"),
3992            Kind::bytes(),
3993            None,
3994        );
3995
3996        assert_eq!(definition, Some(expected_definition));
3997    }
3998
3999    #[test]
4000    fn output_schema_definition_legacy_namespace() {
4001        let config = SplunkConfig::default();
4002        let definitions = config
4003            .outputs(LogNamespace::Legacy)
4004            .remove(0)
4005            .schema_definition(true);
4006
4007        let expected_definition = Definition::new_with_default_metadata(
4008            Kind::object(Collection::empty()),
4009            [LogNamespace::Legacy],
4010        )
4011        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
4012        .with_event_field(
4013            &owned_value_path!("message"),
4014            Kind::bytes().or_undefined(),
4015            Some("message"),
4016        )
4017        .with_event_field(
4018            &owned_value_path!("line"),
4019            Kind::array(Collection::empty())
4020                .or_object(Collection::empty())
4021                .or_undefined(),
4022            None,
4023        )
4024        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
4025        .with_event_field(&owned_value_path!("splunk_channel"), Kind::bytes(), None)
4026        .with_event_field(&owned_value_path!("splunk_index"), Kind::bytes(), None)
4027        .with_event_field(
4028            &owned_value_path!("splunk_source"),
4029            Kind::bytes(),
4030            Some("service"),
4031        )
4032        .with_event_field(&owned_value_path!("splunk_sourcetype"), Kind::bytes(), None)
4033        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
4034
4035        assert_eq!(definitions, Some(expected_definition));
4036    }
4037
4038    impl ValidatableComponent for SplunkConfig {
4039        fn validation_configuration() -> ValidationConfiguration {
4040            let config = Self {
4041                address: default_socket_address(),
4042                ..Default::default()
4043            };
4044
4045            let listen_addr_http = format!("http://{}/services/collector/event", config.address);
4046            let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
4047
4048            let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into();
4049            let framing = BytesDecoderConfig::new().into();
4050            let decoding = DeserializerConfig::Json(Default::default());
4051
4052            let external_resource = ExternalResource::new(
4053                ResourceDirection::Push,
4054                HttpResourceConfig::from_parts(uri, None).with_headers(HashMap::from([(
4055                    X_SPLUNK_REQUEST_CHANNEL.to_string(),
4056                    "channel".to_string(),
4057                )])),
4058                DecodingConfig::new(framing, decoding, false.into()),
4059            );
4060
4061            ValidationConfiguration::from_source(
4062                Self::NAME,
4063                log_namespace,
4064                vec![ComponentTestCaseConfig::from_source(
4065                    config,
4066                    None,
4067                    Some(external_resource),
4068                )],
4069            )
4070        }
4071    }
4072
4073    register_validatable_component!(SplunkConfig);
4074}