diff --git a/src/dragonball/dbs_utils/src/epoll_manager.rs b/src/dragonball/dbs_utils/src/epoll_manager.rs index b27c523afc..4d34aa7d61 100644 --- a/src/dragonball/dbs_utils/src/epoll_manager.rs +++ b/src/dragonball/dbs_utils/src/epoll_manager.rs @@ -99,76 +99,61 @@ impl Default for EpollManager { #[cfg(test)] mod tests { use super::*; - use std::os::unix::io::AsRawFd; + use std::os::fd::AsRawFd; + use std::sync::mpsc::channel; + use std::time::Duration; use vmm_sys_util::{epoll::EventSet, eventfd::EventFd}; struct DummySubscriber { - pub event: EventFd, + pub event: Arc, + pub notify: std::sync::mpsc::Sender<()>, } impl DummySubscriber { - fn new() -> Self { - Self { - event: EventFd::new(0).unwrap(), - } + fn new(event: Arc, notify: std::sync::mpsc::Sender<()>) -> Self { + Self { event, notify } } } impl MutEventSubscriber for DummySubscriber { - fn process(&mut self, events: Events, _ops: &mut EventOps) { - let source = events.fd(); - let event_set = events.event_set(); - assert_ne!(source, self.event.as_raw_fd()); - match event_set { - EventSet::IN => { - unreachable!() - } - EventSet::OUT => { - self.event.read().unwrap(); - } - _ => { - unreachable!() - } - } + fn init(&mut self, ops: &mut EventOps) { + ops.add(Events::new(self.event.as_ref(), EventSet::IN)) + .unwrap(); } - fn init(&mut self, _ops: &mut EventOps) {} + fn process(&mut self, events: Events, _ops: &mut EventOps) { + if events.fd() == self.event.as_raw_fd() && events.event_set().contains(EventSet::IN) { + let _ = self.event.read(); + let _ = self.notify.send(()); + } + } } #[test] fn test_epoll_manager() { - let mut epoll_manager = EpollManager::default(); - let epoll_manager_clone = epoll_manager.clone(); - let thread = std::thread::spawn(move || loop { - let count = epoll_manager_clone.handle_events(-1).unwrap(); - if count == 0 { - continue; + let epoll_manager = EpollManager::default(); + let (stop_tx, stop_rx) = channel::<()>(); + let worker_mgr = epoll_manager.clone(); + let worker = std::thread::spawn(move || { + while stop_rx.try_recv().is_err() { + let _ = worker_mgr.handle_events(50); } - assert_eq!(count, 1); - break; }); - let handler = DummySubscriber::new(); - let event = handler.event.try_clone().unwrap(); + + let (notify_tx, notify_rx) = channel::<()>(); + + let event = Arc::new(EventFd::new(0).unwrap()); + let handler = DummySubscriber::new(event.clone(), notify_tx); let id = epoll_manager.add_subscriber(Box::new(handler)); - thread.join().unwrap(); - - epoll_manager - .add_event(id, Events::new(&event, EventSet::OUT)) - .unwrap(); event.write(1).unwrap(); - let epoll_manager_clone = epoll_manager.clone(); - let thread = std::thread::spawn(move || loop { - let count = epoll_manager_clone.handle_events(-1).unwrap(); - if count == 0 { - continue; - } - assert_eq!(count, 2); - break; - }); + notify_rx + .recv_timeout(Duration::from_secs(2)) + .expect("timeout waiting for subscriber to be processed"); - thread.join().unwrap(); - epoll_manager.remove_subscriber(id).unwrap(); + epoll_manager.clone().remove_subscriber(id).unwrap(); + let _ = stop_tx.send(()); + worker.join().unwrap(); } }