Skip to content
3 changes: 3 additions & 0 deletions changelog.d/25194_windows_event_log_lost_wakeup.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `windows_event_log` source no longer freezes after periods of inactivity.

authors: tot19
280 changes: 191 additions & 89 deletions src/sources/windows_event_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ cfg_if::cfg_if! {
use vector_lib::EstimatedJsonEncodedSizeOf;
use vector_lib::finalizer::OrderedFinalizer;
use vector_lib::internal_event::{
ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
ByteSize, BytesReceived, CountByteSize, InternalEventHandle, Protocol,
};
use windows::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE};
use windows::Win32::System::Threading::GetCurrentProcess;
Expand All @@ -48,6 +48,7 @@ cfg_if::cfg_if! {
error::WindowsEventLogError,
parser::EventLogParser,
subscription::{EventLogSubscription, WaitResult},
xml_parser::WindowsEvent,
};
}
}
Expand Down Expand Up @@ -157,6 +158,107 @@ impl Finalizer {
}
}

/// Parse, emit metrics for, send, and finalize a non-empty batch of pulled Windows events.
///
/// Both the `EventsAvailable` path and the speculative-timeout path share this
/// logic. Returns `true` if the downstream pipeline closed and the caller
/// should break out of the main event loop.
async fn process_event_batch(
events: Vec<WindowsEvent>,
parser: &EventLogParser,
acknowledgements: bool,
subscription: &EventLogSubscription,
out: &mut SourceSender,
finalizer: &Finalizer,
events_received: &impl InternalEventHandle<Data = CountByteSize>,
bytes_received: &impl InternalEventHandle<Data = ByteSize>,
) -> bool {
// Rate limiting between batches (async-compatible).
if let Some(limiter) = subscription.rate_limiter() {
limiter.until_ready().await;
}

let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements);
let mut log_events = Vec::new();
let mut total_byte_size = 0usize;
let mut channels_in_batch = std::collections::HashSet::new();

for event in events {
let channel = event.channel.clone();
channels_in_batch.insert(channel.clone());
let event_id = event.event_id;
match parser.parse_event(event) {
Ok(mut log_event) => {
let byte_size = log_event.estimated_json_encoded_size_of();
total_byte_size += byte_size.get();
if let Some(ref batch) = batch {
log_event = log_event.with_batch_notifier(batch);
}
log_events.push(log_event);
}
Err(e) => {
emit!(WindowsEventLogParseError {
error: e.to_string(),
channel,
event_id: Some(event_id),
});
}
}
}

if !log_events.is_empty() {
let count = log_events.len();
events_received.emit(CountByteSize(count, total_byte_size.into()));
bytes_received.emit(ByteSize(total_byte_size));

// BACK PRESSURE: block until the pipeline accepts the batch.
// We don't call EvtNext again until this completes.
if let Err(_error) = out.send_batch(log_events).await {
emit!(StreamClosedError { count });
return true; // signal: break the main loop
}

// Register checkpoint entry with the finalizer.
let bookmarks: Vec<(String, String)> = channels_in_batch
.into_iter()
.filter_map(|channel| {
subscription
.get_bookmark_xml(&channel)
.map(|xml| (channel, xml))
})
.collect();

if !bookmarks.is_empty() {
let entry = FinalizerEntry { bookmarks };
finalizer.finalize(entry, receiver).await;
}
}

false // pipeline still open
}

/// Transfer ownership of `subscription` into a `spawn_blocking` task, run `f`
/// on it, then return both the subscription and the result.
///
/// All blocking Windows APIs (`WaitForMultipleObjects`, `EvtNext`, `EvtRender`)
/// must run in `spawn_blocking` to avoid stalling the async runtime. The
/// ownership-transfer pattern ensures only one thread holds the subscription
/// at a time, preventing data races without requiring locks.
async fn with_subscription_blocking<F, R>(
subscription: EventLogSubscription,
f: F,
) -> Result<(EventLogSubscription, R), WindowsEventLogError>
where
F: FnOnce(EventLogSubscription) -> (EventLogSubscription, R) + Send + 'static,
R: Send + 'static,
{
tokio::task::spawn_blocking(move || f(subscription))
.await
.map_err(|e| WindowsEventLogError::ConfigError {
message: format!("Blocking subscription task panicked: {e}"),
})
}

/// Windows Event Log source implementation
pub struct WindowsEventLogSource {
config: WindowsEventLogConfig,
Expand Down Expand Up @@ -281,42 +383,25 @@ impl WindowsEventLogSource {
// Ownership transfer ensures no data races between the blocking thread
// and async code. The shutdown watcher uses a raw HANDLE value (just an
// integer) to signal shutdown without needing access to the subscription.
let (returned_sub, wait_result) = tokio::task::spawn_blocking({
let sub = subscription;
move || {
let (returned_sub, wait_result) =
with_subscription_blocking(subscription, move |sub| {
let result = sub.wait_for_events_blocking(timeout_ms);
(sub, result)
}
})
.await
.map_err(|e| WindowsEventLogError::ConfigError {
message: format!("Wait task panicked: {e}"),
})?;

})
.await?;
subscription = returned_sub;

match wait_result {
WaitResult::EventsAvailable => {
// Pull events via spawn_blocking (EvtNext/EvtRender are blocking APIs)
let (returned_sub, events_result) = tokio::task::spawn_blocking({
let mut sub = subscription;
move || {
let (returned_sub, events_result) =
with_subscription_blocking(subscription, move |mut sub| {
let result = sub.pull_events(batch_size);
(sub, result)
}
})
.await
.map_err(|e| WindowsEventLogError::ConfigError {
message: format!("Pull task panicked: {e}"),
})?;

})
.await?;
subscription = returned_sub;

// Rate limiting between batches (async-compatible)
if let Some(limiter) = subscription.rate_limiter() {
limiter.until_ready().await;
}

match events_result {
Ok(events) if events.is_empty() => {
error_backoff = std::time::Duration::from_millis(100);
Expand All @@ -328,65 +413,19 @@ impl WindowsEventLogSource {
message = "Pulled Windows Event Log events.",
event_count = events.len()
);

let (batch, receiver) =
BatchNotifier::maybe_new_with_receiver(acknowledgements);

let mut log_events = Vec::new();
let mut total_byte_size = 0;
let mut channels_in_batch = std::collections::HashSet::new();

for event in events {
let channel = event.channel.clone();
channels_in_batch.insert(channel.clone());
let event_id = event.event_id;
match parser.parse_event(event) {
Ok(mut log_event) => {
let byte_size = log_event.estimated_json_encoded_size_of();
total_byte_size += byte_size.get();

if let Some(ref batch) = batch {
log_event = log_event.with_batch_notifier(batch);
}

log_events.push(log_event);
}
Err(e) => {
emit!(WindowsEventLogParseError {
error: e.to_string(),
channel,
event_id: Some(event_id),
});
}
}
}

if !log_events.is_empty() {
let count = log_events.len();
events_received.emit(CountByteSize(count, total_byte_size.into()));
bytes_received.emit(ByteSize(total_byte_size));

// BACK PRESSURE: block here until the pipeline accepts
// the batch. We don't call EvtNext again until this completes.
if let Err(_error) = out.send_batch(log_events).await {
emit!(StreamClosedError { count });
break;
}

// Register checkpoint entry with finalizer
let bookmarks: Vec<(String, String)> = channels_in_batch
.into_iter()
.filter_map(|channel| {
subscription
.get_bookmark_xml(&channel)
.map(|xml| (channel, xml))
})
.collect();

if !bookmarks.is_empty() {
let entry = FinalizerEntry { bookmarks };
finalizer.finalize(entry, receiver).await;
}
if process_event_batch(
events,
&parser,
acknowledgements,
&subscription,
&mut out,
&finalizer,
&events_received,
&bytes_received,
)
.await
{
break;
}
}
Err(e) => {
Expand Down Expand Up @@ -415,10 +454,6 @@ impl WindowsEventLogSource {
}

WaitResult::Timeout => {
// A full wait cycle without errors means the system is healthy;
// reset backoff so the next transient error starts fresh.
error_backoff = std::time::Duration::from_millis(100);

// Periodic checkpoint flush (sync mode only)
if !acknowledgements && last_checkpoint.elapsed() >= checkpoint_interval {
if let Err(e) = subscription.flush_bookmarks().await {
Expand Down Expand Up @@ -448,6 +483,73 @@ impl WindowsEventLogSource {
);
}
}

// Speculative pull: self-heal against any lost-wakeup scenario,
// regardless of root cause. If the OS signal was lost through any
// mechanism (not just the pre-drain race fixed in #25194), this
// ensures the source recovers within one timeout period.
// EvtNext returns ERROR_NO_MORE_ITEMS on an empty channel, which
// is near-zero cost, so it is safe to attempt every cycle.
let (returned_sub, speculative_result) =
with_subscription_blocking(subscription, move |mut sub| {
let result = sub.pull_events(batch_size);
(sub, result)
Comment thread
tot19 marked this conversation as resolved.
Comment thread
tot19 marked this conversation as resolved.
})
.await?;
subscription = returned_sub;

match speculative_result {
Ok(events) if events.is_empty() => {
// Healthy cycle: reset backoff so the next transient
// error starts fresh.
error_backoff = std::time::Duration::from_millis(100);
}
Ok(events) => {
// Healthy cycle: reset backoff so the next transient
// error starts fresh.
error_backoff = std::time::Duration::from_millis(100);
warn!(
message = "Speculative timeout pull recovered events; possible lost wakeup detected.",
event_count = events.len(),
);
if process_event_batch(
events,
&parser,
acknowledgements,
&subscription,
&mut out,
&finalizer,
&events_received,
&bytes_received,
)
.await
{
break;
}
}
Err(e) => {
emit!(WindowsEventLogQueryError {
channel: "all".to_string(),
query: None,
error: e.to_string(),
});
if !e.is_recoverable() {
error!(
message = "Non-recoverable speculative pull error, shutting down.",
error = %e
);
break;
}
// Exponential backoff mirrors the EventsAvailable error path.
warn!(
message = "Recoverable speculative pull error, backing off.",
backoff_ms = error_backoff.as_millis() as u64,
error = %e
);
tokio::time::sleep(error_backoff).await;
error_backoff = (error_backoff * 2).min(MAX_ERROR_BACKOFF);
Comment thread
tot19 marked this conversation as resolved.
Comment thread
tot19 marked this conversation as resolved.
}
}
}

WaitResult::Shutdown => {
Expand Down
Loading
Loading