runtime-rs: fix stdin hang in azure

Fix stdin hang in azure.

Fixes: #4740
Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
This commit is contained in:
Quanwei Zhou 2022-07-28 10:30:54 +08:00 committed by quanwei.zqw
parent 57c556a801
commit fa0b11fc52
2 changed files with 32 additions and 6 deletions

View File

@ -18,7 +18,7 @@ use nix::{
sys::stat::Mode,
};
use tokio::{
fs::File,
fs::OpenOptions,
io::{AsyncRead, AsyncWrite},
net::UnixStream as AsyncUnixStream,
};
@ -47,8 +47,20 @@ impl ShimIo {
stdout: &Option<String>,
stderr: &Option<String>,
) -> Result<Self> {
info!(
sl!(),
"new shim io stdin {:?} stdout {:?} stderr {:?}", stdin, stdout, stderr
);
let stdin_fd: Option<Box<dyn AsyncRead + Send + Unpin>> = if let Some(stdin) = stdin {
match File::open(&stdin).await {
info!(sl!(), "open stdin {:?}", &stdin);
match OpenOptions::new()
.read(true)
.write(false)
.custom_flags(libc::O_NONBLOCK)
.open(&stdin)
.await
{
Ok(file) => Some(Box::new(file)),
Err(err) => {
error!(sl!(), "failed to open {} error {:?}", &stdin, err);
@ -60,6 +72,8 @@ impl ShimIo {
};
let get_url = |url: &Option<String>| -> Option<Url> {
info!(sl!(), "get url for {:?}", url);
match url {
None => None,
Some(out) => match Url::parse(out.as_str()) {
@ -79,6 +93,7 @@ impl ShimIo {
let stdout_url = get_url(stdout);
let get_fd = |url: &Option<Url>| -> Option<Box<dyn AsyncWrite + Send + Unpin>> {
info!(sl!(), "get fd for {:?}", &url);
if let Some(url) = url {
if url.scheme() == "fifo" {
let path = url.path();

View File

@ -129,13 +129,24 @@ impl Process {
mut reader: Box<dyn AsyncRead + Send + Unpin>,
mut writer: Box<dyn AsyncWrite + Send + Unpin>,
) -> Result<()> {
info!(self.logger, "run io copy for {}", io_name);
let io_name = io_name.to_string();
let logger = self.logger.new(o!("io name" => io_name));
let _ = tokio::spawn(async move {
match tokio::io::copy(&mut reader, &mut writer).await {
Err(e) => warn!(logger, "io: failed to copy stream {}", e),
Ok(length) => warn!(logger, "io: stop to copy stream length {}", length),
};
loop {
match tokio::io::copy(&mut reader, &mut writer).await {
Err(e) => {
if let Some(error_code) = e.raw_os_error() {
if error_code == libc::EAGAIN {
continue;
}
}
warn!(logger, "io: failed to copy stream {}", e);
}
Ok(length) => warn!(logger, "io: stop to copy stream length {}", length),
};
break;
}
wgw.done();
});