diff --git a/.github/workflows/basic-ci-amd64.yaml b/.github/workflows/basic-ci-amd64.yaml index e26467c9a8..935756fb1e 100644 --- a/.github/workflows/basic-ci-amd64.yaml +++ b/.github/workflows/basic-ci-amd64.yaml @@ -168,6 +168,37 @@ jobs: - name: Run runk tests timeout-minutes: 10 run: bash tests/integration/runk/gha-run.sh run + run-stdio: + runs-on: garm-ubuntu-2204-smaller + env: + CONTAINERD_VERSION: lts + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ inputs.commit-hash }} + fetch-depth: 0 + + - name: Rebase atop of the latest target branch + run: | + ./tests/git-helper.sh "rebase-atop-of-the-latest-target-branch" + env: + TARGET_BRANCH: ${{ inputs.target-branch }} + + - name: Install dependencies + run: bash tests/integration/stdio/gha-run.sh install-dependencies + + - name: get-kata-tarball + uses: actions/download-artifact@v4 + with: + name: kata-static-tarball-amd64${{ inputs.tarball-suffix }} + path: kata-artifacts + + - name: Install kata + run: bash tests/integration/stdio/gha-run.sh install-kata kata-artifacts + + - name: Run stdio tests + timeout-minutes: 10 + run: bash tests/integration/stdio/gha-run.sh run-tracing: strategy: diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index 853123cbcf..4b24a7fee6 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -200,15 +200,8 @@ impl Process { } pub async fn close_stdin(&mut self) { - // stdin will be closed automatically in passfd-io senario - if self.proc_io.is_some() { - return; - } - close_process_stream!(self, term_master, TermMaster); close_process_stream!(self, parent_stdin, ParentStdin); - - self.notify_term_close(); } pub fn cleanup_process_stream(&mut self) { diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 5c60f943a5..f7244dc89f 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -598,25 +598,32 @@ impl AgentService { let cid = req.container_id; let eid = req.exec_id; - let writer = { - let mut sandbox = self.sandbox.lock().await; - let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; - - // use ptmx io - if p.term_master.is_some() { - p.get_writer(StreamType::TermMaster) - } else { - // use piped io - p.get_writer(StreamType::ParentStdin) - } - }; - - let writer = writer.ok_or_else(|| anyhow!(ERR_CANNOT_GET_WRITER))?; - writer.lock().await.write_all(req.data.as_slice()).await?; - let mut resp = WriteStreamResponse::new(); resp.set_len(req.data.len() as u32); + // EOF of stdin + if req.data.is_empty() { + let mut sandbox = self.sandbox.lock().await; + let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; + p.close_stdin().await; + } else { + let writer = { + let mut sandbox = self.sandbox.lock().await; + let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; + + // use ptmx io + if p.term_master.is_some() { + p.get_writer(StreamType::TermMaster) + } else { + // use piped io + p.get_writer(StreamType::ParentStdin) + } + }; + + let writer = writer.ok_or_else(|| anyhow!(ERR_CANNOT_GET_WRITER))?; + writer.lock().await.write_all(req.data.as_slice()).await?; + } + Ok(resp) } @@ -659,6 +666,7 @@ impl AgentService { biased; v = read_stream(&reader, req.len as usize) => { let vector = v?; + let mut resp = ReadStreamResponse::new(); resp.set_data(vector); @@ -859,6 +867,9 @@ impl agent_ttrpc::AgentService for AgentService { ctx: &TtrpcContext, req: protocols::agent::CloseStdinRequest, ) -> ttrpc::Result { + // The stdin will be closed when EOF is got in rpc `write_stdin`[runtime-rs] + // so this rpc will not be called anymore by runtime-rs. + trace_rpc_call!(ctx, "close_stdin", req); is_allowed(&req).await?; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/container_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/container_io.rs index c211e8bca4..2c243dece7 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/container_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/container_io.rs @@ -44,6 +44,8 @@ struct ContainerIoWrite<'inner> { pub info: Arc, write_future: Option> + Send + 'inner>>>, + shutdown_future: + Option> + Send + 'inner>>>, } impl<'inner> ContainerIoWrite<'inner> { @@ -51,6 +53,7 @@ impl<'inner> ContainerIoWrite<'inner> { Self { info, write_future: Default::default(), + shutdown_future: Default::default(), } } @@ -80,6 +83,30 @@ impl<'inner> ContainerIoWrite<'inner> { } } } + + // Call rpc agent.write_stdin() with empty data to tell agent to close stdin of the process + fn poll_shutdown_inner(&'inner mut self, cx: &mut Context<'_>) -> Poll> { + let mut shutdown_future = self.shutdown_future.take(); + if shutdown_future.is_none() { + let req = agent::WriteStreamRequest { + process_id: self.info.process.clone().into(), + data: Vec::with_capacity(0), + }; + shutdown_future = Some(Box::pin(self.info.agent.write_stdin(req))); + } + + let mut shutdown_future = shutdown_future.unwrap(); + match shutdown_future.as_mut().poll(cx) { + Poll::Ready(v) => match v { + Ok(_) => Poll::Ready(Ok(())), + Err(err) => Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err))), + }, + Poll::Pending => { + self.shutdown_future = Some(shutdown_future); + Poll::Pending + } + } + } } impl<'inner> AsyncWrite for ContainerIoWrite<'inner> { @@ -100,8 +127,13 @@ impl<'inner> AsyncWrite for ContainerIoWrite<'inner> { Poll::Ready(Ok(())) } - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let me = unsafe { + std::mem::transmute::<&mut ContainerIoWrite<'_>, &mut ContainerIoWrite<'inner>>( + &mut *self, + ) + }; + me.poll_shutdown_inner(cx) } } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs index 34e9a9a60b..17b2fde464 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs @@ -11,7 +11,7 @@ use agent::Agent; use anyhow::{Context, Result}; use awaitgroup::{WaitGroup, Worker as WaitGroupWorker}; use common::types::{ContainerProcess, ProcessExitStatus, ProcessStateInfo, ProcessStatus, PID}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::sync::{watch, RwLock}; use super::container::Container; @@ -23,6 +23,13 @@ pub type ProcessWatcher = ( Arc>, ); +#[derive(Debug, PartialEq)] +enum StdIoType { + Stdin, + Stdout, + Stderr, +} + #[derive(Debug)] pub struct Process { pub process: ContainerProcess, @@ -62,10 +69,6 @@ pub struct Process { pub exit_status: Arc>, pub exit_watcher_rx: Option>, pub exit_watcher_tx: Option>, - // used to sync between stdin io copy thread(tokio) and the close it call. - // close io call should wait until the stdin io copy finished to - // prevent stdin data lost. - pub wg_stdin: WaitGroup, // io streams using vsock fd passthrough feature pub passfd_io: Option, @@ -119,7 +122,6 @@ impl Process { exit_status: Arc::new(RwLock::new(ProcessExitStatus::new())), exit_watcher_rx: Some(receiver), exit_watcher_tx: Some(sender), - wg_stdin: WaitGroup::new(), passfd_io: None, } } @@ -246,9 +248,8 @@ impl Process { self.post_fifos_open()?; // start io copy for stdin - let wgw_stdin = self.wg_stdin.worker(); if let Some(stdin) = shim_io.stdin { - self.run_io_copy("stdin", wgw_stdin, stdin, container_io.stdin) + self.run_io_copy(StdIoType::Stdin, None, stdin, container_io.stdin) .await?; } @@ -258,14 +259,19 @@ impl Process { // start io copy for stdout if let Some(stdout) = shim_io.stdout { - self.run_io_copy("stdout", wgw.clone(), container_io.stdout, stdout) - .await?; + self.run_io_copy( + StdIoType::Stdout, + Some(wgw.clone()), + container_io.stdout, + stdout, + ) + .await?; } // start io copy for stderr if !self.terminal { if let Some(stderr) = shim_io.stderr { - self.run_io_copy("stderr", wgw, container_io.stderr, stderr) + self.run_io_copy(StdIoType::Stderr, Some(wgw), container_io.stderr, stderr) .await?; } } @@ -276,27 +282,51 @@ impl Process { Ok(()) } - async fn run_io_copy<'a>( - &'a self, - io_name: &'a str, - wgw: WaitGroupWorker, + async fn run_io_copy( + &self, + io_type: StdIoType, + wgw: Option, mut reader: Box, mut writer: Box, ) -> Result<()> { - info!(self.logger, "run io copy for {}", io_name); - let io_name = io_name.to_string(); - let logger = self.logger.new(o!("io_name" => io_name)); + let io_name = format!("{:?}", io_type); + + info!(self.logger, "run_io_copy[{}] starts", io_name); + let logger = self.logger.new(o!("io_name" => io_name.clone())); + tokio::spawn(async move { match tokio::io::copy(&mut reader, &mut writer).await { Err(e) => { - warn!(logger, "run_io_copy: failed to copy stream: {}", e); + warn!( + logger, + "run_io_copy[{}]: failed to copy stream: {}", io_name, e + ); } Ok(length) => { - info!(logger, "run_io_copy: stop to copy stream length {}", length) + info!( + logger, + "run_io_copy[{}]: stop to copy stream length {}", io_name, length + ); + // Send EOF to agent by calling rpc write_stdin with 0 length data + if io_type == StdIoType::Stdin { + writer + .shutdown() + .await + .map_err(|e| { + error!( + logger, + "run_io_copy[{}]: failed to shutdown: {:?}", io_name, e + ); + e + }) + .ok(); + } } }; - wgw.done(); + if let Some(w) = wgw { + w.done() + } }); Ok(()) @@ -400,24 +430,13 @@ impl Process { } /// Close the stdin of the process in container. - pub async fn close_io(&mut self, agent: Arc) { + pub async fn close_io(&mut self, _agent: Arc) { // Close the stdin writer keeper so that // the end signal could be received in the read side self.stdin_w.take(); - // In passfd io mode, the stdin close and sync logic is handled - // in the agent side. - if self.passfd_io.is_none() { - self.wg_stdin.wait().await; - } - - let req = agent::CloseStdinRequest { - process_id: self.process.clone().into(), - }; - - if let Err(e) = agent.close_stdin(req).await { - warn!(self.logger, "failed close process io: {:?}", e); - } + // The stdin will be closed when EOF is got in rpc `read_stdout` of agent + // so we will not call agent.close_stdin anymore. } pub async fn get_status(&self) -> ProcessStatus { diff --git a/tests/integration/stdio/gha-run.sh b/tests/integration/stdio/gha-run.sh new file mode 100755 index 0000000000..5db12bb24c --- /dev/null +++ b/tests/integration/stdio/gha-run.sh @@ -0,0 +1,63 @@ +#!/bin/bash +# +# Copyright (c) 2023 Intel Corporation +# +# SPDX-License-Identifier: Apache-2.0 +# + +set -o errexit +set -o nounset +set -o pipefail + +kata_tarball_dir="${2:-kata-artifacts}" +runk_dir="$(dirname "$(readlink -f "$0")")" +source "${runk_dir}/../../common.bash" +source "${runk_dir}/../../gha-run-k8s-common.sh" + +function install_dependencies() { + info "Installing the dependencies needed for running the stdio tests" + + # Dependency list of projects that we can rely on the system packages + # - jq + declare -a system_deps=( + jq + ) + + sudo apt-get update + sudo apt-get -y install "${system_deps[@]}" + + ensure_yq + + # Dependency list of projects that we can install them + # directly from their releases on GitHub: + # - containerd + # - cri-container-cni release tarball already includes CNI plugins + declare -a github_deps + github_deps[0]="cri_containerd:$(get_from_kata_deps "externals.containerd.${CONTAINERD_VERSION}")" + + for github_dep in "${github_deps[@]}"; do + IFS=":" read -r -a dep <<< "${github_dep}" + install_${dep[0]} "${dep[1]}" + done + + # Requires bats to run the tests + install_bats +} + +function run() { + info "Running stdio tests" + + bats "${runk_dir}/stdio-tests.bats" +} + +function main() { + action="${1:-}" + case "${action}" in + install-dependencies) install_dependencies ;; + install-kata) install_kata ;; + run) run ;; + *) >&2 die "Invalid argument" ;; + esac +} + +main "$@" diff --git a/tests/integration/stdio/stdio-tests.sh b/tests/integration/stdio/stdio-tests.sh new file mode 100755 index 0000000000..5d5b7c6a12 --- /dev/null +++ b/tests/integration/stdio/stdio-tests.sh @@ -0,0 +1,145 @@ +#!/bin/bash +# +# Copyright (c) 2024 Kata Contributors +# +# SPDX-License-Identifier: Apache-2.0 +# +# This test will validate stdio with containerd + +source "../../common.bash" +source "../../metrics/lib/common.bash" + +export TEST_IMAGE="docker.io/library/busybox:latest" +export CONTAINER_ID="hello" +export LOG_FILE="/tmp/stdio-tests-log-file" +export TEST_RUNTIME="io.containerd.run.kata.v2" +export LARGE_FILE_SIZE=1000000000 + +echo "pull container image" +check_images ${TEST_IMAGE} + +teardown() { + echo "delete the container" + if sudo ctr t list -q | grep -q "${CONTAINER_ID}"; then + stop_container + fi + + if sudo ctr c list -q | grep -q "${CONTAINER_ID}"; then + sudo ctr c rm "${CONTAINER_ID}" + fi +} + +stop_container() { + local cmd + sudo ctr t kill --signal SIGKILL --all ${CONTAINER_ID} + # poll for a while until the task receives signal and exit + cmd='[ "STOPPED" == "$(sudo ctr t ls | grep ${CONTAINER_ID} | awk "{print \$3}")" ]' + waitForProcess 10 1 "${cmd}" + + echo "check the container is stopped" + # there is only title line of ps command + [ "1" == "$(sudo ctr t ps ${CONTAINER_ID} | wc -l)" ] +} + +assert_eq() { + local actual="$1" + local expected="$2" + + if [ "$expected" != "$actual" ]; then + echo "Assertion failed: Expected '$expected', but got '$actual'" + exit -1 + fi +} + +echo "1. Start a container (using terminal)" +unbuffer sudo ctr run --runtime $TEST_RUNTIME --rm -t ${TEST_IMAGE} ${CONTAINER_ID} whoami> $LOG_FILE +output=$(cat ${LOG_FILE}| tr -d '[:space:]') +assert_eq $output "root" + +/usr/bin/expect <<-EOF +set timeout 5 +spawn sudo ctr run --runtime $TEST_RUNTIME --rm -t ${TEST_IMAGE} ${CONTAINER_ID} sh + +expect "#" +send "id\r" + +expect { + "uid=0(root) gid=0(root) groups=0(root),10(wheel)" { send_user "Ok\n" } + timeout { send_user "Failed\n"; exit 1 } +} + +send "exit\r" +EOF +teardown + +echo "2. Start a container (not using terminal)" +output=$(sudo ctr run --runtime $TEST_RUNTIME --rm ${TEST_IMAGE} ${CONTAINER_ID} whoami) +assert_eq $output root + +/usr/bin/expect <<-EOF +set timeout 5 +spawn sudo ctr run --runtime $TEST_RUNTIME --rm ${TEST_IMAGE} ${CONTAINER_ID} sh + +send "whoami\r" + +expect { + "root" { send_user "Ok\n" } + timeout { send_user "Failed\n"; exit 1 } +} + +send "exit\r" + +EOF + +teardown + +echo "3. Start a detached container (using terminal)" +sudo ctr run --runtime $TEST_RUNTIME -d -t ${TEST_IMAGE} ${CONTAINER_ID} +read CID IMAGE RUNTIME <<< $(sudo ctr c ls | grep ${CONTAINER_ID}) + +assert_eq $CID $CONTAINER_ID +assert_eq $IMAGE $TEST_IMAGE +assert_eq $RUNTIME "io.containerd.run.kata.v2" + +teardown + +echo "4. Execute command (using terminal) in an existing container" +sudo ctr run --runtime $TEST_RUNTIME -d ${TEST_IMAGE} ${CONTAINER_ID} + +unbuffer sudo ctr t exec -t --exec-id foobar ${CONTAINER_ID} whoami>$LOG_FILE +output=$(cat ${LOG_FILE}|head -n 1|tr -d '[:space:]') +echo $output +assert_eq $output "root" + +teardown + +echo "5. Execute command (not using terminal) in an existing container" +sudo ctr run --runtime $TEST_RUNTIME -d ${TEST_IMAGE} ${CONTAINER_ID} +output=$(sudo ctr t exec --exec-id foobar ${CONTAINER_ID} whoami) +assert_eq $output "root" + +teardown + +echo "6. Execute command (not using terminal, pipe stdin) in an existing container" +sudo ctr run --runtime $TEST_RUNTIME -d ${TEST_IMAGE} ${CONTAINER_ID} +# Word count +read F1 F2 F3 <<< $(printf "aaa\nbbb\nccc\n" | sudo ctr t exec --exec-id foobar ${CONTAINER_ID} wc) +assert_eq $F1 3 +assert_eq $F2 3 +assert_eq $F3 12 + +# Large file count +head -c $LARGE_FILE_SIZE /dev/random > /tmp/input +output=$(cat /tmp/input | wc -c|tr -d '[:space:]') +assert_eq $output $LARGE_FILE_SIZE + +output=$(cat /tmp/input | sudo ctr t exec --exec-id foobar ${CONTAINER_ID} wc -c) +assert_eq $output $LARGE_FILE_SIZE + +output=$(cat /tmp/input | sudo ctr t exec --exec-id foobar ${CONTAINER_ID} cat | wc -c) +assert_eq $output $LARGE_FILE_SIZE +# Large file copy +cat /tmp/input | sudo ctr t exec --exec-id foobar ${CONTAINER_ID} cat > /tmp/output +diff -q /tmp/input /tmp/output + +teardown