runtime-rs: fix high cpu

Fixed the issue when using nonblocking, the `tokio::io::copy()` needing
to handle EAGAIN, resulting in high CPU usage.

Fixes: #5740
Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
This commit is contained in:
Quanwei Zhou 2022-12-04 00:45:14 +08:00 committed by quanwei.zqw
parent 8246de821f
commit 0019d653d6
2 changed files with 31 additions and 18 deletions

View File

@ -6,7 +6,11 @@
use std::{
io,
os::unix::{io::FromRawFd, net::UnixStream as StdUnixStream},
os::unix::{
io::{FromRawFd, RawFd},
net::UnixStream as StdUnixStream,
prelude::AsRawFd,
},
pin::Pin,
task::Context as TaskContext,
task::Poll,
@ -52,8 +56,21 @@ impl ShimIo {
"new shim io stdin {:?} stdout {:?} stderr {:?}", stdin, stdout, stderr
);
let set_flag_with_blocking = |fd: RawFd| {
let flag = unsafe { libc::fcntl(fd, libc::F_GETFL) };
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flag & !libc::O_NONBLOCK) };
if ret < 0 {
error!(sl!(), "failed to set fcntl for fd {} error {}", fd, ret);
}
};
let stdin_fd: Option<Box<dyn AsyncRead + Send + Unpin>> = if let Some(stdin) = stdin {
info!(sl!(), "open stdin {:?}", &stdin);
// Since the stdin peer point (which is hold by containerd) could not be openned
// immediately, which would block here's open with block mode, and we wouldn't want to
// block here, thus here opened with nonblock and then reset it to block mode for
// tokio async io.
match OpenOptions::new()
.read(true)
.write(false)
@ -61,7 +78,11 @@ impl ShimIo {
.open(&stdin)
.await
{
Ok(file) => Some(Box::new(file)),
Ok(file) => {
// Set it to blocking to avoid infinitely handling EAGAIN when the reader is empty
set_flag_with_blocking(file.as_raw_fd());
Some(Box::new(file))
}
Err(err) => {
error!(sl!(), "failed to open {} error {:?}", &stdin, err);
None

View File

@ -133,22 +133,14 @@ impl Process {
let io_name = io_name.to_string();
let logger = self.logger.new(o!("io_name" => io_name));
let _ = tokio::spawn(async move {
loop {
match tokio::io::copy(&mut reader, &mut writer).await {
Err(e) => {
if let Some(error_code) = e.raw_os_error() {
if error_code == libc::EAGAIN {
continue;
}
}
warn!(logger, "run_io_copy: failed to copy stream: {}", e);
}
Ok(length) => {
warn!(logger, "run_io_copy: stop to copy stream length {}", length)
}
};
break;
}
match tokio::io::copy(&mut reader, &mut writer).await {
Err(e) => {
warn!(logger, "run_io_copy: failed to copy stream: {}", e);
}
Ok(length) => {
info!(logger, "run_io_copy: stop to copy stream length {}", length)
}
};
wgw.done();
});