Merge pull request #9534 from Tim-Zhang/fix-stdin-stuck

Fix ctr exec stuck problem
This commit is contained in:
Fupan Li
2024-07-15 13:19:19 +08:00
committed by GitHub
7 changed files with 354 additions and 60 deletions

View File

@@ -168,6 +168,37 @@ jobs:
- name: Run runk tests
timeout-minutes: 10
run: bash tests/integration/runk/gha-run.sh run
run-stdio:
runs-on: garm-ubuntu-2204-smaller
env:
CONTAINERD_VERSION: lts
steps:
- uses: actions/checkout@v4
with:
ref: ${{ inputs.commit-hash }}
fetch-depth: 0
- name: Rebase atop of the latest target branch
run: |
./tests/git-helper.sh "rebase-atop-of-the-latest-target-branch"
env:
TARGET_BRANCH: ${{ inputs.target-branch }}
- name: Install dependencies
run: bash tests/integration/stdio/gha-run.sh install-dependencies
- name: get-kata-tarball
uses: actions/download-artifact@v4
with:
name: kata-static-tarball-amd64${{ inputs.tarball-suffix }}
path: kata-artifacts
- name: Install kata
run: bash tests/integration/stdio/gha-run.sh install-kata kata-artifacts
- name: Run stdio tests
timeout-minutes: 10
run: bash tests/integration/stdio/gha-run.sh
run-tracing:
strategy:

View File

@@ -200,15 +200,8 @@ impl Process {
}
pub async fn close_stdin(&mut self) {
// stdin will be closed automatically in passfd-io senario
if self.proc_io.is_some() {
return;
}
close_process_stream!(self, term_master, TermMaster);
close_process_stream!(self, parent_stdin, ParentStdin);
self.notify_term_close();
}
pub fn cleanup_process_stream(&mut self) {

View File

@@ -598,25 +598,32 @@ impl AgentService {
let cid = req.container_id;
let eid = req.exec_id;
let writer = {
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
// use ptmx io
if p.term_master.is_some() {
p.get_writer(StreamType::TermMaster)
} else {
// use piped io
p.get_writer(StreamType::ParentStdin)
}
};
let writer = writer.ok_or_else(|| anyhow!(ERR_CANNOT_GET_WRITER))?;
writer.lock().await.write_all(req.data.as_slice()).await?;
let mut resp = WriteStreamResponse::new();
resp.set_len(req.data.len() as u32);
// EOF of stdin
if req.data.is_empty() {
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
p.close_stdin().await;
} else {
let writer = {
let mut sandbox = self.sandbox.lock().await;
let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?;
// use ptmx io
if p.term_master.is_some() {
p.get_writer(StreamType::TermMaster)
} else {
// use piped io
p.get_writer(StreamType::ParentStdin)
}
};
let writer = writer.ok_or_else(|| anyhow!(ERR_CANNOT_GET_WRITER))?;
writer.lock().await.write_all(req.data.as_slice()).await?;
}
Ok(resp)
}
@@ -659,6 +666,7 @@ impl AgentService {
biased;
v = read_stream(&reader, req.len as usize) => {
let vector = v?;
let mut resp = ReadStreamResponse::new();
resp.set_data(vector);
@@ -859,6 +867,9 @@ impl agent_ttrpc::AgentService for AgentService {
ctx: &TtrpcContext,
req: protocols::agent::CloseStdinRequest,
) -> ttrpc::Result<Empty> {
// The stdin will be closed when EOF is got in rpc `write_stdin`[runtime-rs]
// so this rpc will not be called anymore by runtime-rs.
trace_rpc_call!(ctx, "close_stdin", req);
is_allowed(&req).await?;

View File

@@ -44,6 +44,8 @@ struct ContainerIoWrite<'inner> {
pub info: Arc<ContainerIoInfo>,
write_future:
Option<Pin<Box<dyn Future<Output = Result<agent::WriteStreamResponse>> + Send + 'inner>>>,
shutdown_future:
Option<Pin<Box<dyn Future<Output = Result<agent::WriteStreamResponse>> + Send + 'inner>>>,
}
impl<'inner> ContainerIoWrite<'inner> {
@@ -51,6 +53,7 @@ impl<'inner> ContainerIoWrite<'inner> {
Self {
info,
write_future: Default::default(),
shutdown_future: Default::default(),
}
}
@@ -80,6 +83,30 @@ impl<'inner> ContainerIoWrite<'inner> {
}
}
}
// Call rpc agent.write_stdin() with empty data to tell agent to close stdin of the process
fn poll_shutdown_inner(&'inner mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut shutdown_future = self.shutdown_future.take();
if shutdown_future.is_none() {
let req = agent::WriteStreamRequest {
process_id: self.info.process.clone().into(),
data: Vec::with_capacity(0),
};
shutdown_future = Some(Box::pin(self.info.agent.write_stdin(req)));
}
let mut shutdown_future = shutdown_future.unwrap();
match shutdown_future.as_mut().poll(cx) {
Poll::Ready(v) => match v {
Ok(_) => Poll::Ready(Ok(())),
Err(err) => Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err))),
},
Poll::Pending => {
self.shutdown_future = Some(shutdown_future);
Poll::Pending
}
}
}
}
impl<'inner> AsyncWrite for ContainerIoWrite<'inner> {
@@ -100,8 +127,13 @@ impl<'inner> AsyncWrite for ContainerIoWrite<'inner> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let me = unsafe {
std::mem::transmute::<&mut ContainerIoWrite<'_>, &mut ContainerIoWrite<'inner>>(
&mut *self,
)
};
me.poll_shutdown_inner(cx)
}
}

View File

@@ -11,7 +11,7 @@ use agent::Agent;
use anyhow::{Context, Result};
use awaitgroup::{WaitGroup, Worker as WaitGroupWorker};
use common::types::{ContainerProcess, ProcessExitStatus, ProcessStateInfo, ProcessStatus, PID};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::sync::{watch, RwLock};
use super::container::Container;
@@ -23,6 +23,13 @@ pub type ProcessWatcher = (
Arc<RwLock<ProcessExitStatus>>,
);
#[derive(Debug, PartialEq)]
enum StdIoType {
Stdin,
Stdout,
Stderr,
}
#[derive(Debug)]
pub struct Process {
pub process: ContainerProcess,
@@ -62,10 +69,6 @@ pub struct Process {
pub exit_status: Arc<RwLock<ProcessExitStatus>>,
pub exit_watcher_rx: Option<watch::Receiver<bool>>,
pub exit_watcher_tx: Option<watch::Sender<bool>>,
// used to sync between stdin io copy thread(tokio) and the close it call.
// close io call should wait until the stdin io copy finished to
// prevent stdin data lost.
pub wg_stdin: WaitGroup,
// io streams using vsock fd passthrough feature
pub passfd_io: Option<PassfdIo>,
@@ -119,7 +122,6 @@ impl Process {
exit_status: Arc::new(RwLock::new(ProcessExitStatus::new())),
exit_watcher_rx: Some(receiver),
exit_watcher_tx: Some(sender),
wg_stdin: WaitGroup::new(),
passfd_io: None,
}
}
@@ -246,9 +248,8 @@ impl Process {
self.post_fifos_open()?;
// start io copy for stdin
let wgw_stdin = self.wg_stdin.worker();
if let Some(stdin) = shim_io.stdin {
self.run_io_copy("stdin", wgw_stdin, stdin, container_io.stdin)
self.run_io_copy(StdIoType::Stdin, None, stdin, container_io.stdin)
.await?;
}
@@ -258,14 +259,19 @@ impl Process {
// start io copy for stdout
if let Some(stdout) = shim_io.stdout {
self.run_io_copy("stdout", wgw.clone(), container_io.stdout, stdout)
.await?;
self.run_io_copy(
StdIoType::Stdout,
Some(wgw.clone()),
container_io.stdout,
stdout,
)
.await?;
}
// start io copy for stderr
if !self.terminal {
if let Some(stderr) = shim_io.stderr {
self.run_io_copy("stderr", wgw, container_io.stderr, stderr)
self.run_io_copy(StdIoType::Stderr, Some(wgw), container_io.stderr, stderr)
.await?;
}
}
@@ -276,27 +282,51 @@ impl Process {
Ok(())
}
async fn run_io_copy<'a>(
&'a self,
io_name: &'a str,
wgw: WaitGroupWorker,
async fn run_io_copy(
&self,
io_type: StdIoType,
wgw: Option<WaitGroupWorker>,
mut reader: Box<dyn AsyncRead + Send + Unpin>,
mut writer: Box<dyn AsyncWrite + Send + Unpin>,
) -> Result<()> {
info!(self.logger, "run io copy for {}", io_name);
let io_name = io_name.to_string();
let logger = self.logger.new(o!("io_name" => io_name));
let io_name = format!("{:?}", io_type);
info!(self.logger, "run_io_copy[{}] starts", io_name);
let logger = self.logger.new(o!("io_name" => io_name.clone()));
tokio::spawn(async move {
match tokio::io::copy(&mut reader, &mut writer).await {
Err(e) => {
warn!(logger, "run_io_copy: failed to copy stream: {}", e);
warn!(
logger,
"run_io_copy[{}]: failed to copy stream: {}", io_name, e
);
}
Ok(length) => {
info!(logger, "run_io_copy: stop to copy stream length {}", length)
info!(
logger,
"run_io_copy[{}]: stop to copy stream length {}", io_name, length
);
// Send EOF to agent by calling rpc write_stdin with 0 length data
if io_type == StdIoType::Stdin {
writer
.shutdown()
.await
.map_err(|e| {
error!(
logger,
"run_io_copy[{}]: failed to shutdown: {:?}", io_name, e
);
e
})
.ok();
}
}
};
wgw.done();
if let Some(w) = wgw {
w.done()
}
});
Ok(())
@@ -400,24 +430,13 @@ impl Process {
}
/// Close the stdin of the process in container.
pub async fn close_io(&mut self, agent: Arc<dyn Agent>) {
pub async fn close_io(&mut self, _agent: Arc<dyn Agent>) {
// Close the stdin writer keeper so that
// the end signal could be received in the read side
self.stdin_w.take();
// In passfd io mode, the stdin close and sync logic is handled
// in the agent side.
if self.passfd_io.is_none() {
self.wg_stdin.wait().await;
}
let req = agent::CloseStdinRequest {
process_id: self.process.clone().into(),
};
if let Err(e) = agent.close_stdin(req).await {
warn!(self.logger, "failed close process io: {:?}", e);
}
// The stdin will be closed when EOF is got in rpc `read_stdout` of agent
// so we will not call agent.close_stdin anymore.
}
pub async fn get_status(&self) -> ProcessStatus {

View File

@@ -0,0 +1,63 @@
#!/bin/bash
#
# Copyright (c) 2023 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
#
set -o errexit
set -o nounset
set -o pipefail
kata_tarball_dir="${2:-kata-artifacts}"
runk_dir="$(dirname "$(readlink -f "$0")")"
source "${runk_dir}/../../common.bash"
source "${runk_dir}/../../gha-run-k8s-common.sh"
function install_dependencies() {
info "Installing the dependencies needed for running the stdio tests"
# Dependency list of projects that we can rely on the system packages
# - jq
declare -a system_deps=(
jq
)
sudo apt-get update
sudo apt-get -y install "${system_deps[@]}"
ensure_yq
# Dependency list of projects that we can install them
# directly from their releases on GitHub:
# - containerd
# - cri-container-cni release tarball already includes CNI plugins
declare -a github_deps
github_deps[0]="cri_containerd:$(get_from_kata_deps "externals.containerd.${CONTAINERD_VERSION}")"
for github_dep in "${github_deps[@]}"; do
IFS=":" read -r -a dep <<< "${github_dep}"
install_${dep[0]} "${dep[1]}"
done
# Requires bats to run the tests
install_bats
}
function run() {
info "Running stdio tests"
bats "${runk_dir}/stdio-tests.bats"
}
function main() {
action="${1:-}"
case "${action}" in
install-dependencies) install_dependencies ;;
install-kata) install_kata ;;
run) run ;;
*) >&2 die "Invalid argument" ;;
esac
}
main "$@"

View File

@@ -0,0 +1,145 @@
#!/bin/bash
#
# Copyright (c) 2024 Kata Contributors
#
# SPDX-License-Identifier: Apache-2.0
#
# This test will validate stdio with containerd
source "../../common.bash"
source "../../metrics/lib/common.bash"
export TEST_IMAGE="docker.io/library/busybox:latest"
export CONTAINER_ID="hello"
export LOG_FILE="/tmp/stdio-tests-log-file"
export TEST_RUNTIME="io.containerd.run.kata.v2"
export LARGE_FILE_SIZE=1000000000
echo "pull container image"
check_images ${TEST_IMAGE}
teardown() {
echo "delete the container"
if sudo ctr t list -q | grep -q "${CONTAINER_ID}"; then
stop_container
fi
if sudo ctr c list -q | grep -q "${CONTAINER_ID}"; then
sudo ctr c rm "${CONTAINER_ID}"
fi
}
stop_container() {
local cmd
sudo ctr t kill --signal SIGKILL --all ${CONTAINER_ID}
# poll for a while until the task receives signal and exit
cmd='[ "STOPPED" == "$(sudo ctr t ls | grep ${CONTAINER_ID} | awk "{print \$3}")" ]'
waitForProcess 10 1 "${cmd}"
echo "check the container is stopped"
# there is only title line of ps command
[ "1" == "$(sudo ctr t ps ${CONTAINER_ID} | wc -l)" ]
}
assert_eq() {
local actual="$1"
local expected="$2"
if [ "$expected" != "$actual" ]; then
echo "Assertion failed: Expected '$expected', but got '$actual'"
exit -1
fi
}
echo "1. Start a container (using terminal)"
unbuffer sudo ctr run --runtime $TEST_RUNTIME --rm -t ${TEST_IMAGE} ${CONTAINER_ID} whoami> $LOG_FILE
output=$(cat ${LOG_FILE}| tr -d '[:space:]')
assert_eq $output "root"
/usr/bin/expect <<-EOF
set timeout 5
spawn sudo ctr run --runtime $TEST_RUNTIME --rm -t ${TEST_IMAGE} ${CONTAINER_ID} sh
expect "#"
send "id\r"
expect {
"uid=0(root) gid=0(root) groups=0(root),10(wheel)" { send_user "Ok\n" }
timeout { send_user "Failed\n"; exit 1 }
}
send "exit\r"
EOF
teardown
echo "2. Start a container (not using terminal)"
output=$(sudo ctr run --runtime $TEST_RUNTIME --rm ${TEST_IMAGE} ${CONTAINER_ID} whoami)
assert_eq $output root
/usr/bin/expect <<-EOF
set timeout 5
spawn sudo ctr run --runtime $TEST_RUNTIME --rm ${TEST_IMAGE} ${CONTAINER_ID} sh
send "whoami\r"
expect {
"root" { send_user "Ok\n" }
timeout { send_user "Failed\n"; exit 1 }
}
send "exit\r"
EOF
teardown
echo "3. Start a detached container (using terminal)"
sudo ctr run --runtime $TEST_RUNTIME -d -t ${TEST_IMAGE} ${CONTAINER_ID}
read CID IMAGE RUNTIME <<< $(sudo ctr c ls | grep ${CONTAINER_ID})
assert_eq $CID $CONTAINER_ID
assert_eq $IMAGE $TEST_IMAGE
assert_eq $RUNTIME "io.containerd.run.kata.v2"
teardown
echo "4. Execute command (using terminal) in an existing container"
sudo ctr run --runtime $TEST_RUNTIME -d ${TEST_IMAGE} ${CONTAINER_ID}
unbuffer sudo ctr t exec -t --exec-id foobar ${CONTAINER_ID} whoami>$LOG_FILE
output=$(cat ${LOG_FILE}|head -n 1|tr -d '[:space:]')
echo $output
assert_eq $output "root"
teardown
echo "5. Execute command (not using terminal) in an existing container"
sudo ctr run --runtime $TEST_RUNTIME -d ${TEST_IMAGE} ${CONTAINER_ID}
output=$(sudo ctr t exec --exec-id foobar ${CONTAINER_ID} whoami)
assert_eq $output "root"
teardown
echo "6. Execute command (not using terminal, pipe stdin) in an existing container"
sudo ctr run --runtime $TEST_RUNTIME -d ${TEST_IMAGE} ${CONTAINER_ID}
# Word count
read F1 F2 F3 <<< $(printf "aaa\nbbb\nccc\n" | sudo ctr t exec --exec-id foobar ${CONTAINER_ID} wc)
assert_eq $F1 3
assert_eq $F2 3
assert_eq $F3 12
# Large file count
head -c $LARGE_FILE_SIZE /dev/random > /tmp/input
output=$(cat /tmp/input | wc -c|tr -d '[:space:]')
assert_eq $output $LARGE_FILE_SIZE
output=$(cat /tmp/input | sudo ctr t exec --exec-id foobar ${CONTAINER_ID} wc -c)
assert_eq $output $LARGE_FILE_SIZE
output=$(cat /tmp/input | sudo ctr t exec --exec-id foobar ${CONTAINER_ID} cat | wc -c)
assert_eq $output $LARGE_FILE_SIZE
# Large file copy
cat /tmp/input | sudo ctr t exec --exec-id foobar ${CONTAINER_ID} cat > /tmp/output
diff -q /tmp/input /tmp/output
teardown