mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-30 20:54:26 +00:00
runtime-rs: fix stdin hang in azure
Fix stdin hang in azure. Fixes: #4740 Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
This commit is contained in:
parent
57c556a801
commit
fa0b11fc52
@ -18,7 +18,7 @@ use nix::{
|
|||||||
sys::stat::Mode,
|
sys::stat::Mode,
|
||||||
};
|
};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs::File,
|
fs::OpenOptions,
|
||||||
io::{AsyncRead, AsyncWrite},
|
io::{AsyncRead, AsyncWrite},
|
||||||
net::UnixStream as AsyncUnixStream,
|
net::UnixStream as AsyncUnixStream,
|
||||||
};
|
};
|
||||||
@ -47,8 +47,20 @@ impl ShimIo {
|
|||||||
stdout: &Option<String>,
|
stdout: &Option<String>,
|
||||||
stderr: &Option<String>,
|
stderr: &Option<String>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
|
info!(
|
||||||
|
sl!(),
|
||||||
|
"new shim io stdin {:?} stdout {:?} stderr {:?}", stdin, stdout, stderr
|
||||||
|
);
|
||||||
|
|
||||||
let stdin_fd: Option<Box<dyn AsyncRead + Send + Unpin>> = if let Some(stdin) = stdin {
|
let stdin_fd: Option<Box<dyn AsyncRead + Send + Unpin>> = if let Some(stdin) = stdin {
|
||||||
match File::open(&stdin).await {
|
info!(sl!(), "open stdin {:?}", &stdin);
|
||||||
|
match OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.write(false)
|
||||||
|
.custom_flags(libc::O_NONBLOCK)
|
||||||
|
.open(&stdin)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(file) => Some(Box::new(file)),
|
Ok(file) => Some(Box::new(file)),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(sl!(), "failed to open {} error {:?}", &stdin, err);
|
error!(sl!(), "failed to open {} error {:?}", &stdin, err);
|
||||||
@ -60,6 +72,8 @@ impl ShimIo {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let get_url = |url: &Option<String>| -> Option<Url> {
|
let get_url = |url: &Option<String>| -> Option<Url> {
|
||||||
|
info!(sl!(), "get url for {:?}", url);
|
||||||
|
|
||||||
match url {
|
match url {
|
||||||
None => None,
|
None => None,
|
||||||
Some(out) => match Url::parse(out.as_str()) {
|
Some(out) => match Url::parse(out.as_str()) {
|
||||||
@ -79,6 +93,7 @@ impl ShimIo {
|
|||||||
|
|
||||||
let stdout_url = get_url(stdout);
|
let stdout_url = get_url(stdout);
|
||||||
let get_fd = |url: &Option<Url>| -> Option<Box<dyn AsyncWrite + Send + Unpin>> {
|
let get_fd = |url: &Option<Url>| -> Option<Box<dyn AsyncWrite + Send + Unpin>> {
|
||||||
|
info!(sl!(), "get fd for {:?}", &url);
|
||||||
if let Some(url) = url {
|
if let Some(url) = url {
|
||||||
if url.scheme() == "fifo" {
|
if url.scheme() == "fifo" {
|
||||||
let path = url.path();
|
let path = url.path();
|
||||||
|
@ -129,13 +129,24 @@ impl Process {
|
|||||||
mut reader: Box<dyn AsyncRead + Send + Unpin>,
|
mut reader: Box<dyn AsyncRead + Send + Unpin>,
|
||||||
mut writer: Box<dyn AsyncWrite + Send + Unpin>,
|
mut writer: Box<dyn AsyncWrite + Send + Unpin>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
info!(self.logger, "run io copy for {}", io_name);
|
||||||
let io_name = io_name.to_string();
|
let io_name = io_name.to_string();
|
||||||
let logger = self.logger.new(o!("io name" => io_name));
|
let logger = self.logger.new(o!("io name" => io_name));
|
||||||
let _ = tokio::spawn(async move {
|
let _ = tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
match tokio::io::copy(&mut reader, &mut writer).await {
|
match tokio::io::copy(&mut reader, &mut writer).await {
|
||||||
Err(e) => warn!(logger, "io: failed to copy stream {}", e),
|
Err(e) => {
|
||||||
|
if let Some(error_code) = e.raw_os_error() {
|
||||||
|
if error_code == libc::EAGAIN {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
warn!(logger, "io: failed to copy stream {}", e);
|
||||||
|
}
|
||||||
Ok(length) => warn!(logger, "io: stop to copy stream length {}", length),
|
Ok(length) => warn!(logger, "io: stop to copy stream length {}", length),
|
||||||
};
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
wgw.done();
|
wgw.done();
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user