diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 8b60b429c..ffe7de403 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -220,6 +220,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "awaitgroup" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc17ab023b4091c10ff099f9deebaeeb59b5189df07e554c4fef042b70745d68" + [[package]] name = "base64" version = "0.13.0" @@ -2490,6 +2496,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "awaitgroup", "bit-vec", "capctl", "caps", @@ -2519,6 +2526,7 @@ dependencies = [ "tempfile", "test-utils", "tokio", + "tokio-vsock 0.3.1", "xattr", "zbus", ] diff --git a/src/agent/rustjail/Cargo.toml b/src/agent/rustjail/Cargo.toml index 9c3cabb72..d63ae0ff2 100644 --- a/src/agent/rustjail/Cargo.toml +++ b/src/agent/rustjail/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "Apache-2.0" [dependencies] +awaitgroup = "0.6.0" serde = "1.0.91" serde_json = "1.0.39" serde_derive = "1.0.91" @@ -29,7 +30,8 @@ cgroups = { package = "cgroups-rs", version = "0.3.3" } rlimit = "0.5.3" cfg-if = "0.1.0" -tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt"] } +tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt", "fs"] } +tokio-vsock = "0.3.1" futures = "0.3.17" async-trait = "0.1.31" inotify = "0.9.2" diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 897c5a3c8..6a5434e66 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -14,6 +14,7 @@ use std::fs; use std::os::unix::io::RawFd; use std::path::{Path, PathBuf}; use std::time::SystemTime; +use tokio::fs::File; use cgroups::freezer::FreezerState; @@ -64,7 +65,7 @@ use crate::sync::{read_sync, write_count, write_sync, SYNC_DATA, SYNC_FAILED, SY use crate::sync_with_async::{read_async, write_async}; use async_trait::async_trait; use rlimit::{setrlimit, Resource, Rlim}; -use tokio::io::AsyncBufReadExt; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; use tokio::sync::Mutex; use kata_sys_util::hooks::HookStates; @@ -989,13 +990,148 @@ impl BaseContainer for LinuxContainer { child_stdin = unsafe { std::process::Stdio::from_raw_fd(pseudo.slave) }; child_stdout = unsafe { std::process::Stdio::from_raw_fd(pseudo.slave) }; child_stderr = unsafe { std::process::Stdio::from_raw_fd(pseudo.slave) }; + + if let Some(proc_io) = &mut p.proc_io { + // A reference count used to clean up the term master fd. + let term_closer = Arc::from(unsafe { File::from_raw_fd(pseudo.master) }); + + // Copy from stdin to term_master + if let Some(mut stdin_stream) = proc_io.stdin.take() { + let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; + let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); + let wgw_input = proc_io.wg_input.worker(); + let logger = logger.clone(); + let term_closer = term_closer.clone(); + tokio::spawn(async move { + let mut buf = [0u8; 8192]; + loop { + tokio::select! { + // Make sure stdin_stream is drained before exiting + biased; + res = stdin_stream.read(&mut buf) => { + match res { + Err(_) | Ok(0) => { + debug!(logger, "copy from stdin to term_master end: {:?}", res); + break; + } + Ok(n) => { + if term_master.write_all(&buf[..n]).await.is_err() { + break; + } + } + } + } + // As the stdin fifo is opened in RW mode in the shim, which will never + // read EOF, we close the stdin fifo here when explicit requested. + _ = close_stdin_rx.changed() => { + debug!(logger, "copy ends as requested"); + break + } + } + } + wgw_input.done(); + std::mem::forget(term_master); // Avoid auto closing of term_master + drop(term_closer); + }); + } + + // Copy from term_master to stdout + if let Some(mut stdout_stream) = proc_io.stdout.take() { + let wgw_output = proc_io.wg_output.worker(); + let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; + let logger = logger.clone(); + let term_closer = term_closer; + tokio::spawn(async move { + let res = tokio::io::copy(&mut term_master, &mut stdout_stream).await; + debug!(logger, "copy from term_master to stdout end: {:?}", res); + wgw_output.done(); + std::mem::forget(term_master); // Avoid auto closing of term_master + drop(term_closer); + }); + } + } } else { + // not using a terminal let stdin = p.stdin.unwrap(); let stdout = p.stdout.unwrap(); let stderr = p.stderr.unwrap(); child_stdin = unsafe { std::process::Stdio::from_raw_fd(stdin) }; child_stdout = unsafe { std::process::Stdio::from_raw_fd(stdout) }; child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) }; + + if let Some(proc_io) = &mut p.proc_io { + // Here we copy from vsock stdin stream to parent_stdin manually. + // This is because we need to close the stdin fifo when the stdin stream + // is drained. + if let Some(mut stdin_stream) = proc_io.stdin.take() { + debug!(logger, "copy from stdin to parent_stdin"); + let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; + let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); + let wgw_input = proc_io.wg_input.worker(); + let logger = logger.clone(); + tokio::spawn(async move { + let mut buf = [0u8; 8192]; + loop { + tokio::select! { + // Make sure stdin_stream is drained before exiting + biased; + res = stdin_stream.read(&mut buf) => { + match res { + Err(_) | Ok(0) => { + debug!(logger, "copy from stdin to term_master end: {:?}", res); + break; + } + Ok(n) => { + if parent_stdin.write_all(&buf[..n]).await.is_err() { + break; + } + } + } + } + // As the stdin fifo is opened in RW mode in the shim, which will never + // read EOF, we close the stdin fifo here when explicit requested. + _ = close_stdin_rx.changed() => { + debug!(logger, "copy ends as requested"); + break + } + } + } + wgw_input.done(); + }); + } + + // copy from parent_stdout to stdout stream + if let Some(mut stdout_stream) = proc_io.stdout.take() { + debug!(logger, "copy from parent_stdout to stdout stream"); + let wgw_output = proc_io.wg_output.worker(); + let mut parent_stdout = unsafe { File::from_raw_fd(p.parent_stdout.unwrap()) }; + let logger = logger.clone(); + tokio::spawn(async move { + let res = tokio::io::copy(&mut parent_stdout, &mut stdout_stream).await; + debug!( + logger, + "copy from parent_stdout to stdout stream end: {:?}", res + ); + wgw_output.done(); + }); + } + + // copy from parent_stderr to stderr stream + if let Some(mut stderr_stream) = proc_io.stderr.take() { + debug!(logger, "copy from parent_stderr to stderr stream"); + let wgw_output = proc_io.wg_output.worker(); + let mut parent_stderr = unsafe { File::from_raw_fd(p.parent_stderr.unwrap()) }; + let logger = logger.clone(); + tokio::spawn(async move { + let res = tokio::io::copy(&mut parent_stderr, &mut stderr_stream).await; + debug!( + logger, + "copy from parent_stderr to stderr stream end: {:?}", res + ); + wgw_output.done(); + }); + } + } } let pidns = get_pid_namespace(&self.logger, linux)?; @@ -1904,7 +2040,7 @@ mod tests { let _ = new_linux_container_and_then(|mut c: LinuxContainer| { c.processes.insert( 1, - Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap(), + Process::new(&sl(), &oci::Process::default(), "123", true, 1, None).unwrap(), ); let p = c.get_process("123"); assert!(p.is_ok(), "Expecting Ok, Got {:?}", p); @@ -1931,7 +2067,7 @@ mod tests { let (c, _dir) = new_linux_container(); let ret = c .unwrap() - .start(Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap()) + .start(Process::new(&sl(), &oci::Process::default(), "123", true, 1, None).unwrap()) .await; assert!(ret.is_err(), "Expecting Err, Got {:?}", ret); } @@ -1941,7 +2077,7 @@ mod tests { let (c, _dir) = new_linux_container(); let ret = c .unwrap() - .run(Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap()) + .run(Process::new(&sl(), &oci::Process::default(), "123", true, 1, None).unwrap()) .await; assert!(ret.is_err(), "Expecting Err, Got {:?}", ret); } diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index cdecae130..efc99afd3 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -7,6 +7,7 @@ use libc::pid_t; use std::fs::File; use std::os::unix::io::{AsRawFd, RawFd}; use tokio::sync::mpsc::Sender; +use tokio_vsock::VsockStream; use nix::errno::Errno; use nix::fcntl::{fcntl, FcntlArg, OFlag}; @@ -18,6 +19,7 @@ use oci::Process as OCIProcess; use slog::Logger; use crate::pipestream::PipeStream; +use awaitgroup::WaitGroup; use std::collections::HashMap; use std::sync::Arc; use tokio::io::{split, ReadHalf, WriteHalf}; @@ -47,6 +49,41 @@ pub enum StreamType { type Reader = Arc>>; type Writer = Arc>>; +#[derive(Debug)] +pub struct ProcessIo { + pub stdin: Option, + pub stdout: Option, + pub stderr: Option, + // used to close stdin stream + pub close_stdin_tx: tokio::sync::watch::Sender, + pub close_stdin_rx: tokio::sync::watch::Receiver, + // wait for stdin copy task to finish + pub wg_input: WaitGroup, + // used to wait for all process outputs to be copied to the vsock streams + // only used when tty is used. + pub wg_output: WaitGroup, +} + +impl ProcessIo { + pub fn new( + stdin: Option, + stdout: Option, + stderr: Option, + ) -> Self { + let (close_stdin_tx, close_stdin_rx) = tokio::sync::watch::channel(false); + + ProcessIo { + stdin, + stdout, + stderr, + close_stdin_tx, + close_stdin_rx, + wg_input: WaitGroup::new(), + wg_output: WaitGroup::new(), + } + } +} + #[derive(Debug)] pub struct Process { pub exec_id: String, @@ -74,6 +111,8 @@ pub struct Process { readers: HashMap, writers: HashMap, + + pub proc_io: Option, } pub trait ProcessOperations { @@ -105,6 +144,7 @@ impl Process { id: &str, init: bool, pipe_size: i32, + proc_io: Option, ) -> Result { let logger = logger.new(o!("subsystem" => "process")); let (exit_tx, exit_rx) = tokio::sync::watch::channel(false); @@ -131,6 +171,7 @@ impl Process { term_exit_notifier: Arc::new(Notify::new()), readers: HashMap::new(), writers: HashMap::new(), + proc_io, }; info!(logger, "before create console socket!"); @@ -147,6 +188,10 @@ impl Process { p.parent_stdin = Some(pstdin); p.stdin = Some(stdin); + // These pipes are necessary as the stdout/stderr of the child process + // cannot be a socket. Otherwise, some images relying on the /dev/stdout(stderr) + // and /proc/self/fd/1(2) will fail to boot as opening an existing socket + // is forbidden by the Linux kernel. let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; p.parent_stdout = Some(pstdout); p.stdout = Some(stdout); @@ -164,7 +209,14 @@ impl Process { notify.notify_waiters(); } - pub fn close_stdin(&mut self) { + pub async fn close_stdin(&mut self) { + if let Some(proc_io) = &mut self.proc_io { + // notify io copy task to close stdin stream + let _ = proc_io.close_stdin_tx.send(true); + // wait for io copy task to finish + proc_io.wg_input.wait().await; + } + close_process_stream!(self, term_master, TermMaster); close_process_stream!(self, parent_stdin, ParentStdin); @@ -172,6 +224,13 @@ impl Process { } pub fn cleanup_process_stream(&mut self) { + if let Some(proc_io) = self.proc_io.take() { + drop(proc_io); + + return; + } + + // legacy io mode close_process_stream!(self, parent_stdin, ParentStdin); close_process_stream!(self, parent_stdout, ParentStdout); close_process_stream!(self, parent_stderr, ParentStderr); @@ -277,6 +336,7 @@ mod tests { id, init, 32, + None, ); let mut process = process.unwrap(); diff --git a/src/agent/src/config.rs b/src/agent/src/config.rs index 75c0a245c..abb8be024 100644 --- a/src/agent/src/config.rs +++ b/src/agent/src/config.rs @@ -18,6 +18,7 @@ const DEV_MODE_FLAG: &str = "agent.devmode"; const TRACE_MODE_OPTION: &str = "agent.trace"; const LOG_LEVEL_OPTION: &str = "agent.log"; const SERVER_ADDR_OPTION: &str = "agent.server_addr"; +const PASSFD_LISTENER_PORT: &str = "agent.passfd_listener_port"; const HOTPLUG_TIMOUT_OPTION: &str = "agent.hotplug_timeout"; const DEBUG_CONSOLE_VPORT_OPTION: &str = "agent.debug_console_vport"; const LOG_VPORT_OPTION: &str = "agent.log_vport"; @@ -61,6 +62,7 @@ pub struct AgentConfig { pub log_vport: i32, pub container_pipe_size: i32, pub server_addr: String, + pub passfd_listener_port: i32, pub unified_cgroup_hierarchy: bool, pub tracing: bool, pub supports_seccomp: bool, @@ -76,6 +78,7 @@ pub struct AgentConfigBuilder { pub log_vport: Option, pub container_pipe_size: Option, pub server_addr: Option, + pub passfd_listener_port: Option, pub unified_cgroup_hierarchy: Option, pub tracing: Option, } @@ -135,6 +138,7 @@ impl Default for AgentConfig { log_vport: 0, container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE, server_addr: format!("{}:{}", VSOCK_ADDR, DEFAULT_AGENT_VSOCK_PORT), + passfd_listener_port: 0, unified_cgroup_hierarchy: false, tracing: false, supports_seccomp: rpc::have_seccomp(), @@ -164,6 +168,7 @@ impl FromStr for AgentConfig { config_override!(agent_config_builder, agent_config, log_vport); config_override!(agent_config_builder, agent_config, container_pipe_size); config_override!(agent_config_builder, agent_config, server_addr); + config_override!(agent_config_builder, agent_config, passfd_listener_port); config_override!(agent_config_builder, agent_config, unified_cgroup_hierarchy); config_override!(agent_config_builder, agent_config, tracing); @@ -245,6 +250,13 @@ impl AgentConfig { get_vsock_port, |port| port > 0 ); + parse_cmdline_param!( + param, + PASSFD_LISTENER_PORT, + config.passfd_listener_port, + get_vsock_port, + |port| port > 0 + ); parse_cmdline_param!( param, diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index cb3abbe1e..1b7324e54 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -44,6 +44,7 @@ mod mount; mod namespace; mod netlink; mod network; +mod passfd_io; mod pci; pub mod random; mod sandbox; @@ -235,6 +236,12 @@ async fn real_main() -> std::result::Result<(), Box> { // XXX: Note that *ALL* spans needs to start after this point!! let span_guard = root_span.enter(); + // Start the fd passthrough io listener + let passfd_listener_port = config.passfd_listener_port as u32; + if passfd_listener_port != 0 { + passfd_io::start_listen(passfd_listener_port).await?; + } + // Start the sandbox and wait for its ttRPC server to end start_sandbox(&logger, config, init_mode, &mut tasks, shutdown_rx.clone()).await?; diff --git a/src/agent/src/passfd_io.rs b/src/agent/src/passfd_io.rs new file mode 100644 index 000000000..ba1d7f74b --- /dev/null +++ b/src/agent/src/passfd_io.rs @@ -0,0 +1,82 @@ +// Copyright 2024 Kata Contributors +// +// SPDX-License-Identifier: Apache-2.0 +// +use anyhow::Result; +use lazy_static::lazy_static; +use rustjail::process::ProcessIo; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio_vsock::SockAddr::Vsock; +use tokio_vsock::{VsockListener, VsockStream}; + +lazy_static! { + static ref HVSOCK_STREAMS: Arc>> = + Arc::new(Mutex::new(HashMap::new())); +} + +// Convenience function to obtain the scope logger. +fn sl() -> slog::Logger { + slog_scope::logger().new(o!("subsystem" => "passfd_io")) +} + +pub(crate) async fn start_listen(port: u32) -> Result<()> { + info!(sl(), "start listening on port {}", port); + let mut listener = VsockListener::bind(libc::VMADDR_CID_ANY, port)?; + tokio::spawn(async move { + loop { + if let Ok((stream, Vsock(addr))) = listener.accept().await { + // We should insert the stream into the mapping as soon + // to minimize the risk of encountering race conditions. + let port = addr.port(); + HVSOCK_STREAMS.lock().await.insert(port, stream); + info!(sl(), "accept connection from peer port {}", port); + } + } + }); + Ok(()) +} + +async fn take_stream(port: u32) -> Option { + // There may be a race condition where the stream is accepted but + // not yet inserted into the mapping. We will retry several times. + // If it still fails, we just give up. + let mut count = 0; + while count < 3 { + let stream = HVSOCK_STREAMS.lock().await.remove(&port); + if stream.is_some() { + return stream; + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + count += 1; + } + + warn!(sl(), "failed to take stream for port {}", port); + None +} + +macro_rules! take_io_stream { + ($port: ident) => { + if $port == 0 { + None + } else { + take_stream($port).await + } + }; +} + +pub(crate) async fn take_io_streams( + stdin_port: u32, + stdout_port: u32, + stderr_port: u32, +) -> ProcessIo { + let stdin = take_io_stream!(stdin_port); + let stdout = take_io_stream!(stdout_port); + let stderr = take_io_stream!(stderr_port); + debug!( + sl(), + "take passfd io streams {} {} {}", stdin_port, stdout_port, stderr_port + ); + ProcessIo::new(stdin, stdout, stderr) +} diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 2ce66ecbc..0068eb929 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -58,6 +58,7 @@ use crate::metrics::get_metrics; use crate::mount::baremount; use crate::namespace::{NSTYPEIPC, NSTYPEPID, NSTYPEUTS}; use crate::network::setup_guest_dns; +use crate::passfd_io; use crate::pci; use crate::random; use crate::sandbox::Sandbox; @@ -260,7 +261,15 @@ impl AgentService { let pipe_size = AGENT_CONFIG.container_pipe_size; let p = if let Some(p) = oci.process { - Process::new(&sl(), &p, cid.as_str(), true, pipe_size)? + let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 { + Some( + passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port) + .await, + ) + } else { + None + }; + Process::new(&sl(), &p, cid.as_str(), true, pipe_size, proc_io)? } else { info!(sl(), "no process configurations!"); return Err(anyhow!(nix::Error::EINVAL)); @@ -369,7 +378,15 @@ impl AgentService { let pipe_size = AGENT_CONFIG.container_pipe_size; let ocip = rustjail::process_grpc_to_oci(&process); - let p = Process::new(&sl(), &ocip, exec_id.as_str(), false, pipe_size)?; + + // passfd_listener_port != 0 indicates passfd io mode + let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 { + Some(passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port).await) + } else { + None + }; + + let p = Process::new(&sl(), &ocip, exec_id.as_str(), false, pipe_size, proc_io)?; let ctr = sandbox .get_container(&cid) @@ -834,7 +851,7 @@ impl agent_ttrpc::AgentService for AgentService { ) })?; - p.close_stdin(); + p.close_stdin().await; Ok(Empty::new()) } @@ -2252,6 +2269,7 @@ mod tests { &exec_process_id.to_string(), false, 1, + None, ) .unwrap(); diff --git a/src/agent/src/sandbox.rs b/src/agent/src/sandbox.rs index 30f168f2c..e0a90c814 100644 --- a/src/agent/src/sandbox.rs +++ b/src/agent/src/sandbox.rs @@ -1006,12 +1006,20 @@ mod tests { // add init process linux_container.processes.insert( 1, - Process::new(&logger, &oci::Process::default(), "1", true, 1).unwrap(), + Process::new(&logger, &oci::Process::default(), "1", true, 1, None).unwrap(), ); // add exec process linux_container.processes.insert( 123, - Process::new(&logger, &oci::Process::default(), "exec-123", false, 1).unwrap(), + Process::new( + &logger, + &oci::Process::default(), + "exec-123", + false, + 1, + None, + ) + .unwrap(), ); s.add_container(linux_container); @@ -1058,6 +1066,7 @@ mod tests { "this_is_a_test_process", true, 1, + None, ) .unwrap(); // processes interally only have pids when manually set diff --git a/src/agent/src/signal.rs b/src/agent/src/signal.rs index 401ded953..62a219371 100644 --- a/src/agent/src/signal.rs +++ b/src/agent/src/signal.rs @@ -69,6 +69,11 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc>) -> Result } }; + // In passfd io mode, we need to wait for the copy task end. + if let Some(proc_io) = &mut p.proc_io { + proc_io.wg_output.wait().await; + } + p.exit_code = ret; let _ = p.exit_tx.take(); diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs index e2ca7e333..0e27a36a4 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs @@ -103,7 +103,7 @@ use super::{ConnState, Error, PendingRx, PendingRxSet, Result}; /// guest-side AF_VSOCK socket and a host-side `Read + Write + AsRawFd` stream. pub struct VsockConnection { /// The current connection state. - state: ConnState, + pub(crate) state: ConnState, /// The local CID. Most of the time this will be the constant `2` (the vsock /// host CID). pub(crate) local_cid: u64, @@ -115,6 +115,8 @@ pub struct VsockConnection { pub(crate) peer_port: u32, /// The (connected) host-side stream. pub(crate) stream: Box, + /// keep the connection when local peer closed. + keep: bool, /// The TX buffer for this connection. tx_buf: TxBuf, /// Total number of bytes that have been successfully written to @@ -297,6 +299,8 @@ impl VsockChannel for VsockConnection { // to forward some data to the host stream. Also works for a // connection that has begun shutting down, but the peer still has // some data to send. + // It also work for a hybrid connection's peer closed case, which need + // to active the connection's fd to generate the epollout event. ConnState::Established | ConnState::PeerClosed(_, false) if pkt.op() == uapi::VSOCK_OP_RW => { @@ -318,7 +322,19 @@ impl VsockChannel for VsockConnection { "vsock: error writing to local stream (lp={}, pp={}): {:?}", self.local_port, self.peer_port, err ); - self.kill(); + match err { + Error::TxBufFull => { + // The hybrid pipe peer closed and the tx buf had been full, + // and if want to keep the connection, thus we need drop the + // data send from guest, otherwise, close the connection. + if !self.keep() { + self.kill(); + } + } + _ => { + self.kill(); + } + }; return Ok(()); } @@ -462,15 +478,26 @@ impl VsockEpollListener for VsockConnection { .tx_buf .flush_to(&mut self.stream) .unwrap_or_else(|err| { - warn!( - "vsock: error flushing TX buf for (lp={}, pp={}): {:?}", - self.local_port, self.peer_port, err - ); + if !self.keep() { + warn!( + "vsock: error flushing TX buf for (lp={}, pp={}): {:?}", + self.local_port, self.peer_port, err + ); + } + match err { Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => { // This should never happen (EWOULDBLOCK after // EPOLLOUT), but it does, so let's absorb it. } + Error::TxBufFlush(inner) if (inner.kind() == ErrorKind::BrokenPipe) => { + // The hybrid connection's pipe peer was clsosed, and we want to keep the + // connection thus users can reopen the peer pipe to get the connection, + // otherwise, close the connection. + if !self.keep() { + self.kill(); + } + } _ => self.kill(), }; 0 @@ -499,6 +526,7 @@ impl VsockConnection { local_port: u32, peer_port: u32, peer_buf_alloc: u32, + keep: bool, ) -> Self { Self { local_cid, @@ -506,6 +534,7 @@ impl VsockConnection { local_port, peer_port, stream, + keep, state: ConnState::PeerInit, tx_buf: TxBuf::default(), fwd_cnt: Wrapping(0), @@ -525,6 +554,7 @@ impl VsockConnection { peer_cid: u64, local_port: u32, peer_port: u32, + keep: bool, ) -> Self { Self { local_cid, @@ -532,6 +562,7 @@ impl VsockConnection { local_port, peer_port, stream, + keep, state: ConnState::LocalInit, tx_buf: TxBuf::default(), fwd_cnt: Wrapping(0), @@ -579,6 +610,11 @@ impl VsockConnection { self.state } + /// Return the keep value. + pub fn keep(&self) -> bool { + self.keep + } + /// Send some raw, untracked, data straight to the underlying connected /// stream. Returns: number of bytes written, or the error describing the /// write failure. @@ -608,16 +644,23 @@ impl VsockConnection { // stream. let written = match self.stream.write(buf) { Ok(cnt) => cnt, - Err(e) => { + Err(e) if e.kind() == ErrorKind::WouldBlock => { // Absorb any would-block errors, since we can always try again // later. - if e.kind() == ErrorKind::WouldBlock { - 0 - } else { - // We don't know how to handle any other write error, so - // we'll send it up the call chain. + 0 + } + Err(e) if e.kind() == ErrorKind::BrokenPipe => { + // The backed pipe peer had been closed, and we didn't want to close + // this connection since the peer would like to re attach on it. + if !self.keep() { return Err(Error::StreamWrite(e)); } + 0 + } + Err(e) => { + // We don't know how to handle any other write error, so + // we'll send it up the call chain. + return Err(Error::StreamWrite(e)); } }; // Move the "forwarded bytes" counter ahead by how much we were able to @@ -843,6 +886,7 @@ pub(crate) mod tests { LOCAL_PORT, PEER_PORT, PEER_BUF_ALLOC, + false, ), ConnState::LocalInit => VsockConnection::new_local_init( Box::new(stream), @@ -850,6 +894,7 @@ pub(crate) mod tests { PEER_CID, LOCAL_PORT, PEER_PORT, + false, ), ConnState::Established => { let mut conn = VsockConnection::new_peer_init( @@ -859,6 +904,7 @@ pub(crate) mod tests { LOCAL_PORT, PEER_PORT, PEER_BUF_ALLOC, + false, ); assert!(conn.has_pending_rx()); conn.recv_pkt(&mut pkt).unwrap(); diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs index bfcc5fa1c..09af066cf 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs @@ -291,6 +291,34 @@ impl VsockChannel for VsockMuxer { // Alright, everything looks in order - forward this packet to its // owning connection. let mut res: VsockResult<()> = Ok(()); + + // For the hybrid connection, if it want to keep the connection + // when the pipe peer closed, here it needs to update the epoll + // listner to catch the events. + let mut listener = None; + let conn = self.conn_map.get_mut(&conn_key).unwrap(); + let pre_state = conn.state(); + let nfd: RawFd = conn.as_raw_fd(); + + if pre_state == ConnState::LocalClosed && conn.keep() { + conn.state = ConnState::Established; + listener = Some(EpollListener::Connection { + key: conn_key, + evset: conn.get_polled_evset(), + backend: conn.stream.backend_type(), + }); + } + + if let Some(nlistener) = listener { + self.add_listener(nfd, nlistener).unwrap_or_else(|err| { + self.kill_connection(conn_key); + warn!( + "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", + conn_key.local_port, conn_key.peer_port, err + ); + }); + } + self.apply_conn_mutation(conn_key, |conn| { res = conn.send_pkt(pkt); }); @@ -396,6 +424,23 @@ impl VsockMuxer { // listening for it. Some(EpollListener::Connection { key, evset: _, .. }) => { let key_copy = *key; + + // If the hybrid connection's local peer closed, then the epoll handler wouldn't + // get the epollout event even when it's reopened again, thus it should be notified + // when the guest send any data to try to active the epoll handler to generate the + // epollout event for this connection. + + let mut need_rm = false; + if let Some(conn) = self.conn_map.get_mut(&key_copy) { + if event_set.contains(epoll::Events::EPOLLERR) && conn.keep() { + conn.state = ConnState::LocalClosed; + need_rm = true; + } + } + if need_rm { + self.remove_listener(fd); + } + // The handling of this event will most probably mutate the // state of the receiving connection. We'll need to check for new // pending RX, event set mutation, and all that, so we're @@ -459,6 +504,7 @@ impl VsockMuxer { self.cid, local_port, peer_port, + false, ), ) } @@ -476,8 +522,10 @@ impl VsockMuxer { Some(EpollListener::PassFdStream(_)) => { if let Some(EpollListener::PassFdStream(mut stream)) = self.remove_listener(fd) { Self::passfd_read_port_and_fd(&mut stream) - .map(|(nfd, peer_port)| (nfd, self.allocate_local_port(), peer_port)) - .and_then(|(nfd, local_port, peer_port)| { + .map(|(nfd, peer_port, keep)| { + (nfd, self.allocate_local_port(), peer_port, keep) + }) + .and_then(|(nfd, local_port, peer_port, keep)| { // Here we should make sure the nfd the sole owner to convert it // into an UnixStream object, otherwise, it could cause memory unsafety. let nstream = unsafe { File::from_raw_fd(nfd) }; @@ -502,6 +550,7 @@ impl VsockMuxer { self.cid, local_port, peer_port, + keep, ), ) }) @@ -587,7 +636,7 @@ impl VsockMuxer { .map_err(|_| Error::InvalidPortRequest) } - fn passfd_read_port_and_fd(stream: &mut Box) -> Result<(RawFd, u32)> { + fn passfd_read_port_and_fd(stream: &mut Box) -> Result<(RawFd, u32, bool)> { let mut buf = [0u8; 32]; let mut fds = [0, 1]; let (data_len, fd_len) = stream @@ -607,7 +656,9 @@ impl VsockMuxer { .ok_or(Error::InvalidPortRequest) .and_then(|word| word.parse::().map_err(|_| Error::InvalidPortRequest))?; - Ok((fds[0], port)) + let keep = port_iter.next().is_some_and(|kp| kp == "keep"); + + Ok((fds[0], port, keep)) } /// Add a new connection to the active connection pool. @@ -775,6 +826,7 @@ impl VsockMuxer { pkt.dst_port(), pkt.src_port(), pkt.buf_alloc(), + false, ), ) }) @@ -876,7 +928,7 @@ impl VsockMuxer { ); }); } - } else { + } else if conn.state() != ConnState::LocalClosed { // The connection had previously asked to be removed from the // listener map (by returning an empty event set via // `get_polled_fd()`), but now wants back in. diff --git a/src/libs/kata-types/src/config/agent.rs b/src/libs/kata-types/src/config/agent.rs index 1ac7515ca..7f1fae555 100644 --- a/src/libs/kata-types/src/config/agent.rs +++ b/src/libs/kata-types/src/config/agent.rs @@ -11,6 +11,7 @@ pub use vendor::AgentVendor; use super::default::{ DEFAULT_AGENT_DIAL_TIMEOUT_MS, DEFAULT_AGENT_LOG_PORT, DEFAULT_AGENT_VSOCK_PORT, + DEFAULT_PASSFD_LISTENER_PORT, }; use crate::eother; @@ -60,6 +61,10 @@ pub struct Agent { #[serde(default = "default_log_port")] pub log_port: u32, + /// Agent process io port + #[serde(default = "default_passfd_listener_port")] + pub passfd_listener_port: u32, + /// Agent connection dialing timeout value in millisecond #[serde(default = "default_dial_timeout")] pub dial_timeout_ms: u32, @@ -104,6 +109,7 @@ impl std::default::Default for Agent { debug_console_enabled: false, server_port: DEFAULT_AGENT_VSOCK_PORT, log_port: DEFAULT_AGENT_LOG_PORT, + passfd_listener_port: DEFAULT_PASSFD_LISTENER_PORT, dial_timeout_ms: DEFAULT_AGENT_DIAL_TIMEOUT_MS, reconnect_timeout_ms: 3_000, request_timeout_ms: 30_000, @@ -126,6 +132,10 @@ fn default_log_port() -> u32 { DEFAULT_AGENT_LOG_PORT } +fn default_passfd_listener_port() -> u32 { + DEFAULT_PASSFD_LISTENER_PORT +} + fn default_dial_timeout() -> u32 { // ms 10 diff --git a/src/libs/kata-types/src/config/default.rs b/src/libs/kata-types/src/config/default.rs index 0dfd5d09e..d269ea1ab 100644 --- a/src/libs/kata-types/src/config/default.rs +++ b/src/libs/kata-types/src/config/default.rs @@ -25,6 +25,7 @@ pub const DEFAULT_AGENT_NAME: &str = "kata-agent"; pub const DEFAULT_AGENT_VSOCK_PORT: u32 = 1024; pub const DEFAULT_AGENT_LOG_PORT: u32 = 1025; pub const DEFAULT_AGENT_DBG_CONSOLE_PORT: u32 = 1026; +pub const DEFAULT_PASSFD_LISTENER_PORT: u32 = 1027; pub const DEFAULT_AGENT_TYPE_NAME: &str = AGENT_NAME_KATA; pub const DEFAULT_AGENT_DIAL_TIMEOUT_MS: u32 = 10; diff --git a/src/libs/kata-types/src/config/mod.rs b/src/libs/kata-types/src/config/mod.rs index dcfbadf3f..beb93e697 100644 --- a/src/libs/kata-types/src/config/mod.rs +++ b/src/libs/kata-types/src/config/mod.rs @@ -54,6 +54,8 @@ pub const DEBUG_CONSOLE_VPORT_OPTION: &str = "agent.debug_console_vport"; pub const LOG_VPORT_OPTION: &str = "agent.log_vport"; /// Option of setting the container's pipe size pub const CONTAINER_PIPE_SIZE_OPTION: &str = "agent.container_pipe_size"; +/// Option of setting the fd passthrough io listener port +pub const PASSFD_LISTENER_PORT: &str = "agent.passfd_listener_port"; /// Trait to manipulate global Kata configuration information. pub trait ConfigPlugin: Send + Sync { diff --git a/src/libs/kata-types/src/config/runtime.rs b/src/libs/kata-types/src/config/runtime.rs index 99820dd5f..0954f5039 100644 --- a/src/libs/kata-types/src/config/runtime.rs +++ b/src/libs/kata-types/src/config/runtime.rs @@ -167,6 +167,18 @@ pub struct Runtime { /// shared_mount declarations #[serde(default)] pub shared_mounts: Vec, + + /// If enabled, the runtime will attempt to use fd passthrough feature for process io. + #[serde(default)] + pub use_passfd_io: bool, + + /// If fd passthrough io is enabled, the runtime will attempt to use the specified port instead of the default port. + #[serde(default = "default_passfd_listener_port")] + pub passfd_listener_port: u32, +} + +fn default_passfd_listener_port() -> u32 { + default::DEFAULT_PASSFD_LISTENER_PORT } impl ConfigOps for Runtime { diff --git a/src/libs/protocols/protos/agent.proto b/src/libs/protocols/protos/agent.proto index bee72f066..ff44a46f8 100644 --- a/src/libs/protocols/protos/agent.proto +++ b/src/libs/protocols/protos/agent.proto @@ -93,6 +93,14 @@ message CreateContainerRequest { // This field is used to declare a set of shared mount points // that support cross-container sharing of mount objects. repeated SharedMount shared_mounts = 8; + + // These fields are the host-side vport numbers of passfd streams + // pre-created by runtime-rs, and used as identifiers for the agent + // to select the right streams for init process's stdin/stdout/stderr. + // Disable the feature by setting the associated port to 0. + uint32 stdin_port = 9; + uint32 stdout_port = 10; + uint32 stderr_port = 11; } message StartContainerRequest { @@ -115,6 +123,14 @@ message ExecProcessRequest { string exec_id = 2; StringUser string_user = 3; Process process = 4; + + // These fields are the host-side vport numbers of passfd streams + // pre-created by runtime-rs, and used as identifiers for the agent + // to select the right streams for process's stdin/stdout/stderr. + // Disable the feature by setting the associated port to 0. + uint32 stdin_port = 5; + uint32 stdout_port = 6; + uint32 stderr_port = 7; } message SignalProcessRequest { diff --git a/src/runtime-rs/Cargo.lock b/src/runtime-rs/Cargo.lock index 67178e586..89d850781 100644 --- a/src/runtime-rs/Cargo.lock +++ b/src/runtime-rs/Cargo.lock @@ -3448,6 +3448,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604b71b8fc267e13bb3023a2c901126c8f349393666a6d98ac1ae5729b701798" dependencies = [ "libc", + "tokio", ] [[package]] @@ -4452,6 +4453,7 @@ dependencies = [ "persist", "protobuf 3.2.0", "resource", + "sendfd", "serde", "serde_derive", "serde_json", diff --git a/src/runtime-rs/config/configuration-dragonball.toml.in b/src/runtime-rs/config/configuration-dragonball.toml.in index 7f08a7db9..5a0a6db20 100644 --- a/src/runtime-rs/config/configuration-dragonball.toml.in +++ b/src/runtime-rs/config/configuration-dragonball.toml.in @@ -372,3 +372,10 @@ sandbox_bind_mounts=@DEFBINDMOUNTS@ # to the hypervisor. # (default: /run/kata-containers/dans) dan_conf = "@DEFDANCONF@" + +# If enabled, the runtime will attempt to use fd passthrough feature for process io. +# Note: this feature is only supported by the Dragonball hypervisor. +use_passfd_io = true + +# If fd passthrough io is enabled, the runtime will attempt to use the specified port instead of the default port. +# passfd_listener_port = 1027 diff --git a/src/runtime-rs/crates/agent/src/kata/trans.rs b/src/runtime-rs/crates/agent/src/kata/trans.rs index d7b00aba2..24fdda492 100644 --- a/src/runtime-rs/crates/agent/src/kata/trans.rs +++ b/src/runtime-rs/crates/agent/src/kata/trans.rs @@ -274,6 +274,9 @@ impl From for agent::CreateContainerRequest { OCI: from_option(from.oci), sandbox_pidns: from.sandbox_pidns, shared_mounts: trans_vec(from.shared_mounts), + stdin_port: from.stdin_port.unwrap_or_default(), + stdout_port: from.stdout_port.unwrap_or_default(), + stderr_port: from.stderr_port.unwrap_or_default(), ..Default::default() } } @@ -415,6 +418,9 @@ impl From for agent::ExecProcessRequest { exec_id: from.process_id.exec_id(), string_user: from_option(from.string_user), process: from_option(from.process), + stdin_port: from.stdin_port.unwrap_or_default(), + stdout_port: from.stdout_port.unwrap_or_default(), + stderr_port: from.stderr_port.unwrap_or_default(), ..Default::default() } } diff --git a/src/runtime-rs/crates/agent/src/types.rs b/src/runtime-rs/crates/agent/src/types.rs index 7cb3cfd44..bd7b9ff10 100644 --- a/src/runtime-rs/crates/agent/src/types.rs +++ b/src/runtime-rs/crates/agent/src/types.rs @@ -128,6 +128,9 @@ pub struct CreateContainerRequest { pub sandbox_pidns: bool, pub rootfs_mounts: Vec, pub shared_mounts: Vec, + pub stdin_port: Option, + pub stdout_port: Option, + pub stderr_port: Option, } #[derive(PartialEq, Clone, Default)] @@ -252,6 +255,9 @@ pub struct ExecProcessRequest { pub process_id: ContainerProcessID, pub string_user: Option, pub process: Option, + pub stdin_port: Option, + pub stdout_port: Option, + pub stderr_port: Option, } #[derive(PartialEq, Clone, Default, Debug)] diff --git a/src/runtime-rs/crates/hypervisor/src/ch/mod.rs b/src/runtime-rs/crates/hypervisor/src/ch/mod.rs index 9fe70e9d0..c6bcad8b6 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/mod.rs @@ -182,6 +182,10 @@ impl Hypervisor for CloudHypervisor { let inner = self.inner.read().await; inner.resize_memory(new_mem_mb) } + + async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> { + Err(anyhow::anyhow!("Not yet supported")) + } } #[async_trait] diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs index 28f04296f..bfe260178 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs @@ -22,7 +22,7 @@ use kata_types::{ capabilities::{Capabilities, CapabilityBits}, config::{ hypervisor::{HugePageType, Hypervisor as HypervisorConfig}, - KATA_PATH, + KATA_PATH, PASSFD_LISTENER_PORT, }, }; use nix::mount::MsFlags; @@ -80,6 +80,10 @@ pub struct DragonballInner { /// the balloon size pub(crate) balloon_size: u32, + + /// guest-side fd passthrough io listener port, used to initialize + /// connections for io + pub(crate) passfd_listener_port: Option, } impl DragonballInner { @@ -108,6 +112,7 @@ impl DragonballInner { guest_memory_block_size_mb: 0, mem_hotplug_size_mb: 0, balloon_size: 0, + passfd_listener_port: None, } } @@ -128,6 +133,12 @@ impl DragonballInner { kernel_params.append(&mut KernelParams::from_string( &self.config.boot_info.kernel_params, )); + if let Some(passfd_listener_port) = self.passfd_listener_port { + kernel_params.append(&mut KernelParams::from_string(&format!( + "{}={}", + PASSFD_LISTENER_PORT, passfd_listener_port + ))); + } info!(sl!(), "prepared kernel_params={:?}", kernel_params); // set boot source @@ -458,6 +469,10 @@ impl DragonballInner { pub(crate) fn guest_memory_block_size_mb(&self) -> u32 { self.guest_memory_block_size_mb } + + pub fn set_passfd_listener_port(&mut self, port: u32) { + self.passfd_listener_port = Some(port); + } } #[async_trait] @@ -477,6 +492,7 @@ impl Persist for DragonballInner { config: self.hypervisor_config(), run_dir: self.run_dir.clone(), cached_block_devices: self.cached_block_devices.clone(), + passfd_listener_port: self.passfd_listener_port, ..Default::default() }) } @@ -502,6 +518,7 @@ impl Persist for DragonballInner { guest_memory_block_size_mb: 0, mem_hotplug_size_mb: 0, balloon_size: 0, + passfd_listener_port: hypervisor_state.passfd_listener_port, }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/inner_hypervisor.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/inner_hypervisor.rs index 5de9ceb00..14b0d0219 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/inner_hypervisor.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/inner_hypervisor.rs @@ -9,7 +9,7 @@ use std::{ iter::FromIterator, }; -use anyhow::{Context, Ok, Result}; +use anyhow::{anyhow, Context, Ok, Result}; use kata_types::capabilities::Capabilities; use super::inner::DragonballInner; @@ -76,6 +76,15 @@ impl DragonballInner { )) } + /// Get the address of agent vsock server used to init connections for io + pub(crate) async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> { + if let Some(passfd_port) = self.passfd_listener_port { + Ok((get_hvsock_path(&self.id), passfd_port)) + } else { + Err(anyhow!("passfd io listener port not set")) + } + } + pub(crate) async fn get_hypervisor_metrics(&self) -> Result { info!(sl!(), "get hypervisor metrics"); self.vmm_instance.get_hypervisor_metrics() diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs index bec411b88..b8b1c6318 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs @@ -55,6 +55,11 @@ impl Dragonball { let mut inner = self.inner.write().await; inner.set_hypervisor_config(config) } + + pub async fn set_passfd_listener_port(&mut self, port: u32) { + let mut inner = self.inner.write().await; + inner.set_passfd_listener_port(port) + } } #[async_trait] @@ -198,6 +203,11 @@ impl Hypervisor for Dragonball { let mut inner = self.inner.write().await; inner.resize_memory(new_mem_mb) } + + async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> { + let inner = self.inner.read().await; + inner.get_passfd_listener_addr().await + } } #[async_trait] diff --git a/src/runtime-rs/crates/hypervisor/src/hypervisor_persist.rs b/src/runtime-rs/crates/hypervisor/src/hypervisor_persist.rs index ea870f342..1ae231d83 100644 --- a/src/runtime-rs/crates/hypervisor/src/hypervisor_persist.rs +++ b/src/runtime-rs/crates/hypervisor/src/hypervisor_persist.rs @@ -33,4 +33,5 @@ pub struct HypervisorState { /// cached block device pub cached_block_devices: HashSet, pub virtiofs_daemon_pid: i32, + pub passfd_listener_port: Option, } diff --git a/src/runtime-rs/crates/hypervisor/src/lib.rs b/src/runtime-rs/crates/hypervisor/src/lib.rs index 24987447d..c8d460167 100644 --- a/src/runtime-rs/crates/hypervisor/src/lib.rs +++ b/src/runtime-rs/crates/hypervisor/src/lib.rs @@ -115,4 +115,5 @@ pub trait Hypervisor: std::fmt::Debug + Send + Sync { async fn set_capabilities(&self, flag: CapabilityBits); async fn set_guest_memory_block_size(&self, size: u32); async fn guest_memory_block_size(&self) -> u32; + async fn get_passfd_listener_addr(&self) -> Result<(String, u32)>; } diff --git a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs index f73d7e352..57afe0b21 100644 --- a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs @@ -179,6 +179,10 @@ impl Hypervisor for Qemu { let inner = self.inner.read().await; inner.resize_memory(new_mem_mb) } + + async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> { + Err(anyhow::anyhow!("Not yet supported")) + } } #[async_trait] diff --git a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml index 4346d8560..9b83a1724 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml +++ b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml @@ -15,6 +15,7 @@ lazy_static = "1.4.0" libc = ">=0.2.39" nix = "0.24.2" protobuf = "3.2.0" +sendfd = { version = "0.4.3", features = ["tokio"] } serde = { version = "1.0.100", features = ["derive"] } serde_derive = "1.0.27" serde_json = "1.0.82" diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs index 48ba7dd41..0cfc88409 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs @@ -43,15 +43,17 @@ pub struct Container { agent: Arc, resource_manager: Arc, logger: slog::Logger, + pub(crate) passfd_listener_addr: Option<(String, u32)>, } impl Container { - pub fn new( + pub async fn new( pid: u32, config: ContainerConfig, spec: oci::Spec, agent: Arc, resource_manager: Arc, + passfd_listener_addr: Option<(String, u32)>, ) -> Result { let container_id = ContainerID::new(&config.container_id).context("new container id")?; let logger = sl!().new(o!("container_id" => config.container_id.clone())); @@ -84,6 +86,7 @@ impl Container { agent, resource_manager, logger, + passfd_listener_addr, }) } @@ -184,6 +187,17 @@ impl Container { } } + // In passfd io mode, we create vsock connections for io in advance + // and pass port info to agent in `CreateContainerRequest`. + // These vsock connections will be used as stdin/stdout/stderr of the container process. + // See agent/src/passfd_io.rs for more details. + if let Some((hvsock_uds_path, passfd_port)) = &self.passfd_listener_addr { + inner + .init_process + .passfd_io_init(hvsock_uds_path, *passfd_port) + .await?; + } + // create container let r = agent::CreateContainerRequest { process_id: agent::ContainerProcessID::new(&config.container_id, ""), @@ -192,6 +206,21 @@ impl Container { sandbox_pidns, devices: devices_agent, shared_mounts, + stdin_port: inner + .init_process + .passfd_io + .as_ref() + .and_then(|io| io.stdin_port), + stdout_port: inner + .init_process + .passfd_io + .as_ref() + .and_then(|io| io.stdout_port), + stderr_port: inner + .init_process + .passfd_io + .as_ref() + .and_then(|io| io.stderr_port), ..Default::default() }; @@ -217,21 +246,40 @@ impl Container { return Err(err); } - let container_io = inner.new_container_io(process).await?; - inner - .init_process - .start_io_and_wait(containers, self.agent.clone(), container_io) - .await?; + if self.passfd_listener_addr.is_some() { + inner + .init_process + .passfd_io_wait(containers, self.agent.clone()) + .await?; + } else { + let container_io = inner.new_container_io(process).await?; + inner + .init_process + .start_io_and_wait(containers, self.agent.clone(), container_io) + .await?; + } } ProcessType::Exec => { + // In passfd io mode, we create vsock connections for io in advance + // and pass port info to agent in `ExecProcessRequest`. + // These vsock connections will be used as stdin/stdout/stderr of the exec process. + // See agent/src/passfd_io.rs for more details. + if let Some((hvsock_uds_path, passfd_port)) = &self.passfd_listener_addr { + let exec = inner + .exec_processes + .get_mut(&process.exec_id) + .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; + exec.process + .passfd_io_init(hvsock_uds_path, *passfd_port) + .await?; + } + if let Err(e) = inner.start_exec_process(process).await { let device_manager = self.resource_manager.get_device_manager().await; let _ = inner.stop_process(process, true, &device_manager).await; return Err(e).context("enter process"); } - let container_io = inner.new_container_io(process).await.context("io stream")?; - { let exec = inner .exec_processes @@ -245,13 +293,29 @@ impl Container { } } - // start io and wait - { + if self.passfd_listener_addr.is_some() { + // In passfd io mode, we don't bother with the IO. + // We send `WaitProcessRequest` immediately to the agent + // and wait for the response in a separate thread. + // The agent will only respond after IO is done. let exec = inner .exec_processes .get_mut(&process.exec_id) .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; + exec.process + .passfd_io_wait(containers, self.agent.clone()) + .await?; + } else { + // In legacy io mode, we handle IO by polling the agent. + // When IO is done, we send `WaitProcessRequest` to agent + // to get the exit status. + let container_io = + inner.new_container_io(process).await.context("io stream")?; + let exec = inner + .exec_processes + .get_mut(&process.exec_id) + .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; exec.process .start_io_and_wait(containers, self.agent.clone(), container_io) .await diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs index 8c5e766d8..1b6fcb2aa 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs @@ -87,6 +87,17 @@ impl ContainerInner { process_id: process.clone().into(), string_user: None, process: Some(exec.oci_process.clone()), + stdin_port: exec.process.passfd_io.as_ref().and_then(|io| io.stdin_port), + stdout_port: exec + .process + .passfd_io + .as_ref() + .and_then(|io| io.stdout_port), + stderr_port: exec + .process + .passfd_io + .as_ref() + .and_then(|io| io.stderr_port), }) .await .context("exec process")?; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs index 3c6ca719b..a72eecac2 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs @@ -6,5 +6,7 @@ mod container_io; pub use container_io::ContainerIo; +mod passfd_io; mod shim_io; +pub use passfd_io::PassfdIo; pub use shim_io::ShimIo; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs new file mode 100644 index 000000000..169c3b68d --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs @@ -0,0 +1,135 @@ +// Copyright 2024 Kata Contributors +// +// SPDX-License-Identifier: Apache-2.0 +// +use anyhow::{anyhow, Context, Result}; +use sendfd::SendWithFd; +use std::{ + fs::OpenOptions, + os::fd::{AsRawFd, OwnedFd}, + os::unix::fs::OpenOptionsExt, +}; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::UnixStream, +}; + +// Note: the fd will be closed after passing +async fn passfd_connect(uds: &str, port: u32, fd: OwnedFd) -> Result { + info!(sl!(), "passfd uds {:?} port {}", &uds, port); + let mut stream = UnixStream::connect(&uds).await.context("connect")?; + stream.write_all(b"passfd\n").await.context("write all")?; + + // We want the io connection keep connected when the containerd closed the io pipe, + // thus it can be attached on the io stream. + let buf = format!("{} keep", port); + stream + .send_with_fd(buf.as_bytes(), &[fd.as_raw_fd()]) + .context("send port and fd")?; + + let mut reads = BufReader::new(&mut stream); + let mut response = String::new(); + reads.read_line(&mut response).await.context("read line")?; + + // parse response like "OK port" + let mut iter = response.split_whitespace(); + if iter.next() != Some("OK") { + return Err(anyhow!( + "handshake error: malformed response code: {:?}", + response + )); + } + let hostport = iter + .next() + .ok_or_else(|| anyhow!("handshake error: malformed response code: {:?}", response))? + .parse::() + .context("handshake error: malformed response code")?; + Ok(hostport) +} + +#[derive(Debug, Default)] +pub struct PassfdIo { + stdin: Option, + stdout: Option, + stderr: Option, + + pub stdin_port: Option, + pub stdout_port: Option, + pub stderr_port: Option, +} + +impl PassfdIo { + pub async fn new( + stdin: Option, + stdout: Option, + stderr: Option, + ) -> Self { + Self { + stdin, + stdout, + stderr, + ..Default::default() + } + } + + pub async fn open_and_passfd( + &mut self, + uds_path: &str, + passfd_port: u32, + terminal: bool, + ) -> Result<()> { + // In linux, when a FIFO is opened and there are no writers, the reader + // will continuously receive the HUP event. This can be problematic + // when creating containers in detached mode, as the stdin FIFO writer + // is closed after the container is created, resulting in this situation. + // + // See: https://stackoverflow.com/questions/15055065/o-rdwr-on-named-pipes-with-poll + if let Some(stdin) = &self.stdin { + let fin = OpenOptions::new() + .read(true) + .write(true) + .custom_flags(libc::O_NONBLOCK) + .open(stdin) + .context("open stdin")?; + + let hostport = passfd_connect(uds_path, passfd_port, fin.into()) + .await + .context("passfd")?; + + self.stdin_port = Some(hostport); + } + + if let Some(stdout) = &self.stdout { + let fout = OpenOptions::new() + .write(true) + .custom_flags(libc::O_NONBLOCK) + .open(stdout) + .context("open stdout")?; + + let hostport = passfd_connect(uds_path, passfd_port, fout.into()) + .await + .context("passfd")?; + + self.stdout_port = Some(hostport); + } + + if !terminal { + // stderr is not used in terminal mode + if let Some(stderr) = &self.stderr { + let ferr = OpenOptions::new() + .write(true) + .custom_flags(libc::O_NONBLOCK) + .open(stderr) + .context("open stderr")?; + + let hostport = passfd_connect(uds_path, passfd_port, ferr.into()) + .await + .context("passfd")?; + + self.stderr_port = Some(hostport); + } + } + + Ok(()) + } +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs index f6a6553e9..5fc31eec9 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs @@ -77,7 +77,9 @@ impl ContainerManager for VirtContainerManager { spec.clone(), self.agent.clone(), self.resource_manager.clone(), + self.hypervisor.get_passfd_listener_addr().await.ok(), ) + .await .context("new container")?; // CreateContainer Hooks: diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs index cd73134dd..6cc755770 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs @@ -15,7 +15,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::{watch, RwLock}; use super::container::Container; -use super::io::{ContainerIo, ShimIo}; +use super::io::{ContainerIo, PassfdIo, ShimIo}; use super::logger_with_process; pub type ProcessWatcher = ( @@ -46,6 +46,9 @@ pub struct Process { // close io call should wait until the stdin io copy finished to // prevent stdin data lost. pub wg_stdin: WaitGroup, + + // io streams using vsock fd passthrough feature + pub passfd_io: Option, } impl Process { @@ -76,9 +79,95 @@ impl Process { exit_watcher_rx: Some(receiver), exit_watcher_tx: Some(sender), wg_stdin: WaitGroup::new(), + passfd_io: None, } } + /// Init the `passfd_io` struct and vsock connections for io to the agent. + pub async fn passfd_io_init(&mut self, hvsock_uds_path: &str, passfd_port: u32) -> Result<()> { + info!(self.logger, "passfd io init"); + + let mut passfd_io = + PassfdIo::new(self.stdin.clone(), self.stdout.clone(), self.stderr.clone()).await; + + passfd_io + .open_and_passfd(hvsock_uds_path, passfd_port, self.terminal) + .await + .context("passfd connect")?; + + self.passfd_io = Some(passfd_io); + + Ok(()) + } + + /// (After process started) Send a WaitProcessRequest to agent in the + /// seperate thread. + /// This function is only used in passfd io mode. + pub async fn passfd_io_wait( + &mut self, + containers: Arc>>, + agent: Arc, + ) -> Result<()> { + let logger = self.logger.clone(); + info!(logger, "start passfd io wait"); + let process = self.process.clone(); + let exit_status = self.exit_status.clone(); + let exit_notifier = self.exit_watcher_tx.take(); + let status = self.status.clone(); + + tokio::spawn(async move { + let req = agent::WaitProcessRequest { + process_id: process.clone().into(), + }; + + info!(logger, "begin passfd io wait process"); + let resp = match agent.wait_process(req).await { + Ok(ret) => ret, + Err(e) => { + error!(logger, "failed to passfd io wait process {:?}", e); + return; + } + }; + + info!( + logger, + "end passfd io wait process exit code {}", resp.status + ); + + let containers = containers.read().await; + let container_id = &process.container_id.container_id; + let c = match containers.get(container_id) { + Some(c) => c, + None => { + error!( + logger, + "Failed to stop process, since container {} not found", container_id + ); + return; + } + }; + + if let Err(err) = c.stop_process(&process).await { + error!( + logger, + "Failed to stop process, process = {:?}, err = {:?}", process, err + ); + } + + let mut exit_status = exit_status.write().await; + exit_status.update_exit_code(resp.status); + drop(exit_status); + + let mut status = status.write().await; + *status = ProcessStatus::Stopped; + drop(status); + + drop(exit_notifier); + info!(logger, "end passfd io wait thread"); + }); + Ok(()) + } + pub async fn start_io_and_wait( &mut self, containers: Arc>>, @@ -246,15 +335,20 @@ impl Process { *status = ProcessStatus::Stopped; } + /// Close the stdin of the process in container. pub async fn close_io(&mut self, agent: Arc) { - self.wg_stdin.wait().await; + // In passfd io mode, the stdin close and sync logic is handled + // in the agent side. + if self.passfd_io.is_none() { + self.wg_stdin.wait().await; + } let req = agent::CloseStdinRequest { process_id: self.process.clone().into(), }; if let Err(e) = agent.close_stdin(req).await { - warn!(self.logger, "failed clsoe process io: {:?}", e); + warn!(self.logger, "failed close process io: {:?}", e); } } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs b/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs index 60a3a6eb0..f6dbdc4ea 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs @@ -140,6 +140,11 @@ async fn new_hypervisor(toml_config: &TomlConfig) -> Result> hypervisor .set_hypervisor_config(hypervisor_config.clone()) .await; + if toml_config.runtime.use_passfd_io { + hypervisor + .set_passfd_listener_port(toml_config.runtime.passfd_listener_port) + .await; + } Ok(Arc::new(hypervisor)) } HYPERVISOR_QEMU => { diff --git a/src/tools/runk/libcontainer/src/container.rs b/src/tools/runk/libcontainer/src/container.rs index c7c3e068b..5a2bb0a6c 100644 --- a/src/tools/runk/libcontainer/src/container.rs +++ b/src/tools/runk/libcontainer/src/container.rs @@ -294,6 +294,7 @@ impl ContainerLauncher { &self.id, self.init, 0, + None, )?) } else { Err(anyhow!("no process configuration"))