dragonball: Fix flaky test_epoll_manager by improving synchronization

This commit aims to address issues of "Infinite loop in epoll_manager
tests" and improve stablity.

Root causes as below:
1. Using `handle_events(-1)` caused the worker thread to block forever
   if an event was missed or if the internal `kick()` signal was not
   accounted for correctly.
2. Relying on event counts was unreliable because internal signals could
   fluctuate the total count, causing the it to enter an infinite loop.
3. Using `EventSet::OUT` on an EventFd is often continuously ready,
   leading to non-deterministic trigger behavior.

Signed-off-by: Alex Lyn <alex.lyn@antgroup.com>
This commit is contained in:
Alex Lyn
2026-03-04 11:02:46 +08:00
parent 2fff33cfa4
commit c8a39ad28d

View File

@@ -99,76 +99,61 @@ impl Default for EpollManager {
#[cfg(test)]
mod tests {
use super::*;
use std::os::unix::io::AsRawFd;
use std::os::fd::AsRawFd;
use std::sync::mpsc::channel;
use std::time::Duration;
use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
struct DummySubscriber {
pub event: EventFd,
pub event: Arc<EventFd>,
pub notify: std::sync::mpsc::Sender<()>,
}
impl DummySubscriber {
fn new() -> Self {
Self {
event: EventFd::new(0).unwrap(),
}
fn new(event: Arc<EventFd>, notify: std::sync::mpsc::Sender<()>) -> Self {
Self { event, notify }
}
}
impl MutEventSubscriber for DummySubscriber {
fn process(&mut self, events: Events, _ops: &mut EventOps) {
let source = events.fd();
let event_set = events.event_set();
assert_ne!(source, self.event.as_raw_fd());
match event_set {
EventSet::IN => {
unreachable!()
}
EventSet::OUT => {
self.event.read().unwrap();
}
_ => {
unreachable!()
}
}
fn init(&mut self, ops: &mut EventOps) {
ops.add(Events::new(self.event.as_ref(), EventSet::IN))
.unwrap();
}
fn init(&mut self, _ops: &mut EventOps) {}
fn process(&mut self, events: Events, _ops: &mut EventOps) {
if events.fd() == self.event.as_raw_fd() && events.event_set().contains(EventSet::IN) {
let _ = self.event.read();
let _ = self.notify.send(());
}
}
}
#[test]
fn test_epoll_manager() {
let mut epoll_manager = EpollManager::default();
let epoll_manager_clone = epoll_manager.clone();
let thread = std::thread::spawn(move || loop {
let count = epoll_manager_clone.handle_events(-1).unwrap();
if count == 0 {
continue;
let epoll_manager = EpollManager::default();
let (stop_tx, stop_rx) = channel::<()>();
let worker_mgr = epoll_manager.clone();
let worker = std::thread::spawn(move || {
while stop_rx.try_recv().is_err() {
let _ = worker_mgr.handle_events(50);
}
assert_eq!(count, 1);
break;
});
let handler = DummySubscriber::new();
let event = handler.event.try_clone().unwrap();
let (notify_tx, notify_rx) = channel::<()>();
let event = Arc::new(EventFd::new(0).unwrap());
let handler = DummySubscriber::new(event.clone(), notify_tx);
let id = epoll_manager.add_subscriber(Box::new(handler));
thread.join().unwrap();
epoll_manager
.add_event(id, Events::new(&event, EventSet::OUT))
.unwrap();
event.write(1).unwrap();
let epoll_manager_clone = epoll_manager.clone();
let thread = std::thread::spawn(move || loop {
let count = epoll_manager_clone.handle_events(-1).unwrap();
if count == 0 {
continue;
}
assert_eq!(count, 2);
break;
});
notify_rx
.recv_timeout(Duration::from_secs(2))
.expect("timeout waiting for subscriber to be processed");
thread.join().unwrap();
epoll_manager.remove_subscriber(id).unwrap();
epoll_manager.clone().remove_subscriber(id).unwrap();
let _ = stop_tx.send(());
worker.join().unwrap();
}
}