From 4a2d43704340e378d61d3f140f2aa1a387d052b8 Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Thu, 1 Apr 2021 21:54:33 +0800 Subject: [PATCH 1/2] agent: don't do anything in Pipestream::shutdown The only right way to shutdown pipe is drop it Otherwise PipeStream will conflict with its twins Because they both have the same fd, and both registered. Fixes: #1614 Signed-off-by: Tim Zhang --- src/agent/rustjail/src/pipestream.rs | 9 ++++----- src/agent/rustjail/src/sync_with_async.rs | 18 +++++------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/src/agent/rustjail/src/pipestream.rs b/src/agent/rustjail/src/pipestream.rs index 4215190ace..1312a4b6a4 100644 --- a/src/agent/rustjail/src/pipestream.rs +++ b/src/agent/rustjail/src/pipestream.rs @@ -77,10 +77,6 @@ impl PipeStream { Ok(Self(AsyncFd::new(StreamFd(fd))?)) } - pub fn shutdown(&mut self) -> io::Result<()> { - self.0.get_mut().close() - } - pub fn from_fd(fd: RawFd) -> Self { unsafe { Self::from_raw_fd(fd) } } @@ -164,7 +160,10 @@ impl AsyncWrite for PipeStream { } fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - self.get_mut().shutdown()?; + // Do nothing in shutdown is very important + // The only right way to shutdown pipe is drop it + // Otherwise PipeStream will conflict with its twins + // Because they both have same fd, and both registered. Poll::Ready(Ok(())) } } diff --git a/src/agent/rustjail/src/sync_with_async.rs b/src/agent/rustjail/src/sync_with_async.rs index 7f44ab2819..4626801158 100644 --- a/src/agent/rustjail/src/sync_with_async.rs +++ b/src/agent/rustjail/src/sync_with_async.rs @@ -117,28 +117,20 @@ pub async fn write_async(pipe_w: &mut PipeStream, msg_type: i32, data_str: &str) } match msg_type { - SYNC_FAILED => match write_count(pipe_w, data_str.as_bytes(), data_str.len()).await { - Ok(_) => pipe_w.shutdown()?, - Err(e) => { - pipe_w.shutdown()?; + SYNC_FAILED => { + if let Err(e) = write_count(pipe_w, data_str.as_bytes(), data_str.len()).await { return Err(anyhow!(e).context("error in send message to process")); } - }, + } SYNC_DATA => { let length: i32 = data_str.len() as i32; write_count(pipe_w, &length.to_be_bytes(), MSG_SIZE) .await - .or_else(|e| { - pipe_w.shutdown()?; - Err(anyhow!(e).context("error in send message to process")) - })?; + .map_err(|e| anyhow!(e).context("error in send message to process"))?; write_count(pipe_w, data_str.as_bytes(), data_str.len()) .await - .or_else(|e| { - pipe_w.shutdown()?; - Err(anyhow!(e).context("error in send message to process")) - })?; + .map_err(|e| anyhow!(e).context("error in send message to process"))?; } _ => (), From ee6a590db17807355d08325ec1ad9c06c726a97e Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Fri, 2 Apr 2021 18:21:44 +0800 Subject: [PATCH 2/2] agent: add test test_pipestream_shutdown Make sure PipeStream::shutdown() do not close the inner fd. Signed-off-by: Tim Zhang --- src/agent/rustjail/src/pipestream.rs | 34 ++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/agent/rustjail/src/pipestream.rs b/src/agent/rustjail/src/pipestream.rs index 1312a4b6a4..ab73291801 100644 --- a/src/agent/rustjail/src/pipestream.rs +++ b/src/agent/rustjail/src/pipestream.rs @@ -167,3 +167,37 @@ impl AsyncWrite for PipeStream { Poll::Ready(Ok(())) } } + +#[cfg(test)] +mod tests { + use super::*; + use nix::fcntl::OFlag; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + #[tokio::test] + // Shutdown should never close the inner fd. + async fn test_pipestream_shutdown() { + let (_, wfd1) = unistd::pipe2(OFlag::O_CLOEXEC).unwrap(); + let mut writer1 = PipeStream::new(wfd1).unwrap(); + + // if close fd in shutdown, the fd will be reused + // and the test will failed + let _ = writer1.shutdown().await.unwrap(); + + // let _ = unistd::close(wfd1); + + let (rfd2, wfd2) = unistd::pipe2(OFlag::O_CLOEXEC).unwrap(); // reuse fd number, rfd2 == wfd1 + + let mut reader2 = PipeStream::new(rfd2).unwrap(); + let mut writer2 = PipeStream::new(wfd2).unwrap(); + + // deregister writer1, then reader2 which has the same fd will be deregistered from epoll + drop(writer1); + + let _ = writer2.write(b"1").await; + + let mut content = vec![0u8; 1]; + // Will Block here if shutdown close the fd. + let _ = reader2.read(&mut content).await; + } +}