Merge pull request #1613 from Tim-Zhang/pipestream-shutdown-do-nothing

Don't do anything in Pipestream::shutdown
This commit is contained in:
Bin Liu
2021-04-06 14:03:00 +08:00
committed by GitHub
2 changed files with 43 additions and 18 deletions

View File

@@ -77,10 +77,6 @@ impl PipeStream {
Ok(Self(AsyncFd::new(StreamFd(fd))?))
}
pub fn shutdown(&mut self) -> io::Result<()> {
self.0.get_mut().close()
}
pub fn from_fd(fd: RawFd) -> Self {
unsafe { Self::from_raw_fd(fd) }
}
@@ -164,7 +160,44 @@ impl AsyncWrite for PipeStream {
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_mut().shutdown()?;
// Do nothing in shutdown is very important
// The only right way to shutdown pipe is drop it
// Otherwise PipeStream will conflict with its twins
// Because they both have same fd, and both registered.
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use nix::fcntl::OFlag;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::test]
// Shutdown should never close the inner fd.
async fn test_pipestream_shutdown() {
let (_, wfd1) = unistd::pipe2(OFlag::O_CLOEXEC).unwrap();
let mut writer1 = PipeStream::new(wfd1).unwrap();
// if close fd in shutdown, the fd will be reused
// and the test will failed
let _ = writer1.shutdown().await.unwrap();
// let _ = unistd::close(wfd1);
let (rfd2, wfd2) = unistd::pipe2(OFlag::O_CLOEXEC).unwrap(); // reuse fd number, rfd2 == wfd1
let mut reader2 = PipeStream::new(rfd2).unwrap();
let mut writer2 = PipeStream::new(wfd2).unwrap();
// deregister writer1, then reader2 which has the same fd will be deregistered from epoll
drop(writer1);
let _ = writer2.write(b"1").await;
let mut content = vec![0u8; 1];
// Will Block here if shutdown close the fd.
let _ = reader2.read(&mut content).await;
}
}

View File

@@ -117,28 +117,20 @@ pub async fn write_async(pipe_w: &mut PipeStream, msg_type: i32, data_str: &str)
}
match msg_type {
SYNC_FAILED => match write_count(pipe_w, data_str.as_bytes(), data_str.len()).await {
Ok(_) => pipe_w.shutdown()?,
Err(e) => {
pipe_w.shutdown()?;
SYNC_FAILED => {
if let Err(e) = write_count(pipe_w, data_str.as_bytes(), data_str.len()).await {
return Err(anyhow!(e).context("error in send message to process"));
}
},
}
SYNC_DATA => {
let length: i32 = data_str.len() as i32;
write_count(pipe_w, &length.to_be_bytes(), MSG_SIZE)
.await
.or_else(|e| {
pipe_w.shutdown()?;
Err(anyhow!(e).context("error in send message to process"))
})?;
.map_err(|e| anyhow!(e).context("error in send message to process"))?;
write_count(pipe_w, data_str.as_bytes(), data_str.len())
.await
.or_else(|e| {
pipe_w.shutdown()?;
Err(anyhow!(e).context("error in send message to process"))
})?;
.map_err(|e| anyhow!(e).context("error in send message to process"))?;
}
_ => (),