1use async_trait::async_trait;
2use vector_lib::config::LogNamespace;
3use vrl::value::{Kind, kind::Collection};
4
5use vector_config::component::SourceDescription;
6
7use crate::config::{DataType, SourceConfig, SourceContext, SourceOutput};
8
9mod config;
11pub use self::config::*;
12
13cfg_if::cfg_if! {
14 if #[cfg(windows)] {
15 mod bookmark;
16 mod checkpoint;
17 pub mod error;
18 mod metadata;
19 mod parser;
20 mod render;
21 mod sid_resolver;
22 mod subscription;
23 mod xml_parser;
24
25 use std::path::PathBuf;
26 use std::sync::Arc;
27
28 use chrono::Utc;
29 use futures::StreamExt;
30 use vector_lib::EstimatedJsonEncodedSizeOf;
31 use vector_lib::finalizer::OrderedFinalizer;
32 use vector_lib::internal_event::{
33 ByteSize, BytesReceived, CountByteSize, InternalEventHandle, Protocol,
34 };
35 use windows::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE};
36 use windows::Win32::System::Threading::GetCurrentProcess;
37
38 use crate::{
39 SourceSender,
40 event::{BatchNotifier, BatchStatus, BatchStatusReceiver},
41 internal_events::{
42 EventsReceived, StreamClosedError, WindowsEventLogParseError, WindowsEventLogQueryError,
43 },
44 shutdown::ShutdownSignal,
45 };
46
47 use self::{
48 checkpoint::Checkpointer,
49 error::WindowsEventLogError,
50 parser::EventLogParser,
51 subscription::{EventLogSubscription, WaitResult},
52 xml_parser::WindowsEvent,
53 };
54 }
55}
56
57#[cfg(all(test, windows))]
58mod tests;
59
60#[cfg(all(test, windows, feature = "sources-windows_event_log-integration-tests"))]
63mod integration_tests;
64
65cfg_if::cfg_if! {
66if #[cfg(windows)] {
67
68#[derive(Debug, Clone)]
73struct FinalizerEntry {
74 bookmarks: Vec<(String, String)>,
76}
77
78type SharedCheckpointer = Arc<Checkpointer>;
80
81enum Finalizer {
84 Sync(SharedCheckpointer),
87 Async(OrderedFinalizer<FinalizerEntry>),
90}
91
92impl Finalizer {
93 fn new(
95 acknowledgements: bool,
96 checkpointer: SharedCheckpointer,
97 shutdown: ShutdownSignal,
98 ) -> Self {
99 if acknowledgements {
100 let (finalizer, mut ack_stream) =
101 OrderedFinalizer::<FinalizerEntry>::new(Some(shutdown.clone()));
102
103 tokio::spawn(async move {
105 while let Some((status, entry)) = ack_stream.next().await {
106 if status == BatchStatus::Delivered {
107 if let Err(e) = checkpointer.set_batch(entry.bookmarks.clone()).await {
108 warn!(
109 message = "Failed to update checkpoint after acknowledgement.",
110 error = %e
111 );
112 } else {
113 debug!(
114 message = "Checkpoint updated after acknowledgement.",
115 channels = entry.bookmarks.len()
116 );
117 }
118 } else {
119 debug!(
120 message = "Events not delivered, checkpoint not updated.",
121 status = ?status
122 );
123 }
124 }
125 debug!(message = "Acknowledgement stream completed.");
126 });
127
128 Self::Async(finalizer)
129 } else {
130 Self::Sync(checkpointer)
131 }
132 }
133
134 async fn finalize(&self, entry: FinalizerEntry, receiver: Option<BatchStatusReceiver>) {
138 match (self, receiver) {
139 (Self::Sync(checkpointer), None) => {
140 if let Err(e) = checkpointer.set_batch(entry.bookmarks.clone()).await {
141 warn!(
142 message = "Failed to update checkpoint.",
143 error = %e
144 );
145 }
146 }
147 (Self::Async(finalizer), Some(receiver)) => {
148 finalizer.add(entry, receiver);
149 }
150 (Self::Sync(_), Some(_)) => {
151 warn!(message = "Received acknowledgement receiver in sync mode, ignoring.");
152 }
153 (Self::Async(_), None) => {
154 warn!(
155 message = "No acknowledgement receiver in async mode, checkpoint may be lost."
156 );
157 }
158 }
159 }
160}
161
162async fn process_event_batch(
168 events: Vec<WindowsEvent>,
169 parser: &EventLogParser,
170 log_namespace: LogNamespace,
171 acknowledgements: bool,
172 subscription: &EventLogSubscription,
173 out: &mut SourceSender,
174 finalizer: &Finalizer,
175 events_received: &impl InternalEventHandle<Data = CountByteSize>,
176 bytes_received: &impl InternalEventHandle<Data = ByteSize>,
177) -> bool {
178 if let Some(limiter) = subscription.rate_limiter() {
180 limiter.until_ready().await;
181 }
182
183 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements);
184 let mut log_events = Vec::new();
185 let mut total_byte_size = 0usize;
186 let mut channels_in_batch = std::collections::HashSet::new();
187
188 for event in events {
189 let channel = event.channel.clone();
190 channels_in_batch.insert(channel.clone());
191 let event_id = event.event_id;
192 match parser.parse_event(event) {
193 Ok(mut log_event) => {
194 log_namespace.insert_standard_vector_source_metadata(
195 &mut log_event,
196 WindowsEventLogConfig::NAME,
197 Utc::now(),
198 );
199
200 let byte_size = log_event.estimated_json_encoded_size_of();
201 total_byte_size += byte_size.get();
202 if let Some(ref batch) = batch {
203 log_event = log_event.with_batch_notifier(batch);
204 }
205 log_events.push(log_event);
206 }
207 Err(e) => {
208 emit!(WindowsEventLogParseError {
209 error: e.to_string(),
210 channel,
211 event_id: Some(event_id),
212 });
213 }
214 }
215 }
216
217 if !log_events.is_empty() {
218 let count = log_events.len();
219 events_received.emit(CountByteSize(count, total_byte_size.into()));
220 bytes_received.emit(ByteSize(total_byte_size));
221
222 if let Err(_error) = out.send_batch(log_events).await {
225 emit!(StreamClosedError { count });
226 return true; }
228
229 let bookmarks: Vec<(String, String)> = channels_in_batch
231 .into_iter()
232 .filter_map(|channel| {
233 subscription
234 .get_bookmark_xml(&channel)
235 .map(|xml| (channel, xml))
236 })
237 .collect();
238
239 if !bookmarks.is_empty() {
240 let entry = FinalizerEntry { bookmarks };
241 finalizer.finalize(entry, receiver).await;
242 }
243 }
244
245 false }
247
248async fn with_subscription_blocking<F, R>(
256 subscription: EventLogSubscription,
257 f: F,
258) -> Result<(EventLogSubscription, R), WindowsEventLogError>
259where
260 F: FnOnce(EventLogSubscription) -> (EventLogSubscription, R) + Send + 'static,
261 R: Send + 'static,
262{
263 tokio::task::spawn_blocking(move || f(subscription))
264 .await
265 .map_err(|e| WindowsEventLogError::ConfigError {
266 message: format!("Blocking subscription task panicked: {e}"),
267 })
268}
269
270pub struct WindowsEventLogSource {
272 config: WindowsEventLogConfig,
273 data_dir: PathBuf,
274 acknowledgements: bool,
275 log_namespace: LogNamespace,
276}
277
278impl WindowsEventLogSource {
279 pub fn new(
280 config: WindowsEventLogConfig,
281 data_dir: PathBuf,
282 acknowledgements: bool,
283 log_namespace: LogNamespace,
284 ) -> crate::Result<Self> {
285 config.validate()?;
286
287 Ok(Self {
288 config,
289 data_dir,
290 acknowledgements,
291 log_namespace,
292 })
293 }
294
295 async fn run_internal(
296 &mut self,
297 mut out: SourceSender,
298 shutdown: ShutdownSignal,
299 ) -> Result<(), WindowsEventLogError> {
300 let checkpointer = Arc::new(Checkpointer::new(&self.data_dir).await?);
301
302 let finalizer = Finalizer::new(
303 self.acknowledgements,
304 Arc::clone(&checkpointer),
305 shutdown.clone(),
306 );
307
308 let mut subscription = EventLogSubscription::new(
309 &self.config,
310 Arc::clone(&checkpointer),
311 self.acknowledgements,
312 )
313 .await?;
314 let parser = EventLogParser::new(&self.config, self.log_namespace);
315
316 let events_received = register!(EventsReceived);
317 let bytes_received = register!(BytesReceived::from(Protocol::from("windows_event_log")));
318
319 let timeout_ms = self.config.event_timeout_ms as u32;
320 let batch_size = self.config.batch_size as usize;
321 let acknowledgements = self.acknowledgements;
322
323 info!(
324 message = "Starting Windows Event Log source (pull mode).",
325 acknowledgements = acknowledgements,
326 );
327
328 let (watcher_handle_raw, watcher_owns_handle): (isize, bool) = {
336 unsafe {
337 let src = HANDLE(subscription.shutdown_event_raw());
338 let process = GetCurrentProcess();
339 let mut dup = HANDLE::default();
340 if DuplicateHandle(
341 process,
342 src,
343 process,
344 &mut dup,
345 0,
346 false,
347 DUPLICATE_SAME_ACCESS,
348 )
349 .is_ok()
350 {
351 (dup.0 as isize, true)
352 } else {
353 warn!(
357 message = "Failed to duplicate shutdown event handle, falling back to shared handle."
358 );
359 (src.0 as isize, false)
360 }
361 }
362 };
363 let shutdown_watcher = shutdown.clone();
364 tokio::spawn(async move {
365 shutdown_watcher.await;
366 unsafe {
367 let handle =
368 windows::Win32::Foundation::HANDLE(watcher_handle_raw as *mut std::ffi::c_void);
369 let _ = windows::Win32::System::Threading::SetEvent(handle);
370 if watcher_owns_handle {
371 let _ = windows::Win32::Foundation::CloseHandle(handle);
372 }
373 }
374 });
375
376 let mut last_checkpoint = std::time::Instant::now();
378 let checkpoint_interval =
379 std::time::Duration::from_secs(self.config.checkpoint_interval_secs);
380
381 let mut error_backoff = std::time::Duration::from_millis(100);
383 const MAX_ERROR_BACKOFF: std::time::Duration = std::time::Duration::from_secs(5);
384
385 let mut timeout_count: u32 = 0;
387 let health_interval_timeouts = (30_000 / self.config.event_timeout_ms).max(1) as u32;
388
389 loop {
390 let (returned_sub, wait_result) =
395 with_subscription_blocking(subscription, move |sub| {
396 let result = sub.wait_for_events_blocking(timeout_ms);
397 (sub, result)
398 })
399 .await?;
400 subscription = returned_sub;
401
402 match wait_result {
403 WaitResult::EventsAvailable => {
404 let (returned_sub, events_result) =
406 with_subscription_blocking(subscription, move |mut sub| {
407 let result = sub.pull_events(batch_size);
408 (sub, result)
409 })
410 .await?;
411 subscription = returned_sub;
412
413 match events_result {
414 Ok(events) if events.is_empty() => {
415 error_backoff = std::time::Duration::from_millis(100);
416 continue;
417 }
418 Ok(events) => {
419 error_backoff = std::time::Duration::from_millis(100);
420 debug!(
421 message = "Pulled Windows Event Log events.",
422 event_count = events.len()
423 );
424 if process_event_batch(
425 events,
426 &parser,
427 self.log_namespace,
428 acknowledgements,
429 &subscription,
430 &mut out,
431 &finalizer,
432 &events_received,
433 &bytes_received,
434 )
435 .await
436 {
437 break;
438 }
439 }
440 Err(e) => {
441 emit!(WindowsEventLogQueryError {
442 channel: "all".to_string(),
443 query: None,
444 error: e.to_string(),
445 });
446 if !e.is_recoverable() {
447 error!(
448 message = "Non-recoverable pull error, shutting down.",
449 error = %e
450 );
451 break;
452 }
453 warn!(
455 message = "Recoverable pull error, backing off.",
456 backoff_ms = error_backoff.as_millis() as u64,
457 error = %e
458 );
459 tokio::time::sleep(error_backoff).await;
460 error_backoff = (error_backoff * 2).min(MAX_ERROR_BACKOFF);
461 }
462 }
463 }
464
465 WaitResult::Timeout => {
466 if !acknowledgements && last_checkpoint.elapsed() >= checkpoint_interval {
468 if let Err(e) = subscription.flush_bookmarks().await {
469 warn!(
470 message = "Failed to flush bookmarks during periodic checkpoint.",
471 error = %e
472 );
473 }
474 last_checkpoint = std::time::Instant::now();
475 }
476
477 timeout_count += 1;
479 if timeout_count >= health_interval_timeouts {
480 timeout_count = 0;
481 let (total, active) = subscription.channel_health_summary();
482 if active < total {
483 warn!(
484 message = "Some channel subscriptions are inactive.",
485 total_channels = total,
486 active_channels = active,
487 );
488 } else {
489 debug!(
490 message = "All channel subscriptions healthy.",
491 total_channels = total,
492 );
493 }
494 }
495
496 let (returned_sub, speculative_result) =
504 with_subscription_blocking(subscription, move |mut sub| {
505 let result = sub.pull_events_speculative(batch_size);
506 (sub, result)
507 })
508 .await?;
509 subscription = returned_sub;
510
511 match speculative_result {
512 Ok(events) if events.is_empty() => {
513 error_backoff = std::time::Duration::from_millis(100);
516 }
517 Ok(events) => {
518 error_backoff = std::time::Duration::from_millis(100);
521 warn!(
522 message = "Speculative timeout pull recovered events; possible lost wakeup detected.",
523 event_count = events.len(),
524 );
525 if process_event_batch(
526 events,
527 &parser,
528 self.log_namespace,
529 acknowledgements,
530 &subscription,
531 &mut out,
532 &finalizer,
533 &events_received,
534 &bytes_received,
535 )
536 .await
537 {
538 break;
539 }
540 }
541 Err(e) => {
542 emit!(WindowsEventLogQueryError {
543 channel: "all".to_string(),
544 query: None,
545 error: e.to_string(),
546 });
547 if !e.is_recoverable() {
548 error!(
549 message = "Non-recoverable speculative pull error, shutting down.",
550 error = %e
551 );
552 break;
553 }
554 warn!(
556 message = "Recoverable speculative pull error, backing off.",
557 backoff_ms = error_backoff.as_millis() as u64,
558 error = %e
559 );
560 tokio::time::sleep(error_backoff).await;
561 error_backoff = (error_backoff * 2).min(MAX_ERROR_BACKOFF);
562 }
563 }
564 }
565
566 WaitResult::Shutdown => {
567 info!(message = "Windows Event Log wait received shutdown signal.");
568 if !acknowledgements {
569 info!(message = "Flushing bookmarks before shutdown.");
570 if let Err(e) = subscription.flush_bookmarks().await {
571 warn!(message = "Failed to flush bookmarks on shutdown.", error = %e);
572 }
573 }
574 break;
575 }
576 }
577 }
578
579 Ok(())
580 }
581}
582
583} } #[async_trait]
587#[typetag::serde(name = "windows_event_log")]
588impl SourceConfig for WindowsEventLogConfig {
589 async fn build(&self, _cx: SourceContext) -> crate::Result<super::Source> {
590 #[cfg(not(windows))]
591 {
592 Err("The windows_event_log source is only supported on Windows.".into())
593 }
594
595 #[cfg(windows)]
596 {
597 let data_dir = _cx
598 .globals
599 .resolve_and_make_data_subdir(self.data_dir.as_ref(), _cx.key.id())?;
600
601 let acknowledgements = _cx.do_acknowledgements(self.acknowledgements);
602
603 let log_namespace = _cx.log_namespace(self.log_namespace);
604 let source = WindowsEventLogSource::new(
605 self.clone(),
606 data_dir,
607 acknowledgements,
608 log_namespace,
609 )?;
610 Ok(Box::pin(async move {
611 let mut source = source;
612 if let Err(error) = source.run_internal(_cx.out, _cx.shutdown).await {
613 error!(message = "Windows Event Log source failed.", %error);
614 }
615 Ok(())
616 }))
617 }
618 }
619
620 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
621 let log_namespace = self
622 .log_namespace
623 .map(|b| {
624 if b {
625 LogNamespace::Vector
626 } else {
627 LogNamespace::Legacy
628 }
629 })
630 .unwrap_or(global_log_namespace);
631
632 let schema_definition = match log_namespace {
633 LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
634 Kind::object(std::collections::BTreeMap::from([
635 ("timestamp".into(), Kind::timestamp().or_undefined()),
636 ("message".into(), Kind::bytes().or_undefined()),
637 ("level".into(), Kind::bytes().or_undefined()),
638 ("source".into(), Kind::bytes().or_undefined()),
639 ("event_id".into(), Kind::integer().or_undefined()),
640 ("provider_name".into(), Kind::bytes().or_undefined()),
641 ("computer".into(), Kind::bytes().or_undefined()),
642 ("user_id".into(), Kind::bytes().or_undefined()),
643 ("user_name".into(), Kind::bytes().or_undefined()),
644 ("record_id".into(), Kind::integer().or_undefined()),
645 ("activity_id".into(), Kind::bytes().or_undefined()),
646 ("related_activity_id".into(), Kind::bytes().or_undefined()),
647 ("process_id".into(), Kind::integer().or_undefined()),
648 ("thread_id".into(), Kind::integer().or_undefined()),
649 ("channel".into(), Kind::bytes().or_undefined()),
650 ("opcode".into(), Kind::integer().or_undefined()),
651 ("task".into(), Kind::integer().or_undefined()),
652 ("keywords".into(), Kind::bytes().or_undefined()),
653 ("level_value".into(), Kind::integer().or_undefined()),
654 ("provider_guid".into(), Kind::bytes().or_undefined()),
655 ("version".into(), Kind::integer().or_undefined()),
656 ("qualifiers".into(), Kind::integer().or_undefined()),
657 (
658 "string_inserts".into(),
659 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
660 ),
661 (
662 "event_data".into(),
663 Kind::object(std::collections::BTreeMap::new()).or_undefined(),
664 ),
665 (
666 "user_data".into(),
667 Kind::object(std::collections::BTreeMap::new()).or_undefined(),
668 ),
669 ("task_name".into(), Kind::bytes().or_undefined()),
670 ("opcode_name".into(), Kind::bytes().or_undefined()),
671 (
672 "keyword_names".into(),
673 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
674 ),
675 ])),
676 [LogNamespace::Vector],
677 )
678 .with_standard_vector_source_metadata(),
679 LogNamespace::Legacy => {
680 vector_lib::schema::Definition::any().with_standard_vector_source_metadata()
681 }
682 };
683
684 vec![SourceOutput::new_maybe_logs(
685 DataType::Log,
686 schema_definition,
687 )]
688 }
689
690 fn resources(&self) -> Vec<crate::config::Resource> {
691 self.channels
692 .iter()
693 .map(|channel| crate::config::Resource::DiskBuffer(channel.clone()))
694 .collect()
695 }
696
697 fn can_acknowledge(&self) -> bool {
698 true
699 }
700}
701
702inventory::submit! {
703 SourceDescription::new::<WindowsEventLogConfig>(
704 "windows_event_log",
705 "Collect logs from Windows Event Log channels",
706 "A Windows-specific source that subscribes to Windows Event Log channels and streams events in real-time using the Windows Event Log API.",
707 "https://vector.dev/docs/reference/configuration/sources/windows_event_log/"
708 )
709}