From 1d984ebf20fa49067b5327de3d08e8ad0a8086f7 Mon Sep 17 00:00:00 2001 From: tot19 <31141271+tot19@users.noreply.github.com> Date: Wed, 15 Apr 2026 10:18:41 +0000 Subject: [PATCH 1/7] fix(windows_event_log): fix pre-drain ResetEvent race and add lost-wakeup regression tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Windows Event Log service signals the pull-mode wait handle via SetEvent each time a new matching event is recorded. Because the handle is manual-reset, any SetEvent that fires between the last EvtNext call and the post-drain ResetEvent is silently lost — the subscription then hangs until the next OS event arrives (vectordotdev/vector#25194). Fix: reset the handle *before* entering the drain loop. Signals raised during the drain are preserved because SetEvent on an already-signaled handle is a no-op. Re-arm (SetEvent) on early exits so the next pull_events revisits the channel without waiting for a fresh OS notification: - budget exhaustion - bookmark failure mid-batch - transient EvtNext error Regression tests: - test_pull_events_preserves_setevent_during_drain: installs DRAIN_STEP_HOOK to fire SetEvent mid-drain and asserts wait_for_events_blocking returns EventsAvailable, not Timeout. - test_speculative_pull_recovers_without_signal: manually clears the channel signal via ResetEvent, confirms wait times out, then asserts pull_events still returns events — proving the speculative timeout pull in mod.rs self-heals independently of signal state. Also: comment re-subscription break paths (ERROR_EVT_QUERY_RESULT_STALE and INVALID_POSITION) noting the speculative pull as a safety net if the re-subbed channel does not immediately re-signal; add serialization note to DRAIN_STEP_HOOK. --- ...25194_windows_event_log_lost_wakeup.fix.md | 7 + src/sources/windows_event_log/subscription.rs | 263 +++++++++++++++++- 2 files changed, 259 insertions(+), 11 deletions(-) create mode 100644 changelog.d/25194_windows_event_log_lost_wakeup.fix.md diff --git a/changelog.d/25194_windows_event_log_lost_wakeup.fix.md b/changelog.d/25194_windows_event_log_lost_wakeup.fix.md new file mode 100644 index 0000000000000..032612be1268e --- /dev/null +++ b/changelog.d/25194_windows_event_log_lost_wakeup.fix.md @@ -0,0 +1,7 @@ +The `windows_event_log` source no longer freezes after periods of inactivity. Two complementary fixes address the root cause and add a recovery path: + +1. **Pre-drain signal reset**: the subscription's wait handle is now reset *before* draining events via `EvtNext`, not after. A signal that fires between the last `EvtNext` and the old post-drain `ResetEvent` was silently lost, leaving the source frozen until the next OS event arrived. Resetting first preserves any notification raised mid-drain. + +2. **Speculative pull on timeout**: on each wait timeout, `pull_events` is called speculatively. `EvtNext` returns `ERROR_NO_MORE_ITEMS` immediately on an empty channel (near-zero cost), so this is safe every cycle. If events are recovered, a warning is logged. This self-heals within one timeout period regardless of why the wakeup signal was lost — covering both the drain-race path and any other lost-wakeup scenario. + +authors: tot19 diff --git a/src/sources/windows_event_log/subscription.rs b/src/sources/windows_event_log/subscription.rs index dc92713f691d9..d4b37c0776fbf 100644 --- a/src/sources/windows_event_log/subscription.rs +++ b/src/sources/windows_event_log/subscription.rs @@ -18,9 +18,9 @@ use windows::Win32::System::EventLog::{ EvtSubscribeStartAfterBookmark, EvtSubscribeStartAtOldestRecord, EvtSubscribeStrict, EvtSubscribeToFutureEvents, }; -#[cfg(test)] -use windows::Win32::System::Threading::SetEvent; -use windows::Win32::System::Threading::{CreateEventW, ResetEvent, WaitForMultipleObjects}; +use windows::Win32::System::Threading::{ + CreateEventW, ResetEvent, SetEvent, WaitForMultipleObjects, +}; use windows::core::HSTRING; use super::{ @@ -30,6 +30,19 @@ use super::{ use crate::internal_events::WindowsEventLogBookmarkError; +/// Test-only hook called inside the `pull_events` drain loop after each +/// `EvtNext` invocation. Used by the lost-wakeup regression test +/// (see `test_pull_events_preserves_setevent_during_drain`) to race a +/// `SetEvent` against the drain without relying on thread-timing. +/// No-op and zero-cost in non-test builds. +/// +/// Only one test should install a hook at a time; tests that install a hook +/// must use `#[serial_test::serial]` or equivalent serialization to prevent +/// concurrent tests from triggering each other's hook. +#[cfg(test)] +static DRAIN_STEP_HOOK: std::sync::Mutex>> = + std::sync::Mutex::new(None); + /// Maximum number of entries in the EvtFormatMessage result cache. pub const FORMAT_CACHE_CAPACITY: usize = 10_000; /// Maximum number of cached publisher metadata handles. @@ -479,9 +492,25 @@ impl EventLogSubscription { let mut bookmark_failed = false; let mut channel_count = 0usize; - // Drain loop: keep calling EvtNext until ERROR_NO_MORE_ITEMS or channel budget. - // Only reset the signal once the channel is fully drained; if we hit the - // budget limit the signal stays set so WaitForMultipleObjects returns immediately. + // Reset the signal BEFORE draining to avoid a lost-wakeup race + // (see vectordotdev/vector#25194). The Windows Event Log service + // signals this manual-reset event via SetEvent each time a new + // matching event is recorded; SetEvent on an already-signaled + // event is a no-op, so if we reset AFTER draining, any signal + // that arrives between our last EvtNext and ResetEvent is lost + // — the subscription then hangs until the next event arrives. + // Resetting first means any signal raised during the drain is + // preserved, causing the next WaitForMultipleObjects to return + // immediately. + // + // If we exit the drain loop early (channel budget exhausted or + // bookmark update failed mid-batch), we re-SetEvent at the end + // of this iteration so the next pull_events call revisits this + // channel without waiting for a fresh OS signal. + unsafe { + let _ = ResetEvent(channel_sub.signal_event); + } + 'drain: loop { if channel_count >= channel_limit { break; @@ -501,6 +530,17 @@ impl EventLogSubscription { ) }; + // Test-only hook: lets the lost-wakeup regression test race + // a SetEvent against the drain without thread-timing. No-op + // and zero-cost in non-test builds. + #[cfg(test)] + { + let hook = DRAIN_STEP_HOOK.lock().unwrap().clone(); + if let Some(h) = hook { + h(channel_sub.signal_event); + } + } + if let Err(err) = result { let code = (err.code().0 as u32) & 0xFFFF; if code == ERROR_NO_MORE_ITEMS { @@ -513,6 +553,8 @@ impl EventLogSubscription { channel = %channel_sub.channel ); channel_drained = true; + // Speculative pull on timeout in mod.rs is a safety net if the + // re-subscribed channel does not immediately re-signal. break; } if code == ERROR_EVT_QUERY_RESULT_INVALID_POSITION { @@ -526,7 +568,9 @@ impl EventLogSubscription { message = "Re-subscription succeeded after stale query.", channel = %channel_sub.channel ); - // Retry from fresh subscription — the signal will fire again + // Retry from fresh subscription — the signal will fire again. + // Speculative pull on timeout in mod.rs is a safety net if + // the new subscription does not immediately re-signal. channel_drained = true; break; } @@ -538,10 +582,23 @@ impl EventLogSubscription { ); channel_sub.subscription_active_gauge.set(0.0); channel_drained = true; + // Speculative pull on timeout in mod.rs is a safety net if + // the failed channel does not re-signal after recovery. break; } } } + // Re-arm the signal before returning. We reset it pre-drain + // but are bailing out without confirming the drain completed, + // so if events were left un-drained the next pull_events must + // still revisit this channel without waiting for a fresh OS + // signal. This mirrors the `else` branch below that handles + // budget-exhaustion and bookmark-failure early breaks, and + // avoids the same lost-wakeup symptom (vectordotdev/vector#25194) + // on transient EvtNext failures. + unsafe { + let _ = SetEvent(channel_sub.signal_event); + } return Err(WindowsEventLogError::PullEventsError { channel: channel_sub.channel.clone(), source: err, @@ -697,15 +754,20 @@ impl EventLogSubscription { } if channel_drained && !bookmark_failed { - unsafe { - let _ = ResetEvent(channel_sub.signal_event); - } - // Update channel record count gauge for lag detection. super::render::update_channel_records( &channel_sub.channel, &channel_sub.channel_records_gauge, ); + } else { + // Drain exited early (budget exhausted or bookmark_failed + // mid-batch). Re-arm the signal so the next pull_events + // revisits this channel immediately without waiting for a + // fresh OS notification. Pairs with the pre-drain ResetEvent + // above. + unsafe { + let _ = SetEvent(channel_sub.signal_event); + } } } @@ -816,6 +878,15 @@ impl EventLogSubscription { self.shutdown_event.0 } + /// Test-only accessor for the first channel's signal event handle. Used + /// by the lost-wakeup regression test to scope its drain-loop hook to + /// exactly this subscription, so it does not fire on concurrent + /// `pull_events` calls from other tests in the same process. + #[cfg(test)] + pub(super) fn first_channel_signal_raw(&self) -> isize { + self.channels[0].signal_event.0 as isize + } + /// Returns a reference to the rate limiter, if configured. pub const fn rate_limiter( &self, @@ -1272,4 +1343,174 @@ mod tests { // that the subscription is functional. let _events = subscription.pull_events(100).unwrap_or_default(); } + + /// Proves that `pull_events` works independently of signal state — the + /// invariant the speculative timeout pull in mod.rs relies on. + /// + /// Steps: + /// 1. Subscribe to the Application log with `read_existing_events = true`. + /// 2. Manually clear the channel signal via `ResetEvent`, simulating a lost wakeup. + /// 3. Assert `wait_for_events_blocking` times out (signal cleared, no OS wake-up). + /// 4. Assert `pull_events` still returns events — `EvtNext` fetches from the queue + /// regardless of signal state, so the speculative pull in mod.rs self-heals. + #[tokio::test] + async fn test_speculative_pull_recovers_without_signal() { + let mut config = WindowsEventLogConfig::default(); + config.channels = vec!["Application".to_string()]; + config.read_existing_events = true; + config.event_timeout_ms = 500; + + let (checkpointer, _temp_dir) = create_test_checkpointer().await; + + let mut subscription = EventLogSubscription::new(&config, checkpointer, false) + .await + .expect("Subscription creation should succeed"); + + // Manually clear the signal to simulate a lost wakeup. The Application + // log always has existing events on a running Windows system, so even + // with the signal cleared events remain available via EvtNext. + let signal_raw = subscription.first_channel_signal_raw(); + unsafe { + let _ = ResetEvent(HANDLE(signal_raw as *mut std::ffi::c_void)); + } + + // Signal is cleared: wait must time out. + let (mut subscription, wait_result) = tokio::task::spawn_blocking(move || { + let r = subscription.wait_for_events_blocking(500); + (subscription, r) + }) + .await + .unwrap(); + + assert!( + matches!(wait_result, WaitResult::Timeout), + "expected Timeout after manual ResetEvent; signal was not cleared" + ); + + // Despite the cleared signal, pull_events must still return events. + // This is the invariant the speculative timeout pull in mod.rs depends on. + let events = subscription.pull_events(100).unwrap_or_default(); + assert!( + !events.is_empty(), + "pull_events must return events independently of signal state; \ + this is the invariant the speculative timeout pull in mod.rs depends on" + ); + } + + /// Regression test for vectordotdev/vector#25194. + /// + /// The Windows Event Log service signals the pull-mode wait handle via + /// `SetEvent` each time a new matching event is recorded. Because the + /// handle is manual-reset, `SetEvent` on an already-signaled handle is + /// a no-op. If `pull_events` resets the signal *after* draining events + /// via `EvtNext`, any signal that fires between the last `EvtNext` and + /// the `ResetEvent` call is silently lost — the subscription then + /// permanently hangs until a subsequent event arrives. + /// + /// The fix is to reset the signal *before* the drain loop, so signals + /// raised during the drain are preserved and the next wait returns + /// immediately. + /// + /// This test pins that invariant by driving the real `pull_events` + /// against a real `EvtSubscribe` handle. It installs a + /// `DRAIN_STEP_HOOK` that runs inside the drain loop after each + /// `EvtNext` and fires `SetEvent` on the subscription's signal + /// handle — simulating the OS signaling a new event arrival during + /// the drain window. After `pull_events` returns, the signal must + /// still be set (the subsequent `wait_for_events_blocking` must + /// return `EventsAvailable`, not `Timeout`). Under the old + /// post-drain `ResetEvent` order, the hook's `SetEvent` would be + /// clobbered by the reset and the wait would time out — which is + /// exactly what #25194 reports. + #[tokio::test] + async fn test_pull_events_preserves_setevent_during_drain() { + use std::sync::Arc as StdArc; + + let mut config = WindowsEventLogConfig::default(); + config.channels = vec!["Application".to_string()]; + config.read_existing_events = true; + config.event_timeout_ms = 1000; + + let (checkpointer, _temp_dir) = create_test_checkpointer().await; + + let mut subscription = EventLogSubscription::new(&config, checkpointer, false) + .await + .expect("Subscription creation should succeed"); + + // Capture THIS subscription's signal handle so the hook can scope + // itself to this test. DRAIN_STEP_HOOK is a process-global, and + // cargo runs tests in parallel by default; without handle-keying, + // a concurrent test's pull_events could trigger our one-shot + // hook first, flip `fired`, and SetEvent on the wrong handle. + let target_signal_raw = subscription.first_channel_signal_raw(); + + // Install the drain-loop hook: every EvtNext call inside + // pull_events fires SetEvent on the subscription's signal + // handle. This simulates the OS signaling a fresh event + // mid-drain, which is exactly the race window #25194 exposes. + // The hook only needs to fire once to prove the invariant; we + // use an AtomicBool to keep it deterministic. The hook is keyed + // to `target_signal_raw` so concurrent pull_events calls from + // other tests no-op here. + let fired = StdArc::new(std::sync::atomic::AtomicBool::new(false)); + { + let fired = StdArc::clone(&fired); + let hook: StdArc = StdArc::new(move |signal: HANDLE| { + if signal.0 as isize != target_signal_raw { + return; + } + if !fired.swap(true, std::sync::atomic::Ordering::SeqCst) { + unsafe { + let _ = SetEvent(signal); + } + } + }); + *DRAIN_STEP_HOOK.lock().unwrap() = Some(hook); + } + + // Drop-guard: clear the hook even if the test panics, so it + // doesn't contaminate other tests in the same process. + struct HookGuard; + impl Drop for HookGuard { + fn drop(&mut self) { + *DRAIN_STEP_HOOK.lock().unwrap() = None; + } + } + let _guard = HookGuard; + + // Drive pull_events with a very large budget so the drain + // exits via ERROR_NO_MORE_ITEMS (channel_drained = true), + // which is the path that ran the post-drain ResetEvent in the + // old buggy code. Exiting via budget exhaustion would skip + // that reset and cause this test to false-pass against the + // pre-fix code. + let _ = subscription.pull_events(usize::MAX).unwrap_or_default(); + + assert!( + fired.load(std::sync::atomic::Ordering::SeqCst), + "drain-loop hook never ran — pull_events must call EvtNext \ + at least once even on an empty channel" + ); + + // With the fix, the SetEvent fired by the hook during the + // drain is preserved; the next wait returns EventsAvailable + // immediately. Under the old post-drain ResetEvent order, it + // would time out — that is the #25194 freeze. + let (_subscription, result) = tokio::task::spawn_blocking(move || { + let r = subscription.wait_for_events_blocking(500); + (subscription, r) + }) + .await + .unwrap(); + + match result { + WaitResult::EventsAvailable => {} + WaitResult::Timeout => panic!( + "signal set during the drain window was lost — this is the \ + lost-wakeup race from vectordotdev/vector#25194. \ + pull_events must call ResetEvent BEFORE draining, not after." + ), + WaitResult::Shutdown => panic!("unexpected shutdown"), + } + } } From 70fe23071f84215211436e4f6ab7770c55198f21 Mon Sep 17 00:00:00 2001 From: tot19 <31141271+tot19@users.noreply.github.com> Date: Wed, 15 Apr 2026 10:18:42 +0000 Subject: [PATCH 2/7] fix(windows_event_log): add speculative timeout pull, deduplicate processing, fix error handling Four related improvements to mod.rs: 1. Speculative pull on WaitResult::Timeout: call pull_events on every timeout cycle as a belt-and-suspenders self-heal. EvtNext returns ERROR_NO_MORE_ITEMS immediately on an empty channel (near-zero cost). If events are recovered a warning is emitted. Guarantees recovery within one timeout period regardless of the root cause of the lost wakeup. 2. Extract with_subscription_blocking helper: wraps the spawn_blocking ownership-transfer pattern (move subscription in, run blocking fn, return subscription + result). All three blocking calls (wait, normal pull, speculative pull) now use this helper instead of inlining spawn_blocking. 3. Extract process_event_batch helper: the parse/emit/send_batch/finalize sequence was duplicated verbatim between the EventsAvailable arm and the speculative-timeout arm. Extracted into a shared free async function. Rate limiting is applied consistently in both paths via the helper. 4. Fix error-handling asymmetry: the speculative pull Err branch previously only logged warn! and continued, so a non-recoverable error (access denied, channel not found) would spam warnings indefinitely. Now mirrors the EventsAvailable path: emit WindowsEventLogQueryError, break on non-recoverable errors, apply exponential backoff on recoverable ones. --- src/sources/windows_event_log/mod.rs | 280 ++++++++++++++++++--------- 1 file changed, 191 insertions(+), 89 deletions(-) diff --git a/src/sources/windows_event_log/mod.rs b/src/sources/windows_event_log/mod.rs index 17d900cc07622..2b1410f60d051 100644 --- a/src/sources/windows_event_log/mod.rs +++ b/src/sources/windows_event_log/mod.rs @@ -29,7 +29,7 @@ cfg_if::cfg_if! { use vector_lib::EstimatedJsonEncodedSizeOf; use vector_lib::finalizer::OrderedFinalizer; use vector_lib::internal_event::{ - ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, + ByteSize, BytesReceived, CountByteSize, InternalEventHandle, Protocol, }; use windows::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE}; use windows::Win32::System::Threading::GetCurrentProcess; @@ -48,6 +48,7 @@ cfg_if::cfg_if! { error::WindowsEventLogError, parser::EventLogParser, subscription::{EventLogSubscription, WaitResult}, + xml_parser::WindowsEvent, }; } } @@ -157,6 +158,107 @@ impl Finalizer { } } +/// Parse, emit metrics for, send, and finalize a non-empty batch of pulled Windows events. +/// +/// Both the `EventsAvailable` path and the speculative-timeout path share this +/// logic. Returns `true` if the downstream pipeline closed and the caller +/// should break out of the main event loop. +async fn process_event_batch( + events: Vec, + parser: &EventLogParser, + acknowledgements: bool, + subscription: &EventLogSubscription, + out: &mut SourceSender, + finalizer: &Finalizer, + events_received: &impl InternalEventHandle, + bytes_received: &impl InternalEventHandle, +) -> bool { + // Rate limiting between batches (async-compatible). + if let Some(limiter) = subscription.rate_limiter() { + limiter.until_ready().await; + } + + let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements); + let mut log_events = Vec::new(); + let mut total_byte_size = 0usize; + let mut channels_in_batch = std::collections::HashSet::new(); + + for event in events { + let channel = event.channel.clone(); + channels_in_batch.insert(channel.clone()); + let event_id = event.event_id; + match parser.parse_event(event) { + Ok(mut log_event) => { + let byte_size = log_event.estimated_json_encoded_size_of(); + total_byte_size += byte_size.get(); + if let Some(ref batch) = batch { + log_event = log_event.with_batch_notifier(batch); + } + log_events.push(log_event); + } + Err(e) => { + emit!(WindowsEventLogParseError { + error: e.to_string(), + channel, + event_id: Some(event_id), + }); + } + } + } + + if !log_events.is_empty() { + let count = log_events.len(); + events_received.emit(CountByteSize(count, total_byte_size.into())); + bytes_received.emit(ByteSize(total_byte_size)); + + // BACK PRESSURE: block until the pipeline accepts the batch. + // We don't call EvtNext again until this completes. + if let Err(_error) = out.send_batch(log_events).await { + emit!(StreamClosedError { count }); + return true; // signal: break the main loop + } + + // Register checkpoint entry with the finalizer. + let bookmarks: Vec<(String, String)> = channels_in_batch + .into_iter() + .filter_map(|channel| { + subscription + .get_bookmark_xml(&channel) + .map(|xml| (channel, xml)) + }) + .collect(); + + if !bookmarks.is_empty() { + let entry = FinalizerEntry { bookmarks }; + finalizer.finalize(entry, receiver).await; + } + } + + false // pipeline still open +} + +/// Transfer ownership of `subscription` into a `spawn_blocking` task, run `f` +/// on it, then return both the subscription and the result. +/// +/// All blocking Windows APIs (`WaitForMultipleObjects`, `EvtNext`, `EvtRender`) +/// must run in `spawn_blocking` to avoid stalling the async runtime. The +/// ownership-transfer pattern ensures only one thread holds the subscription +/// at a time, preventing data races without requiring locks. +async fn with_subscription_blocking( + subscription: EventLogSubscription, + f: F, +) -> Result<(EventLogSubscription, R), WindowsEventLogError> +where + F: FnOnce(EventLogSubscription) -> (EventLogSubscription, R) + Send + 'static, + R: Send + 'static, +{ + tokio::task::spawn_blocking(move || f(subscription)) + .await + .map_err(|e| WindowsEventLogError::ConfigError { + message: format!("Blocking subscription task panicked: {e}"), + }) +} + /// Windows Event Log source implementation pub struct WindowsEventLogSource { config: WindowsEventLogConfig, @@ -281,42 +383,25 @@ impl WindowsEventLogSource { // Ownership transfer ensures no data races between the blocking thread // and async code. The shutdown watcher uses a raw HANDLE value (just an // integer) to signal shutdown without needing access to the subscription. - let (returned_sub, wait_result) = tokio::task::spawn_blocking({ - let sub = subscription; - move || { + let (returned_sub, wait_result) = + with_subscription_blocking(subscription, |sub| { let result = sub.wait_for_events_blocking(timeout_ms); (sub, result) - } - }) - .await - .map_err(|e| WindowsEventLogError::ConfigError { - message: format!("Wait task panicked: {e}"), - })?; - + }) + .await?; subscription = returned_sub; match wait_result { WaitResult::EventsAvailable => { // Pull events via spawn_blocking (EvtNext/EvtRender are blocking APIs) - let (returned_sub, events_result) = tokio::task::spawn_blocking({ - let mut sub = subscription; - move || { + let (returned_sub, events_result) = + with_subscription_blocking(subscription, |mut sub| { let result = sub.pull_events(batch_size); (sub, result) - } - }) - .await - .map_err(|e| WindowsEventLogError::ConfigError { - message: format!("Pull task panicked: {e}"), - })?; - + }) + .await?; subscription = returned_sub; - // Rate limiting between batches (async-compatible) - if let Some(limiter) = subscription.rate_limiter() { - limiter.until_ready().await; - } - match events_result { Ok(events) if events.is_empty() => { error_backoff = std::time::Duration::from_millis(100); @@ -328,65 +413,19 @@ impl WindowsEventLogSource { message = "Pulled Windows Event Log events.", event_count = events.len() ); - - let (batch, receiver) = - BatchNotifier::maybe_new_with_receiver(acknowledgements); - - let mut log_events = Vec::new(); - let mut total_byte_size = 0; - let mut channels_in_batch = std::collections::HashSet::new(); - - for event in events { - let channel = event.channel.clone(); - channels_in_batch.insert(channel.clone()); - let event_id = event.event_id; - match parser.parse_event(event) { - Ok(mut log_event) => { - let byte_size = log_event.estimated_json_encoded_size_of(); - total_byte_size += byte_size.get(); - - if let Some(ref batch) = batch { - log_event = log_event.with_batch_notifier(batch); - } - - log_events.push(log_event); - } - Err(e) => { - emit!(WindowsEventLogParseError { - error: e.to_string(), - channel, - event_id: Some(event_id), - }); - } - } - } - - if !log_events.is_empty() { - let count = log_events.len(); - events_received.emit(CountByteSize(count, total_byte_size.into())); - bytes_received.emit(ByteSize(total_byte_size)); - - // BACK PRESSURE: block here until the pipeline accepts - // the batch. We don't call EvtNext again until this completes. - if let Err(_error) = out.send_batch(log_events).await { - emit!(StreamClosedError { count }); - break; - } - - // Register checkpoint entry with finalizer - let bookmarks: Vec<(String, String)> = channels_in_batch - .into_iter() - .filter_map(|channel| { - subscription - .get_bookmark_xml(&channel) - .map(|xml| (channel, xml)) - }) - .collect(); - - if !bookmarks.is_empty() { - let entry = FinalizerEntry { bookmarks }; - finalizer.finalize(entry, receiver).await; - } + if process_event_batch( + events, + &parser, + acknowledgements, + &subscription, + &mut out, + &finalizer, + &events_received, + &bytes_received, + ) + .await + { + break; } } Err(e) => { @@ -415,10 +454,6 @@ impl WindowsEventLogSource { } WaitResult::Timeout => { - // A full wait cycle without errors means the system is healthy; - // reset backoff so the next transient error starts fresh. - error_backoff = std::time::Duration::from_millis(100); - // Periodic checkpoint flush (sync mode only) if !acknowledgements && last_checkpoint.elapsed() >= checkpoint_interval { if let Err(e) = subscription.flush_bookmarks().await { @@ -448,6 +483,73 @@ impl WindowsEventLogSource { ); } } + + // Speculative pull: self-heal against any lost-wakeup scenario, + // regardless of root cause. If the OS signal was lost through any + // mechanism (not just the pre-drain race fixed in #25194), this + // ensures the source recovers within one timeout period. + // EvtNext returns ERROR_NO_MORE_ITEMS on an empty channel, which + // is near-zero cost, so it is safe to attempt every cycle. + let (returned_sub, speculative_result) = + with_subscription_blocking(subscription, |mut sub| { + let result = sub.pull_events(batch_size); + (sub, result) + }) + .await?; + subscription = returned_sub; + + match speculative_result { + Ok(events) if events.is_empty() => { + // Healthy cycle: reset backoff so the next transient + // error starts fresh. + error_backoff = std::time::Duration::from_millis(100); + } + Ok(events) => { + // Healthy cycle: reset backoff so the next transient + // error starts fresh. + error_backoff = std::time::Duration::from_millis(100); + warn!( + message = "Speculative timeout pull recovered events; possible lost wakeup detected.", + event_count = events.len(), + ); + if process_event_batch( + events, + &parser, + acknowledgements, + &subscription, + &mut out, + &finalizer, + &events_received, + &bytes_received, + ) + .await + { + break; + } + } + Err(e) => { + emit!(WindowsEventLogQueryError { + channel: "all".to_string(), + query: None, + error: e.to_string(), + }); + if !e.is_recoverable() { + error!( + message = "Non-recoverable speculative pull error, shutting down.", + error = %e + ); + break; + } + // Exponential backoff mirrors the EventsAvailable error path. + warn!( + message = "Recoverable speculative pull error, backing off.", + backoff_ms = error_backoff.as_millis() as u64, + error = %e + ); + tokio::time::sleep(error_backoff).await; + error_backoff = (error_backoff * 2).min(MAX_ERROR_BACKOFF); + } + } } WaitResult::Shutdown => { From 689ea898dfbc7deb4605f5db9eba9acaacde6abe Mon Sep 17 00:00:00 2001 From: tot19 <31141271+tot19@users.noreply.github.com> Date: Fri, 17 Apr 2026 17:05:27 +0900 Subject: [PATCH 3/7] test(windows_event_log source): harden lost-wakeup regression tests against flakiness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address two independent flakiness sources in the #25194 regression tests so the suite is stable on real Windows CI runners. test_pull_events_preserves_setevent_during_drain: - Replaced a 1000ms blocking wait with an immediate 0ms poll after pull_events returns, so the check measures only the reset/preserve behavior of pull_events and is not contaminated by unrelated Windows system events signaling the handle during a nonzero wait window. - Keyed the DRAIN_STEP_HOOK fire-once to the subscription's own signal handle so concurrent pull_events calls from other tests can't flip the hook first and SetEvent the wrong handle. test_speculative_pull_recovers_without_signal: - Same 500ms→0ms poll change, opposite direction: real events arriving during the wait would have re-signaled the manually-cleared handle and flipped the expected Timeout into a real signal result. - Seed a deterministic record via 'eventcreate' before subscription creation so the non-empty-events assertion is independent of whatever backlog the runner happens to have. Freshly provisioned images can have an empty Application log, which would otherwise cause pull_events(100) to legitimately return empty and false-fail the test. --- src/sources/windows_event_log/mod.rs | 6 +- src/sources/windows_event_log/subscription.rs | 86 +++++++++++++------ 2 files changed, 64 insertions(+), 28 deletions(-) diff --git a/src/sources/windows_event_log/mod.rs b/src/sources/windows_event_log/mod.rs index 2b1410f60d051..39d8c8b73a24d 100644 --- a/src/sources/windows_event_log/mod.rs +++ b/src/sources/windows_event_log/mod.rs @@ -384,7 +384,7 @@ impl WindowsEventLogSource { // and async code. The shutdown watcher uses a raw HANDLE value (just an // integer) to signal shutdown without needing access to the subscription. let (returned_sub, wait_result) = - with_subscription_blocking(subscription, |sub| { + with_subscription_blocking(subscription, move |sub| { let result = sub.wait_for_events_blocking(timeout_ms); (sub, result) }) @@ -395,7 +395,7 @@ impl WindowsEventLogSource { WaitResult::EventsAvailable => { // Pull events via spawn_blocking (EvtNext/EvtRender are blocking APIs) let (returned_sub, events_result) = - with_subscription_blocking(subscription, |mut sub| { + with_subscription_blocking(subscription, move |mut sub| { let result = sub.pull_events(batch_size); (sub, result) }) @@ -491,7 +491,7 @@ impl WindowsEventLogSource { // EvtNext returns ERROR_NO_MORE_ITEMS on an empty channel, which // is near-zero cost, so it is safe to attempt every cycle. let (returned_sub, speculative_result) = - with_subscription_blocking(subscription, |mut sub| { + with_subscription_blocking(subscription, move |mut sub| { let result = sub.pull_events(batch_size); (sub, result) }) diff --git a/src/sources/windows_event_log/subscription.rs b/src/sources/windows_event_log/subscription.rs index d4b37c0776fbf..5c9522740c059 100644 --- a/src/sources/windows_event_log/subscription.rs +++ b/src/sources/windows_event_log/subscription.rs @@ -1076,6 +1076,7 @@ impl Drop for EventLogSubscription { #[cfg(test)] mod tests { use super::*; + use serial_test::serial; async fn create_test_checkpointer() -> (Arc, tempfile::TempDir) { let temp_dir = tempfile::TempDir::new().unwrap(); @@ -1354,7 +1355,41 @@ mod tests { /// 4. Assert `pull_events` still returns events — `EvtNext` fetches from the queue /// regardless of signal state, so the speculative pull in mod.rs self-heals. #[tokio::test] - async fn test_speculative_pull_recovers_without_signal() { + #[serial] + async fn test_pull_events_works_with_cleared_signal() { + // Seed the Application log with a record so the "events remain + // available despite cleared signal" assertion below does not depend + // on whatever backlog the runner happens to have. Freshly provisioned + // CI images can have an empty Application log, which would otherwise + // make `pull_events` legitimately return empty and produce a spurious + // failure unrelated to the invariant under test. + let seed_output = std::process::Command::new("eventcreate") + .args([ + "/T", + "INFORMATION", + "/ID", + "100", + "/L", + "APPLICATION", + "/SO", + "VectorTestSpeculativePullSeed", + "/D", + "seed event for #25194 speculative-pull regression test", + ]) + .output() + .expect("failed to spawn eventcreate — required for deterministic seeding"); + assert!( + seed_output.status.success(), + "eventcreate failed to seed Application log (exit={:?}): stdout={:?} stderr={:?}. \ + This test requires a seeded event to be deterministic; a locked-down runner \ + without the privilege to write to Application cannot run this test reliably.", + seed_output.status.code(), + String::from_utf8_lossy(&seed_output.stdout), + String::from_utf8_lossy(&seed_output.stderr), + ); + // Give the service a moment to persist the record before we subscribe. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + let mut config = WindowsEventLogConfig::default(); config.channels = vec!["Application".to_string()]; config.read_existing_events = true; @@ -1366,21 +1401,20 @@ mod tests { .await .expect("Subscription creation should succeed"); - // Manually clear the signal to simulate a lost wakeup. The Application - // log always has existing events on a running Windows system, so even - // with the signal cleared events remain available via EvtNext. + // Manually clear the signal to simulate a lost wakeup. The seeded + // event above guarantees at least one record is queued in EvtNext + // regardless of the runner's pre-existing log state. let signal_raw = subscription.first_channel_signal_raw(); unsafe { let _ = ResetEvent(HANDLE(signal_raw as *mut std::ffi::c_void)); } - // Signal is cleared: wait must time out. - let (mut subscription, wait_result) = tokio::task::spawn_blocking(move || { - let r = subscription.wait_for_events_blocking(500); - (subscription, r) - }) - .await - .unwrap(); + // Signal is cleared: an immediate (0ms) poll must report Timeout. + // A 0ms wait reads only the current signal state with no grace + // window, so unrelated Windows system events arriving between the + // `ResetEvent` above and the poll cannot re-signal the handle and + // cause a spurious failure. + let wait_result = subscription.wait_for_events_blocking(0); assert!( matches!(wait_result, WaitResult::Timeout), @@ -1417,12 +1451,15 @@ mod tests { /// `EvtNext` and fires `SetEvent` on the subscription's signal /// handle — simulating the OS signaling a new event arrival during /// the drain window. After `pull_events` returns, the signal must - /// still be set (the subsequent `wait_for_events_blocking` must - /// return `EventsAvailable`, not `Timeout`). Under the old + /// still be set — observed via a 0ms `wait_for_events_blocking` + /// so the check measures only the reset/preserve behavior of + /// `pull_events` and is not contaminated by unrelated Windows + /// system events arriving during a nonzero wait. Under the old /// post-drain `ResetEvent` order, the hook's `SetEvent` would be - /// clobbered by the reset and the wait would time out — which is - /// exactly what #25194 reports. + /// clobbered by the reset and the immediate poll would return + /// `Timeout` — which is exactly what #25194 reports. #[tokio::test] + #[serial] async fn test_pull_events_preserves_setevent_during_drain() { use std::sync::Arc as StdArc; @@ -1492,16 +1529,15 @@ mod tests { at least once even on an empty channel" ); - // With the fix, the SetEvent fired by the hook during the - // drain is preserved; the next wait returns EventsAvailable - // immediately. Under the old post-drain ResetEvent order, it - // would time out — that is the #25194 freeze. - let (_subscription, result) = tokio::task::spawn_blocking(move || { - let r = subscription.wait_for_events_blocking(500); - (subscription, r) - }) - .await - .unwrap(); + // Observe the signal state IMMEDIATELY with a 0ms wait. We want + // to know whether pull_events's reset clobbered the hook's + // SetEvent — NOT whether new real events arrive during some + // wait window. A nonzero timeout against the live Application + // channel lets arbitrary Windows system events re-signal us + // and false-pass against the pre-fix code. 0ms = WaitForMultiple- + // Objects returns the current state with no grace period, so + // only the reset/preserve behavior of pull_events is measured. + let result = subscription.wait_for_events_blocking(0); match result { WaitResult::EventsAvailable => {} From 8e92d7a516081551d3c782aed328dd7295753d6a Mon Sep 17 00:00:00 2001 From: tot19 <31141271+tot19@users.noreply.github.com> Date: Tue, 21 Apr 2026 14:32:06 +0900 Subject: [PATCH 4/7] fix(windows_event_log): implement Sync for subscription types to satisfy Send bound on source future EventLogSubscription and ChannelSubscription had unsafe impl Send but no Sync impl. Since &T: Send requires T: Sync, process_event_batch holding &EventLogSubscription across an .await made the entire source future !Send, causing a compile error (ICE + future-not-Send) in release builds. All mutation on these types requires &mut self; &self methods are read-only or delegate to already-Sync types (RateLimiter). The underlying Windows kernel handles are safe for concurrent access. Co-Authored-By: Claude Sonnet 4.6 --- src/sources/windows_event_log/subscription.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/sources/windows_event_log/subscription.rs b/src/sources/windows_event_log/subscription.rs index 5c9522740c059..a01ce447bbfd8 100644 --- a/src/sources/windows_event_log/subscription.rs +++ b/src/sources/windows_event_log/subscription.rs @@ -93,6 +93,7 @@ struct ChannelSubscription { // SAFETY: Same rationale as EventLogSubscription - Windows kernel handles are thread-safe. unsafe impl Send for ChannelSubscription {} +unsafe impl Sync for ChannelSubscription {} /// Result of waiting for events across all channels. pub enum WaitResult { @@ -143,8 +144,10 @@ pub struct EventLogSubscription { // SAFETY: Windows HANDLE and EVT_HANDLE are kernel objects safe to use across // threads. In windows 0.58, HANDLE wraps *mut c_void which is !Send/!Sync, -// but the underlying kernel handles are thread-safe. +// but the underlying kernel handles are thread-safe. All mutation requires +// &mut self; &self methods are read-only or delegate to Sync types (RateLimiter). unsafe impl Send for EventLogSubscription {} +unsafe impl Sync for EventLogSubscription {} impl EventLogSubscription { /// Create a new pull-model subscription for all configured channels. From 42a5ea4ead0b8b3240cc466c7d7e316950a55428 Mon Sep 17 00:00:00 2001 From: tot19 <31141271+tot19@users.noreply.github.com> Date: Thu, 30 Apr 2026 12:25:48 +0900 Subject: [PATCH 5/7] chore(changelog): simplify windows_event_log fix fragment to user-focused one-liner Co-Authored-By: Claude Sonnet 4.6 --- changelog.d/25194_windows_event_log_lost_wakeup.fix.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/changelog.d/25194_windows_event_log_lost_wakeup.fix.md b/changelog.d/25194_windows_event_log_lost_wakeup.fix.md index 032612be1268e..cb6f67987d67c 100644 --- a/changelog.d/25194_windows_event_log_lost_wakeup.fix.md +++ b/changelog.d/25194_windows_event_log_lost_wakeup.fix.md @@ -1,7 +1,3 @@ -The `windows_event_log` source no longer freezes after periods of inactivity. Two complementary fixes address the root cause and add a recovery path: - -1. **Pre-drain signal reset**: the subscription's wait handle is now reset *before* draining events via `EvtNext`, not after. A signal that fires between the last `EvtNext` and the old post-drain `ResetEvent` was silently lost, leaving the source frozen until the next OS event arrived. Resetting first preserves any notification raised mid-drain. - -2. **Speculative pull on timeout**: on each wait timeout, `pull_events` is called speculatively. `EvtNext` returns `ERROR_NO_MORE_ITEMS` immediately on an empty channel (near-zero cost), so this is safe every cycle. If events are recovered, a warning is logged. This self-heals within one timeout period regardless of why the wakeup signal was lost — covering both the drain-race path and any other lost-wakeup scenario. +The `windows_event_log` source no longer freezes after periods of inactivity. authors: tot19 From 4ada9cc626a3a4d962951fbc073de9b9fc433cc0 Mon Sep 17 00:00:00 2001 From: tot19 <31141271+tot19@users.noreply.github.com> Date: Fri, 1 May 2026 16:13:13 +0900 Subject: [PATCH 6/7] fix(windows_event_log): prioritize shutdown signal --- src/sources/windows_event_log/subscription.rs | 40 +++++++++++++++---- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/src/sources/windows_event_log/subscription.rs b/src/sources/windows_event_log/subscription.rs index a01ce447bbfd8..ba66d02668850 100644 --- a/src/sources/windows_event_log/subscription.rs +++ b/src/sources/windows_event_log/subscription.rs @@ -431,21 +431,20 @@ impl EventLogSubscription { /// Wait for events to become available on any channel, or for shutdown. /// /// Uses `WaitForMultipleObjects` via `spawn_blocking` to avoid blocking the - /// Tokio runtime. The wait array includes all channel signal events plus the - /// shutdown event. + /// Tokio runtime. The wait array puts shutdown first so a stop request wins + /// over any channel that is already signaled. pub fn wait_for_events_blocking(&self, timeout_ms: u32) -> WaitResult { - // Build wait handle array: [channel0_signal, channel1_signal, ..., shutdown_event] - let mut handles: Vec = self.channels.iter().map(|c| c.signal_event).collect(); + // Build wait handle array: [shutdown_event, channel0_signal, channel1_signal, ...] + let mut handles = Vec::with_capacity(self.channels.len() + 1); handles.push(self.shutdown_event); + handles.extend(self.channels.iter().map(|c| c.signal_event)); let result = unsafe { WaitForMultipleObjects(&handles, false, timeout_ms) }; - let shutdown_index = (self.channels.len()) as u32; - match result { r if r == WAIT_TIMEOUT => WaitResult::Timeout, - r if r.0 < WAIT_OBJECT_0.0 + shutdown_index => WaitResult::EventsAvailable, - r if r.0 == WAIT_OBJECT_0.0 + shutdown_index => WaitResult::Shutdown, + r if r == WAIT_OBJECT_0 => WaitResult::Shutdown, + r if r.0 <= WAIT_OBJECT_0.0 + self.channels.len() as u32 => WaitResult::EventsAvailable, _ => { // WAIT_FAILED or unexpected - treat as timeout to avoid tight loop warn!( @@ -1211,6 +1210,31 @@ mod tests { drop(subscription); } + /// Test that shutdown wins when both shutdown and channel handles are signaled. + #[tokio::test] + async fn test_shutdown_signal_takes_priority_over_channel_signal() { + let mut config = WindowsEventLogConfig::default(); + config.channels = vec!["Application".to_string()]; + config.event_timeout_ms = 500; + + let (checkpointer, _temp_dir) = create_test_checkpointer().await; + + let subscription = EventLogSubscription::new(&config, checkpointer, false) + .await + .expect("Subscription creation should succeed"); + + unsafe { + let handle = HANDLE(subscription.shutdown_event_raw()); + let _ = SetEvent(handle); + } + + let result = subscription.wait_for_events_blocking(0); + assert!( + matches!(result, WaitResult::Shutdown), + "shutdown should take priority over already-signaled channels" + ); + } + /// Test pull_events with read_existing_events=true #[tokio::test] async fn test_pull_events_returns_events() { From a8e8c576ed84a11647285a4092b5341701d73e7e Mon Sep 17 00:00:00 2001 From: tot19 <31141271+tot19@users.noreply.github.com> Date: Fri, 1 May 2026 17:21:19 +0900 Subject: [PATCH 7/7] fix(windows_event_log): lighten speculative timeout pulls --- src/sources/windows_event_log/mod.rs | 7 ++-- src/sources/windows_event_log/subscription.rs | 32 ++++++++++++++++--- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/src/sources/windows_event_log/mod.rs b/src/sources/windows_event_log/mod.rs index 39d8c8b73a24d..28dfc1db4cb79 100644 --- a/src/sources/windows_event_log/mod.rs +++ b/src/sources/windows_event_log/mod.rs @@ -488,11 +488,12 @@ impl WindowsEventLogSource { // regardless of root cause. If the OS signal was lost through any // mechanism (not just the pre-drain race fixed in #25194), this // ensures the source recovers within one timeout period. - // EvtNext returns ERROR_NO_MORE_ITEMS on an empty channel, which - // is near-zero cost, so it is safe to attempt every cycle. + // Use the speculative pull variant so idle timeout cycles don't + // refresh per-channel record-count gauges via EvtOpenLog / + // EvtGetLogInfo on every configured channel. let (returned_sub, speculative_result) = with_subscription_blocking(subscription, move |mut sub| { - let result = sub.pull_events(batch_size); + let result = sub.pull_events_speculative(batch_size); (sub, result) }) .await?; diff --git a/src/sources/windows_event_log/subscription.rs b/src/sources/windows_event_log/subscription.rs index ba66d02668850..4571561a2d349 100644 --- a/src/sources/windows_event_log/subscription.rs +++ b/src/sources/windows_event_log/subscription.rs @@ -474,6 +474,28 @@ impl EventLogSubscription { pub fn pull_events( &mut self, max_events: usize, + ) -> Result, WindowsEventLogError> { + self.pull_events_inner(max_events, true) + } + + /// Pull events for timeout-based speculative recovery. + /// + /// This keeps the same event-drain behavior as `pull_events`, but avoids + /// refreshing per-channel record-count gauges for channels that were empty. + /// Timeout pulls can run repeatedly while the host is idle, so skipping + /// those metadata queries prevents steady `EvtOpenLog`/`EvtGetLogInfo` + /// churn without changing event recovery behavior. + pub fn pull_events_speculative( + &mut self, + max_events: usize, + ) -> Result, WindowsEventLogError> { + self.pull_events_inner(max_events, false) + } + + fn pull_events_inner( + &mut self, + max_events: usize, + update_records_for_empty_channels: bool, ) -> Result, WindowsEventLogError> { let mut all_events = Vec::with_capacity(max_events.min(1000)); let num_channels = self.channels.len().max(1); @@ -757,10 +779,12 @@ impl EventLogSubscription { if channel_drained && !bookmark_failed { // Update channel record count gauge for lag detection. - super::render::update_channel_records( - &channel_sub.channel, - &channel_sub.channel_records_gauge, - ); + if update_records_for_empty_channels || channel_count > 0 { + super::render::update_channel_records( + &channel_sub.channel, + &channel_sub.channel_records_gauge, + ); + } } else { // Drain exited early (budget exhausted or bookmark_failed // mid-batch). Re-arm the signal so the next pull_events