mirror of
https://github.com/kata-containers/kata-containers.git
synced 2026-01-30 05:52:15 +00:00
dbs: hybrid stream support keep the connection when local closed
Support the hybrid fd passthrough mode with passing pipe fd, which can specify this connection kept even when the pipe peer closed, and this connection can be reget wich re-opening the pipe. Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
This commit is contained in:
@@ -103,7 +103,7 @@ use super::{ConnState, Error, PendingRx, PendingRxSet, Result};
|
||||
/// guest-side AF_VSOCK socket and a host-side `Read + Write + AsRawFd` stream.
|
||||
pub struct VsockConnection {
|
||||
/// The current connection state.
|
||||
state: ConnState,
|
||||
pub(crate) state: ConnState,
|
||||
/// The local CID. Most of the time this will be the constant `2` (the vsock
|
||||
/// host CID).
|
||||
pub(crate) local_cid: u64,
|
||||
@@ -115,6 +115,8 @@ pub struct VsockConnection {
|
||||
pub(crate) peer_port: u32,
|
||||
/// The (connected) host-side stream.
|
||||
pub(crate) stream: Box<dyn VsockStream>,
|
||||
/// keep the connection when local peer closed.
|
||||
keep: bool,
|
||||
/// The TX buffer for this connection.
|
||||
tx_buf: TxBuf,
|
||||
/// Total number of bytes that have been successfully written to
|
||||
@@ -297,6 +299,8 @@ impl VsockChannel for VsockConnection {
|
||||
// to forward some data to the host stream. Also works for a
|
||||
// connection that has begun shutting down, but the peer still has
|
||||
// some data to send.
|
||||
// It also work for a hybrid connection's peer closed case, which need
|
||||
// to active the connection's fd to generate the epollout event.
|
||||
ConnState::Established | ConnState::PeerClosed(_, false)
|
||||
if pkt.op() == uapi::VSOCK_OP_RW =>
|
||||
{
|
||||
@@ -318,7 +322,19 @@ impl VsockChannel for VsockConnection {
|
||||
"vsock: error writing to local stream (lp={}, pp={}): {:?}",
|
||||
self.local_port, self.peer_port, err
|
||||
);
|
||||
self.kill();
|
||||
match err {
|
||||
Error::TxBufFull => {
|
||||
// The hybrid pipe peer closed and the tx buf had been full,
|
||||
// and if want to keep the connection, thus we need drop the
|
||||
// data send from guest, otherwise, close the connection.
|
||||
if !self.keep() {
|
||||
self.kill();
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
self.kill();
|
||||
}
|
||||
};
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -462,15 +478,26 @@ impl VsockEpollListener for VsockConnection {
|
||||
.tx_buf
|
||||
.flush_to(&mut self.stream)
|
||||
.unwrap_or_else(|err| {
|
||||
warn!(
|
||||
"vsock: error flushing TX buf for (lp={}, pp={}): {:?}",
|
||||
self.local_port, self.peer_port, err
|
||||
);
|
||||
if !self.keep() {
|
||||
warn!(
|
||||
"vsock: error flushing TX buf for (lp={}, pp={}): {:?}",
|
||||
self.local_port, self.peer_port, err
|
||||
);
|
||||
}
|
||||
|
||||
match err {
|
||||
Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => {
|
||||
// This should never happen (EWOULDBLOCK after
|
||||
// EPOLLOUT), but it does, so let's absorb it.
|
||||
}
|
||||
Error::TxBufFlush(inner) if (inner.kind() == ErrorKind::BrokenPipe) => {
|
||||
// The hybrid connection's pipe peer was clsosed, and we want to keep the
|
||||
// connection thus users can reopen the peer pipe to get the connection,
|
||||
// otherwise, close the connection.
|
||||
if !self.keep() {
|
||||
self.kill();
|
||||
}
|
||||
}
|
||||
_ => self.kill(),
|
||||
};
|
||||
0
|
||||
@@ -499,6 +526,7 @@ impl VsockConnection {
|
||||
local_port: u32,
|
||||
peer_port: u32,
|
||||
peer_buf_alloc: u32,
|
||||
keep: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
local_cid,
|
||||
@@ -506,6 +534,7 @@ impl VsockConnection {
|
||||
local_port,
|
||||
peer_port,
|
||||
stream,
|
||||
keep,
|
||||
state: ConnState::PeerInit,
|
||||
tx_buf: TxBuf::default(),
|
||||
fwd_cnt: Wrapping(0),
|
||||
@@ -525,6 +554,7 @@ impl VsockConnection {
|
||||
peer_cid: u64,
|
||||
local_port: u32,
|
||||
peer_port: u32,
|
||||
keep: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
local_cid,
|
||||
@@ -532,6 +562,7 @@ impl VsockConnection {
|
||||
local_port,
|
||||
peer_port,
|
||||
stream,
|
||||
keep,
|
||||
state: ConnState::LocalInit,
|
||||
tx_buf: TxBuf::default(),
|
||||
fwd_cnt: Wrapping(0),
|
||||
@@ -579,6 +610,11 @@ impl VsockConnection {
|
||||
self.state
|
||||
}
|
||||
|
||||
/// Return the keep value.
|
||||
pub fn keep(&self) -> bool {
|
||||
self.keep
|
||||
}
|
||||
|
||||
/// Send some raw, untracked, data straight to the underlying connected
|
||||
/// stream. Returns: number of bytes written, or the error describing the
|
||||
/// write failure.
|
||||
@@ -608,16 +644,23 @@ impl VsockConnection {
|
||||
// stream.
|
||||
let written = match self.stream.write(buf) {
|
||||
Ok(cnt) => cnt,
|
||||
Err(e) => {
|
||||
Err(e) if e.kind() == ErrorKind::WouldBlock => {
|
||||
// Absorb any would-block errors, since we can always try again
|
||||
// later.
|
||||
if e.kind() == ErrorKind::WouldBlock {
|
||||
0
|
||||
} else {
|
||||
// We don't know how to handle any other write error, so
|
||||
// we'll send it up the call chain.
|
||||
0
|
||||
}
|
||||
Err(e) if e.kind() == ErrorKind::BrokenPipe => {
|
||||
// The backed pipe peer had been closed, and we didn't want to close
|
||||
// this connection since the peer would like to re attach on it.
|
||||
if !self.keep() {
|
||||
return Err(Error::StreamWrite(e));
|
||||
}
|
||||
0
|
||||
}
|
||||
Err(e) => {
|
||||
// We don't know how to handle any other write error, so
|
||||
// we'll send it up the call chain.
|
||||
return Err(Error::StreamWrite(e));
|
||||
}
|
||||
};
|
||||
// Move the "forwarded bytes" counter ahead by how much we were able to
|
||||
@@ -843,6 +886,7 @@ pub(crate) mod tests {
|
||||
LOCAL_PORT,
|
||||
PEER_PORT,
|
||||
PEER_BUF_ALLOC,
|
||||
false,
|
||||
),
|
||||
ConnState::LocalInit => VsockConnection::new_local_init(
|
||||
Box::new(stream),
|
||||
@@ -850,6 +894,7 @@ pub(crate) mod tests {
|
||||
PEER_CID,
|
||||
LOCAL_PORT,
|
||||
PEER_PORT,
|
||||
false,
|
||||
),
|
||||
ConnState::Established => {
|
||||
let mut conn = VsockConnection::new_peer_init(
|
||||
@@ -859,6 +904,7 @@ pub(crate) mod tests {
|
||||
LOCAL_PORT,
|
||||
PEER_PORT,
|
||||
PEER_BUF_ALLOC,
|
||||
false,
|
||||
);
|
||||
assert!(conn.has_pending_rx());
|
||||
conn.recv_pkt(&mut pkt).unwrap();
|
||||
|
||||
@@ -291,6 +291,34 @@ impl VsockChannel for VsockMuxer {
|
||||
// Alright, everything looks in order - forward this packet to its
|
||||
// owning connection.
|
||||
let mut res: VsockResult<()> = Ok(());
|
||||
|
||||
// For the hybrid connection, if it want to keep the connection
|
||||
// when the pipe peer closed, here it needs to update the epoll
|
||||
// listner to catch the events.
|
||||
let mut listener = None;
|
||||
let conn = self.conn_map.get_mut(&conn_key).unwrap();
|
||||
let pre_state = conn.state();
|
||||
let nfd: RawFd = conn.as_raw_fd();
|
||||
|
||||
if pre_state == ConnState::LocalClosed && conn.keep() {
|
||||
conn.state = ConnState::Established;
|
||||
listener = Some(EpollListener::Connection {
|
||||
key: conn_key,
|
||||
evset: conn.get_polled_evset(),
|
||||
backend: conn.stream.backend_type(),
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(nlistener) = listener {
|
||||
self.add_listener(nfd, nlistener).unwrap_or_else(|err| {
|
||||
self.kill_connection(conn_key);
|
||||
warn!(
|
||||
"vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
|
||||
conn_key.local_port, conn_key.peer_port, err
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
self.apply_conn_mutation(conn_key, |conn| {
|
||||
res = conn.send_pkt(pkt);
|
||||
});
|
||||
@@ -396,6 +424,23 @@ impl VsockMuxer {
|
||||
// listening for it.
|
||||
Some(EpollListener::Connection { key, evset: _, .. }) => {
|
||||
let key_copy = *key;
|
||||
|
||||
// If the hybrid connection's local peer closed, then the epoll handler wouldn't
|
||||
// get the epollout event even when it's reopened again, thus it should be notified
|
||||
// when the guest send any data to try to active the epoll handler to generate the
|
||||
// epollout event for this connection.
|
||||
|
||||
let mut need_rm = false;
|
||||
if let Some(conn) = self.conn_map.get_mut(&key_copy) {
|
||||
if event_set.contains(epoll::Events::EPOLLERR) && conn.keep() {
|
||||
conn.state = ConnState::LocalClosed;
|
||||
need_rm = true;
|
||||
}
|
||||
}
|
||||
if need_rm {
|
||||
self.remove_listener(fd);
|
||||
}
|
||||
|
||||
// The handling of this event will most probably mutate the
|
||||
// state of the receiving connection. We'll need to check for new
|
||||
// pending RX, event set mutation, and all that, so we're
|
||||
@@ -459,6 +504,7 @@ impl VsockMuxer {
|
||||
self.cid,
|
||||
local_port,
|
||||
peer_port,
|
||||
false,
|
||||
),
|
||||
)
|
||||
}
|
||||
@@ -476,8 +522,10 @@ impl VsockMuxer {
|
||||
Some(EpollListener::PassFdStream(_)) => {
|
||||
if let Some(EpollListener::PassFdStream(mut stream)) = self.remove_listener(fd) {
|
||||
Self::passfd_read_port_and_fd(&mut stream)
|
||||
.map(|(nfd, peer_port)| (nfd, self.allocate_local_port(), peer_port))
|
||||
.and_then(|(nfd, local_port, peer_port)| {
|
||||
.map(|(nfd, peer_port, keep)| {
|
||||
(nfd, self.allocate_local_port(), peer_port, keep)
|
||||
})
|
||||
.and_then(|(nfd, local_port, peer_port, keep)| {
|
||||
// Here we should make sure the nfd the sole owner to convert it
|
||||
// into an UnixStream object, otherwise, it could cause memory unsafety.
|
||||
let nstream = unsafe { File::from_raw_fd(nfd) };
|
||||
@@ -502,6 +550,7 @@ impl VsockMuxer {
|
||||
self.cid,
|
||||
local_port,
|
||||
peer_port,
|
||||
keep,
|
||||
),
|
||||
)
|
||||
})
|
||||
@@ -587,7 +636,7 @@ impl VsockMuxer {
|
||||
.map_err(|_| Error::InvalidPortRequest)
|
||||
}
|
||||
|
||||
fn passfd_read_port_and_fd(stream: &mut Box<dyn VsockStream>) -> Result<(RawFd, u32)> {
|
||||
fn passfd_read_port_and_fd(stream: &mut Box<dyn VsockStream>) -> Result<(RawFd, u32, bool)> {
|
||||
let mut buf = [0u8; 32];
|
||||
let mut fds = [0, 1];
|
||||
let (data_len, fd_len) = stream
|
||||
@@ -607,7 +656,9 @@ impl VsockMuxer {
|
||||
.ok_or(Error::InvalidPortRequest)
|
||||
.and_then(|word| word.parse::<u32>().map_err(|_| Error::InvalidPortRequest))?;
|
||||
|
||||
Ok((fds[0], port))
|
||||
let keep = port_iter.next().is_some_and(|kp| kp == "keep");
|
||||
|
||||
Ok((fds[0], port, keep))
|
||||
}
|
||||
|
||||
/// Add a new connection to the active connection pool.
|
||||
@@ -775,6 +826,7 @@ impl VsockMuxer {
|
||||
pkt.dst_port(),
|
||||
pkt.src_port(),
|
||||
pkt.buf_alloc(),
|
||||
false,
|
||||
),
|
||||
)
|
||||
})
|
||||
@@ -876,7 +928,7 @@ impl VsockMuxer {
|
||||
);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
} else if conn.state() != ConnState::LocalClosed {
|
||||
// The connection had previously asked to be removed from the
|
||||
// listener map (by returning an empty event set via
|
||||
// `get_polled_fd()`), but now wants back in.
|
||||
|
||||
Reference in New Issue
Block a user