agent: use biased select to avoid data loss

This patch uses a biased select to avoid stdin data loss in case of
CloseStdinRequest.

Fixes: #6714
Signed-off-by: Zixuan Tan <tanzixuan.me@gmail.com>
This commit is contained in:
Zixuan Tan 2023-11-06 19:25:39 +08:00
parent 7874ef5fd2
commit 3eb4bed957

View File

@ -65,7 +65,7 @@ use crate::sync::{read_sync, write_count, write_sync, SYNC_DATA, SYNC_FAILED, SY
use crate::sync_with_async::{read_async, write_async}; use crate::sync_with_async::{read_async, write_async};
use async_trait::async_trait; use async_trait::async_trait;
use rlimit::{setrlimit, Resource, Rlim}; use rlimit::{setrlimit, Resource, Rlim};
use tokio::io::AsyncBufReadExt; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use kata_sys_util::hooks::HookStates; use kata_sys_util::hooks::HookStates;
@ -1003,14 +1003,30 @@ impl BaseContainer for LinuxContainer {
let logger = logger.clone(); let logger = logger.clone();
let term_closer = term_closer.clone(); let term_closer = term_closer.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut buf = [0u8; 8192];
loop {
tokio::select! {
// Make sure stdin_stream is drained before exiting
biased;
res = stdin_stream.read(&mut buf) => {
match res {
Err(_) | Ok(0) => {
debug!(logger, "copy from stdin to term_master end: {:?}", res);
break;
}
Ok(n) => {
if let Err(_) = term_master.write_all(&buf[..n]).await {
break;
}
}
}
}
// As the stdin fifo is opened in RW mode in the shim, which will never // As the stdin fifo is opened in RW mode in the shim, which will never
// read EOF, we close the stdin fifo here when explicit requested. // read EOF, we close the stdin fifo here when explicit requested.
tokio::select! {
res = tokio::io::copy(&mut stdin_stream, &mut term_master) => {
debug!(logger, "copy from stdin to term_master end: {:?}", res);
}
_ = close_stdin_rx.changed() => { _ = close_stdin_rx.changed() => {
debug!(logger, "copy ends as requested"); debug!(logger, "copy ends as requested");
break
}
} }
} }
wgw_input.done(); wgw_input.done();
@ -1049,14 +1065,30 @@ impl BaseContainer for LinuxContainer {
let wgw_input = proc_io.wg_input.worker(); let wgw_input = proc_io.wg_input.worker();
let logger = logger.clone(); let logger = logger.clone();
tokio::spawn(async move { tokio::spawn(async move {
// As the stdin fifo is opened in RW mode in the shim, which will never let mut buf = [0u8; 8192];
// read EOF, we close the stdin stream when containerd explicit requested. loop {
tokio::select! { tokio::select! {
res = tokio::io::copy(&mut stdin_stream, &mut parent_stdin) => { // Make sure stdin_stream is drained before exiting
debug!(logger, "copy from stdin to parent_stdin end: {:?}", res); biased;
res = stdin_stream.read(&mut buf) => {
match res {
Err(_) | Ok(0) => {
debug!(logger, "copy from stdin to term_master end: {:?}", res);
break;
} }
Ok(n) => {
if let Err(_) = parent_stdin.write_all(&buf[..n]).await {
break;
}
}
}
}
// As the stdin fifo is opened in RW mode in the shim, which will never
// read EOF, we close the stdin fifo here when explicit requested.
_ = close_stdin_rx.changed() => { _ = close_stdin_rx.changed() => {
debug!(logger, "copy ends as requested"); debug!(logger, "copy ends as requested");
break
}
} }
} }
wgw_input.done(); wgw_input.done();