diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs index e2ca7e3339..0e27a36a49 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs @@ -103,7 +103,7 @@ use super::{ConnState, Error, PendingRx, PendingRxSet, Result}; /// guest-side AF_VSOCK socket and a host-side `Read + Write + AsRawFd` stream. pub struct VsockConnection { /// The current connection state. - state: ConnState, + pub(crate) state: ConnState, /// The local CID. Most of the time this will be the constant `2` (the vsock /// host CID). pub(crate) local_cid: u64, @@ -115,6 +115,8 @@ pub struct VsockConnection { pub(crate) peer_port: u32, /// The (connected) host-side stream. pub(crate) stream: Box, + /// keep the connection when local peer closed. + keep: bool, /// The TX buffer for this connection. tx_buf: TxBuf, /// Total number of bytes that have been successfully written to @@ -297,6 +299,8 @@ impl VsockChannel for VsockConnection { // to forward some data to the host stream. Also works for a // connection that has begun shutting down, but the peer still has // some data to send. + // It also work for a hybrid connection's peer closed case, which need + // to active the connection's fd to generate the epollout event. ConnState::Established | ConnState::PeerClosed(_, false) if pkt.op() == uapi::VSOCK_OP_RW => { @@ -318,7 +322,19 @@ impl VsockChannel for VsockConnection { "vsock: error writing to local stream (lp={}, pp={}): {:?}", self.local_port, self.peer_port, err ); - self.kill(); + match err { + Error::TxBufFull => { + // The hybrid pipe peer closed and the tx buf had been full, + // and if want to keep the connection, thus we need drop the + // data send from guest, otherwise, close the connection. + if !self.keep() { + self.kill(); + } + } + _ => { + self.kill(); + } + }; return Ok(()); } @@ -462,15 +478,26 @@ impl VsockEpollListener for VsockConnection { .tx_buf .flush_to(&mut self.stream) .unwrap_or_else(|err| { - warn!( - "vsock: error flushing TX buf for (lp={}, pp={}): {:?}", - self.local_port, self.peer_port, err - ); + if !self.keep() { + warn!( + "vsock: error flushing TX buf for (lp={}, pp={}): {:?}", + self.local_port, self.peer_port, err + ); + } + match err { Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => { // This should never happen (EWOULDBLOCK after // EPOLLOUT), but it does, so let's absorb it. } + Error::TxBufFlush(inner) if (inner.kind() == ErrorKind::BrokenPipe) => { + // The hybrid connection's pipe peer was clsosed, and we want to keep the + // connection thus users can reopen the peer pipe to get the connection, + // otherwise, close the connection. + if !self.keep() { + self.kill(); + } + } _ => self.kill(), }; 0 @@ -499,6 +526,7 @@ impl VsockConnection { local_port: u32, peer_port: u32, peer_buf_alloc: u32, + keep: bool, ) -> Self { Self { local_cid, @@ -506,6 +534,7 @@ impl VsockConnection { local_port, peer_port, stream, + keep, state: ConnState::PeerInit, tx_buf: TxBuf::default(), fwd_cnt: Wrapping(0), @@ -525,6 +554,7 @@ impl VsockConnection { peer_cid: u64, local_port: u32, peer_port: u32, + keep: bool, ) -> Self { Self { local_cid, @@ -532,6 +562,7 @@ impl VsockConnection { local_port, peer_port, stream, + keep, state: ConnState::LocalInit, tx_buf: TxBuf::default(), fwd_cnt: Wrapping(0), @@ -579,6 +610,11 @@ impl VsockConnection { self.state } + /// Return the keep value. + pub fn keep(&self) -> bool { + self.keep + } + /// Send some raw, untracked, data straight to the underlying connected /// stream. Returns: number of bytes written, or the error describing the /// write failure. @@ -608,16 +644,23 @@ impl VsockConnection { // stream. let written = match self.stream.write(buf) { Ok(cnt) => cnt, - Err(e) => { + Err(e) if e.kind() == ErrorKind::WouldBlock => { // Absorb any would-block errors, since we can always try again // later. - if e.kind() == ErrorKind::WouldBlock { - 0 - } else { - // We don't know how to handle any other write error, so - // we'll send it up the call chain. + 0 + } + Err(e) if e.kind() == ErrorKind::BrokenPipe => { + // The backed pipe peer had been closed, and we didn't want to close + // this connection since the peer would like to re attach on it. + if !self.keep() { return Err(Error::StreamWrite(e)); } + 0 + } + Err(e) => { + // We don't know how to handle any other write error, so + // we'll send it up the call chain. + return Err(Error::StreamWrite(e)); } }; // Move the "forwarded bytes" counter ahead by how much we were able to @@ -843,6 +886,7 @@ pub(crate) mod tests { LOCAL_PORT, PEER_PORT, PEER_BUF_ALLOC, + false, ), ConnState::LocalInit => VsockConnection::new_local_init( Box::new(stream), @@ -850,6 +894,7 @@ pub(crate) mod tests { PEER_CID, LOCAL_PORT, PEER_PORT, + false, ), ConnState::Established => { let mut conn = VsockConnection::new_peer_init( @@ -859,6 +904,7 @@ pub(crate) mod tests { LOCAL_PORT, PEER_PORT, PEER_BUF_ALLOC, + false, ); assert!(conn.has_pending_rx()); conn.recv_pkt(&mut pkt).unwrap(); diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs index bfcc5fa1c8..09af066cf1 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs @@ -291,6 +291,34 @@ impl VsockChannel for VsockMuxer { // Alright, everything looks in order - forward this packet to its // owning connection. let mut res: VsockResult<()> = Ok(()); + + // For the hybrid connection, if it want to keep the connection + // when the pipe peer closed, here it needs to update the epoll + // listner to catch the events. + let mut listener = None; + let conn = self.conn_map.get_mut(&conn_key).unwrap(); + let pre_state = conn.state(); + let nfd: RawFd = conn.as_raw_fd(); + + if pre_state == ConnState::LocalClosed && conn.keep() { + conn.state = ConnState::Established; + listener = Some(EpollListener::Connection { + key: conn_key, + evset: conn.get_polled_evset(), + backend: conn.stream.backend_type(), + }); + } + + if let Some(nlistener) = listener { + self.add_listener(nfd, nlistener).unwrap_or_else(|err| { + self.kill_connection(conn_key); + warn!( + "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", + conn_key.local_port, conn_key.peer_port, err + ); + }); + } + self.apply_conn_mutation(conn_key, |conn| { res = conn.send_pkt(pkt); }); @@ -396,6 +424,23 @@ impl VsockMuxer { // listening for it. Some(EpollListener::Connection { key, evset: _, .. }) => { let key_copy = *key; + + // If the hybrid connection's local peer closed, then the epoll handler wouldn't + // get the epollout event even when it's reopened again, thus it should be notified + // when the guest send any data to try to active the epoll handler to generate the + // epollout event for this connection. + + let mut need_rm = false; + if let Some(conn) = self.conn_map.get_mut(&key_copy) { + if event_set.contains(epoll::Events::EPOLLERR) && conn.keep() { + conn.state = ConnState::LocalClosed; + need_rm = true; + } + } + if need_rm { + self.remove_listener(fd); + } + // The handling of this event will most probably mutate the // state of the receiving connection. We'll need to check for new // pending RX, event set mutation, and all that, so we're @@ -459,6 +504,7 @@ impl VsockMuxer { self.cid, local_port, peer_port, + false, ), ) } @@ -476,8 +522,10 @@ impl VsockMuxer { Some(EpollListener::PassFdStream(_)) => { if let Some(EpollListener::PassFdStream(mut stream)) = self.remove_listener(fd) { Self::passfd_read_port_and_fd(&mut stream) - .map(|(nfd, peer_port)| (nfd, self.allocate_local_port(), peer_port)) - .and_then(|(nfd, local_port, peer_port)| { + .map(|(nfd, peer_port, keep)| { + (nfd, self.allocate_local_port(), peer_port, keep) + }) + .and_then(|(nfd, local_port, peer_port, keep)| { // Here we should make sure the nfd the sole owner to convert it // into an UnixStream object, otherwise, it could cause memory unsafety. let nstream = unsafe { File::from_raw_fd(nfd) }; @@ -502,6 +550,7 @@ impl VsockMuxer { self.cid, local_port, peer_port, + keep, ), ) }) @@ -587,7 +636,7 @@ impl VsockMuxer { .map_err(|_| Error::InvalidPortRequest) } - fn passfd_read_port_and_fd(stream: &mut Box) -> Result<(RawFd, u32)> { + fn passfd_read_port_and_fd(stream: &mut Box) -> Result<(RawFd, u32, bool)> { let mut buf = [0u8; 32]; let mut fds = [0, 1]; let (data_len, fd_len) = stream @@ -607,7 +656,9 @@ impl VsockMuxer { .ok_or(Error::InvalidPortRequest) .and_then(|word| word.parse::().map_err(|_| Error::InvalidPortRequest))?; - Ok((fds[0], port)) + let keep = port_iter.next().is_some_and(|kp| kp == "keep"); + + Ok((fds[0], port, keep)) } /// Add a new connection to the active connection pool. @@ -775,6 +826,7 @@ impl VsockMuxer { pkt.dst_port(), pkt.src_port(), pkt.buf_alloc(), + false, ), ) }) @@ -876,7 +928,7 @@ impl VsockMuxer { ); }); } - } else { + } else if conn.state() != ConnState::LocalClosed { // The connection had previously asked to be removed from the // listener map (by returning an empty event set via // `get_polled_fd()`), but now wants back in.