runtime-rs: fix high cpu

Fixed the issue when using nonblocking, the `tokio::io::copy()` needing
to handle EAGAIN, resulting in high CPU usage.

Fixes: #5740
Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
This commit is contained in:
Quanwei Zhou
2022-12-04 00:45:14 +08:00
committed by quanwei.zqw
parent 8246de821f
commit 0019d653d6
2 changed files with 31 additions and 18 deletions

View File

@@ -6,7 +6,11 @@
use std::{ use std::{
io, io,
os::unix::{io::FromRawFd, net::UnixStream as StdUnixStream}, os::unix::{
io::{FromRawFd, RawFd},
net::UnixStream as StdUnixStream,
prelude::AsRawFd,
},
pin::Pin, pin::Pin,
task::Context as TaskContext, task::Context as TaskContext,
task::Poll, task::Poll,
@@ -52,8 +56,21 @@ impl ShimIo {
"new shim io stdin {:?} stdout {:?} stderr {:?}", stdin, stdout, stderr "new shim io stdin {:?} stdout {:?} stderr {:?}", stdin, stdout, stderr
); );
let set_flag_with_blocking = |fd: RawFd| {
let flag = unsafe { libc::fcntl(fd, libc::F_GETFL) };
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flag & !libc::O_NONBLOCK) };
if ret < 0 {
error!(sl!(), "failed to set fcntl for fd {} error {}", fd, ret);
}
};
let stdin_fd: Option<Box<dyn AsyncRead + Send + Unpin>> = if let Some(stdin) = stdin { let stdin_fd: Option<Box<dyn AsyncRead + Send + Unpin>> = if let Some(stdin) = stdin {
info!(sl!(), "open stdin {:?}", &stdin); info!(sl!(), "open stdin {:?}", &stdin);
// Since the stdin peer point (which is hold by containerd) could not be openned
// immediately, which would block here's open with block mode, and we wouldn't want to
// block here, thus here opened with nonblock and then reset it to block mode for
// tokio async io.
match OpenOptions::new() match OpenOptions::new()
.read(true) .read(true)
.write(false) .write(false)
@@ -61,7 +78,11 @@ impl ShimIo {
.open(&stdin) .open(&stdin)
.await .await
{ {
Ok(file) => Some(Box::new(file)), Ok(file) => {
// Set it to blocking to avoid infinitely handling EAGAIN when the reader is empty
set_flag_with_blocking(file.as_raw_fd());
Some(Box::new(file))
}
Err(err) => { Err(err) => {
error!(sl!(), "failed to open {} error {:?}", &stdin, err); error!(sl!(), "failed to open {} error {:?}", &stdin, err);
None None

View File

@@ -133,22 +133,14 @@ impl Process {
let io_name = io_name.to_string(); let io_name = io_name.to_string();
let logger = self.logger.new(o!("io_name" => io_name)); let logger = self.logger.new(o!("io_name" => io_name));
let _ = tokio::spawn(async move { let _ = tokio::spawn(async move {
loop {
match tokio::io::copy(&mut reader, &mut writer).await { match tokio::io::copy(&mut reader, &mut writer).await {
Err(e) => { Err(e) => {
if let Some(error_code) = e.raw_os_error() {
if error_code == libc::EAGAIN {
continue;
}
}
warn!(logger, "run_io_copy: failed to copy stream: {}", e); warn!(logger, "run_io_copy: failed to copy stream: {}", e);
} }
Ok(length) => { Ok(length) => {
warn!(logger, "run_io_copy: stop to copy stream length {}", length) info!(logger, "run_io_copy: stop to copy stream length {}", length)
} }
}; };
break;
}
wgw.done(); wgw.done();
}); });