vector/sources/windows_event_log/
mod.rs

1use async_trait::async_trait;
2use vector_lib::config::LogNamespace;
3use vrl::value::{Kind, kind::Collection};
4
5use vector_config::component::SourceDescription;
6
7use crate::config::{DataType, SourceConfig, SourceContext, SourceOutput};
8
9// Cross-platform: config types (pure serde structs, no Windows dependencies)
10mod config;
11pub use self::config::*;
12
13cfg_if::cfg_if! {
14    if #[cfg(windows)] {
15        mod bookmark;
16        mod checkpoint;
17        pub mod error;
18        mod metadata;
19        mod parser;
20        mod render;
21        mod sid_resolver;
22        mod subscription;
23        mod xml_parser;
24
25        use std::path::PathBuf;
26        use std::sync::Arc;
27
28        use chrono::Utc;
29        use futures::StreamExt;
30        use vector_lib::EstimatedJsonEncodedSizeOf;
31        use vector_lib::finalizer::OrderedFinalizer;
32        use vector_lib::internal_event::{
33            ByteSize, BytesReceived, CountByteSize, InternalEventHandle, Protocol,
34        };
35        use windows::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE};
36        use windows::Win32::System::Threading::GetCurrentProcess;
37
38        use crate::{
39            SourceSender,
40            event::{BatchNotifier, BatchStatus, BatchStatusReceiver},
41            internal_events::{
42                EventsReceived, StreamClosedError, WindowsEventLogParseError, WindowsEventLogQueryError,
43            },
44            shutdown::ShutdownSignal,
45        };
46
47        use self::{
48            checkpoint::Checkpointer,
49            error::WindowsEventLogError,
50            parser::EventLogParser,
51            subscription::{EventLogSubscription, WaitResult},
52            xml_parser::WindowsEvent,
53        };
54    }
55}
56
57#[cfg(all(test, windows))]
58mod tests;
59
60// Integration tests are feature-gated to avoid requiring Windows Event Log service.
61// To run integration tests on Windows: cargo test --features sources-windows_event_log-integration-tests
62#[cfg(all(test, windows, feature = "sources-windows_event_log-integration-tests"))]
63mod integration_tests;
64
65cfg_if::cfg_if! {
66if #[cfg(windows)] {
67
68/// Entry for the acknowledgment finalizer containing checkpoint information.
69/// Each entry represents a batch of events that need to be acknowledged before
70/// the checkpoint can be safely updated. Contains all channel bookmarks from
71/// the batch since a single batch may span multiple channels.
72#[derive(Debug, Clone)]
73struct FinalizerEntry {
74    /// Channel bookmarks: (channel_name, bookmark_xml) pairs
75    bookmarks: Vec<(String, String)>,
76}
77
78/// Shared checkpointer type for use with the finalizer
79type SharedCheckpointer = Arc<Checkpointer>;
80
81/// Finalizer for handling acknowledgments.
82/// Supports both synchronous (immediate checkpoint) and asynchronous (deferred checkpoint) modes.
83enum Finalizer {
84    /// Synchronous mode: checkpoints are updated immediately after reading events.
85    /// Used when acknowledgements are disabled.
86    Sync(SharedCheckpointer),
87    /// Asynchronous mode: checkpoints are updated only after downstream sinks acknowledge receipt.
88    /// Used when acknowledgements are enabled.
89    Async(OrderedFinalizer<FinalizerEntry>),
90}
91
92impl Finalizer {
93    /// Create a new finalizer based on acknowledgement configuration.
94    fn new(
95        acknowledgements: bool,
96        checkpointer: SharedCheckpointer,
97        shutdown: ShutdownSignal,
98    ) -> Self {
99        if acknowledgements {
100            let (finalizer, mut ack_stream) =
101                OrderedFinalizer::<FinalizerEntry>::new(Some(shutdown.clone()));
102
103            // Spawn background task to process acknowledgments and update checkpoints
104            tokio::spawn(async move {
105                while let Some((status, entry)) = ack_stream.next().await {
106                    if status == BatchStatus::Delivered {
107                        if let Err(e) = checkpointer.set_batch(entry.bookmarks.clone()).await {
108                            warn!(
109                                message = "Failed to update checkpoint after acknowledgement.",
110                                error = %e
111                            );
112                        } else {
113                            debug!(
114                                message = "Checkpoint updated after acknowledgement.",
115                                channels = entry.bookmarks.len()
116                            );
117                        }
118                    } else {
119                        debug!(
120                            message = "Events not delivered, checkpoint not updated.",
121                            status = ?status
122                        );
123                    }
124                }
125                debug!(message = "Acknowledgement stream completed.");
126            });
127
128            Self::Async(finalizer)
129        } else {
130            Self::Sync(checkpointer)
131        }
132    }
133
134    /// Finalize a batch of events.
135    /// In sync mode, immediately updates the checkpoint.
136    /// In async mode, registers the entry for deferred checkpoint update.
137    async fn finalize(&self, entry: FinalizerEntry, receiver: Option<BatchStatusReceiver>) {
138        match (self, receiver) {
139            (Self::Sync(checkpointer), None) => {
140                if let Err(e) = checkpointer.set_batch(entry.bookmarks.clone()).await {
141                    warn!(
142                        message = "Failed to update checkpoint.",
143                        error = %e
144                    );
145                }
146            }
147            (Self::Async(finalizer), Some(receiver)) => {
148                finalizer.add(entry, receiver);
149            }
150            (Self::Sync(_), Some(_)) => {
151                warn!(message = "Received acknowledgement receiver in sync mode, ignoring.");
152            }
153            (Self::Async(_), None) => {
154                warn!(
155                    message = "No acknowledgement receiver in async mode, checkpoint may be lost."
156                );
157            }
158        }
159    }
160}
161
162/// Parse, emit metrics for, send, and finalize a non-empty batch of pulled Windows events.
163///
164/// Both the `EventsAvailable` path and the speculative-timeout path share this
165/// logic. Returns `true` if the downstream pipeline closed and the caller
166/// should break out of the main event loop.
167async fn process_event_batch(
168    events: Vec<WindowsEvent>,
169    parser: &EventLogParser,
170    log_namespace: LogNamespace,
171    acknowledgements: bool,
172    subscription: &EventLogSubscription,
173    out: &mut SourceSender,
174    finalizer: &Finalizer,
175    events_received: &impl InternalEventHandle<Data = CountByteSize>,
176    bytes_received: &impl InternalEventHandle<Data = ByteSize>,
177) -> bool {
178    // Rate limiting between batches (async-compatible).
179    if let Some(limiter) = subscription.rate_limiter() {
180        limiter.until_ready().await;
181    }
182
183    let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements);
184    let mut log_events = Vec::new();
185    let mut total_byte_size = 0usize;
186    let mut channels_in_batch = std::collections::HashSet::new();
187
188    for event in events {
189        let channel = event.channel.clone();
190        channels_in_batch.insert(channel.clone());
191        let event_id = event.event_id;
192        match parser.parse_event(event) {
193            Ok(mut log_event) => {
194                log_namespace.insert_standard_vector_source_metadata(
195                    &mut log_event,
196                    WindowsEventLogConfig::NAME,
197                    Utc::now(),
198                );
199
200                let byte_size = log_event.estimated_json_encoded_size_of();
201                total_byte_size += byte_size.get();
202                if let Some(ref batch) = batch {
203                    log_event = log_event.with_batch_notifier(batch);
204                }
205                log_events.push(log_event);
206            }
207            Err(e) => {
208                emit!(WindowsEventLogParseError {
209                    error: e.to_string(),
210                    channel,
211                    event_id: Some(event_id),
212                });
213            }
214        }
215    }
216
217    if !log_events.is_empty() {
218        let count = log_events.len();
219        events_received.emit(CountByteSize(count, total_byte_size.into()));
220        bytes_received.emit(ByteSize(total_byte_size));
221
222        // BACK PRESSURE: block until the pipeline accepts the batch.
223        // We don't call EvtNext again until this completes.
224        if let Err(_error) = out.send_batch(log_events).await {
225            emit!(StreamClosedError { count });
226            return true; // signal: break the main loop
227        }
228
229        // Register checkpoint entry with the finalizer.
230        let bookmarks: Vec<(String, String)> = channels_in_batch
231            .into_iter()
232            .filter_map(|channel| {
233                subscription
234                    .get_bookmark_xml(&channel)
235                    .map(|xml| (channel, xml))
236            })
237            .collect();
238
239        if !bookmarks.is_empty() {
240            let entry = FinalizerEntry { bookmarks };
241            finalizer.finalize(entry, receiver).await;
242        }
243    }
244
245    false // pipeline still open
246}
247
248/// Transfer ownership of `subscription` into a `spawn_blocking` task, run `f`
249/// on it, then return both the subscription and the result.
250///
251/// All blocking Windows APIs (`WaitForMultipleObjects`, `EvtNext`, `EvtRender`)
252/// must run in `spawn_blocking` to avoid stalling the async runtime. The
253/// ownership-transfer pattern ensures only one thread holds the subscription
254/// at a time, preventing data races without requiring locks.
255async fn with_subscription_blocking<F, R>(
256    subscription: EventLogSubscription,
257    f: F,
258) -> Result<(EventLogSubscription, R), WindowsEventLogError>
259where
260    F: FnOnce(EventLogSubscription) -> (EventLogSubscription, R) + Send + 'static,
261    R: Send + 'static,
262{
263    tokio::task::spawn_blocking(move || f(subscription))
264        .await
265        .map_err(|e| WindowsEventLogError::ConfigError {
266            message: format!("Blocking subscription task panicked: {e}"),
267        })
268}
269
270/// Windows Event Log source implementation
271pub struct WindowsEventLogSource {
272    config: WindowsEventLogConfig,
273    data_dir: PathBuf,
274    acknowledgements: bool,
275    log_namespace: LogNamespace,
276}
277
278impl WindowsEventLogSource {
279    pub fn new(
280        config: WindowsEventLogConfig,
281        data_dir: PathBuf,
282        acknowledgements: bool,
283        log_namespace: LogNamespace,
284    ) -> crate::Result<Self> {
285        config.validate()?;
286
287        Ok(Self {
288            config,
289            data_dir,
290            acknowledgements,
291            log_namespace,
292        })
293    }
294
295    async fn run_internal(
296        &mut self,
297        mut out: SourceSender,
298        shutdown: ShutdownSignal,
299    ) -> Result<(), WindowsEventLogError> {
300        let checkpointer = Arc::new(Checkpointer::new(&self.data_dir).await?);
301
302        let finalizer = Finalizer::new(
303            self.acknowledgements,
304            Arc::clone(&checkpointer),
305            shutdown.clone(),
306        );
307
308        let mut subscription = EventLogSubscription::new(
309            &self.config,
310            Arc::clone(&checkpointer),
311            self.acknowledgements,
312        )
313        .await?;
314        let parser = EventLogParser::new(&self.config, self.log_namespace);
315
316        let events_received = register!(EventsReceived);
317        let bytes_received = register!(BytesReceived::from(Protocol::from("windows_event_log")));
318
319        let timeout_ms = self.config.event_timeout_ms as u32;
320        let batch_size = self.config.batch_size as usize;
321        let acknowledgements = self.acknowledgements;
322
323        info!(
324            message = "Starting Windows Event Log source (pull mode).",
325            acknowledgements = acknowledgements,
326        );
327
328        // Spawn async shutdown watcher that signals the Windows shutdown event
329        // when the Vector shutdown signal fires. This wakes WaitForMultipleObjects
330        // while subscription is moved into spawn_blocking.
331        //
332        // We duplicate the handle so the watcher owns an independent kernel reference.
333        // This prevents use-after-close if the subscription panics and drops before
334        // the watcher fires — the duplicate remains valid until explicitly closed.
335        let (watcher_handle_raw, watcher_owns_handle): (isize, bool) = {
336            unsafe {
337                let src = HANDLE(subscription.shutdown_event_raw());
338                let process = GetCurrentProcess();
339                let mut dup = HANDLE::default();
340                if DuplicateHandle(
341                    process,
342                    src,
343                    process,
344                    &mut dup,
345                    0,
346                    false,
347                    DUPLICATE_SAME_ACCESS,
348                )
349                .is_ok()
350                {
351                    (dup.0 as isize, true)
352                } else {
353                    // Fallback: use the original handle without ownership.
354                    // The watcher will signal but NOT close — EventLogSubscription::drop
355                    // owns the handle and will close it.
356                    warn!(
357                        message = "Failed to duplicate shutdown event handle, falling back to shared handle."
358                    );
359                    (src.0 as isize, false)
360                }
361            }
362        };
363        let shutdown_watcher = shutdown.clone();
364        tokio::spawn(async move {
365            shutdown_watcher.await;
366            unsafe {
367                let handle =
368                    windows::Win32::Foundation::HANDLE(watcher_handle_raw as *mut std::ffi::c_void);
369                let _ = windows::Win32::System::Threading::SetEvent(handle);
370                if watcher_owns_handle {
371                    let _ = windows::Win32::Foundation::CloseHandle(handle);
372                }
373            }
374        });
375
376        // Track when we last flushed checkpoints
377        let mut last_checkpoint = std::time::Instant::now();
378        let checkpoint_interval =
379            std::time::Duration::from_secs(self.config.checkpoint_interval_secs);
380
381        // Exponential backoff on consecutive recoverable errors
382        let mut error_backoff = std::time::Duration::from_millis(100);
383        const MAX_ERROR_BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
384
385        // Health heartbeat: log every ~30s regardless of checkpoint interval
386        let mut timeout_count: u32 = 0;
387        let health_interval_timeouts = (30_000 / self.config.event_timeout_ms).max(1) as u32;
388
389        loop {
390            // Move subscription into blocking thread for WaitForMultipleObjects.
391            // Ownership transfer ensures no data races between the blocking thread
392            // and async code. The shutdown watcher uses a raw HANDLE value (just an
393            // integer) to signal shutdown without needing access to the subscription.
394            let (returned_sub, wait_result) =
395                with_subscription_blocking(subscription, move |sub| {
396                    let result = sub.wait_for_events_blocking(timeout_ms);
397                    (sub, result)
398                })
399                .await?;
400            subscription = returned_sub;
401
402            match wait_result {
403                WaitResult::EventsAvailable => {
404                    // Pull events via spawn_blocking (EvtNext/EvtRender are blocking APIs)
405                    let (returned_sub, events_result) =
406                        with_subscription_blocking(subscription, move |mut sub| {
407                            let result = sub.pull_events(batch_size);
408                            (sub, result)
409                        })
410                        .await?;
411                    subscription = returned_sub;
412
413                    match events_result {
414                        Ok(events) if events.is_empty() => {
415                            error_backoff = std::time::Duration::from_millis(100);
416                            continue;
417                        }
418                        Ok(events) => {
419                            error_backoff = std::time::Duration::from_millis(100);
420                            debug!(
421                                message = "Pulled Windows Event Log events.",
422                                event_count = events.len()
423                            );
424                            if process_event_batch(
425                                events,
426                                &parser,
427                                self.log_namespace,
428                                acknowledgements,
429                                &subscription,
430                                &mut out,
431                                &finalizer,
432                                &events_received,
433                                &bytes_received,
434                            )
435                            .await
436                            {
437                                break;
438                            }
439                        }
440                        Err(e) => {
441                            emit!(WindowsEventLogQueryError {
442                                channel: "all".to_string(),
443                                query: None,
444                                error: e.to_string(),
445                            });
446                            if !e.is_recoverable() {
447                                error!(
448                                    message = "Non-recoverable pull error, shutting down.",
449                                    error = %e
450                                );
451                                break;
452                            }
453                            // Exponential backoff on consecutive recoverable errors
454                            warn!(
455                                message = "Recoverable pull error, backing off.",
456                                backoff_ms = error_backoff.as_millis() as u64,
457                                error = %e
458                            );
459                            tokio::time::sleep(error_backoff).await;
460                            error_backoff = (error_backoff * 2).min(MAX_ERROR_BACKOFF);
461                        }
462                    }
463                }
464
465                WaitResult::Timeout => {
466                    // Periodic checkpoint flush (sync mode only)
467                    if !acknowledgements && last_checkpoint.elapsed() >= checkpoint_interval {
468                        if let Err(e) = subscription.flush_bookmarks().await {
469                            warn!(
470                                message = "Failed to flush bookmarks during periodic checkpoint.",
471                                error = %e
472                            );
473                        }
474                        last_checkpoint = std::time::Instant::now();
475                    }
476
477                    // Health heartbeat on a separate ~30s cadence
478                    timeout_count += 1;
479                    if timeout_count >= health_interval_timeouts {
480                        timeout_count = 0;
481                        let (total, active) = subscription.channel_health_summary();
482                        if active < total {
483                            warn!(
484                                message = "Some channel subscriptions are inactive.",
485                                total_channels = total,
486                                active_channels = active,
487                            );
488                        } else {
489                            debug!(
490                                message = "All channel subscriptions healthy.",
491                                total_channels = total,
492                            );
493                        }
494                    }
495
496                    // Speculative pull: self-heal against any lost-wakeup scenario,
497                    // regardless of root cause. If the OS signal was lost through any
498                    // mechanism (not just the pre-drain race fixed in #25194), this
499                    // ensures the source recovers within one timeout period.
500                    // Use the speculative pull variant so idle timeout cycles don't
501                    // refresh per-channel record-count gauges via EvtOpenLog /
502                    // EvtGetLogInfo on every configured channel.
503                    let (returned_sub, speculative_result) =
504                        with_subscription_blocking(subscription, move |mut sub| {
505                            let result = sub.pull_events_speculative(batch_size);
506                            (sub, result)
507                        })
508                        .await?;
509                    subscription = returned_sub;
510
511                    match speculative_result {
512                        Ok(events) if events.is_empty() => {
513                            // Healthy cycle: reset backoff so the next transient
514                            // error starts fresh.
515                            error_backoff = std::time::Duration::from_millis(100);
516                        }
517                        Ok(events) => {
518                            // Healthy cycle: reset backoff so the next transient
519                            // error starts fresh.
520                            error_backoff = std::time::Duration::from_millis(100);
521                            warn!(
522                                message = "Speculative timeout pull recovered events; possible lost wakeup detected.",
523                                event_count = events.len(),
524                            );
525                            if process_event_batch(
526                                events,
527                                &parser,
528                                self.log_namespace,
529                                acknowledgements,
530                                &subscription,
531                                &mut out,
532                                &finalizer,
533                                &events_received,
534                                &bytes_received,
535                            )
536                            .await
537                            {
538                                break;
539                            }
540                        }
541                        Err(e) => {
542                            emit!(WindowsEventLogQueryError {
543                                channel: "all".to_string(),
544                                query: None,
545                                error: e.to_string(),
546                            });
547                            if !e.is_recoverable() {
548                                error!(
549                                    message = "Non-recoverable speculative pull error, shutting down.",
550                                    error = %e
551                                );
552                                break;
553                            }
554                            // Exponential backoff mirrors the EventsAvailable error path.
555                            warn!(
556                                message = "Recoverable speculative pull error, backing off.",
557                                backoff_ms = error_backoff.as_millis() as u64,
558                                error = %e
559                            );
560                            tokio::time::sleep(error_backoff).await;
561                            error_backoff = (error_backoff * 2).min(MAX_ERROR_BACKOFF);
562                        }
563                    }
564                }
565
566                WaitResult::Shutdown => {
567                    info!(message = "Windows Event Log wait received shutdown signal.");
568                    if !acknowledgements {
569                        info!(message = "Flushing bookmarks before shutdown.");
570                        if let Err(e) = subscription.flush_bookmarks().await {
571                            warn!(message = "Failed to flush bookmarks on shutdown.", error = %e);
572                        }
573                    }
574                    break;
575                }
576            }
577        }
578
579        Ok(())
580    }
581}
582
583} // if #[cfg(windows)]
584} // cfg_if!
585
586#[async_trait]
587#[typetag::serde(name = "windows_event_log")]
588impl SourceConfig for WindowsEventLogConfig {
589    async fn build(&self, _cx: SourceContext) -> crate::Result<super::Source> {
590        #[cfg(not(windows))]
591        {
592            Err("The windows_event_log source is only supported on Windows.".into())
593        }
594
595        #[cfg(windows)]
596        {
597            let data_dir = _cx
598                .globals
599                .resolve_and_make_data_subdir(self.data_dir.as_ref(), _cx.key.id())?;
600
601            let acknowledgements = _cx.do_acknowledgements(self.acknowledgements);
602
603            let log_namespace = _cx.log_namespace(self.log_namespace);
604            let source = WindowsEventLogSource::new(
605                self.clone(),
606                data_dir,
607                acknowledgements,
608                log_namespace,
609            )?;
610            Ok(Box::pin(async move {
611                let mut source = source;
612                if let Err(error) = source.run_internal(_cx.out, _cx.shutdown).await {
613                    error!(message = "Windows Event Log source failed.", %error);
614                }
615                Ok(())
616            }))
617        }
618    }
619
620    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
621        let log_namespace = self
622            .log_namespace
623            .map(|b| {
624                if b {
625                    LogNamespace::Vector
626                } else {
627                    LogNamespace::Legacy
628                }
629            })
630            .unwrap_or(global_log_namespace);
631
632        let schema_definition = match log_namespace {
633            LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
634                Kind::object(std::collections::BTreeMap::from([
635                    ("timestamp".into(), Kind::timestamp().or_undefined()),
636                    ("message".into(), Kind::bytes().or_undefined()),
637                    ("level".into(), Kind::bytes().or_undefined()),
638                    ("source".into(), Kind::bytes().or_undefined()),
639                    ("event_id".into(), Kind::integer().or_undefined()),
640                    ("provider_name".into(), Kind::bytes().or_undefined()),
641                    ("computer".into(), Kind::bytes().or_undefined()),
642                    ("user_id".into(), Kind::bytes().or_undefined()),
643                    ("user_name".into(), Kind::bytes().or_undefined()),
644                    ("record_id".into(), Kind::integer().or_undefined()),
645                    ("activity_id".into(), Kind::bytes().or_undefined()),
646                    ("related_activity_id".into(), Kind::bytes().or_undefined()),
647                    ("process_id".into(), Kind::integer().or_undefined()),
648                    ("thread_id".into(), Kind::integer().or_undefined()),
649                    ("channel".into(), Kind::bytes().or_undefined()),
650                    ("opcode".into(), Kind::integer().or_undefined()),
651                    ("task".into(), Kind::integer().or_undefined()),
652                    ("keywords".into(), Kind::bytes().or_undefined()),
653                    ("level_value".into(), Kind::integer().or_undefined()),
654                    ("provider_guid".into(), Kind::bytes().or_undefined()),
655                    ("version".into(), Kind::integer().or_undefined()),
656                    ("qualifiers".into(), Kind::integer().or_undefined()),
657                    (
658                        "string_inserts".into(),
659                        Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
660                    ),
661                    (
662                        "event_data".into(),
663                        Kind::object(std::collections::BTreeMap::new()).or_undefined(),
664                    ),
665                    (
666                        "user_data".into(),
667                        Kind::object(std::collections::BTreeMap::new()).or_undefined(),
668                    ),
669                    ("task_name".into(), Kind::bytes().or_undefined()),
670                    ("opcode_name".into(), Kind::bytes().or_undefined()),
671                    (
672                        "keyword_names".into(),
673                        Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
674                    ),
675                ])),
676                [LogNamespace::Vector],
677            )
678            .with_standard_vector_source_metadata(),
679            LogNamespace::Legacy => {
680                vector_lib::schema::Definition::any().with_standard_vector_source_metadata()
681            }
682        };
683
684        vec![SourceOutput::new_maybe_logs(
685            DataType::Log,
686            schema_definition,
687        )]
688    }
689
690    fn resources(&self) -> Vec<crate::config::Resource> {
691        self.channels
692            .iter()
693            .map(|channel| crate::config::Resource::DiskBuffer(channel.clone()))
694            .collect()
695    }
696
697    fn can_acknowledge(&self) -> bool {
698        true
699    }
700}
701
702inventory::submit! {
703    SourceDescription::new::<WindowsEventLogConfig>(
704        "windows_event_log",
705        "Collect logs from Windows Event Log channels",
706        "A Windows-specific source that subscribes to Windows Event Log channels and streams events in real-time using the Windows Event Log API.",
707        "https://vector.dev/docs/reference/configuration/sources/windows_event_log/"
708    )
709}