From 973b5ad1f497c6e5bfd771ccfd02953245cead8b Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Fri, 28 Jul 2023 00:26:52 +0800 Subject: [PATCH 01/16] runtime-rs: make Container::new async Fixes: #6714 Signed-off-by: Zixuan Tan --- .../runtimes/virt_container/src/container_manager/container.rs | 2 +- .../runtimes/virt_container/src/container_manager/manager.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs index 48ba7dd41..cb7548fea 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs @@ -46,7 +46,7 @@ pub struct Container { } impl Container { - pub fn new( + pub async fn new( pid: u32, config: ContainerConfig, spec: oci::Spec, diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs index f6a6553e9..c532ccaca 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs @@ -78,6 +78,7 @@ impl ContainerManager for VirtContainerManager { self.agent.clone(), self.resource_manager.clone(), ) + .await .context("new container")?; // CreateContainer Hooks: From eb6bb6fe0d9eb80adaf23308747cd9f3f49c1a95 Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Fri, 28 Jul 2023 00:46:56 +0800 Subject: [PATCH 02/16] config: add two options to control vsock passthrough io feature Two toml options, `use_passfd_io` and `passfd_listener_port` are introduced to enable and configure dragonball's vsock fd passthrough io feature. This commit is a preparation for vsock fd passthrough io feature. Fixes: #6714 Signed-off-by: Zixuan Tan --- src/agent/src/config.rs | 12 ++++++++++++ src/libs/kata-types/src/config/agent.rs | 10 ++++++++++ src/libs/kata-types/src/config/default.rs | 1 + src/libs/kata-types/src/config/mod.rs | 2 ++ src/libs/kata-types/src/config/runtime.rs | 12 ++++++++++++ .../config/configuration-dragonball.toml.in | 7 +++++++ .../crates/hypervisor/src/dragonball/inner.rs | 19 ++++++++++++++++++- .../crates/hypervisor/src/dragonball/mod.rs | 5 +++++ .../hypervisor/src/hypervisor_persist.rs | 1 + .../src/container_manager/container.rs | 2 ++ .../crates/runtimes/virt_container/src/lib.rs | 5 +++++ 11 files changed, 75 insertions(+), 1 deletion(-) diff --git a/src/agent/src/config.rs b/src/agent/src/config.rs index 75c0a245c..abb8be024 100644 --- a/src/agent/src/config.rs +++ b/src/agent/src/config.rs @@ -18,6 +18,7 @@ const DEV_MODE_FLAG: &str = "agent.devmode"; const TRACE_MODE_OPTION: &str = "agent.trace"; const LOG_LEVEL_OPTION: &str = "agent.log"; const SERVER_ADDR_OPTION: &str = "agent.server_addr"; +const PASSFD_LISTENER_PORT: &str = "agent.passfd_listener_port"; const HOTPLUG_TIMOUT_OPTION: &str = "agent.hotplug_timeout"; const DEBUG_CONSOLE_VPORT_OPTION: &str = "agent.debug_console_vport"; const LOG_VPORT_OPTION: &str = "agent.log_vport"; @@ -61,6 +62,7 @@ pub struct AgentConfig { pub log_vport: i32, pub container_pipe_size: i32, pub server_addr: String, + pub passfd_listener_port: i32, pub unified_cgroup_hierarchy: bool, pub tracing: bool, pub supports_seccomp: bool, @@ -76,6 +78,7 @@ pub struct AgentConfigBuilder { pub log_vport: Option, pub container_pipe_size: Option, pub server_addr: Option, + pub passfd_listener_port: Option, pub unified_cgroup_hierarchy: Option, pub tracing: Option, } @@ -135,6 +138,7 @@ impl Default for AgentConfig { log_vport: 0, container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE, server_addr: format!("{}:{}", VSOCK_ADDR, DEFAULT_AGENT_VSOCK_PORT), + passfd_listener_port: 0, unified_cgroup_hierarchy: false, tracing: false, supports_seccomp: rpc::have_seccomp(), @@ -164,6 +168,7 @@ impl FromStr for AgentConfig { config_override!(agent_config_builder, agent_config, log_vport); config_override!(agent_config_builder, agent_config, container_pipe_size); config_override!(agent_config_builder, agent_config, server_addr); + config_override!(agent_config_builder, agent_config, passfd_listener_port); config_override!(agent_config_builder, agent_config, unified_cgroup_hierarchy); config_override!(agent_config_builder, agent_config, tracing); @@ -245,6 +250,13 @@ impl AgentConfig { get_vsock_port, |port| port > 0 ); + parse_cmdline_param!( + param, + PASSFD_LISTENER_PORT, + config.passfd_listener_port, + get_vsock_port, + |port| port > 0 + ); parse_cmdline_param!( param, diff --git a/src/libs/kata-types/src/config/agent.rs b/src/libs/kata-types/src/config/agent.rs index 1ac7515ca..7f1fae555 100644 --- a/src/libs/kata-types/src/config/agent.rs +++ b/src/libs/kata-types/src/config/agent.rs @@ -11,6 +11,7 @@ pub use vendor::AgentVendor; use super::default::{ DEFAULT_AGENT_DIAL_TIMEOUT_MS, DEFAULT_AGENT_LOG_PORT, DEFAULT_AGENT_VSOCK_PORT, + DEFAULT_PASSFD_LISTENER_PORT, }; use crate::eother; @@ -60,6 +61,10 @@ pub struct Agent { #[serde(default = "default_log_port")] pub log_port: u32, + /// Agent process io port + #[serde(default = "default_passfd_listener_port")] + pub passfd_listener_port: u32, + /// Agent connection dialing timeout value in millisecond #[serde(default = "default_dial_timeout")] pub dial_timeout_ms: u32, @@ -104,6 +109,7 @@ impl std::default::Default for Agent { debug_console_enabled: false, server_port: DEFAULT_AGENT_VSOCK_PORT, log_port: DEFAULT_AGENT_LOG_PORT, + passfd_listener_port: DEFAULT_PASSFD_LISTENER_PORT, dial_timeout_ms: DEFAULT_AGENT_DIAL_TIMEOUT_MS, reconnect_timeout_ms: 3_000, request_timeout_ms: 30_000, @@ -126,6 +132,10 @@ fn default_log_port() -> u32 { DEFAULT_AGENT_LOG_PORT } +fn default_passfd_listener_port() -> u32 { + DEFAULT_PASSFD_LISTENER_PORT +} + fn default_dial_timeout() -> u32 { // ms 10 diff --git a/src/libs/kata-types/src/config/default.rs b/src/libs/kata-types/src/config/default.rs index 0dfd5d09e..d269ea1ab 100644 --- a/src/libs/kata-types/src/config/default.rs +++ b/src/libs/kata-types/src/config/default.rs @@ -25,6 +25,7 @@ pub const DEFAULT_AGENT_NAME: &str = "kata-agent"; pub const DEFAULT_AGENT_VSOCK_PORT: u32 = 1024; pub const DEFAULT_AGENT_LOG_PORT: u32 = 1025; pub const DEFAULT_AGENT_DBG_CONSOLE_PORT: u32 = 1026; +pub const DEFAULT_PASSFD_LISTENER_PORT: u32 = 1027; pub const DEFAULT_AGENT_TYPE_NAME: &str = AGENT_NAME_KATA; pub const DEFAULT_AGENT_DIAL_TIMEOUT_MS: u32 = 10; diff --git a/src/libs/kata-types/src/config/mod.rs b/src/libs/kata-types/src/config/mod.rs index dcfbadf3f..beb93e697 100644 --- a/src/libs/kata-types/src/config/mod.rs +++ b/src/libs/kata-types/src/config/mod.rs @@ -54,6 +54,8 @@ pub const DEBUG_CONSOLE_VPORT_OPTION: &str = "agent.debug_console_vport"; pub const LOG_VPORT_OPTION: &str = "agent.log_vport"; /// Option of setting the container's pipe size pub const CONTAINER_PIPE_SIZE_OPTION: &str = "agent.container_pipe_size"; +/// Option of setting the fd passthrough io listener port +pub const PASSFD_LISTENER_PORT: &str = "agent.passfd_listener_port"; /// Trait to manipulate global Kata configuration information. pub trait ConfigPlugin: Send + Sync { diff --git a/src/libs/kata-types/src/config/runtime.rs b/src/libs/kata-types/src/config/runtime.rs index 99820dd5f..0954f5039 100644 --- a/src/libs/kata-types/src/config/runtime.rs +++ b/src/libs/kata-types/src/config/runtime.rs @@ -167,6 +167,18 @@ pub struct Runtime { /// shared_mount declarations #[serde(default)] pub shared_mounts: Vec, + + /// If enabled, the runtime will attempt to use fd passthrough feature for process io. + #[serde(default)] + pub use_passfd_io: bool, + + /// If fd passthrough io is enabled, the runtime will attempt to use the specified port instead of the default port. + #[serde(default = "default_passfd_listener_port")] + pub passfd_listener_port: u32, +} + +fn default_passfd_listener_port() -> u32 { + default::DEFAULT_PASSFD_LISTENER_PORT } impl ConfigOps for Runtime { diff --git a/src/runtime-rs/config/configuration-dragonball.toml.in b/src/runtime-rs/config/configuration-dragonball.toml.in index 7f08a7db9..5a0a6db20 100644 --- a/src/runtime-rs/config/configuration-dragonball.toml.in +++ b/src/runtime-rs/config/configuration-dragonball.toml.in @@ -372,3 +372,10 @@ sandbox_bind_mounts=@DEFBINDMOUNTS@ # to the hypervisor. # (default: /run/kata-containers/dans) dan_conf = "@DEFDANCONF@" + +# If enabled, the runtime will attempt to use fd passthrough feature for process io. +# Note: this feature is only supported by the Dragonball hypervisor. +use_passfd_io = true + +# If fd passthrough io is enabled, the runtime will attempt to use the specified port instead of the default port. +# passfd_listener_port = 1027 diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs index 28f04296f..bfe260178 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/inner.rs @@ -22,7 +22,7 @@ use kata_types::{ capabilities::{Capabilities, CapabilityBits}, config::{ hypervisor::{HugePageType, Hypervisor as HypervisorConfig}, - KATA_PATH, + KATA_PATH, PASSFD_LISTENER_PORT, }, }; use nix::mount::MsFlags; @@ -80,6 +80,10 @@ pub struct DragonballInner { /// the balloon size pub(crate) balloon_size: u32, + + /// guest-side fd passthrough io listener port, used to initialize + /// connections for io + pub(crate) passfd_listener_port: Option, } impl DragonballInner { @@ -108,6 +112,7 @@ impl DragonballInner { guest_memory_block_size_mb: 0, mem_hotplug_size_mb: 0, balloon_size: 0, + passfd_listener_port: None, } } @@ -128,6 +133,12 @@ impl DragonballInner { kernel_params.append(&mut KernelParams::from_string( &self.config.boot_info.kernel_params, )); + if let Some(passfd_listener_port) = self.passfd_listener_port { + kernel_params.append(&mut KernelParams::from_string(&format!( + "{}={}", + PASSFD_LISTENER_PORT, passfd_listener_port + ))); + } info!(sl!(), "prepared kernel_params={:?}", kernel_params); // set boot source @@ -458,6 +469,10 @@ impl DragonballInner { pub(crate) fn guest_memory_block_size_mb(&self) -> u32 { self.guest_memory_block_size_mb } + + pub fn set_passfd_listener_port(&mut self, port: u32) { + self.passfd_listener_port = Some(port); + } } #[async_trait] @@ -477,6 +492,7 @@ impl Persist for DragonballInner { config: self.hypervisor_config(), run_dir: self.run_dir.clone(), cached_block_devices: self.cached_block_devices.clone(), + passfd_listener_port: self.passfd_listener_port, ..Default::default() }) } @@ -502,6 +518,7 @@ impl Persist for DragonballInner { guest_memory_block_size_mb: 0, mem_hotplug_size_mb: 0, balloon_size: 0, + passfd_listener_port: hypervisor_state.passfd_listener_port, }) } } diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs index bec411b88..980f0e0eb 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs @@ -55,6 +55,11 @@ impl Dragonball { let mut inner = self.inner.write().await; inner.set_hypervisor_config(config) } + + pub async fn set_passfd_listener_port(&mut self, port: u32) { + let mut inner = self.inner.write().await; + inner.set_passfd_listener_port(port) + } } #[async_trait] diff --git a/src/runtime-rs/crates/hypervisor/src/hypervisor_persist.rs b/src/runtime-rs/crates/hypervisor/src/hypervisor_persist.rs index ea870f342..1ae231d83 100644 --- a/src/runtime-rs/crates/hypervisor/src/hypervisor_persist.rs +++ b/src/runtime-rs/crates/hypervisor/src/hypervisor_persist.rs @@ -33,4 +33,5 @@ pub struct HypervisorState { /// cached block device pub cached_block_devices: HashSet, pub virtiofs_daemon_pid: i32, + pub passfd_listener_port: Option, } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs index cb7548fea..27563ea5d 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs @@ -43,6 +43,7 @@ pub struct Container { agent: Arc, resource_manager: Arc, logger: slog::Logger, + passfd_listener_addr: Option<(String, u32)>, } impl Container { @@ -84,6 +85,7 @@ impl Container { agent, resource_manager, logger, + None, }) } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs b/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs index 60a3a6eb0..f6dbdc4ea 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs @@ -140,6 +140,11 @@ async fn new_hypervisor(toml_config: &TomlConfig) -> Result> hypervisor .set_hypervisor_config(hypervisor_config.clone()) .await; + if toml_config.runtime.use_passfd_io { + hypervisor + .set_passfd_listener_port(toml_config.runtime.passfd_listener_port) + .await; + } Ok(Arc::new(hypervisor)) } HYPERVISOR_QEMU => { From 442df71fe592a42725124b3a8d062b16d4ce623b Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Fri, 28 Jul 2023 16:52:38 +0800 Subject: [PATCH 03/16] agent,runtime-rs: refactor process io using vsock fd passthrough feature Currently in the kata container, every io read/write operation requires an RPC request from the runtime to the agent. This process involves data copying into/from an RPC request/response, which are high overhead. To solve this issue, this commit utilize the vsock fd passthrough, a newly introduced feature in the Dragonball hypervisor. This feature allows other host programs to pass a file descriptor to the Dragonball process, directly as the backend of an ordinary hybrid vsock connection. The runtime-rs now utilizes this feature for container process io. It open the stdin/stdout/stderr fifo from containerd, and pass them to Dragonball, then don't bother with process io any more, eliminating the need for an RPC for each io read/write operation. In passfd io mode, the agent uses the vsock connections as the child process's stdin/stdout/stderr, eliminating the need for a pipe to bump data (in non-tty mode). Fixes: #6714 Signed-off-by: Zixuan Tan --- src/agent/Cargo.lock | 8 ++ src/agent/rustjail/Cargo.toml | 2 + src/agent/rustjail/src/container.rs | 53 ++++++-- src/agent/rustjail/src/process.rs | 81 ++++++++++-- src/agent/src/main.rs | 7 ++ src/agent/src/passfd_io.rs | 65 ++++++++++ src/agent/src/rpc.rs | 22 +++- src/agent/src/sandbox.rs | 13 +- src/agent/src/signal.rs | 5 + src/libs/protocols/protos/agent.proto | 16 +++ src/runtime-rs/Cargo.lock | 2 + src/runtime-rs/crates/agent/src/kata/trans.rs | 6 + src/runtime-rs/crates/agent/src/types.rs | 6 + .../crates/hypervisor/src/ch/mod.rs | 4 + .../src/dragonball/inner_hypervisor.rs | 11 +- .../crates/hypervisor/src/dragonball/mod.rs | 5 + src/runtime-rs/crates/hypervisor/src/lib.rs | 1 + .../crates/hypervisor/src/qemu/mod.rs | 4 + .../crates/runtimes/virt_container/Cargo.toml | 1 + .../src/container_manager/container.rs | 87 +++++++++++-- .../src/container_manager/container_inner.rs | 18 +++ .../src/container_manager/io/mod.rs | 2 + .../src/container_manager/io/passfd_io.rs | 118 ++++++++++++++++++ .../src/container_manager/manager.rs | 1 + .../src/container_manager/process.rs | 101 ++++++++++++++- 25 files changed, 603 insertions(+), 36 deletions(-) create mode 100644 src/agent/src/passfd_io.rs create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 8b60b429c..1f5da937b 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -220,6 +220,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "awaitgroup" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc17ab023b4091c10ff099f9deebaeeb59b5189df07e554c4fef042b70745d68" + [[package]] name = "base64" version = "0.13.0" @@ -2490,6 +2496,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "awaitgroup", "bit-vec", "capctl", "caps", @@ -2519,6 +2526,7 @@ dependencies = [ "tempfile", "test-utils", "tokio", + "tokio-vsock", "xattr", "zbus", ] diff --git a/src/agent/rustjail/Cargo.toml b/src/agent/rustjail/Cargo.toml index 9c3cabb72..e8499832b 100644 --- a/src/agent/rustjail/Cargo.toml +++ b/src/agent/rustjail/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "Apache-2.0" [dependencies] +awaitgroup = "0.6.0" serde = "1.0.91" serde_json = "1.0.39" serde_derive = "1.0.91" @@ -30,6 +31,7 @@ rlimit = "0.5.3" cfg-if = "0.1.0" tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt"] } +tokio-vsock = "0.3.1" futures = "0.3.17" async-trait = "0.1.31" inotify = "0.9.2" diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 897c5a3c8..1a29d90c8 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -14,6 +14,7 @@ use std::fs; use std::os::unix::io::RawFd; use std::path::{Path, PathBuf}; use std::time::SystemTime; +use tokio::fs::File; use cgroups::freezer::FreezerState; @@ -989,13 +990,47 @@ impl BaseContainer for LinuxContainer { child_stdin = unsafe { std::process::Stdio::from_raw_fd(pseudo.slave) }; child_stdout = unsafe { std::process::Stdio::from_raw_fd(pseudo.slave) }; child_stderr = unsafe { std::process::Stdio::from_raw_fd(pseudo.slave) }; + + if let Some(proc_io) = &mut p.proc_io { + if let Some(mut stdin_stream) = proc_io.stdin.take() { + let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; + let logger = logger.clone(); + tokio::spawn(async move { + let res = tokio::io::copy(&mut stdin_stream, &mut term_master).await; + debug!(logger, "copy from stdin to term_master end: {:?}", res); + std::mem::forget(term_master); // Avoid auto closing of term_master + }); + } + + if let Some(mut stdout_stream) = proc_io.stdout.take() { + let wgw_output = proc_io.wg_output.worker(); + let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; + let logger = logger.clone(); + tokio::spawn(async move { + let res = tokio::io::copy(&mut term_master, &mut stdout_stream).await; + debug!(logger, "copy from term_master to stdout end: {:?}", res); + wgw_output.done(); + std::mem::forget(term_master); // Avoid auto closing of term_master + }); + } + } } else { - let stdin = p.stdin.unwrap(); - let stdout = p.stdout.unwrap(); - let stderr = p.stderr.unwrap(); - child_stdin = unsafe { std::process::Stdio::from_raw_fd(stdin) }; - child_stdout = unsafe { std::process::Stdio::from_raw_fd(stdout) }; - child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) }; + // Allow null io in passfd io mode when vsock streams are not provided + child_stdin = if let Some(stdin) = p.stdin { + unsafe { std::process::Stdio::from_raw_fd(stdin) } + } else { + std::process::Stdio::null() + }; + child_stdout = if let Some(stdout) = p.stdout { + unsafe { std::process::Stdio::from_raw_fd(stdout) } + } else { + std::process::Stdio::null() + }; + child_stderr = if let Some(stderr) = p.stderr { + unsafe { std::process::Stdio::from_raw_fd(stderr) } + } else { + std::process::Stdio::null() + }; } let pidns = get_pid_namespace(&self.logger, linux)?; @@ -1904,7 +1939,7 @@ mod tests { let _ = new_linux_container_and_then(|mut c: LinuxContainer| { c.processes.insert( 1, - Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap(), + Process::new(&sl(), &oci::Process::default(), "123", true, 1, None).unwrap(), ); let p = c.get_process("123"); assert!(p.is_ok(), "Expecting Ok, Got {:?}", p); @@ -1931,7 +1966,7 @@ mod tests { let (c, _dir) = new_linux_container(); let ret = c .unwrap() - .start(Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap()) + .start(Process::new(&sl(), &oci::Process::default(), "123", true, 1, None).unwrap()) .await; assert!(ret.is_err(), "Expecting Err, Got {:?}", ret); } @@ -1941,7 +1976,7 @@ mod tests { let (c, _dir) = new_linux_container(); let ret = c .unwrap() - .run(Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap()) + .run(Process::new(&sl(), &oci::Process::default(), "123", true, 1, None).unwrap()) .await; assert!(ret.is_err(), "Expecting Err, Got {:?}", ret); } diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index cdecae130..a568ed854 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -7,6 +7,7 @@ use libc::pid_t; use std::fs::File; use std::os::unix::io::{AsRawFd, RawFd}; use tokio::sync::mpsc::Sender; +use tokio_vsock::VsockStream; use nix::errno::Errno; use nix::fcntl::{fcntl, FcntlArg, OFlag}; @@ -18,6 +19,7 @@ use oci::Process as OCIProcess; use slog::Logger; use crate::pipestream::PipeStream; +use awaitgroup::WaitGroup; use std::collections::HashMap; use std::sync::Arc; use tokio::io::{split, ReadHalf, WriteHalf}; @@ -47,6 +49,31 @@ pub enum StreamType { type Reader = Arc>>; type Writer = Arc>>; +#[derive(Debug)] +pub struct ProcessIo { + pub stdin: Option, + pub stdout: Option, + pub stderr: Option, + // used to wait for all process outputs to be copied to the vsock streams + // only used when tty is used. + pub wg_output: WaitGroup, +} + +impl ProcessIo { + pub fn new( + stdin: Option, + stdout: Option, + stderr: Option, + ) -> Self { + ProcessIo { + stdin, + stdout, + stderr, + wg_output: WaitGroup::new(), + } + } +} + #[derive(Debug)] pub struct Process { pub exec_id: String, @@ -74,6 +101,8 @@ pub struct Process { readers: HashMap, writers: HashMap, + + pub proc_io: Option, } pub trait ProcessOperations { @@ -105,6 +134,7 @@ impl Process { id: &str, init: bool, pipe_size: i32, + proc_io: Option, ) -> Result { let logger = logger.new(o!("subsystem" => "process")); let (exit_tx, exit_rx) = tokio::sync::watch::channel(false); @@ -131,6 +161,7 @@ impl Process { term_exit_notifier: Arc::new(Notify::new()), readers: HashMap::new(), writers: HashMap::new(), + proc_io, }; info!(logger, "before create console socket!"); @@ -143,17 +174,40 @@ impl Process { } else { info!(logger, "created console socket!"); - let (stdin, pstdin) = unistd::pipe2(OFlag::O_CLOEXEC)?; - p.parent_stdin = Some(pstdin); - p.stdin = Some(stdin); + if let Some(proc_io) = p.proc_io.as_mut() { + // In passfd io mode + if let Some(stdin) = proc_io.stdin.take() { + p.stdin = Some(stdin.as_raw_fd()); + std::mem::forget(stdin); + } + // p.stdin can be None if the connection for stdin is not provided. + } else { + let (stdin, pstdin) = unistd::pipe2(OFlag::O_CLOEXEC)?; + p.parent_stdin = Some(pstdin); + p.stdin = Some(stdin); + } - let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; - p.parent_stdout = Some(pstdout); - p.stdout = Some(stdout); + if let Some(proc_io) = p.proc_io.as_mut() { + if let Some(stdout) = proc_io.stdout.take() { + p.stdout = Some(stdout.as_raw_fd()); + std::mem::forget(stdout); + } + } else { + let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; + p.parent_stdout = Some(pstdout); + p.stdout = Some(stdout); + } - let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; - p.parent_stderr = Some(pstderr); - p.stderr = Some(stderr); + if let Some(proc_io) = p.proc_io.as_mut() { + if let Some(stderr) = proc_io.stderr.take() { + p.stderr = Some(stderr.as_raw_fd()); + std::mem::forget(stderr); + } + } else { + let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; + p.parent_stderr = Some(pstderr); + p.stderr = Some(stderr); + } } } Ok(p) @@ -164,6 +218,7 @@ impl Process { notify.notify_waiters(); } + /// won't be use in passfd io mode. pub fn close_stdin(&mut self) { close_process_stream!(self, term_master, TermMaster); close_process_stream!(self, parent_stdin, ParentStdin); @@ -172,6 +227,13 @@ impl Process { } pub fn cleanup_process_stream(&mut self) { + if let Some(_) = self.proc_io.take() { + // taken + + return; + } + + // legacy io mode close_process_stream!(self, parent_stdin, ParentStdin); close_process_stream!(self, parent_stdout, ParentStdout); close_process_stream!(self, parent_stderr, ParentStderr); @@ -277,6 +339,7 @@ mod tests { id, init, 32, + None, ); let mut process = process.unwrap(); diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index cb3abbe1e..1b7324e54 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -44,6 +44,7 @@ mod mount; mod namespace; mod netlink; mod network; +mod passfd_io; mod pci; pub mod random; mod sandbox; @@ -235,6 +236,12 @@ async fn real_main() -> std::result::Result<(), Box> { // XXX: Note that *ALL* spans needs to start after this point!! let span_guard = root_span.enter(); + // Start the fd passthrough io listener + let passfd_listener_port = config.passfd_listener_port as u32; + if passfd_listener_port != 0 { + passfd_io::start_listen(passfd_listener_port).await?; + } + // Start the sandbox and wait for its ttRPC server to end start_sandbox(&logger, config, init_mode, &mut tasks, shutdown_rx.clone()).await?; diff --git a/src/agent/src/passfd_io.rs b/src/agent/src/passfd_io.rs new file mode 100644 index 000000000..314f66abf --- /dev/null +++ b/src/agent/src/passfd_io.rs @@ -0,0 +1,65 @@ +use anyhow::Result; +use lazy_static::lazy_static; +use rustjail::process::ProcessIo; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio_vsock::SockAddr::Vsock; +use tokio_vsock::{VsockListener, VsockStream}; + +lazy_static! { + static ref HVSOCK_STREAMS: Arc>> = + Arc::new(Mutex::new(HashMap::new())); +} + +// Convenience function to obtain the scope logger. +fn sl() -> slog::Logger { + slog_scope::logger().new(o!("subsystem" => "passfd_io")) +} + +pub(crate) async fn start_listen(port: u32) -> Result<()> { + info!(sl(), "start listening on port {}", port); + let mut listener = VsockListener::bind(libc::VMADDR_CID_ANY, port)?; + tokio::spawn(async move { + loop { + if let Ok((stream, peer_addr)) = listener.accept().await { + if let Vsock(addr) = peer_addr { + let port = addr.port(); + info!(sl(), "accept connection from peer port {}", port); + HVSOCK_STREAMS.lock().await.insert(port, stream); + } + } + } + }); + Ok(()) +} + +async fn take_stream(port: u32) -> Option { + let mut mapping = HVSOCK_STREAMS.lock().await; + mapping.remove(&port) +} + +macro_rules! take_io_stream { + ($port: ident) => { + if $port == 0 { + None + } else { + take_stream($port).await + } + }; +} + +pub(crate) async fn take_io_streams( + stdin_port: u32, + stdout_port: u32, + stderr_port: u32, +) -> ProcessIo { + let stdin = take_io_stream!(stdin_port); + let stdout = take_io_stream!(stdout_port); + let stderr = take_io_stream!(stderr_port); + debug!( + sl(), + "take passfd io streams {} {} {}", stdin_port, stdout_port, stderr_port + ); + ProcessIo::new(stdin, stdout, stderr) +} diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 2ce66ecbc..b0e6d3f70 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -58,6 +58,7 @@ use crate::metrics::get_metrics; use crate::mount::baremount; use crate::namespace::{NSTYPEIPC, NSTYPEPID, NSTYPEUTS}; use crate::network::setup_guest_dns; +use crate::passfd_io; use crate::pci; use crate::random; use crate::sandbox::Sandbox; @@ -260,7 +261,15 @@ impl AgentService { let pipe_size = AGENT_CONFIG.container_pipe_size; let p = if let Some(p) = oci.process { - Process::new(&sl(), &p, cid.as_str(), true, pipe_size)? + let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 { + Some( + passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port) + .await, + ) + } else { + None + }; + Process::new(&sl(), &p, cid.as_str(), true, pipe_size, proc_io)? } else { info!(sl(), "no process configurations!"); return Err(anyhow!(nix::Error::EINVAL)); @@ -369,7 +378,15 @@ impl AgentService { let pipe_size = AGENT_CONFIG.container_pipe_size; let ocip = rustjail::process_grpc_to_oci(&process); - let p = Process::new(&sl(), &ocip, exec_id.as_str(), false, pipe_size)?; + + // signal_port != 0 indicates passfd io mode + let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 { + Some(passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port).await) + } else { + None + }; + + let p = Process::new(&sl(), &ocip, exec_id.as_str(), false, pipe_size, proc_io)?; let ctr = sandbox .get_container(&cid) @@ -2252,6 +2269,7 @@ mod tests { &exec_process_id.to_string(), false, 1, + None, ) .unwrap(); diff --git a/src/agent/src/sandbox.rs b/src/agent/src/sandbox.rs index 30f168f2c..e0a90c814 100644 --- a/src/agent/src/sandbox.rs +++ b/src/agent/src/sandbox.rs @@ -1006,12 +1006,20 @@ mod tests { // add init process linux_container.processes.insert( 1, - Process::new(&logger, &oci::Process::default(), "1", true, 1).unwrap(), + Process::new(&logger, &oci::Process::default(), "1", true, 1, None).unwrap(), ); // add exec process linux_container.processes.insert( 123, - Process::new(&logger, &oci::Process::default(), "exec-123", false, 1).unwrap(), + Process::new( + &logger, + &oci::Process::default(), + "exec-123", + false, + 1, + None, + ) + .unwrap(), ); s.add_container(linux_container); @@ -1058,6 +1066,7 @@ mod tests { "this_is_a_test_process", true, 1, + None, ) .unwrap(); // processes interally only have pids when manually set diff --git a/src/agent/src/signal.rs b/src/agent/src/signal.rs index 401ded953..eafec4bb0 100644 --- a/src/agent/src/signal.rs +++ b/src/agent/src/signal.rs @@ -69,6 +69,11 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc>) -> Result } }; + // In passfd io mode, when using tty, we need to wait for the copy task end. + if let Some(proc_io) = &mut p.proc_io { + proc_io.wg_output.wait().await; + } + p.exit_code = ret; let _ = p.exit_tx.take(); diff --git a/src/libs/protocols/protos/agent.proto b/src/libs/protocols/protos/agent.proto index bee72f066..ff44a46f8 100644 --- a/src/libs/protocols/protos/agent.proto +++ b/src/libs/protocols/protos/agent.proto @@ -93,6 +93,14 @@ message CreateContainerRequest { // This field is used to declare a set of shared mount points // that support cross-container sharing of mount objects. repeated SharedMount shared_mounts = 8; + + // These fields are the host-side vport numbers of passfd streams + // pre-created by runtime-rs, and used as identifiers for the agent + // to select the right streams for init process's stdin/stdout/stderr. + // Disable the feature by setting the associated port to 0. + uint32 stdin_port = 9; + uint32 stdout_port = 10; + uint32 stderr_port = 11; } message StartContainerRequest { @@ -115,6 +123,14 @@ message ExecProcessRequest { string exec_id = 2; StringUser string_user = 3; Process process = 4; + + // These fields are the host-side vport numbers of passfd streams + // pre-created by runtime-rs, and used as identifiers for the agent + // to select the right streams for process's stdin/stdout/stderr. + // Disable the feature by setting the associated port to 0. + uint32 stdin_port = 5; + uint32 stdout_port = 6; + uint32 stderr_port = 7; } message SignalProcessRequest { diff --git a/src/runtime-rs/Cargo.lock b/src/runtime-rs/Cargo.lock index 67178e586..89d850781 100644 --- a/src/runtime-rs/Cargo.lock +++ b/src/runtime-rs/Cargo.lock @@ -3448,6 +3448,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604b71b8fc267e13bb3023a2c901126c8f349393666a6d98ac1ae5729b701798" dependencies = [ "libc", + "tokio", ] [[package]] @@ -4452,6 +4453,7 @@ dependencies = [ "persist", "protobuf 3.2.0", "resource", + "sendfd", "serde", "serde_derive", "serde_json", diff --git a/src/runtime-rs/crates/agent/src/kata/trans.rs b/src/runtime-rs/crates/agent/src/kata/trans.rs index d7b00aba2..24fdda492 100644 --- a/src/runtime-rs/crates/agent/src/kata/trans.rs +++ b/src/runtime-rs/crates/agent/src/kata/trans.rs @@ -274,6 +274,9 @@ impl From for agent::CreateContainerRequest { OCI: from_option(from.oci), sandbox_pidns: from.sandbox_pidns, shared_mounts: trans_vec(from.shared_mounts), + stdin_port: from.stdin_port.unwrap_or_default(), + stdout_port: from.stdout_port.unwrap_or_default(), + stderr_port: from.stderr_port.unwrap_or_default(), ..Default::default() } } @@ -415,6 +418,9 @@ impl From for agent::ExecProcessRequest { exec_id: from.process_id.exec_id(), string_user: from_option(from.string_user), process: from_option(from.process), + stdin_port: from.stdin_port.unwrap_or_default(), + stdout_port: from.stdout_port.unwrap_or_default(), + stderr_port: from.stderr_port.unwrap_or_default(), ..Default::default() } } diff --git a/src/runtime-rs/crates/agent/src/types.rs b/src/runtime-rs/crates/agent/src/types.rs index 7cb3cfd44..bd7b9ff10 100644 --- a/src/runtime-rs/crates/agent/src/types.rs +++ b/src/runtime-rs/crates/agent/src/types.rs @@ -128,6 +128,9 @@ pub struct CreateContainerRequest { pub sandbox_pidns: bool, pub rootfs_mounts: Vec, pub shared_mounts: Vec, + pub stdin_port: Option, + pub stdout_port: Option, + pub stderr_port: Option, } #[derive(PartialEq, Clone, Default)] @@ -252,6 +255,9 @@ pub struct ExecProcessRequest { pub process_id: ContainerProcessID, pub string_user: Option, pub process: Option, + pub stdin_port: Option, + pub stdout_port: Option, + pub stderr_port: Option, } #[derive(PartialEq, Clone, Default, Debug)] diff --git a/src/runtime-rs/crates/hypervisor/src/ch/mod.rs b/src/runtime-rs/crates/hypervisor/src/ch/mod.rs index 9fe70e9d0..c6bcad8b6 100644 --- a/src/runtime-rs/crates/hypervisor/src/ch/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/ch/mod.rs @@ -182,6 +182,10 @@ impl Hypervisor for CloudHypervisor { let inner = self.inner.read().await; inner.resize_memory(new_mem_mb) } + + async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> { + Err(anyhow::anyhow!("Not yet supported")) + } } #[async_trait] diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/inner_hypervisor.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/inner_hypervisor.rs index 5de9ceb00..14b0d0219 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/inner_hypervisor.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/inner_hypervisor.rs @@ -9,7 +9,7 @@ use std::{ iter::FromIterator, }; -use anyhow::{Context, Ok, Result}; +use anyhow::{anyhow, Context, Ok, Result}; use kata_types::capabilities::Capabilities; use super::inner::DragonballInner; @@ -76,6 +76,15 @@ impl DragonballInner { )) } + /// Get the address of agent vsock server used to init connections for io + pub(crate) async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> { + if let Some(passfd_port) = self.passfd_listener_port { + Ok((get_hvsock_path(&self.id), passfd_port)) + } else { + Err(anyhow!("passfd io listener port not set")) + } + } + pub(crate) async fn get_hypervisor_metrics(&self) -> Result { info!(sl!(), "get hypervisor metrics"); self.vmm_instance.get_hypervisor_metrics() diff --git a/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs b/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs index 980f0e0eb..b8b1c6318 100644 --- a/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/dragonball/mod.rs @@ -203,6 +203,11 @@ impl Hypervisor for Dragonball { let mut inner = self.inner.write().await; inner.resize_memory(new_mem_mb) } + + async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> { + let inner = self.inner.read().await; + inner.get_passfd_listener_addr().await + } } #[async_trait] diff --git a/src/runtime-rs/crates/hypervisor/src/lib.rs b/src/runtime-rs/crates/hypervisor/src/lib.rs index 24987447d..c8d460167 100644 --- a/src/runtime-rs/crates/hypervisor/src/lib.rs +++ b/src/runtime-rs/crates/hypervisor/src/lib.rs @@ -115,4 +115,5 @@ pub trait Hypervisor: std::fmt::Debug + Send + Sync { async fn set_capabilities(&self, flag: CapabilityBits); async fn set_guest_memory_block_size(&self, size: u32); async fn guest_memory_block_size(&self) -> u32; + async fn get_passfd_listener_addr(&self) -> Result<(String, u32)>; } diff --git a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs index 16921fcee..c32e784d7 100644 --- a/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs +++ b/src/runtime-rs/crates/hypervisor/src/qemu/mod.rs @@ -179,6 +179,10 @@ impl Hypervisor for Qemu { let inner = self.inner.read().await; inner.resize_memory(new_mem_mb) } + + async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> { + Err(anyhow::anyhow!("Not yet supported")) + } } #[async_trait] diff --git a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml index 4346d8560..9b83a1724 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml +++ b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml @@ -15,6 +15,7 @@ lazy_static = "1.4.0" libc = ">=0.2.39" nix = "0.24.2" protobuf = "3.2.0" +sendfd = { version = "0.4.3", features = ["tokio"] } serde = { version = "1.0.100", features = ["derive"] } serde_derive = "1.0.27" serde_json = "1.0.82" diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs index 27563ea5d..5b570fb27 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs @@ -43,7 +43,7 @@ pub struct Container { agent: Arc, resource_manager: Arc, logger: slog::Logger, - passfd_listener_addr: Option<(String, u32)>, + pub(crate) passfd_listener_addr: Option<(String, u32)>, } impl Container { @@ -53,6 +53,7 @@ impl Container { spec: oci::Spec, agent: Arc, resource_manager: Arc, + passfd_listener_addr: Option<(String, u32)>, ) -> Result { let container_id = ContainerID::new(&config.container_id).context("new container id")?; let logger = sl!().new(o!("container_id" => config.container_id.clone())); @@ -85,7 +86,7 @@ impl Container { agent, resource_manager, logger, - None, + passfd_listener_addr, }) } @@ -186,6 +187,17 @@ impl Container { } } + // In passfd io mode, we create vsock connections for io in advance + // and pass port info to agent in `CreateContainerRequest`. + // These vsock connections will be used as stdin/stdout/stderr of the container process. + // See agent/src/passfd_io.rs for more details. + if let Some((hvsock_uds_path, passfd_port)) = &self.passfd_listener_addr { + inner + .init_process + .passfd_io_init(hvsock_uds_path, *passfd_port) + .await?; + } + // create container let r = agent::CreateContainerRequest { process_id: agent::ContainerProcessID::new(&config.container_id, ""), @@ -194,6 +206,24 @@ impl Container { sandbox_pidns, devices: devices_agent, shared_mounts, + stdin_port: inner + .init_process + .passfd_io + .as_ref() + .map(|io| io.stdin_port) + .flatten(), + stdout_port: inner + .init_process + .passfd_io + .as_ref() + .map(|io| io.stdout_port) + .flatten(), + stderr_port: inner + .init_process + .passfd_io + .as_ref() + .map(|io| io.stderr_port) + .flatten(), ..Default::default() }; @@ -219,21 +249,40 @@ impl Container { return Err(err); } - let container_io = inner.new_container_io(process).await?; - inner - .init_process - .start_io_and_wait(containers, self.agent.clone(), container_io) - .await?; + if self.passfd_listener_addr.is_some() { + inner + .init_process + .passfd_io_wait(containers, self.agent.clone()) + .await?; + } else { + let container_io = inner.new_container_io(process).await?; + inner + .init_process + .start_io_and_wait(containers, self.agent.clone(), container_io) + .await?; + } } ProcessType::Exec => { + // In passfd io mode, we create vsock connections for io in advance + // and pass port info to agent in `ExecProcessRequest`. + // These vsock connections will be used as stdin/stdout/stderr of the exec process. + // See agent/src/passfd_io.rs for more details. + if let Some((hvsock_uds_path, passfd_port)) = &self.passfd_listener_addr { + let exec = inner + .exec_processes + .get_mut(&process.exec_id) + .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; + exec.process + .passfd_io_init(hvsock_uds_path, *passfd_port) + .await?; + } + if let Err(e) = inner.start_exec_process(process).await { let device_manager = self.resource_manager.get_device_manager().await; let _ = inner.stop_process(process, true, &device_manager).await; return Err(e).context("enter process"); } - let container_io = inner.new_container_io(process).await.context("io stream")?; - { let exec = inner .exec_processes @@ -247,13 +296,29 @@ impl Container { } } - // start io and wait - { + if self.passfd_listener_addr.is_some() { + // In passfd io mode, we don't bother with the IO. + // We send `WaitProcessRequest` immediately to the agent + // and wait for the response in a separate thread. + // The agent will only respond after IO is done. let exec = inner .exec_processes .get_mut(&process.exec_id) .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; + exec.process + .passfd_io_wait(containers, self.agent.clone()) + .await?; + } else { + // In legacy io mode, we handle IO by polling the agent. + // When IO is done, we send `WaitProcessRequest` to agent + // to get the exit status. + let container_io = + inner.new_container_io(process).await.context("io stream")?; + let exec = inner + .exec_processes + .get_mut(&process.exec_id) + .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; exec.process .start_io_and_wait(containers, self.agent.clone(), container_io) .await diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs index 8c5e766d8..1433f6447 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs @@ -87,6 +87,24 @@ impl ContainerInner { process_id: process.clone().into(), string_user: None, process: Some(exec.oci_process.clone()), + stdin_port: exec + .process + .passfd_io + .as_ref() + .map(|io| io.stdin_port) + .flatten(), + stdout_port: exec + .process + .passfd_io + .as_ref() + .map(|io| io.stdout_port) + .flatten(), + stderr_port: exec + .process + .passfd_io + .as_ref() + .map(|io| io.stderr_port) + .flatten(), }) .await .context("exec process")?; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs index 3c6ca719b..a72eecac2 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs @@ -6,5 +6,7 @@ mod container_io; pub use container_io::ContainerIo; +mod passfd_io; mod shim_io; +pub use passfd_io::PassfdIo; pub use shim_io::ShimIo; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs new file mode 100644 index 000000000..aa08aefca --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs @@ -0,0 +1,118 @@ +use anyhow::{anyhow, Context, Result}; +use sendfd::SendWithFd; +use std::{ + fs::OpenOptions, + os::fd::{AsRawFd, OwnedFd}, +}; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::UnixStream, +}; + +// Note: the fd will be closed after passing +async fn passfd_connect(uds: &str, port: u32, fd: OwnedFd) -> Result { + info!(sl!(), "passfd uds {:?} port {}", &uds, port); + let mut stream = UnixStream::connect(&uds).await.context("connect")?; + stream.write_all(b"passfd\n").await.context("write all")?; + + let buf = format!("{}", port); + stream + .send_with_fd(buf.as_bytes(), &[fd.as_raw_fd()]) + .context("send port and fd")?; + + let mut reads = BufReader::new(&mut stream); + let mut response = String::new(); + reads.read_line(&mut response).await.context("read line")?; + + // parse response like "OK port" + let mut iter = response.split_whitespace(); + if iter.next() != Some("OK") { + return Err(anyhow!( + "handshake error: malformed response code: {:?}", + response + )); + } + let hostport = iter + .next() + .ok_or_else(|| anyhow!("handshake error: malformed response code: {:?}", response))? + .parse::() + .context("handshake error: malformed response code")?; + Ok(hostport) +} + +#[derive(Debug, Default)] +pub struct PassfdIo { + stdin: Option, + stdout: Option, + stderr: Option, + + pub stdin_port: Option, + pub stdout_port: Option, + pub stderr_port: Option, +} + +impl PassfdIo { + pub async fn new( + stdin: Option, + stdout: Option, + stderr: Option, + ) -> Self { + Self { + stdin, + stdout, + stderr, + ..Default::default() + } + } + + pub async fn open_and_passfd( + &mut self, + uds_path: &str, + passfd_port: u32, + terminal: bool, + ) -> Result<()> { + if let Some(stdin) = &self.stdin { + let fin = OpenOptions::new() + .read(true) + .open(&stdin) + .context("open stdin")?; + + let hostport = passfd_connect(uds_path, passfd_port, fin.into()) + .await + .context("passfd")?; + + self.stdin_port = Some(hostport); + } + + if let Some(stdout) = &self.stdout { + let fout = OpenOptions::new() + .write(true) + .open(&stdout) + .context("open stdout")?; + + let hostport = passfd_connect(uds_path, passfd_port, fout.into()) + .await + .context("passfd")?; + + self.stdout_port = Some(hostport); + } + + if !terminal { + // stderr is not used in terminal mode + if let Some(stderr) = &self.stderr { + let ferr = OpenOptions::new() + .write(true) + .open(&stderr) + .context("open stderr")?; + + let hostport = passfd_connect(uds_path, passfd_port, ferr.into()) + .await + .context("passfd")?; + + self.stderr_port = Some(hostport); + } + } + + Ok(()) + } +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs index c532ccaca..5fc31eec9 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs @@ -77,6 +77,7 @@ impl ContainerManager for VirtContainerManager { spec.clone(), self.agent.clone(), self.resource_manager.clone(), + self.hypervisor.get_passfd_listener_addr().await.ok(), ) .await .context("new container")?; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs index cd73134dd..93ca0bd99 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs @@ -15,7 +15,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::{watch, RwLock}; use super::container::Container; -use super::io::{ContainerIo, ShimIo}; +use super::io::{ContainerIo, PassfdIo, ShimIo}; use super::logger_with_process; pub type ProcessWatcher = ( @@ -46,6 +46,9 @@ pub struct Process { // close io call should wait until the stdin io copy finished to // prevent stdin data lost. pub wg_stdin: WaitGroup, + + // io streams using vsock fd passthrough feature + pub passfd_io: Option, } impl Process { @@ -76,9 +79,95 @@ impl Process { exit_watcher_rx: Some(receiver), exit_watcher_tx: Some(sender), wg_stdin: WaitGroup::new(), + passfd_io: None, } } + /// Init the `passfd_io` struct and vsock connections for io to the agent. + pub async fn passfd_io_init(&mut self, hvsock_uds_path: &str, passfd_port: u32) -> Result<()> { + info!(self.logger, "passfd io init"); + + let mut passfd_io = + PassfdIo::new(self.stdin.clone(), self.stdout.clone(), self.stderr.clone()).await; + + passfd_io + .open_and_passfd(hvsock_uds_path, passfd_port, self.terminal) + .await + .context("passfd connect")?; + + self.passfd_io = Some(passfd_io); + + Ok(()) + } + + /// (After process started) Send a WaitProcessRequest to agent in the + /// seperate thread. + /// This function is only used in passfd io mode. + pub async fn passfd_io_wait( + &mut self, + containers: Arc>>, + agent: Arc, + ) -> Result<()> { + let logger = self.logger.clone(); + info!(logger, "start passfd io wait"); + let process = self.process.clone(); + let exit_status = self.exit_status.clone(); + let exit_notifier = self.exit_watcher_tx.take(); + let status = self.status.clone(); + + tokio::spawn(async move { + let req = agent::WaitProcessRequest { + process_id: process.clone().into(), + }; + + info!(logger, "begin passfd io wait process"); + let resp = match agent.wait_process(req).await { + Ok(ret) => ret, + Err(e) => { + error!(logger, "failed to passfd io wait process {:?}", e); + return; + } + }; + + info!( + logger, + "end passfd io wait process exit code {}", resp.status + ); + + let containers = containers.read().await; + let container_id = &process.container_id.container_id; + let c = match containers.get(container_id) { + Some(c) => c, + None => { + error!( + logger, + "Failed to stop process, since container {} not found", container_id + ); + return; + } + }; + + if let Err(err) = c.stop_process(&process).await { + error!( + logger, + "Failed to stop process, process = {:?}, err = {:?}", process, err + ); + } + + let mut exit_status = exit_status.write().await; + exit_status.update_exit_code(resp.status); + drop(exit_status); + + let mut status = status.write().await; + *status = ProcessStatus::Stopped; + drop(status); + + drop(exit_notifier); + info!(logger, "end passfd io wait thread"); + }); + Ok(()) + } + pub async fn start_io_and_wait( &mut self, containers: Arc>>, @@ -246,7 +335,15 @@ impl Process { *status = ProcessStatus::Stopped; } + /// Close the stdin of the process in container. pub async fn close_io(&mut self, agent: Arc) { + if self.passfd_io.is_some() { + // In passfd io mode, if containerd closes stdin stream, the + // agent can get the close event from the vsock connection. + // so we just return here. + return; + } + self.wg_stdin.wait().await; let req = agent::CloseStdinRequest { @@ -254,7 +351,7 @@ impl Process { }; if let Err(e) = agent.close_stdin(req).await { - warn!(self.logger, "failed clsoe process io: {:?}", e); + warn!(self.logger, "failed close process io: {:?}", e); } } From b8632b4034327592235f0325028133702649cc94 Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Wed, 4 Oct 2023 16:39:14 +0800 Subject: [PATCH 04/16] dragonball: vsock: properly handle EPOLLHUP/EPOLLERR events When one end of the connection close, the epoll event will be triggered forever. We should close the connection and kill the connection. Fixes: #6714 Signed-off-by: Zixuan Tan --- .../src/vsock/csm/connection.rs | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs index e2ca7e333..c3415cf6c 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs @@ -487,6 +487,31 @@ impl VsockEpollListener for VsockConnection { self.pending_rx.insert(PendingRx::CreditUpdate); } } + + if evset.contains(epoll::Events::EPOLLHUP) + && !evset.contains(epoll::Events::EPOLLIN) + && !evset.contains(epoll::Events::EPOLLOUT) + { + // The host stream has been hung up. We'll kill this connection. + warn!( + "vsock: connection received EPOLLHUP event: lp={}, pp={}", + self.local_port, self.peer_port + ); + self.kill(); + } + + if evset.contains(epoll::Events::EPOLLERR) + && !evset.contains(epoll::Events::EPOLLIN) + && !evset.contains(epoll::Events::EPOLLOUT) + { + // The host stream has encountered an error. We'll kill this + // connection. + warn!( + "vsock: connection received EPOLLERR event: lp={}, pp={}", + self.local_port, self.peer_port + ); + self.kill(); + } } } From f1b33fd2e0e7642a2d5862187e9cf4648da90d4a Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Sun, 8 Oct 2023 01:19:39 +0800 Subject: [PATCH 05/16] agent: clean up term master fd when container exits When container exits, the agent should clean up the term master fd, otherwise the fd will be leaked. Fixes: kata-containers#6714 Signed-off-by: Zixuan Tan --- src/agent/rustjail/src/container.rs | 7 +++++++ src/agent/src/rpc.rs | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 1a29d90c8..02de8a77b 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -992,13 +992,18 @@ impl BaseContainer for LinuxContainer { child_stderr = unsafe { std::process::Stdio::from_raw_fd(pseudo.slave) }; if let Some(proc_io) = &mut p.proc_io { + // A reference count used to clean up the term master fd. + let term_closer = Arc::from(unsafe { File::from_raw_fd(pseudo.master) }); + if let Some(mut stdin_stream) = proc_io.stdin.take() { let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; let logger = logger.clone(); + let term_closer = term_closer.clone(); tokio::spawn(async move { let res = tokio::io::copy(&mut stdin_stream, &mut term_master).await; debug!(logger, "copy from stdin to term_master end: {:?}", res); std::mem::forget(term_master); // Avoid auto closing of term_master + drop(term_closer); }); } @@ -1006,11 +1011,13 @@ impl BaseContainer for LinuxContainer { let wgw_output = proc_io.wg_output.worker(); let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; let logger = logger.clone(); + let term_closer = term_closer.clone(); tokio::spawn(async move { let res = tokio::io::copy(&mut term_master, &mut stdout_stream).await; debug!(logger, "copy from term_master to stdout end: {:?}", res); wgw_output.done(); std::mem::forget(term_master); // Avoid auto closing of term_master + drop(term_closer); }); } } diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index b0e6d3f70..3ed56e761 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -379,7 +379,7 @@ impl AgentService { let pipe_size = AGENT_CONFIG.container_pipe_size; let ocip = rustjail::process_grpc_to_oci(&process); - // signal_port != 0 indicates passfd io mode + // passfd_listener_port != 0 indicates passfd io mode let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 { Some(passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port).await) } else { From 657b17a86f1d53ff59684d5504a677b5e4395eb7 Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Thu, 26 Oct 2023 18:35:09 +0800 Subject: [PATCH 06/16] runtime-rs: open stdin fifo with RDWR|NONBLOCK when pass vsock streams In linux, when a FIFO is opened and there are no writers, the reader will continuously receive the HUP event. This can be problematic when creating containers in detached mode, as the stdin FIFO writer is closed after the container is created, resulting in this situation. In passfd io mode, open stdin fifo with O_RDWR|O_NONBLOCK to avoid the HUP event. Fixes: #6714 Signed-off-by: Zixuan Tan --- .../virt_container/src/container_manager/io/passfd_io.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs index aa08aefca..4af2e0084 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs @@ -3,6 +3,7 @@ use sendfd::SendWithFd; use std::{ fs::OpenOptions, os::fd::{AsRawFd, OwnedFd}, + os::unix::fs::OpenOptionsExt, }; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, @@ -71,9 +72,17 @@ impl PassfdIo { passfd_port: u32, terminal: bool, ) -> Result<()> { + // In linux, when a FIFO is opened and there are no writers, the reader + // will continuously receive the HUP event. This can be problematic + // when creating containers in detached mode, as the stdin FIFO writer + // is closed after the container is created, resulting in this situation. + // + // See: https://stackoverflow.com/questions/15055065/o-rdwr-on-named-pipes-with-poll if let Some(stdin) = &self.stdin { let fin = OpenOptions::new() .read(true) + .write(true) + .custom_flags(libc::O_NONBLOCK) .open(&stdin) .context("open stdin")?; From 5536743361b6a3d789826cad1000d84007bb41b0 Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Mon, 30 Oct 2023 01:21:32 +0800 Subject: [PATCH 07/16] agent,runtime-rs: fix container io detach and attach Partially fix some issues related to container io detach and attach. Fixes: #6714 Signed-off-by: Zixuan Tan --- src/agent/rustjail/src/container.rs | 61 +++++++++++++------ src/agent/rustjail/src/process.rs | 51 ++++++++-------- src/agent/src/rpc.rs | 2 +- .../src/vsock/csm/connection.rs | 25 -------- .../src/container_manager/process.rs | 11 ++-- 5 files changed, 75 insertions(+), 75 deletions(-) diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 02de8a77b..21929e4dd 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -995,13 +995,25 @@ impl BaseContainer for LinuxContainer { // A reference count used to clean up the term master fd. let term_closer = Arc::from(unsafe { File::from_raw_fd(pseudo.master) }); + // Copy from stdin to term_master if let Some(mut stdin_stream) = proc_io.stdin.take() { let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; + let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); + let wgw_input = proc_io.wg_input.worker(); let logger = logger.clone(); let term_closer = term_closer.clone(); tokio::spawn(async move { - let res = tokio::io::copy(&mut stdin_stream, &mut term_master).await; - debug!(logger, "copy from stdin to term_master end: {:?}", res); + // As the stdin fifo is opened in RW mode in the shim, which will never + // read EOF, we close the stdin fifo here when explicit requested. + tokio::select! { + res = tokio::io::copy(&mut stdin_stream, &mut term_master) => { + debug!(logger, "copy from stdin to term_master end: {:?}", res); + } + _ = close_stdin_rx.changed() => { + debug!(logger, "copy ends as requested"); + } + } + wgw_input.done(); std::mem::forget(term_master); // Avoid auto closing of term_master drop(term_closer); }); @@ -1022,22 +1034,35 @@ impl BaseContainer for LinuxContainer { } } } else { - // Allow null io in passfd io mode when vsock streams are not provided - child_stdin = if let Some(stdin) = p.stdin { - unsafe { std::process::Stdio::from_raw_fd(stdin) } - } else { - std::process::Stdio::null() - }; - child_stdout = if let Some(stdout) = p.stdout { - unsafe { std::process::Stdio::from_raw_fd(stdout) } - } else { - std::process::Stdio::null() - }; - child_stderr = if let Some(stderr) = p.stderr { - unsafe { std::process::Stdio::from_raw_fd(stderr) } - } else { - std::process::Stdio::null() - }; + let stdin = p.stdin.unwrap(); + let stdout = p.stdout.unwrap(); + let stderr = p.stderr.unwrap(); + child_stdin = unsafe { std::process::Stdio::from_raw_fd(stdin) }; + child_stdout = unsafe { std::process::Stdio::from_raw_fd(stdout) }; + child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) }; + + if let Some(proc_io) = &mut p.proc_io { + // Copy from stdin to parent_stdin + if let Some(mut stdin_stream) = proc_io.stdin.take() { + let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; + let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); + let wgw_input = proc_io.wg_input.worker(); + let logger = logger.clone(); + tokio::spawn(async move { + // As the stdin fifo is opened in RW mode in the shim, which will never + // read EOF, we close the stdin stream when containerd explicit requested. + tokio::select! { + res = tokio::io::copy(&mut stdin_stream, &mut parent_stdin) => { + debug!(logger, "copy from stdin to parent_stdin end: {:?}", res); + } + _ = close_stdin_rx.changed() => { + debug!(logger, "copy ends as requested"); + } + } + wgw_input.done(); + }); + } + } } let pidns = get_pid_namespace(&self.logger, linux)?; diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index a568ed854..2ce9f3721 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -54,6 +54,11 @@ pub struct ProcessIo { pub stdin: Option, pub stdout: Option, pub stderr: Option, + // used to close stdin stream + pub close_stdin_tx: tokio::sync::watch::Sender, + pub close_stdin_rx: tokio::sync::watch::Receiver, + // wait for stdin copy task to finish + pub wg_input: WaitGroup, // used to wait for all process outputs to be copied to the vsock streams // only used when tty is used. pub wg_output: WaitGroup, @@ -65,10 +70,15 @@ impl ProcessIo { stdout: Option, stderr: Option, ) -> Self { + let (close_stdin_tx, close_stdin_rx) = tokio::sync::watch::channel(false); + ProcessIo { stdin, stdout, stderr, + close_stdin_tx, + close_stdin_rx, + wg_input: WaitGroup::new(), wg_output: WaitGroup::new(), } } @@ -174,35 +184,22 @@ impl Process { } else { info!(logger, "created console socket!"); - if let Some(proc_io) = p.proc_io.as_mut() { - // In passfd io mode - if let Some(stdin) = proc_io.stdin.take() { - p.stdin = Some(stdin.as_raw_fd()); - std::mem::forget(stdin); - } - // p.stdin can be None if the connection for stdin is not provided. - } else { - let (stdin, pstdin) = unistd::pipe2(OFlag::O_CLOEXEC)?; - p.parent_stdin = Some(pstdin); - p.stdin = Some(stdin); - } + let (stdin, pstdin) = unistd::pipe2(OFlag::O_CLOEXEC)?; + p.parent_stdin = Some(pstdin); + p.stdin = Some(stdin); - if let Some(proc_io) = p.proc_io.as_mut() { - if let Some(stdout) = proc_io.stdout.take() { - p.stdout = Some(stdout.as_raw_fd()); - std::mem::forget(stdout); - } + if let Some(stdout) = p.proc_io.as_mut().map(|io| io.stdout.take()).flatten() { + p.stdout = Some(stdout.as_raw_fd()); + std::mem::forget(stdout); } else { let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; p.parent_stdout = Some(pstdout); p.stdout = Some(stdout); } - if let Some(proc_io) = p.proc_io.as_mut() { - if let Some(stderr) = proc_io.stderr.take() { - p.stderr = Some(stderr.as_raw_fd()); - std::mem::forget(stderr); - } + if let Some(stderr) = p.proc_io.as_mut().map(|io| io.stderr.take()).flatten() { + p.stderr = Some(stderr.as_raw_fd()); + std::mem::forget(stderr); } else { let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; p.parent_stderr = Some(pstderr); @@ -218,8 +215,14 @@ impl Process { notify.notify_waiters(); } - /// won't be use in passfd io mode. - pub fn close_stdin(&mut self) { + pub async fn close_stdin(&mut self) { + if let Some(proc_io) = &mut self.proc_io { + // notify io copy task to close stdin stream + let _ = proc_io.close_stdin_tx.send(true); + // wait for io copy task to finish + proc_io.wg_input.wait().await; + } + close_process_stream!(self, term_master, TermMaster); close_process_stream!(self, parent_stdin, ParentStdin); diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 3ed56e761..0068eb929 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -851,7 +851,7 @@ impl agent_ttrpc::AgentService for AgentService { ) })?; - p.close_stdin(); + p.close_stdin().await; Ok(Empty::new()) } diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs index c3415cf6c..e2ca7e333 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs @@ -487,31 +487,6 @@ impl VsockEpollListener for VsockConnection { self.pending_rx.insert(PendingRx::CreditUpdate); } } - - if evset.contains(epoll::Events::EPOLLHUP) - && !evset.contains(epoll::Events::EPOLLIN) - && !evset.contains(epoll::Events::EPOLLOUT) - { - // The host stream has been hung up. We'll kill this connection. - warn!( - "vsock: connection received EPOLLHUP event: lp={}, pp={}", - self.local_port, self.peer_port - ); - self.kill(); - } - - if evset.contains(epoll::Events::EPOLLERR) - && !evset.contains(epoll::Events::EPOLLIN) - && !evset.contains(epoll::Events::EPOLLOUT) - { - // The host stream has encountered an error. We'll kill this - // connection. - warn!( - "vsock: connection received EPOLLERR event: lp={}, pp={}", - self.local_port, self.peer_port - ); - self.kill(); - } } } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs index 93ca0bd99..6cc755770 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs @@ -337,15 +337,12 @@ impl Process { /// Close the stdin of the process in container. pub async fn close_io(&mut self, agent: Arc) { - if self.passfd_io.is_some() { - // In passfd io mode, if containerd closes stdin stream, the - // agent can get the close event from the vsock connection. - // so we just return here. - return; + // In passfd io mode, the stdin close and sync logic is handled + // in the agent side. + if self.passfd_io.is_none() { + self.wg_stdin.wait().await; } - self.wg_stdin.wait().await; - let req = agent::CloseStdinRequest { process_id: self.process.clone().into(), }; From 4a762fcfdd60c19794569a324fae7f8057c37247 Mon Sep 17 00:00:00 2001 From: Fupan Li Date: Thu, 2 Nov 2023 15:09:07 +0800 Subject: [PATCH 08/16] dbs: hybrid stream support keep the connection when local closed Support the hybrid fd passthrough mode with passing pipe fd, which can specify this connection kept even when the pipe peer closed, and this connection can be reget wich re-opening the pipe. Signed-off-by: Fupan Li --- .../src/vsock/csm/connection.rs | 70 +++++++++++++++---- .../src/vsock/muxer/muxer_impl.rs | 62 ++++++++++++++-- 2 files changed, 115 insertions(+), 17 deletions(-) diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs index e2ca7e333..0e27a36a4 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/csm/connection.rs @@ -103,7 +103,7 @@ use super::{ConnState, Error, PendingRx, PendingRxSet, Result}; /// guest-side AF_VSOCK socket and a host-side `Read + Write + AsRawFd` stream. pub struct VsockConnection { /// The current connection state. - state: ConnState, + pub(crate) state: ConnState, /// The local CID. Most of the time this will be the constant `2` (the vsock /// host CID). pub(crate) local_cid: u64, @@ -115,6 +115,8 @@ pub struct VsockConnection { pub(crate) peer_port: u32, /// The (connected) host-side stream. pub(crate) stream: Box, + /// keep the connection when local peer closed. + keep: bool, /// The TX buffer for this connection. tx_buf: TxBuf, /// Total number of bytes that have been successfully written to @@ -297,6 +299,8 @@ impl VsockChannel for VsockConnection { // to forward some data to the host stream. Also works for a // connection that has begun shutting down, but the peer still has // some data to send. + // It also work for a hybrid connection's peer closed case, which need + // to active the connection's fd to generate the epollout event. ConnState::Established | ConnState::PeerClosed(_, false) if pkt.op() == uapi::VSOCK_OP_RW => { @@ -318,7 +322,19 @@ impl VsockChannel for VsockConnection { "vsock: error writing to local stream (lp={}, pp={}): {:?}", self.local_port, self.peer_port, err ); - self.kill(); + match err { + Error::TxBufFull => { + // The hybrid pipe peer closed and the tx buf had been full, + // and if want to keep the connection, thus we need drop the + // data send from guest, otherwise, close the connection. + if !self.keep() { + self.kill(); + } + } + _ => { + self.kill(); + } + }; return Ok(()); } @@ -462,15 +478,26 @@ impl VsockEpollListener for VsockConnection { .tx_buf .flush_to(&mut self.stream) .unwrap_or_else(|err| { - warn!( - "vsock: error flushing TX buf for (lp={}, pp={}): {:?}", - self.local_port, self.peer_port, err - ); + if !self.keep() { + warn!( + "vsock: error flushing TX buf for (lp={}, pp={}): {:?}", + self.local_port, self.peer_port, err + ); + } + match err { Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => { // This should never happen (EWOULDBLOCK after // EPOLLOUT), but it does, so let's absorb it. } + Error::TxBufFlush(inner) if (inner.kind() == ErrorKind::BrokenPipe) => { + // The hybrid connection's pipe peer was clsosed, and we want to keep the + // connection thus users can reopen the peer pipe to get the connection, + // otherwise, close the connection. + if !self.keep() { + self.kill(); + } + } _ => self.kill(), }; 0 @@ -499,6 +526,7 @@ impl VsockConnection { local_port: u32, peer_port: u32, peer_buf_alloc: u32, + keep: bool, ) -> Self { Self { local_cid, @@ -506,6 +534,7 @@ impl VsockConnection { local_port, peer_port, stream, + keep, state: ConnState::PeerInit, tx_buf: TxBuf::default(), fwd_cnt: Wrapping(0), @@ -525,6 +554,7 @@ impl VsockConnection { peer_cid: u64, local_port: u32, peer_port: u32, + keep: bool, ) -> Self { Self { local_cid, @@ -532,6 +562,7 @@ impl VsockConnection { local_port, peer_port, stream, + keep, state: ConnState::LocalInit, tx_buf: TxBuf::default(), fwd_cnt: Wrapping(0), @@ -579,6 +610,11 @@ impl VsockConnection { self.state } + /// Return the keep value. + pub fn keep(&self) -> bool { + self.keep + } + /// Send some raw, untracked, data straight to the underlying connected /// stream. Returns: number of bytes written, or the error describing the /// write failure. @@ -608,16 +644,23 @@ impl VsockConnection { // stream. let written = match self.stream.write(buf) { Ok(cnt) => cnt, - Err(e) => { + Err(e) if e.kind() == ErrorKind::WouldBlock => { // Absorb any would-block errors, since we can always try again // later. - if e.kind() == ErrorKind::WouldBlock { - 0 - } else { - // We don't know how to handle any other write error, so - // we'll send it up the call chain. + 0 + } + Err(e) if e.kind() == ErrorKind::BrokenPipe => { + // The backed pipe peer had been closed, and we didn't want to close + // this connection since the peer would like to re attach on it. + if !self.keep() { return Err(Error::StreamWrite(e)); } + 0 + } + Err(e) => { + // We don't know how to handle any other write error, so + // we'll send it up the call chain. + return Err(Error::StreamWrite(e)); } }; // Move the "forwarded bytes" counter ahead by how much we were able to @@ -843,6 +886,7 @@ pub(crate) mod tests { LOCAL_PORT, PEER_PORT, PEER_BUF_ALLOC, + false, ), ConnState::LocalInit => VsockConnection::new_local_init( Box::new(stream), @@ -850,6 +894,7 @@ pub(crate) mod tests { PEER_CID, LOCAL_PORT, PEER_PORT, + false, ), ConnState::Established => { let mut conn = VsockConnection::new_peer_init( @@ -859,6 +904,7 @@ pub(crate) mod tests { LOCAL_PORT, PEER_PORT, PEER_BUF_ALLOC, + false, ); assert!(conn.has_pending_rx()); conn.recv_pkt(&mut pkt).unwrap(); diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs index bfcc5fa1c..09af066cf 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs @@ -291,6 +291,34 @@ impl VsockChannel for VsockMuxer { // Alright, everything looks in order - forward this packet to its // owning connection. let mut res: VsockResult<()> = Ok(()); + + // For the hybrid connection, if it want to keep the connection + // when the pipe peer closed, here it needs to update the epoll + // listner to catch the events. + let mut listener = None; + let conn = self.conn_map.get_mut(&conn_key).unwrap(); + let pre_state = conn.state(); + let nfd: RawFd = conn.as_raw_fd(); + + if pre_state == ConnState::LocalClosed && conn.keep() { + conn.state = ConnState::Established; + listener = Some(EpollListener::Connection { + key: conn_key, + evset: conn.get_polled_evset(), + backend: conn.stream.backend_type(), + }); + } + + if let Some(nlistener) = listener { + self.add_listener(nfd, nlistener).unwrap_or_else(|err| { + self.kill_connection(conn_key); + warn!( + "vsock: error updating epoll listener for (lp={}, pp={}): {:?}", + conn_key.local_port, conn_key.peer_port, err + ); + }); + } + self.apply_conn_mutation(conn_key, |conn| { res = conn.send_pkt(pkt); }); @@ -396,6 +424,23 @@ impl VsockMuxer { // listening for it. Some(EpollListener::Connection { key, evset: _, .. }) => { let key_copy = *key; + + // If the hybrid connection's local peer closed, then the epoll handler wouldn't + // get the epollout event even when it's reopened again, thus it should be notified + // when the guest send any data to try to active the epoll handler to generate the + // epollout event for this connection. + + let mut need_rm = false; + if let Some(conn) = self.conn_map.get_mut(&key_copy) { + if event_set.contains(epoll::Events::EPOLLERR) && conn.keep() { + conn.state = ConnState::LocalClosed; + need_rm = true; + } + } + if need_rm { + self.remove_listener(fd); + } + // The handling of this event will most probably mutate the // state of the receiving connection. We'll need to check for new // pending RX, event set mutation, and all that, so we're @@ -459,6 +504,7 @@ impl VsockMuxer { self.cid, local_port, peer_port, + false, ), ) } @@ -476,8 +522,10 @@ impl VsockMuxer { Some(EpollListener::PassFdStream(_)) => { if let Some(EpollListener::PassFdStream(mut stream)) = self.remove_listener(fd) { Self::passfd_read_port_and_fd(&mut stream) - .map(|(nfd, peer_port)| (nfd, self.allocate_local_port(), peer_port)) - .and_then(|(nfd, local_port, peer_port)| { + .map(|(nfd, peer_port, keep)| { + (nfd, self.allocate_local_port(), peer_port, keep) + }) + .and_then(|(nfd, local_port, peer_port, keep)| { // Here we should make sure the nfd the sole owner to convert it // into an UnixStream object, otherwise, it could cause memory unsafety. let nstream = unsafe { File::from_raw_fd(nfd) }; @@ -502,6 +550,7 @@ impl VsockMuxer { self.cid, local_port, peer_port, + keep, ), ) }) @@ -587,7 +636,7 @@ impl VsockMuxer { .map_err(|_| Error::InvalidPortRequest) } - fn passfd_read_port_and_fd(stream: &mut Box) -> Result<(RawFd, u32)> { + fn passfd_read_port_and_fd(stream: &mut Box) -> Result<(RawFd, u32, bool)> { let mut buf = [0u8; 32]; let mut fds = [0, 1]; let (data_len, fd_len) = stream @@ -607,7 +656,9 @@ impl VsockMuxer { .ok_or(Error::InvalidPortRequest) .and_then(|word| word.parse::().map_err(|_| Error::InvalidPortRequest))?; - Ok((fds[0], port)) + let keep = port_iter.next().is_some_and(|kp| kp == "keep"); + + Ok((fds[0], port, keep)) } /// Add a new connection to the active connection pool. @@ -775,6 +826,7 @@ impl VsockMuxer { pkt.dst_port(), pkt.src_port(), pkt.buf_alloc(), + false, ), ) }) @@ -876,7 +928,7 @@ impl VsockMuxer { ); }); } - } else { + } else if conn.state() != ConnState::LocalClosed { // The connection had previously asked to be removed from the // listener map (by returning an empty event set via // `get_polled_fd()`), but now wants back in. From cfb262d02f635b3ec4a28463ae22b544dcb8f032 Mon Sep 17 00:00:00 2001 From: Fupan Li Date: Thu, 2 Nov 2023 17:03:17 +0800 Subject: [PATCH 09/16] container: keep the io connection when pass fd to hybrid vsock We want the io connection keep connected when the containerd closed the io pipe, thus it can be attached on the io stream. Signed-off-by: Fupan Li --- .../virt_container/src/container_manager/io/passfd_io.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs index 4af2e0084..2224f85f6 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs @@ -16,7 +16,9 @@ async fn passfd_connect(uds: &str, port: u32, fd: OwnedFd) -> Result { let mut stream = UnixStream::connect(&uds).await.context("connect")?; stream.write_all(b"passfd\n").await.context("write all")?; - let buf = format!("{}", port); + // We want the io connection keep connected when the containerd closed the io pipe, + // thus it can be attached on the io stream. + let buf = format!("{} keep", port); stream .send_with_fd(buf.as_bytes(), &[fd.as_raw_fd()]) .context("send port and fd")?; From 7874ef5fd2c0665f70ed0339632d77166b54067f Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Sat, 4 Nov 2023 17:22:39 +0800 Subject: [PATCH 10/16] agent: set stdout/err vsock stream as blocking before passing to child In passfd io mode, when not using a terminal, the stdout/stderr vsock streams are directly used as the stdout/stderr of the child process. These streams are non-blocking by default. The stdout/stderr of the process should be blocking, otherwise the process may encounter EAGAIN error when writing to stdout/stderr. Fixes: #6714 Signed-off-by: Zixuan Tan --- src/agent/rustjail/src/process.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index 2ce9f3721..7b99820c5 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -5,7 +5,7 @@ use libc::pid_t; use std::fs::File; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::{AsRawFd, RawFd, IntoRawFd}; use tokio::sync::mpsc::Sender; use tokio_vsock::VsockStream; @@ -137,6 +137,13 @@ impl ProcessOperations for Process { } } +fn set_blocking(fd: RawFd) -> Result<()> { + let flags = fcntl(fd, FcntlArg::F_GETFL)?; + let new_flags = !OFlag::O_NONBLOCK & OFlag::from_bits_truncate(flags); + fcntl(fd, FcntlArg::F_SETFL(new_flags))?; + Ok(()) +} + impl Process { pub fn new( logger: &Logger, @@ -189,8 +196,11 @@ impl Process { p.stdin = Some(stdin); if let Some(stdout) = p.proc_io.as_mut().map(|io| io.stdout.take()).flatten() { - p.stdout = Some(stdout.as_raw_fd()); - std::mem::forget(stdout); + let fd = stdout.into_raw_fd(); + // The stdout/stderr of the process should be blocking, otherwise + // the process may encounter EAGAIN error when writing to stdout/stderr. + set_blocking(fd)?; + p.stdout = Some(fd); } else { let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; p.parent_stdout = Some(pstdout); @@ -198,8 +208,9 @@ impl Process { } if let Some(stderr) = p.proc_io.as_mut().map(|io| io.stderr.take()).flatten() { - p.stderr = Some(stderr.as_raw_fd()); - std::mem::forget(stderr); + let fd = stderr.into_raw_fd(); + set_blocking(fd)?; + p.stderr = Some(fd); } else { let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; p.parent_stderr = Some(pstderr); From 3eb4bed957c546428c2ad07612a5d951ff7c8b5f Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Mon, 6 Nov 2023 19:25:39 +0800 Subject: [PATCH 11/16] agent: use biased select to avoid data loss This patch uses a biased select to avoid stdin data loss in case of CloseStdinRequest. Fixes: #6714 Signed-off-by: Zixuan Tan --- src/agent/rustjail/src/container.rs | 66 +++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 17 deletions(-) diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 21929e4dd..465667e3f 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -65,7 +65,7 @@ use crate::sync::{read_sync, write_count, write_sync, SYNC_DATA, SYNC_FAILED, SY use crate::sync_with_async::{read_async, write_async}; use async_trait::async_trait; use rlimit::{setrlimit, Resource, Rlim}; -use tokio::io::AsyncBufReadExt; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; use tokio::sync::Mutex; use kata_sys_util::hooks::HookStates; @@ -1003,14 +1003,30 @@ impl BaseContainer for LinuxContainer { let logger = logger.clone(); let term_closer = term_closer.clone(); tokio::spawn(async move { - // As the stdin fifo is opened in RW mode in the shim, which will never - // read EOF, we close the stdin fifo here when explicit requested. - tokio::select! { - res = tokio::io::copy(&mut stdin_stream, &mut term_master) => { - debug!(logger, "copy from stdin to term_master end: {:?}", res); - } - _ = close_stdin_rx.changed() => { - debug!(logger, "copy ends as requested"); + let mut buf = [0u8; 8192]; + loop { + tokio::select! { + // Make sure stdin_stream is drained before exiting + biased; + res = stdin_stream.read(&mut buf) => { + match res { + Err(_) | Ok(0) => { + debug!(logger, "copy from stdin to term_master end: {:?}", res); + break; + } + Ok(n) => { + if let Err(_) = term_master.write_all(&buf[..n]).await { + break; + } + } + } + } + // As the stdin fifo is opened in RW mode in the shim, which will never + // read EOF, we close the stdin fifo here when explicit requested. + _ = close_stdin_rx.changed() => { + debug!(logger, "copy ends as requested"); + break + } } } wgw_input.done(); @@ -1049,14 +1065,30 @@ impl BaseContainer for LinuxContainer { let wgw_input = proc_io.wg_input.worker(); let logger = logger.clone(); tokio::spawn(async move { - // As the stdin fifo is opened in RW mode in the shim, which will never - // read EOF, we close the stdin stream when containerd explicit requested. - tokio::select! { - res = tokio::io::copy(&mut stdin_stream, &mut parent_stdin) => { - debug!(logger, "copy from stdin to parent_stdin end: {:?}", res); - } - _ = close_stdin_rx.changed() => { - debug!(logger, "copy ends as requested"); + let mut buf = [0u8; 8192]; + loop { + tokio::select! { + // Make sure stdin_stream is drained before exiting + biased; + res = stdin_stream.read(&mut buf) => { + match res { + Err(_) | Ok(0) => { + debug!(logger, "copy from stdin to term_master end: {:?}", res); + break; + } + Ok(n) => { + if let Err(_) = parent_stdin.write_all(&buf[..n]).await { + break; + } + } + } + } + // As the stdin fifo is opened in RW mode in the shim, which will never + // read EOF, we close the stdin fifo here when explicit requested. + _ = close_stdin_rx.changed() => { + debug!(logger, "copy ends as requested"); + break + } } } wgw_input.done(); From 89be42a17745a4b64472b287b47227a63c63df69 Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Tue, 14 Nov 2023 18:05:17 +0800 Subject: [PATCH 12/16] runtime-rs: open stdout and stderr fifos NONBLOCK This patch adds O_NONBLOCK flag when open stdout and stderr FIFOs to avoid blocking. Fixes: #6714 Signed-off-by: Zixuan Tan --- .../virt_container/src/container_manager/io/passfd_io.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs index 2224f85f6..29d2fe592 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs @@ -98,6 +98,7 @@ impl PassfdIo { if let Some(stdout) = &self.stdout { let fout = OpenOptions::new() .write(true) + .custom_flags(libc::O_NONBLOCK) .open(&stdout) .context("open stdout")?; @@ -113,6 +114,7 @@ impl PassfdIo { if let Some(stderr) = &self.stderr { let ferr = OpenOptions::new() .write(true) + .custom_flags(libc::O_NONBLOCK) .open(&stderr) .context("open stderr")?; From f6710610d17974e12a897428cc8b3ad3c63c30da Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Tue, 14 Nov 2023 19:07:02 +0800 Subject: [PATCH 13/16] agent,runtime-rs,runk: fix fmt and clippy warnings Fix rustfmt and clippy warnings detected by CI. Fixes: #6714 Signed-off-by: Zixuan Tan --- src/agent/rustjail/Cargo.toml | 2 +- src/agent/rustjail/src/container.rs | 17 +++++++++++------ src/agent/rustjail/src/process.rs | 10 +++++----- src/agent/src/passfd_io.rs | 10 ++++------ .../src/container_manager/container.rs | 9 +++------ .../src/container_manager/container_inner.rs | 13 +++---------- .../src/container_manager/io/passfd_io.rs | 6 +++--- src/tools/runk/libcontainer/src/container.rs | 1 + 8 files changed, 31 insertions(+), 37 deletions(-) diff --git a/src/agent/rustjail/Cargo.toml b/src/agent/rustjail/Cargo.toml index e8499832b..d63ae0ff2 100644 --- a/src/agent/rustjail/Cargo.toml +++ b/src/agent/rustjail/Cargo.toml @@ -30,7 +30,7 @@ cgroups = { package = "cgroups-rs", version = "0.3.3" } rlimit = "0.5.3" cfg-if = "0.1.0" -tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt"] } +tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt", "fs"] } tokio-vsock = "0.3.1" futures = "0.3.17" async-trait = "0.1.31" diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 465667e3f..4795ffc14 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -1015,7 +1015,7 @@ impl BaseContainer for LinuxContainer { break; } Ok(n) => { - if let Err(_) = term_master.write_all(&buf[..n]).await { + if term_master.write_all(&buf[..n]).await.is_err() { break; } } @@ -1035,11 +1035,12 @@ impl BaseContainer for LinuxContainer { }); } + // Copy from term_master to stdout if let Some(mut stdout_stream) = proc_io.stdout.take() { let wgw_output = proc_io.wg_output.worker(); let mut term_master = unsafe { File::from_raw_fd(pseudo.master) }; let logger = logger.clone(); - let term_closer = term_closer.clone(); + let term_closer = term_closer; tokio::spawn(async move { let res = tokio::io::copy(&mut term_master, &mut stdout_stream).await; debug!(logger, "copy from term_master to stdout end: {:?}", res); @@ -1050,6 +1051,7 @@ impl BaseContainer for LinuxContainer { } } } else { + // not using a terminal let stdin = p.stdin.unwrap(); let stdout = p.stdout.unwrap(); let stderr = p.stderr.unwrap(); @@ -1058,8 +1060,11 @@ impl BaseContainer for LinuxContainer { child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) }; if let Some(proc_io) = &mut p.proc_io { - // Copy from stdin to parent_stdin + // Here we copy from vsock stdin stream to parent_stdin manually. + // This is because we need to close the stdin fifo when the stdin stream + // is drained. if let Some(mut stdin_stream) = proc_io.stdin.take() { + info!(logger, "copy from stdin to parent_stdin"); let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); let wgw_input = proc_io.wg_input.worker(); @@ -1073,11 +1078,11 @@ impl BaseContainer for LinuxContainer { res = stdin_stream.read(&mut buf) => { match res { Err(_) | Ok(0) => { - debug!(logger, "copy from stdin to term_master end: {:?}", res); + info!(logger, "copy from stdin to term_master end: {:?}", res); break; } Ok(n) => { - if let Err(_) = parent_stdin.write_all(&buf[..n]).await { + if parent_stdin.write_all(&buf[..n]).await.is_err() { break; } } @@ -1086,7 +1091,7 @@ impl BaseContainer for LinuxContainer { // As the stdin fifo is opened in RW mode in the shim, which will never // read EOF, we close the stdin fifo here when explicit requested. _ = close_stdin_rx.changed() => { - debug!(logger, "copy ends as requested"); + info!(logger, "copy ends as requested"); break } } diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index 7b99820c5..e865b7c9d 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -5,7 +5,7 @@ use libc::pid_t; use std::fs::File; -use std::os::unix::io::{AsRawFd, RawFd, IntoRawFd}; +use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; use tokio::sync::mpsc::Sender; use tokio_vsock::VsockStream; @@ -195,7 +195,7 @@ impl Process { p.parent_stdin = Some(pstdin); p.stdin = Some(stdin); - if let Some(stdout) = p.proc_io.as_mut().map(|io| io.stdout.take()).flatten() { + if let Some(stdout) = p.proc_io.as_mut().and_then(|io| io.stdout.take()) { let fd = stdout.into_raw_fd(); // The stdout/stderr of the process should be blocking, otherwise // the process may encounter EAGAIN error when writing to stdout/stderr. @@ -207,7 +207,7 @@ impl Process { p.stdout = Some(stdout); } - if let Some(stderr) = p.proc_io.as_mut().map(|io| io.stderr.take()).flatten() { + if let Some(stderr) = p.proc_io.as_mut().and_then(|io| io.stderr.take()) { let fd = stderr.into_raw_fd(); set_blocking(fd)?; p.stderr = Some(fd); @@ -241,8 +241,8 @@ impl Process { } pub fn cleanup_process_stream(&mut self) { - if let Some(_) = self.proc_io.take() { - // taken + if let Some(proc_io) = self.proc_io.take() { + drop(proc_io); return; } diff --git a/src/agent/src/passfd_io.rs b/src/agent/src/passfd_io.rs index 314f66abf..3175572e9 100644 --- a/src/agent/src/passfd_io.rs +++ b/src/agent/src/passfd_io.rs @@ -22,12 +22,10 @@ pub(crate) async fn start_listen(port: u32) -> Result<()> { let mut listener = VsockListener::bind(libc::VMADDR_CID_ANY, port)?; tokio::spawn(async move { loop { - if let Ok((stream, peer_addr)) = listener.accept().await { - if let Vsock(addr) = peer_addr { - let port = addr.port(); - info!(sl(), "accept connection from peer port {}", port); - HVSOCK_STREAMS.lock().await.insert(port, stream); - } + if let Ok((stream, Vsock(addr))) = listener.accept().await { + let port = addr.port(); + info!(sl(), "accept connection from peer port {}", port); + HVSOCK_STREAMS.lock().await.insert(port, stream); } } }); diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs index 5b570fb27..0cfc88409 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs @@ -210,20 +210,17 @@ impl Container { .init_process .passfd_io .as_ref() - .map(|io| io.stdin_port) - .flatten(), + .and_then(|io| io.stdin_port), stdout_port: inner .init_process .passfd_io .as_ref() - .map(|io| io.stdout_port) - .flatten(), + .and_then(|io| io.stdout_port), stderr_port: inner .init_process .passfd_io .as_ref() - .map(|io| io.stderr_port) - .flatten(), + .and_then(|io| io.stderr_port), ..Default::default() }; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs index 1433f6447..1b6fcb2aa 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs @@ -87,24 +87,17 @@ impl ContainerInner { process_id: process.clone().into(), string_user: None, process: Some(exec.oci_process.clone()), - stdin_port: exec - .process - .passfd_io - .as_ref() - .map(|io| io.stdin_port) - .flatten(), + stdin_port: exec.process.passfd_io.as_ref().and_then(|io| io.stdin_port), stdout_port: exec .process .passfd_io .as_ref() - .map(|io| io.stdout_port) - .flatten(), + .and_then(|io| io.stdout_port), stderr_port: exec .process .passfd_io .as_ref() - .map(|io| io.stderr_port) - .flatten(), + .and_then(|io| io.stderr_port), }) .await .context("exec process")?; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs index 29d2fe592..6f360300a 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs @@ -85,7 +85,7 @@ impl PassfdIo { .read(true) .write(true) .custom_flags(libc::O_NONBLOCK) - .open(&stdin) + .open(stdin) .context("open stdin")?; let hostport = passfd_connect(uds_path, passfd_port, fin.into()) @@ -99,7 +99,7 @@ impl PassfdIo { let fout = OpenOptions::new() .write(true) .custom_flags(libc::O_NONBLOCK) - .open(&stdout) + .open(stdout) .context("open stdout")?; let hostport = passfd_connect(uds_path, passfd_port, fout.into()) @@ -115,7 +115,7 @@ impl PassfdIo { let ferr = OpenOptions::new() .write(true) .custom_flags(libc::O_NONBLOCK) - .open(&stderr) + .open(stderr) .context("open stderr")?; let hostport = passfd_connect(uds_path, passfd_port, ferr.into()) diff --git a/src/tools/runk/libcontainer/src/container.rs b/src/tools/runk/libcontainer/src/container.rs index c7c3e068b..5a2bb0a6c 100644 --- a/src/tools/runk/libcontainer/src/container.rs +++ b/src/tools/runk/libcontainer/src/container.rs @@ -294,6 +294,7 @@ impl ContainerLauncher { &self.id, self.init, 0, + None, )?) } else { Err(anyhow!("no process configuration")) From 1206de2c2337f5169a8f7c862e34f493d351e608 Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Wed, 17 Jan 2024 23:00:44 +0800 Subject: [PATCH 14/16] agent: Use pipes as stdout/stderr of container process Linux forbids opening an existing socket through /proc//fd/, making some images relying on the special file /dev/stdout(stderr), /proc/self/fd/1(2) fail to boot in passfd io mode, where the stdout/stderr of a container process is a vsock socket. For back compatibility, a pipe is introduced between the process and the socket, and its read end is set as stdout/stderr of the container process instead of the socket. The agent will do the forwarding between the pipe and the socket. Fixes: #6714 Signed-off-by: Zixuan Tan --- src/agent/Cargo.lock | 2 +- src/agent/rustjail/src/container.rs | 38 +++++++++++++++++++++++++--- src/agent/rustjail/src/process.rs | 39 ++++++++--------------------- src/agent/src/signal.rs | 2 +- 4 files changed, 48 insertions(+), 33 deletions(-) diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 1f5da937b..ffe7de403 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -2526,7 +2526,7 @@ dependencies = [ "tempfile", "test-utils", "tokio", - "tokio-vsock", + "tokio-vsock 0.3.1", "xattr", "zbus", ] diff --git a/src/agent/rustjail/src/container.rs b/src/agent/rustjail/src/container.rs index 4795ffc14..6a5434e66 100644 --- a/src/agent/rustjail/src/container.rs +++ b/src/agent/rustjail/src/container.rs @@ -1064,7 +1064,7 @@ impl BaseContainer for LinuxContainer { // This is because we need to close the stdin fifo when the stdin stream // is drained. if let Some(mut stdin_stream) = proc_io.stdin.take() { - info!(logger, "copy from stdin to parent_stdin"); + debug!(logger, "copy from stdin to parent_stdin"); let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) }; let mut close_stdin_rx = proc_io.close_stdin_rx.clone(); let wgw_input = proc_io.wg_input.worker(); @@ -1078,7 +1078,7 @@ impl BaseContainer for LinuxContainer { res = stdin_stream.read(&mut buf) => { match res { Err(_) | Ok(0) => { - info!(logger, "copy from stdin to term_master end: {:?}", res); + debug!(logger, "copy from stdin to term_master end: {:?}", res); break; } Ok(n) => { @@ -1091,7 +1091,7 @@ impl BaseContainer for LinuxContainer { // As the stdin fifo is opened in RW mode in the shim, which will never // read EOF, we close the stdin fifo here when explicit requested. _ = close_stdin_rx.changed() => { - info!(logger, "copy ends as requested"); + debug!(logger, "copy ends as requested"); break } } @@ -1099,6 +1099,38 @@ impl BaseContainer for LinuxContainer { wgw_input.done(); }); } + + // copy from parent_stdout to stdout stream + if let Some(mut stdout_stream) = proc_io.stdout.take() { + debug!(logger, "copy from parent_stdout to stdout stream"); + let wgw_output = proc_io.wg_output.worker(); + let mut parent_stdout = unsafe { File::from_raw_fd(p.parent_stdout.unwrap()) }; + let logger = logger.clone(); + tokio::spawn(async move { + let res = tokio::io::copy(&mut parent_stdout, &mut stdout_stream).await; + debug!( + logger, + "copy from parent_stdout to stdout stream end: {:?}", res + ); + wgw_output.done(); + }); + } + + // copy from parent_stderr to stderr stream + if let Some(mut stderr_stream) = proc_io.stderr.take() { + debug!(logger, "copy from parent_stderr to stderr stream"); + let wgw_output = proc_io.wg_output.worker(); + let mut parent_stderr = unsafe { File::from_raw_fd(p.parent_stderr.unwrap()) }; + let logger = logger.clone(); + tokio::spawn(async move { + let res = tokio::io::copy(&mut parent_stderr, &mut stderr_stream).await; + debug!( + logger, + "copy from parent_stderr to stderr stream end: {:?}", res + ); + wgw_output.done(); + }); + } } } diff --git a/src/agent/rustjail/src/process.rs b/src/agent/rustjail/src/process.rs index e865b7c9d..efc99afd3 100644 --- a/src/agent/rustjail/src/process.rs +++ b/src/agent/rustjail/src/process.rs @@ -5,7 +5,7 @@ use libc::pid_t; use std::fs::File; -use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; +use std::os::unix::io::{AsRawFd, RawFd}; use tokio::sync::mpsc::Sender; use tokio_vsock::VsockStream; @@ -137,13 +137,6 @@ impl ProcessOperations for Process { } } -fn set_blocking(fd: RawFd) -> Result<()> { - let flags = fcntl(fd, FcntlArg::F_GETFL)?; - let new_flags = !OFlag::O_NONBLOCK & OFlag::from_bits_truncate(flags); - fcntl(fd, FcntlArg::F_SETFL(new_flags))?; - Ok(()) -} - impl Process { pub fn new( logger: &Logger, @@ -195,27 +188,17 @@ impl Process { p.parent_stdin = Some(pstdin); p.stdin = Some(stdin); - if let Some(stdout) = p.proc_io.as_mut().and_then(|io| io.stdout.take()) { - let fd = stdout.into_raw_fd(); - // The stdout/stderr of the process should be blocking, otherwise - // the process may encounter EAGAIN error when writing to stdout/stderr. - set_blocking(fd)?; - p.stdout = Some(fd); - } else { - let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; - p.parent_stdout = Some(pstdout); - p.stdout = Some(stdout); - } + // These pipes are necessary as the stdout/stderr of the child process + // cannot be a socket. Otherwise, some images relying on the /dev/stdout(stderr) + // and /proc/self/fd/1(2) will fail to boot as opening an existing socket + // is forbidden by the Linux kernel. + let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; + p.parent_stdout = Some(pstdout); + p.stdout = Some(stdout); - if let Some(stderr) = p.proc_io.as_mut().and_then(|io| io.stderr.take()) { - let fd = stderr.into_raw_fd(); - set_blocking(fd)?; - p.stderr = Some(fd); - } else { - let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; - p.parent_stderr = Some(pstderr); - p.stderr = Some(stderr); - } + let (pstderr, stderr) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?; + p.parent_stderr = Some(pstderr); + p.stderr = Some(stderr); } } Ok(p) diff --git a/src/agent/src/signal.rs b/src/agent/src/signal.rs index eafec4bb0..62a219371 100644 --- a/src/agent/src/signal.rs +++ b/src/agent/src/signal.rs @@ -69,7 +69,7 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc>) -> Result } }; - // In passfd io mode, when using tty, we need to wait for the copy task end. + // In passfd io mode, we need to wait for the copy task end. if let Some(proc_io) = &mut p.proc_io { proc_io.wg_output.wait().await; } From 6e4d4c329a2b4ecde84ea9a73a5beaec33d2d7d3 Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Thu, 18 Jan 2024 09:38:11 +0800 Subject: [PATCH 15/16] agent,runtime-rs: Add license header to passfd_io.rs Fixes: #6714 Signed-off-by: Zixuan Tan --- src/agent/src/passfd_io.rs | 4 ++++ .../virt_container/src/container_manager/io/passfd_io.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/src/agent/src/passfd_io.rs b/src/agent/src/passfd_io.rs index 3175572e9..92d58b7b2 100644 --- a/src/agent/src/passfd_io.rs +++ b/src/agent/src/passfd_io.rs @@ -1,3 +1,7 @@ +// Copyright 2024 Kata Contributors +// +// SPDX-License-Identifier: Apache-2.0 +// use anyhow::Result; use lazy_static::lazy_static; use rustjail::process::ProcessIo; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs index 6f360300a..169c3b68d 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/passfd_io.rs @@ -1,3 +1,7 @@ +// Copyright 2024 Kata Contributors +// +// SPDX-License-Identifier: Apache-2.0 +// use anyhow::{anyhow, Context, Result}; use sendfd::SendWithFd; use std::{ From 222de4f6847e3424060827ab2e97d121d7326801 Mon Sep 17 00:00:00 2001 From: Zixuan Tan Date: Wed, 31 Jan 2024 21:00:58 +0800 Subject: [PATCH 16/16] agent: Fix a race condition in passfd_io.rs There is a race condition in agent HVSOCK_STREAMS hashmap, where a stream may be taken before it is inserted into the hashmap. This patch add simple retry logic to the stream consumer to alleviate this issue. Fixes: #6714 Signed-off-by: Zixuan Tan --- src/agent/src/passfd_io.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/agent/src/passfd_io.rs b/src/agent/src/passfd_io.rs index 92d58b7b2..ba1d7f74b 100644 --- a/src/agent/src/passfd_io.rs +++ b/src/agent/src/passfd_io.rs @@ -27,9 +27,11 @@ pub(crate) async fn start_listen(port: u32) -> Result<()> { tokio::spawn(async move { loop { if let Ok((stream, Vsock(addr))) = listener.accept().await { + // We should insert the stream into the mapping as soon + // to minimize the risk of encountering race conditions. let port = addr.port(); - info!(sl(), "accept connection from peer port {}", port); HVSOCK_STREAMS.lock().await.insert(port, stream); + info!(sl(), "accept connection from peer port {}", port); } } }); @@ -37,8 +39,21 @@ pub(crate) async fn start_listen(port: u32) -> Result<()> { } async fn take_stream(port: u32) -> Option { - let mut mapping = HVSOCK_STREAMS.lock().await; - mapping.remove(&port) + // There may be a race condition where the stream is accepted but + // not yet inserted into the mapping. We will retry several times. + // If it still fails, we just give up. + let mut count = 0; + while count < 3 { + let stream = HVSOCK_STREAMS.lock().await.remove(&port); + if stream.is_some() { + return stream; + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + count += 1; + } + + warn!(sl(), "failed to take stream for port {}", port); + None } macro_rules! take_io_stream {