Merge pull request #7483 from frezcirno/passfd_io_feature

runtime-rs: improving io performance using dragonball's vsock fd passthrough
This commit is contained in:
Xuewei Niu 2024-02-01 14:53:53 +08:00 committed by GitHub
commit 2332552c8f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 903 additions and 43 deletions

8
src/agent/Cargo.lock generated
View File

@ -220,6 +220,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "awaitgroup"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc17ab023b4091c10ff099f9deebaeeb59b5189df07e554c4fef042b70745d68"
[[package]]
name = "base64"
version = "0.13.0"
@ -2490,6 +2496,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"awaitgroup",
"bit-vec",
"capctl",
"caps",
@ -2519,6 +2526,7 @@ dependencies = [
"tempfile",
"test-utils",
"tokio",
"tokio-vsock 0.3.1",
"xattr",
"zbus",
]

View File

@ -6,6 +6,7 @@ edition = "2018"
license = "Apache-2.0"
[dependencies]
awaitgroup = "0.6.0"
serde = "1.0.91"
serde_json = "1.0.39"
serde_derive = "1.0.91"
@ -29,7 +30,8 @@ cgroups = { package = "cgroups-rs", version = "0.3.3" }
rlimit = "0.5.3"
cfg-if = "0.1.0"
tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt"] }
tokio = { version = "1.28.1", features = ["sync", "io-util", "process", "time", "macros", "rt", "fs"] }
tokio-vsock = "0.3.1"
futures = "0.3.17"
async-trait = "0.1.31"
inotify = "0.9.2"

View File

@ -14,6 +14,7 @@ use std::fs;
use std::os::unix::io::RawFd;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use tokio::fs::File;
use cgroups::freezer::FreezerState;
@ -64,7 +65,7 @@ use crate::sync::{read_sync, write_count, write_sync, SYNC_DATA, SYNC_FAILED, SY
use crate::sync_with_async::{read_async, write_async};
use async_trait::async_trait;
use rlimit::{setrlimit, Resource, Rlim};
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
use tokio::sync::Mutex;
use kata_sys_util::hooks::HookStates;
@ -989,13 +990,148 @@ impl BaseContainer for LinuxContainer {
child_stdin = unsafe { std::process::Stdio::from_raw_fd(pseudo.slave) };
child_stdout = unsafe { std::process::Stdio::from_raw_fd(pseudo.slave) };
child_stderr = unsafe { std::process::Stdio::from_raw_fd(pseudo.slave) };
if let Some(proc_io) = &mut p.proc_io {
// A reference count used to clean up the term master fd.
let term_closer = Arc::from(unsafe { File::from_raw_fd(pseudo.master) });
// Copy from stdin to term_master
if let Some(mut stdin_stream) = proc_io.stdin.take() {
let mut term_master = unsafe { File::from_raw_fd(pseudo.master) };
let mut close_stdin_rx = proc_io.close_stdin_rx.clone();
let wgw_input = proc_io.wg_input.worker();
let logger = logger.clone();
let term_closer = term_closer.clone();
tokio::spawn(async move {
let mut buf = [0u8; 8192];
loop {
tokio::select! {
// Make sure stdin_stream is drained before exiting
biased;
res = stdin_stream.read(&mut buf) => {
match res {
Err(_) | Ok(0) => {
debug!(logger, "copy from stdin to term_master end: {:?}", res);
break;
}
Ok(n) => {
if term_master.write_all(&buf[..n]).await.is_err() {
break;
}
}
}
}
// As the stdin fifo is opened in RW mode in the shim, which will never
// read EOF, we close the stdin fifo here when explicit requested.
_ = close_stdin_rx.changed() => {
debug!(logger, "copy ends as requested");
break
}
}
}
wgw_input.done();
std::mem::forget(term_master); // Avoid auto closing of term_master
drop(term_closer);
});
}
// Copy from term_master to stdout
if let Some(mut stdout_stream) = proc_io.stdout.take() {
let wgw_output = proc_io.wg_output.worker();
let mut term_master = unsafe { File::from_raw_fd(pseudo.master) };
let logger = logger.clone();
let term_closer = term_closer;
tokio::spawn(async move {
let res = tokio::io::copy(&mut term_master, &mut stdout_stream).await;
debug!(logger, "copy from term_master to stdout end: {:?}", res);
wgw_output.done();
std::mem::forget(term_master); // Avoid auto closing of term_master
drop(term_closer);
});
}
}
} else {
// not using a terminal
let stdin = p.stdin.unwrap();
let stdout = p.stdout.unwrap();
let stderr = p.stderr.unwrap();
child_stdin = unsafe { std::process::Stdio::from_raw_fd(stdin) };
child_stdout = unsafe { std::process::Stdio::from_raw_fd(stdout) };
child_stderr = unsafe { std::process::Stdio::from_raw_fd(stderr) };
if let Some(proc_io) = &mut p.proc_io {
// Here we copy from vsock stdin stream to parent_stdin manually.
// This is because we need to close the stdin fifo when the stdin stream
// is drained.
if let Some(mut stdin_stream) = proc_io.stdin.take() {
debug!(logger, "copy from stdin to parent_stdin");
let mut parent_stdin = unsafe { File::from_raw_fd(p.parent_stdin.unwrap()) };
let mut close_stdin_rx = proc_io.close_stdin_rx.clone();
let wgw_input = proc_io.wg_input.worker();
let logger = logger.clone();
tokio::spawn(async move {
let mut buf = [0u8; 8192];
loop {
tokio::select! {
// Make sure stdin_stream is drained before exiting
biased;
res = stdin_stream.read(&mut buf) => {
match res {
Err(_) | Ok(0) => {
debug!(logger, "copy from stdin to term_master end: {:?}", res);
break;
}
Ok(n) => {
if parent_stdin.write_all(&buf[..n]).await.is_err() {
break;
}
}
}
}
// As the stdin fifo is opened in RW mode in the shim, which will never
// read EOF, we close the stdin fifo here when explicit requested.
_ = close_stdin_rx.changed() => {
debug!(logger, "copy ends as requested");
break
}
}
}
wgw_input.done();
});
}
// copy from parent_stdout to stdout stream
if let Some(mut stdout_stream) = proc_io.stdout.take() {
debug!(logger, "copy from parent_stdout to stdout stream");
let wgw_output = proc_io.wg_output.worker();
let mut parent_stdout = unsafe { File::from_raw_fd(p.parent_stdout.unwrap()) };
let logger = logger.clone();
tokio::spawn(async move {
let res = tokio::io::copy(&mut parent_stdout, &mut stdout_stream).await;
debug!(
logger,
"copy from parent_stdout to stdout stream end: {:?}", res
);
wgw_output.done();
});
}
// copy from parent_stderr to stderr stream
if let Some(mut stderr_stream) = proc_io.stderr.take() {
debug!(logger, "copy from parent_stderr to stderr stream");
let wgw_output = proc_io.wg_output.worker();
let mut parent_stderr = unsafe { File::from_raw_fd(p.parent_stderr.unwrap()) };
let logger = logger.clone();
tokio::spawn(async move {
let res = tokio::io::copy(&mut parent_stderr, &mut stderr_stream).await;
debug!(
logger,
"copy from parent_stderr to stderr stream end: {:?}", res
);
wgw_output.done();
});
}
}
}
let pidns = get_pid_namespace(&self.logger, linux)?;
@ -1904,7 +2040,7 @@ mod tests {
let _ = new_linux_container_and_then(|mut c: LinuxContainer| {
c.processes.insert(
1,
Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap(),
Process::new(&sl(), &oci::Process::default(), "123", true, 1, None).unwrap(),
);
let p = c.get_process("123");
assert!(p.is_ok(), "Expecting Ok, Got {:?}", p);
@ -1931,7 +2067,7 @@ mod tests {
let (c, _dir) = new_linux_container();
let ret = c
.unwrap()
.start(Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap())
.start(Process::new(&sl(), &oci::Process::default(), "123", true, 1, None).unwrap())
.await;
assert!(ret.is_err(), "Expecting Err, Got {:?}", ret);
}
@ -1941,7 +2077,7 @@ mod tests {
let (c, _dir) = new_linux_container();
let ret = c
.unwrap()
.run(Process::new(&sl(), &oci::Process::default(), "123", true, 1).unwrap())
.run(Process::new(&sl(), &oci::Process::default(), "123", true, 1, None).unwrap())
.await;
assert!(ret.is_err(), "Expecting Err, Got {:?}", ret);
}

View File

@ -7,6 +7,7 @@ use libc::pid_t;
use std::fs::File;
use std::os::unix::io::{AsRawFd, RawFd};
use tokio::sync::mpsc::Sender;
use tokio_vsock::VsockStream;
use nix::errno::Errno;
use nix::fcntl::{fcntl, FcntlArg, OFlag};
@ -18,6 +19,7 @@ use oci::Process as OCIProcess;
use slog::Logger;
use crate::pipestream::PipeStream;
use awaitgroup::WaitGroup;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{split, ReadHalf, WriteHalf};
@ -47,6 +49,41 @@ pub enum StreamType {
type Reader = Arc<Mutex<ReadHalf<PipeStream>>>;
type Writer = Arc<Mutex<WriteHalf<PipeStream>>>;
#[derive(Debug)]
pub struct ProcessIo {
pub stdin: Option<VsockStream>,
pub stdout: Option<VsockStream>,
pub stderr: Option<VsockStream>,
// used to close stdin stream
pub close_stdin_tx: tokio::sync::watch::Sender<bool>,
pub close_stdin_rx: tokio::sync::watch::Receiver<bool>,
// wait for stdin copy task to finish
pub wg_input: WaitGroup,
// used to wait for all process outputs to be copied to the vsock streams
// only used when tty is used.
pub wg_output: WaitGroup,
}
impl ProcessIo {
pub fn new(
stdin: Option<VsockStream>,
stdout: Option<VsockStream>,
stderr: Option<VsockStream>,
) -> Self {
let (close_stdin_tx, close_stdin_rx) = tokio::sync::watch::channel(false);
ProcessIo {
stdin,
stdout,
stderr,
close_stdin_tx,
close_stdin_rx,
wg_input: WaitGroup::new(),
wg_output: WaitGroup::new(),
}
}
}
#[derive(Debug)]
pub struct Process {
pub exec_id: String,
@ -74,6 +111,8 @@ pub struct Process {
readers: HashMap<StreamType, Reader>,
writers: HashMap<StreamType, Writer>,
pub proc_io: Option<ProcessIo>,
}
pub trait ProcessOperations {
@ -105,6 +144,7 @@ impl Process {
id: &str,
init: bool,
pipe_size: i32,
proc_io: Option<ProcessIo>,
) -> Result<Self> {
let logger = logger.new(o!("subsystem" => "process"));
let (exit_tx, exit_rx) = tokio::sync::watch::channel(false);
@ -131,6 +171,7 @@ impl Process {
term_exit_notifier: Arc::new(Notify::new()),
readers: HashMap::new(),
writers: HashMap::new(),
proc_io,
};
info!(logger, "before create console socket!");
@ -147,6 +188,10 @@ impl Process {
p.parent_stdin = Some(pstdin);
p.stdin = Some(stdin);
// These pipes are necessary as the stdout/stderr of the child process
// cannot be a socket. Otherwise, some images relying on the /dev/stdout(stderr)
// and /proc/self/fd/1(2) will fail to boot as opening an existing socket
// is forbidden by the Linux kernel.
let (pstdout, stdout) = create_extended_pipe(OFlag::O_CLOEXEC, pipe_size)?;
p.parent_stdout = Some(pstdout);
p.stdout = Some(stdout);
@ -164,7 +209,14 @@ impl Process {
notify.notify_waiters();
}
pub fn close_stdin(&mut self) {
pub async fn close_stdin(&mut self) {
if let Some(proc_io) = &mut self.proc_io {
// notify io copy task to close stdin stream
let _ = proc_io.close_stdin_tx.send(true);
// wait for io copy task to finish
proc_io.wg_input.wait().await;
}
close_process_stream!(self, term_master, TermMaster);
close_process_stream!(self, parent_stdin, ParentStdin);
@ -172,6 +224,13 @@ impl Process {
}
pub fn cleanup_process_stream(&mut self) {
if let Some(proc_io) = self.proc_io.take() {
drop(proc_io);
return;
}
// legacy io mode
close_process_stream!(self, parent_stdin, ParentStdin);
close_process_stream!(self, parent_stdout, ParentStdout);
close_process_stream!(self, parent_stderr, ParentStderr);
@ -277,6 +336,7 @@ mod tests {
id,
init,
32,
None,
);
let mut process = process.unwrap();

View File

@ -18,6 +18,7 @@ const DEV_MODE_FLAG: &str = "agent.devmode";
const TRACE_MODE_OPTION: &str = "agent.trace";
const LOG_LEVEL_OPTION: &str = "agent.log";
const SERVER_ADDR_OPTION: &str = "agent.server_addr";
const PASSFD_LISTENER_PORT: &str = "agent.passfd_listener_port";
const HOTPLUG_TIMOUT_OPTION: &str = "agent.hotplug_timeout";
const DEBUG_CONSOLE_VPORT_OPTION: &str = "agent.debug_console_vport";
const LOG_VPORT_OPTION: &str = "agent.log_vport";
@ -61,6 +62,7 @@ pub struct AgentConfig {
pub log_vport: i32,
pub container_pipe_size: i32,
pub server_addr: String,
pub passfd_listener_port: i32,
pub unified_cgroup_hierarchy: bool,
pub tracing: bool,
pub supports_seccomp: bool,
@ -76,6 +78,7 @@ pub struct AgentConfigBuilder {
pub log_vport: Option<i32>,
pub container_pipe_size: Option<i32>,
pub server_addr: Option<String>,
pub passfd_listener_port: Option<i32>,
pub unified_cgroup_hierarchy: Option<bool>,
pub tracing: Option<bool>,
}
@ -135,6 +138,7 @@ impl Default for AgentConfig {
log_vport: 0,
container_pipe_size: DEFAULT_CONTAINER_PIPE_SIZE,
server_addr: format!("{}:{}", VSOCK_ADDR, DEFAULT_AGENT_VSOCK_PORT),
passfd_listener_port: 0,
unified_cgroup_hierarchy: false,
tracing: false,
supports_seccomp: rpc::have_seccomp(),
@ -164,6 +168,7 @@ impl FromStr for AgentConfig {
config_override!(agent_config_builder, agent_config, log_vport);
config_override!(agent_config_builder, agent_config, container_pipe_size);
config_override!(agent_config_builder, agent_config, server_addr);
config_override!(agent_config_builder, agent_config, passfd_listener_port);
config_override!(agent_config_builder, agent_config, unified_cgroup_hierarchy);
config_override!(agent_config_builder, agent_config, tracing);
@ -245,6 +250,13 @@ impl AgentConfig {
get_vsock_port,
|port| port > 0
);
parse_cmdline_param!(
param,
PASSFD_LISTENER_PORT,
config.passfd_listener_port,
get_vsock_port,
|port| port > 0
);
parse_cmdline_param!(
param,

View File

@ -44,6 +44,7 @@ mod mount;
mod namespace;
mod netlink;
mod network;
mod passfd_io;
mod pci;
pub mod random;
mod sandbox;
@ -235,6 +236,12 @@ async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
// XXX: Note that *ALL* spans needs to start after this point!!
let span_guard = root_span.enter();
// Start the fd passthrough io listener
let passfd_listener_port = config.passfd_listener_port as u32;
if passfd_listener_port != 0 {
passfd_io::start_listen(passfd_listener_port).await?;
}
// Start the sandbox and wait for its ttRPC server to end
start_sandbox(&logger, config, init_mode, &mut tasks, shutdown_rx.clone()).await?;

View File

@ -0,0 +1,82 @@
// Copyright 2024 Kata Contributors
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::Result;
use lazy_static::lazy_static;
use rustjail::process::ProcessIo;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_vsock::SockAddr::Vsock;
use tokio_vsock::{VsockListener, VsockStream};
lazy_static! {
static ref HVSOCK_STREAMS: Arc<Mutex<HashMap<u32, VsockStream>>> =
Arc::new(Mutex::new(HashMap::new()));
}
// Convenience function to obtain the scope logger.
fn sl() -> slog::Logger {
slog_scope::logger().new(o!("subsystem" => "passfd_io"))
}
pub(crate) async fn start_listen(port: u32) -> Result<()> {
info!(sl(), "start listening on port {}", port);
let mut listener = VsockListener::bind(libc::VMADDR_CID_ANY, port)?;
tokio::spawn(async move {
loop {
if let Ok((stream, Vsock(addr))) = listener.accept().await {
// We should insert the stream into the mapping as soon
// to minimize the risk of encountering race conditions.
let port = addr.port();
HVSOCK_STREAMS.lock().await.insert(port, stream);
info!(sl(), "accept connection from peer port {}", port);
}
}
});
Ok(())
}
async fn take_stream(port: u32) -> Option<VsockStream> {
// There may be a race condition where the stream is accepted but
// not yet inserted into the mapping. We will retry several times.
// If it still fails, we just give up.
let mut count = 0;
while count < 3 {
let stream = HVSOCK_STREAMS.lock().await.remove(&port);
if stream.is_some() {
return stream;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
count += 1;
}
warn!(sl(), "failed to take stream for port {}", port);
None
}
macro_rules! take_io_stream {
($port: ident) => {
if $port == 0 {
None
} else {
take_stream($port).await
}
};
}
pub(crate) async fn take_io_streams(
stdin_port: u32,
stdout_port: u32,
stderr_port: u32,
) -> ProcessIo {
let stdin = take_io_stream!(stdin_port);
let stdout = take_io_stream!(stdout_port);
let stderr = take_io_stream!(stderr_port);
debug!(
sl(),
"take passfd io streams {} {} {}", stdin_port, stdout_port, stderr_port
);
ProcessIo::new(stdin, stdout, stderr)
}

View File

@ -58,6 +58,7 @@ use crate::metrics::get_metrics;
use crate::mount::baremount;
use crate::namespace::{NSTYPEIPC, NSTYPEPID, NSTYPEUTS};
use crate::network::setup_guest_dns;
use crate::passfd_io;
use crate::pci;
use crate::random;
use crate::sandbox::Sandbox;
@ -260,7 +261,15 @@ impl AgentService {
let pipe_size = AGENT_CONFIG.container_pipe_size;
let p = if let Some(p) = oci.process {
Process::new(&sl(), &p, cid.as_str(), true, pipe_size)?
let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 {
Some(
passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port)
.await,
)
} else {
None
};
Process::new(&sl(), &p, cid.as_str(), true, pipe_size, proc_io)?
} else {
info!(sl(), "no process configurations!");
return Err(anyhow!(nix::Error::EINVAL));
@ -369,7 +378,15 @@ impl AgentService {
let pipe_size = AGENT_CONFIG.container_pipe_size;
let ocip = rustjail::process_grpc_to_oci(&process);
let p = Process::new(&sl(), &ocip, exec_id.as_str(), false, pipe_size)?;
// passfd_listener_port != 0 indicates passfd io mode
let proc_io = if AGENT_CONFIG.passfd_listener_port != 0 {
Some(passfd_io::take_io_streams(req.stdin_port, req.stdout_port, req.stderr_port).await)
} else {
None
};
let p = Process::new(&sl(), &ocip, exec_id.as_str(), false, pipe_size, proc_io)?;
let ctr = sandbox
.get_container(&cid)
@ -834,7 +851,7 @@ impl agent_ttrpc::AgentService for AgentService {
)
})?;
p.close_stdin();
p.close_stdin().await;
Ok(Empty::new())
}
@ -2252,6 +2269,7 @@ mod tests {
&exec_process_id.to_string(),
false,
1,
None,
)
.unwrap();

View File

@ -1006,12 +1006,20 @@ mod tests {
// add init process
linux_container.processes.insert(
1,
Process::new(&logger, &oci::Process::default(), "1", true, 1).unwrap(),
Process::new(&logger, &oci::Process::default(), "1", true, 1, None).unwrap(),
);
// add exec process
linux_container.processes.insert(
123,
Process::new(&logger, &oci::Process::default(), "exec-123", false, 1).unwrap(),
Process::new(
&logger,
&oci::Process::default(),
"exec-123",
false,
1,
None,
)
.unwrap(),
);
s.add_container(linux_container);
@ -1058,6 +1066,7 @@ mod tests {
"this_is_a_test_process",
true,
1,
None,
)
.unwrap();
// processes interally only have pids when manually set

View File

@ -69,6 +69,11 @@ async fn handle_sigchild(logger: Logger, sandbox: Arc<Mutex<Sandbox>>) -> Result
}
};
// In passfd io mode, we need to wait for the copy task end.
if let Some(proc_io) = &mut p.proc_io {
proc_io.wg_output.wait().await;
}
p.exit_code = ret;
let _ = p.exit_tx.take();

View File

@ -103,7 +103,7 @@ use super::{ConnState, Error, PendingRx, PendingRxSet, Result};
/// guest-side AF_VSOCK socket and a host-side `Read + Write + AsRawFd` stream.
pub struct VsockConnection {
/// The current connection state.
state: ConnState,
pub(crate) state: ConnState,
/// The local CID. Most of the time this will be the constant `2` (the vsock
/// host CID).
pub(crate) local_cid: u64,
@ -115,6 +115,8 @@ pub struct VsockConnection {
pub(crate) peer_port: u32,
/// The (connected) host-side stream.
pub(crate) stream: Box<dyn VsockStream>,
/// keep the connection when local peer closed.
keep: bool,
/// The TX buffer for this connection.
tx_buf: TxBuf,
/// Total number of bytes that have been successfully written to
@ -297,6 +299,8 @@ impl VsockChannel for VsockConnection {
// to forward some data to the host stream. Also works for a
// connection that has begun shutting down, but the peer still has
// some data to send.
// It also work for a hybrid connection's peer closed case, which need
// to active the connection's fd to generate the epollout event.
ConnState::Established | ConnState::PeerClosed(_, false)
if pkt.op() == uapi::VSOCK_OP_RW =>
{
@ -318,7 +322,19 @@ impl VsockChannel for VsockConnection {
"vsock: error writing to local stream (lp={}, pp={}): {:?}",
self.local_port, self.peer_port, err
);
self.kill();
match err {
Error::TxBufFull => {
// The hybrid pipe peer closed and the tx buf had been full,
// and if want to keep the connection, thus we need drop the
// data send from guest, otherwise, close the connection.
if !self.keep() {
self.kill();
}
}
_ => {
self.kill();
}
};
return Ok(());
}
@ -462,15 +478,26 @@ impl VsockEpollListener for VsockConnection {
.tx_buf
.flush_to(&mut self.stream)
.unwrap_or_else(|err| {
warn!(
"vsock: error flushing TX buf for (lp={}, pp={}): {:?}",
self.local_port, self.peer_port, err
);
if !self.keep() {
warn!(
"vsock: error flushing TX buf for (lp={}, pp={}): {:?}",
self.local_port, self.peer_port, err
);
}
match err {
Error::TxBufFlush(inner) if inner.kind() == ErrorKind::WouldBlock => {
// This should never happen (EWOULDBLOCK after
// EPOLLOUT), but it does, so let's absorb it.
}
Error::TxBufFlush(inner) if (inner.kind() == ErrorKind::BrokenPipe) => {
// The hybrid connection's pipe peer was clsosed, and we want to keep the
// connection thus users can reopen the peer pipe to get the connection,
// otherwise, close the connection.
if !self.keep() {
self.kill();
}
}
_ => self.kill(),
};
0
@ -499,6 +526,7 @@ impl VsockConnection {
local_port: u32,
peer_port: u32,
peer_buf_alloc: u32,
keep: bool,
) -> Self {
Self {
local_cid,
@ -506,6 +534,7 @@ impl VsockConnection {
local_port,
peer_port,
stream,
keep,
state: ConnState::PeerInit,
tx_buf: TxBuf::default(),
fwd_cnt: Wrapping(0),
@ -525,6 +554,7 @@ impl VsockConnection {
peer_cid: u64,
local_port: u32,
peer_port: u32,
keep: bool,
) -> Self {
Self {
local_cid,
@ -532,6 +562,7 @@ impl VsockConnection {
local_port,
peer_port,
stream,
keep,
state: ConnState::LocalInit,
tx_buf: TxBuf::default(),
fwd_cnt: Wrapping(0),
@ -579,6 +610,11 @@ impl VsockConnection {
self.state
}
/// Return the keep value.
pub fn keep(&self) -> bool {
self.keep
}
/// Send some raw, untracked, data straight to the underlying connected
/// stream. Returns: number of bytes written, or the error describing the
/// write failure.
@ -608,16 +644,23 @@ impl VsockConnection {
// stream.
let written = match self.stream.write(buf) {
Ok(cnt) => cnt,
Err(e) => {
Err(e) if e.kind() == ErrorKind::WouldBlock => {
// Absorb any would-block errors, since we can always try again
// later.
if e.kind() == ErrorKind::WouldBlock {
0
} else {
// We don't know how to handle any other write error, so
// we'll send it up the call chain.
0
}
Err(e) if e.kind() == ErrorKind::BrokenPipe => {
// The backed pipe peer had been closed, and we didn't want to close
// this connection since the peer would like to re attach on it.
if !self.keep() {
return Err(Error::StreamWrite(e));
}
0
}
Err(e) => {
// We don't know how to handle any other write error, so
// we'll send it up the call chain.
return Err(Error::StreamWrite(e));
}
};
// Move the "forwarded bytes" counter ahead by how much we were able to
@ -843,6 +886,7 @@ pub(crate) mod tests {
LOCAL_PORT,
PEER_PORT,
PEER_BUF_ALLOC,
false,
),
ConnState::LocalInit => VsockConnection::new_local_init(
Box::new(stream),
@ -850,6 +894,7 @@ pub(crate) mod tests {
PEER_CID,
LOCAL_PORT,
PEER_PORT,
false,
),
ConnState::Established => {
let mut conn = VsockConnection::new_peer_init(
@ -859,6 +904,7 @@ pub(crate) mod tests {
LOCAL_PORT,
PEER_PORT,
PEER_BUF_ALLOC,
false,
);
assert!(conn.has_pending_rx());
conn.recv_pkt(&mut pkt).unwrap();

View File

@ -291,6 +291,34 @@ impl VsockChannel for VsockMuxer {
// Alright, everything looks in order - forward this packet to its
// owning connection.
let mut res: VsockResult<()> = Ok(());
// For the hybrid connection, if it want to keep the connection
// when the pipe peer closed, here it needs to update the epoll
// listner to catch the events.
let mut listener = None;
let conn = self.conn_map.get_mut(&conn_key).unwrap();
let pre_state = conn.state();
let nfd: RawFd = conn.as_raw_fd();
if pre_state == ConnState::LocalClosed && conn.keep() {
conn.state = ConnState::Established;
listener = Some(EpollListener::Connection {
key: conn_key,
evset: conn.get_polled_evset(),
backend: conn.stream.backend_type(),
});
}
if let Some(nlistener) = listener {
self.add_listener(nfd, nlistener).unwrap_or_else(|err| {
self.kill_connection(conn_key);
warn!(
"vsock: error updating epoll listener for (lp={}, pp={}): {:?}",
conn_key.local_port, conn_key.peer_port, err
);
});
}
self.apply_conn_mutation(conn_key, |conn| {
res = conn.send_pkt(pkt);
});
@ -396,6 +424,23 @@ impl VsockMuxer {
// listening for it.
Some(EpollListener::Connection { key, evset: _, .. }) => {
let key_copy = *key;
// If the hybrid connection's local peer closed, then the epoll handler wouldn't
// get the epollout event even when it's reopened again, thus it should be notified
// when the guest send any data to try to active the epoll handler to generate the
// epollout event for this connection.
let mut need_rm = false;
if let Some(conn) = self.conn_map.get_mut(&key_copy) {
if event_set.contains(epoll::Events::EPOLLERR) && conn.keep() {
conn.state = ConnState::LocalClosed;
need_rm = true;
}
}
if need_rm {
self.remove_listener(fd);
}
// The handling of this event will most probably mutate the
// state of the receiving connection. We'll need to check for new
// pending RX, event set mutation, and all that, so we're
@ -459,6 +504,7 @@ impl VsockMuxer {
self.cid,
local_port,
peer_port,
false,
),
)
}
@ -476,8 +522,10 @@ impl VsockMuxer {
Some(EpollListener::PassFdStream(_)) => {
if let Some(EpollListener::PassFdStream(mut stream)) = self.remove_listener(fd) {
Self::passfd_read_port_and_fd(&mut stream)
.map(|(nfd, peer_port)| (nfd, self.allocate_local_port(), peer_port))
.and_then(|(nfd, local_port, peer_port)| {
.map(|(nfd, peer_port, keep)| {
(nfd, self.allocate_local_port(), peer_port, keep)
})
.and_then(|(nfd, local_port, peer_port, keep)| {
// Here we should make sure the nfd the sole owner to convert it
// into an UnixStream object, otherwise, it could cause memory unsafety.
let nstream = unsafe { File::from_raw_fd(nfd) };
@ -502,6 +550,7 @@ impl VsockMuxer {
self.cid,
local_port,
peer_port,
keep,
),
)
})
@ -587,7 +636,7 @@ impl VsockMuxer {
.map_err(|_| Error::InvalidPortRequest)
}
fn passfd_read_port_and_fd(stream: &mut Box<dyn VsockStream>) -> Result<(RawFd, u32)> {
fn passfd_read_port_and_fd(stream: &mut Box<dyn VsockStream>) -> Result<(RawFd, u32, bool)> {
let mut buf = [0u8; 32];
let mut fds = [0, 1];
let (data_len, fd_len) = stream
@ -607,7 +656,9 @@ impl VsockMuxer {
.ok_or(Error::InvalidPortRequest)
.and_then(|word| word.parse::<u32>().map_err(|_| Error::InvalidPortRequest))?;
Ok((fds[0], port))
let keep = port_iter.next().is_some_and(|kp| kp == "keep");
Ok((fds[0], port, keep))
}
/// Add a new connection to the active connection pool.
@ -775,6 +826,7 @@ impl VsockMuxer {
pkt.dst_port(),
pkt.src_port(),
pkt.buf_alloc(),
false,
),
)
})
@ -876,7 +928,7 @@ impl VsockMuxer {
);
});
}
} else {
} else if conn.state() != ConnState::LocalClosed {
// The connection had previously asked to be removed from the
// listener map (by returning an empty event set via
// `get_polled_fd()`), but now wants back in.

View File

@ -11,6 +11,7 @@ pub use vendor::AgentVendor;
use super::default::{
DEFAULT_AGENT_DIAL_TIMEOUT_MS, DEFAULT_AGENT_LOG_PORT, DEFAULT_AGENT_VSOCK_PORT,
DEFAULT_PASSFD_LISTENER_PORT,
};
use crate::eother;
@ -60,6 +61,10 @@ pub struct Agent {
#[serde(default = "default_log_port")]
pub log_port: u32,
/// Agent process io port
#[serde(default = "default_passfd_listener_port")]
pub passfd_listener_port: u32,
/// Agent connection dialing timeout value in millisecond
#[serde(default = "default_dial_timeout")]
pub dial_timeout_ms: u32,
@ -104,6 +109,7 @@ impl std::default::Default for Agent {
debug_console_enabled: false,
server_port: DEFAULT_AGENT_VSOCK_PORT,
log_port: DEFAULT_AGENT_LOG_PORT,
passfd_listener_port: DEFAULT_PASSFD_LISTENER_PORT,
dial_timeout_ms: DEFAULT_AGENT_DIAL_TIMEOUT_MS,
reconnect_timeout_ms: 3_000,
request_timeout_ms: 30_000,
@ -126,6 +132,10 @@ fn default_log_port() -> u32 {
DEFAULT_AGENT_LOG_PORT
}
fn default_passfd_listener_port() -> u32 {
DEFAULT_PASSFD_LISTENER_PORT
}
fn default_dial_timeout() -> u32 {
// ms
10

View File

@ -25,6 +25,7 @@ pub const DEFAULT_AGENT_NAME: &str = "kata-agent";
pub const DEFAULT_AGENT_VSOCK_PORT: u32 = 1024;
pub const DEFAULT_AGENT_LOG_PORT: u32 = 1025;
pub const DEFAULT_AGENT_DBG_CONSOLE_PORT: u32 = 1026;
pub const DEFAULT_PASSFD_LISTENER_PORT: u32 = 1027;
pub const DEFAULT_AGENT_TYPE_NAME: &str = AGENT_NAME_KATA;
pub const DEFAULT_AGENT_DIAL_TIMEOUT_MS: u32 = 10;

View File

@ -54,6 +54,8 @@ pub const DEBUG_CONSOLE_VPORT_OPTION: &str = "agent.debug_console_vport";
pub const LOG_VPORT_OPTION: &str = "agent.log_vport";
/// Option of setting the container's pipe size
pub const CONTAINER_PIPE_SIZE_OPTION: &str = "agent.container_pipe_size";
/// Option of setting the fd passthrough io listener port
pub const PASSFD_LISTENER_PORT: &str = "agent.passfd_listener_port";
/// Trait to manipulate global Kata configuration information.
pub trait ConfigPlugin: Send + Sync {

View File

@ -167,6 +167,18 @@ pub struct Runtime {
/// shared_mount declarations
#[serde(default)]
pub shared_mounts: Vec<SharedMount>,
/// If enabled, the runtime will attempt to use fd passthrough feature for process io.
#[serde(default)]
pub use_passfd_io: bool,
/// If fd passthrough io is enabled, the runtime will attempt to use the specified port instead of the default port.
#[serde(default = "default_passfd_listener_port")]
pub passfd_listener_port: u32,
}
fn default_passfd_listener_port() -> u32 {
default::DEFAULT_PASSFD_LISTENER_PORT
}
impl ConfigOps for Runtime {

View File

@ -93,6 +93,14 @@ message CreateContainerRequest {
// This field is used to declare a set of shared mount points
// that support cross-container sharing of mount objects.
repeated SharedMount shared_mounts = 8;
// These fields are the host-side vport numbers of passfd streams
// pre-created by runtime-rs, and used as identifiers for the agent
// to select the right streams for init process's stdin/stdout/stderr.
// Disable the feature by setting the associated port to 0.
uint32 stdin_port = 9;
uint32 stdout_port = 10;
uint32 stderr_port = 11;
}
message StartContainerRequest {
@ -115,6 +123,14 @@ message ExecProcessRequest {
string exec_id = 2;
StringUser string_user = 3;
Process process = 4;
// These fields are the host-side vport numbers of passfd streams
// pre-created by runtime-rs, and used as identifiers for the agent
// to select the right streams for process's stdin/stdout/stderr.
// Disable the feature by setting the associated port to 0.
uint32 stdin_port = 5;
uint32 stdout_port = 6;
uint32 stderr_port = 7;
}
message SignalProcessRequest {

View File

@ -3448,6 +3448,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604b71b8fc267e13bb3023a2c901126c8f349393666a6d98ac1ae5729b701798"
dependencies = [
"libc",
"tokio",
]
[[package]]
@ -4452,6 +4453,7 @@ dependencies = [
"persist",
"protobuf 3.2.0",
"resource",
"sendfd",
"serde",
"serde_derive",
"serde_json",

View File

@ -372,3 +372,10 @@ sandbox_bind_mounts=@DEFBINDMOUNTS@
# to the hypervisor.
# (default: /run/kata-containers/dans)
dan_conf = "@DEFDANCONF@"
# If enabled, the runtime will attempt to use fd passthrough feature for process io.
# Note: this feature is only supported by the Dragonball hypervisor.
use_passfd_io = true
# If fd passthrough io is enabled, the runtime will attempt to use the specified port instead of the default port.
# passfd_listener_port = 1027

View File

@ -274,6 +274,9 @@ impl From<CreateContainerRequest> for agent::CreateContainerRequest {
OCI: from_option(from.oci),
sandbox_pidns: from.sandbox_pidns,
shared_mounts: trans_vec(from.shared_mounts),
stdin_port: from.stdin_port.unwrap_or_default(),
stdout_port: from.stdout_port.unwrap_or_default(),
stderr_port: from.stderr_port.unwrap_or_default(),
..Default::default()
}
}
@ -415,6 +418,9 @@ impl From<ExecProcessRequest> for agent::ExecProcessRequest {
exec_id: from.process_id.exec_id(),
string_user: from_option(from.string_user),
process: from_option(from.process),
stdin_port: from.stdin_port.unwrap_or_default(),
stdout_port: from.stdout_port.unwrap_or_default(),
stderr_port: from.stderr_port.unwrap_or_default(),
..Default::default()
}
}

View File

@ -128,6 +128,9 @@ pub struct CreateContainerRequest {
pub sandbox_pidns: bool,
pub rootfs_mounts: Vec<oci::Mount>,
pub shared_mounts: Vec<SharedMount>,
pub stdin_port: Option<u32>,
pub stdout_port: Option<u32>,
pub stderr_port: Option<u32>,
}
#[derive(PartialEq, Clone, Default)]
@ -252,6 +255,9 @@ pub struct ExecProcessRequest {
pub process_id: ContainerProcessID,
pub string_user: Option<StringUser>,
pub process: Option<oci::Process>,
pub stdin_port: Option<u32>,
pub stdout_port: Option<u32>,
pub stderr_port: Option<u32>,
}
#[derive(PartialEq, Clone, Default, Debug)]

View File

@ -182,6 +182,10 @@ impl Hypervisor for CloudHypervisor {
let inner = self.inner.read().await;
inner.resize_memory(new_mem_mb)
}
async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> {
Err(anyhow::anyhow!("Not yet supported"))
}
}
#[async_trait]

View File

@ -22,7 +22,7 @@ use kata_types::{
capabilities::{Capabilities, CapabilityBits},
config::{
hypervisor::{HugePageType, Hypervisor as HypervisorConfig},
KATA_PATH,
KATA_PATH, PASSFD_LISTENER_PORT,
},
};
use nix::mount::MsFlags;
@ -80,6 +80,10 @@ pub struct DragonballInner {
/// the balloon size
pub(crate) balloon_size: u32,
/// guest-side fd passthrough io listener port, used to initialize
/// connections for io
pub(crate) passfd_listener_port: Option<u32>,
}
impl DragonballInner {
@ -108,6 +112,7 @@ impl DragonballInner {
guest_memory_block_size_mb: 0,
mem_hotplug_size_mb: 0,
balloon_size: 0,
passfd_listener_port: None,
}
}
@ -128,6 +133,12 @@ impl DragonballInner {
kernel_params.append(&mut KernelParams::from_string(
&self.config.boot_info.kernel_params,
));
if let Some(passfd_listener_port) = self.passfd_listener_port {
kernel_params.append(&mut KernelParams::from_string(&format!(
"{}={}",
PASSFD_LISTENER_PORT, passfd_listener_port
)));
}
info!(sl!(), "prepared kernel_params={:?}", kernel_params);
// set boot source
@ -458,6 +469,10 @@ impl DragonballInner {
pub(crate) fn guest_memory_block_size_mb(&self) -> u32 {
self.guest_memory_block_size_mb
}
pub fn set_passfd_listener_port(&mut self, port: u32) {
self.passfd_listener_port = Some(port);
}
}
#[async_trait]
@ -477,6 +492,7 @@ impl Persist for DragonballInner {
config: self.hypervisor_config(),
run_dir: self.run_dir.clone(),
cached_block_devices: self.cached_block_devices.clone(),
passfd_listener_port: self.passfd_listener_port,
..Default::default()
})
}
@ -502,6 +518,7 @@ impl Persist for DragonballInner {
guest_memory_block_size_mb: 0,
mem_hotplug_size_mb: 0,
balloon_size: 0,
passfd_listener_port: hypervisor_state.passfd_listener_port,
})
}
}

View File

@ -9,7 +9,7 @@ use std::{
iter::FromIterator,
};
use anyhow::{Context, Ok, Result};
use anyhow::{anyhow, Context, Ok, Result};
use kata_types::capabilities::Capabilities;
use super::inner::DragonballInner;
@ -76,6 +76,15 @@ impl DragonballInner {
))
}
/// Get the address of agent vsock server used to init connections for io
pub(crate) async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> {
if let Some(passfd_port) = self.passfd_listener_port {
Ok((get_hvsock_path(&self.id), passfd_port))
} else {
Err(anyhow!("passfd io listener port not set"))
}
}
pub(crate) async fn get_hypervisor_metrics(&self) -> Result<String> {
info!(sl!(), "get hypervisor metrics");
self.vmm_instance.get_hypervisor_metrics()

View File

@ -55,6 +55,11 @@ impl Dragonball {
let mut inner = self.inner.write().await;
inner.set_hypervisor_config(config)
}
pub async fn set_passfd_listener_port(&mut self, port: u32) {
let mut inner = self.inner.write().await;
inner.set_passfd_listener_port(port)
}
}
#[async_trait]
@ -198,6 +203,11 @@ impl Hypervisor for Dragonball {
let mut inner = self.inner.write().await;
inner.resize_memory(new_mem_mb)
}
async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> {
let inner = self.inner.read().await;
inner.get_passfd_listener_addr().await
}
}
#[async_trait]

View File

@ -33,4 +33,5 @@ pub struct HypervisorState {
/// cached block device
pub cached_block_devices: HashSet<String>,
pub virtiofs_daemon_pid: i32,
pub passfd_listener_port: Option<u32>,
}

View File

@ -115,4 +115,5 @@ pub trait Hypervisor: std::fmt::Debug + Send + Sync {
async fn set_capabilities(&self, flag: CapabilityBits);
async fn set_guest_memory_block_size(&self, size: u32);
async fn guest_memory_block_size(&self) -> u32;
async fn get_passfd_listener_addr(&self) -> Result<(String, u32)>;
}

View File

@ -179,6 +179,10 @@ impl Hypervisor for Qemu {
let inner = self.inner.read().await;
inner.resize_memory(new_mem_mb)
}
async fn get_passfd_listener_addr(&self) -> Result<(String, u32)> {
Err(anyhow::anyhow!("Not yet supported"))
}
}
#[async_trait]

View File

@ -15,6 +15,7 @@ lazy_static = "1.4.0"
libc = ">=0.2.39"
nix = "0.24.2"
protobuf = "3.2.0"
sendfd = { version = "0.4.3", features = ["tokio"] }
serde = { version = "1.0.100", features = ["derive"] }
serde_derive = "1.0.27"
serde_json = "1.0.82"

View File

@ -43,15 +43,17 @@ pub struct Container {
agent: Arc<dyn Agent>,
resource_manager: Arc<ResourceManager>,
logger: slog::Logger,
pub(crate) passfd_listener_addr: Option<(String, u32)>,
}
impl Container {
pub fn new(
pub async fn new(
pid: u32,
config: ContainerConfig,
spec: oci::Spec,
agent: Arc<dyn Agent>,
resource_manager: Arc<ResourceManager>,
passfd_listener_addr: Option<(String, u32)>,
) -> Result<Self> {
let container_id = ContainerID::new(&config.container_id).context("new container id")?;
let logger = sl!().new(o!("container_id" => config.container_id.clone()));
@ -84,6 +86,7 @@ impl Container {
agent,
resource_manager,
logger,
passfd_listener_addr,
})
}
@ -184,6 +187,17 @@ impl Container {
}
}
// In passfd io mode, we create vsock connections for io in advance
// and pass port info to agent in `CreateContainerRequest`.
// These vsock connections will be used as stdin/stdout/stderr of the container process.
// See agent/src/passfd_io.rs for more details.
if let Some((hvsock_uds_path, passfd_port)) = &self.passfd_listener_addr {
inner
.init_process
.passfd_io_init(hvsock_uds_path, *passfd_port)
.await?;
}
// create container
let r = agent::CreateContainerRequest {
process_id: agent::ContainerProcessID::new(&config.container_id, ""),
@ -192,6 +206,21 @@ impl Container {
sandbox_pidns,
devices: devices_agent,
shared_mounts,
stdin_port: inner
.init_process
.passfd_io
.as_ref()
.and_then(|io| io.stdin_port),
stdout_port: inner
.init_process
.passfd_io
.as_ref()
.and_then(|io| io.stdout_port),
stderr_port: inner
.init_process
.passfd_io
.as_ref()
.and_then(|io| io.stderr_port),
..Default::default()
};
@ -217,21 +246,40 @@ impl Container {
return Err(err);
}
let container_io = inner.new_container_io(process).await?;
inner
.init_process
.start_io_and_wait(containers, self.agent.clone(), container_io)
.await?;
if self.passfd_listener_addr.is_some() {
inner
.init_process
.passfd_io_wait(containers, self.agent.clone())
.await?;
} else {
let container_io = inner.new_container_io(process).await?;
inner
.init_process
.start_io_and_wait(containers, self.agent.clone(), container_io)
.await?;
}
}
ProcessType::Exec => {
// In passfd io mode, we create vsock connections for io in advance
// and pass port info to agent in `ExecProcessRequest`.
// These vsock connections will be used as stdin/stdout/stderr of the exec process.
// See agent/src/passfd_io.rs for more details.
if let Some((hvsock_uds_path, passfd_port)) = &self.passfd_listener_addr {
let exec = inner
.exec_processes
.get_mut(&process.exec_id)
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
exec.process
.passfd_io_init(hvsock_uds_path, *passfd_port)
.await?;
}
if let Err(e) = inner.start_exec_process(process).await {
let device_manager = self.resource_manager.get_device_manager().await;
let _ = inner.stop_process(process, true, &device_manager).await;
return Err(e).context("enter process");
}
let container_io = inner.new_container_io(process).await.context("io stream")?;
{
let exec = inner
.exec_processes
@ -245,13 +293,29 @@ impl Container {
}
}
// start io and wait
{
if self.passfd_listener_addr.is_some() {
// In passfd io mode, we don't bother with the IO.
// We send `WaitProcessRequest` immediately to the agent
// and wait for the response in a separate thread.
// The agent will only respond after IO is done.
let exec = inner
.exec_processes
.get_mut(&process.exec_id)
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
exec.process
.passfd_io_wait(containers, self.agent.clone())
.await?;
} else {
// In legacy io mode, we handle IO by polling the agent.
// When IO is done, we send `WaitProcessRequest` to agent
// to get the exit status.
let container_io =
inner.new_container_io(process).await.context("io stream")?;
let exec = inner
.exec_processes
.get_mut(&process.exec_id)
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
exec.process
.start_io_and_wait(containers, self.agent.clone(), container_io)
.await

View File

@ -87,6 +87,17 @@ impl ContainerInner {
process_id: process.clone().into(),
string_user: None,
process: Some(exec.oci_process.clone()),
stdin_port: exec.process.passfd_io.as_ref().and_then(|io| io.stdin_port),
stdout_port: exec
.process
.passfd_io
.as_ref()
.and_then(|io| io.stdout_port),
stderr_port: exec
.process
.passfd_io
.as_ref()
.and_then(|io| io.stderr_port),
})
.await
.context("exec process")?;

View File

@ -6,5 +6,7 @@
mod container_io;
pub use container_io::ContainerIo;
mod passfd_io;
mod shim_io;
pub use passfd_io::PassfdIo;
pub use shim_io::ShimIo;

View File

@ -0,0 +1,135 @@
// Copyright 2024 Kata Contributors
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::{anyhow, Context, Result};
use sendfd::SendWithFd;
use std::{
fs::OpenOptions,
os::fd::{AsRawFd, OwnedFd},
os::unix::fs::OpenOptionsExt,
};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::UnixStream,
};
// Note: the fd will be closed after passing
async fn passfd_connect(uds: &str, port: u32, fd: OwnedFd) -> Result<u32> {
info!(sl!(), "passfd uds {:?} port {}", &uds, port);
let mut stream = UnixStream::connect(&uds).await.context("connect")?;
stream.write_all(b"passfd\n").await.context("write all")?;
// We want the io connection keep connected when the containerd closed the io pipe,
// thus it can be attached on the io stream.
let buf = format!("{} keep", port);
stream
.send_with_fd(buf.as_bytes(), &[fd.as_raw_fd()])
.context("send port and fd")?;
let mut reads = BufReader::new(&mut stream);
let mut response = String::new();
reads.read_line(&mut response).await.context("read line")?;
// parse response like "OK port"
let mut iter = response.split_whitespace();
if iter.next() != Some("OK") {
return Err(anyhow!(
"handshake error: malformed response code: {:?}",
response
));
}
let hostport = iter
.next()
.ok_or_else(|| anyhow!("handshake error: malformed response code: {:?}", response))?
.parse::<u32>()
.context("handshake error: malformed response code")?;
Ok(hostport)
}
#[derive(Debug, Default)]
pub struct PassfdIo {
stdin: Option<String>,
stdout: Option<String>,
stderr: Option<String>,
pub stdin_port: Option<u32>,
pub stdout_port: Option<u32>,
pub stderr_port: Option<u32>,
}
impl PassfdIo {
pub async fn new(
stdin: Option<String>,
stdout: Option<String>,
stderr: Option<String>,
) -> Self {
Self {
stdin,
stdout,
stderr,
..Default::default()
}
}
pub async fn open_and_passfd(
&mut self,
uds_path: &str,
passfd_port: u32,
terminal: bool,
) -> Result<()> {
// In linux, when a FIFO is opened and there are no writers, the reader
// will continuously receive the HUP event. This can be problematic
// when creating containers in detached mode, as the stdin FIFO writer
// is closed after the container is created, resulting in this situation.
//
// See: https://stackoverflow.com/questions/15055065/o-rdwr-on-named-pipes-with-poll
if let Some(stdin) = &self.stdin {
let fin = OpenOptions::new()
.read(true)
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(stdin)
.context("open stdin")?;
let hostport = passfd_connect(uds_path, passfd_port, fin.into())
.await
.context("passfd")?;
self.stdin_port = Some(hostport);
}
if let Some(stdout) = &self.stdout {
let fout = OpenOptions::new()
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(stdout)
.context("open stdout")?;
let hostport = passfd_connect(uds_path, passfd_port, fout.into())
.await
.context("passfd")?;
self.stdout_port = Some(hostport);
}
if !terminal {
// stderr is not used in terminal mode
if let Some(stderr) = &self.stderr {
let ferr = OpenOptions::new()
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(stderr)
.context("open stderr")?;
let hostport = passfd_connect(uds_path, passfd_port, ferr.into())
.await
.context("passfd")?;
self.stderr_port = Some(hostport);
}
}
Ok(())
}
}

View File

@ -77,7 +77,9 @@ impl ContainerManager for VirtContainerManager {
spec.clone(),
self.agent.clone(),
self.resource_manager.clone(),
self.hypervisor.get_passfd_listener_addr().await.ok(),
)
.await
.context("new container")?;
// CreateContainer Hooks:

View File

@ -15,7 +15,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::{watch, RwLock};
use super::container::Container;
use super::io::{ContainerIo, ShimIo};
use super::io::{ContainerIo, PassfdIo, ShimIo};
use super::logger_with_process;
pub type ProcessWatcher = (
@ -46,6 +46,9 @@ pub struct Process {
// close io call should wait until the stdin io copy finished to
// prevent stdin data lost.
pub wg_stdin: WaitGroup,
// io streams using vsock fd passthrough feature
pub passfd_io: Option<PassfdIo>,
}
impl Process {
@ -76,9 +79,95 @@ impl Process {
exit_watcher_rx: Some(receiver),
exit_watcher_tx: Some(sender),
wg_stdin: WaitGroup::new(),
passfd_io: None,
}
}
/// Init the `passfd_io` struct and vsock connections for io to the agent.
pub async fn passfd_io_init(&mut self, hvsock_uds_path: &str, passfd_port: u32) -> Result<()> {
info!(self.logger, "passfd io init");
let mut passfd_io =
PassfdIo::new(self.stdin.clone(), self.stdout.clone(), self.stderr.clone()).await;
passfd_io
.open_and_passfd(hvsock_uds_path, passfd_port, self.terminal)
.await
.context("passfd connect")?;
self.passfd_io = Some(passfd_io);
Ok(())
}
/// (After process started) Send a WaitProcessRequest to agent in the
/// seperate thread.
/// This function is only used in passfd io mode.
pub async fn passfd_io_wait(
&mut self,
containers: Arc<RwLock<HashMap<String, Container>>>,
agent: Arc<dyn Agent>,
) -> Result<()> {
let logger = self.logger.clone();
info!(logger, "start passfd io wait");
let process = self.process.clone();
let exit_status = self.exit_status.clone();
let exit_notifier = self.exit_watcher_tx.take();
let status = self.status.clone();
tokio::spawn(async move {
let req = agent::WaitProcessRequest {
process_id: process.clone().into(),
};
info!(logger, "begin passfd io wait process");
let resp = match agent.wait_process(req).await {
Ok(ret) => ret,
Err(e) => {
error!(logger, "failed to passfd io wait process {:?}", e);
return;
}
};
info!(
logger,
"end passfd io wait process exit code {}", resp.status
);
let containers = containers.read().await;
let container_id = &process.container_id.container_id;
let c = match containers.get(container_id) {
Some(c) => c,
None => {
error!(
logger,
"Failed to stop process, since container {} not found", container_id
);
return;
}
};
if let Err(err) = c.stop_process(&process).await {
error!(
logger,
"Failed to stop process, process = {:?}, err = {:?}", process, err
);
}
let mut exit_status = exit_status.write().await;
exit_status.update_exit_code(resp.status);
drop(exit_status);
let mut status = status.write().await;
*status = ProcessStatus::Stopped;
drop(status);
drop(exit_notifier);
info!(logger, "end passfd io wait thread");
});
Ok(())
}
pub async fn start_io_and_wait(
&mut self,
containers: Arc<RwLock<HashMap<String, Container>>>,
@ -246,15 +335,20 @@ impl Process {
*status = ProcessStatus::Stopped;
}
/// Close the stdin of the process in container.
pub async fn close_io(&mut self, agent: Arc<dyn Agent>) {
self.wg_stdin.wait().await;
// In passfd io mode, the stdin close and sync logic is handled
// in the agent side.
if self.passfd_io.is_none() {
self.wg_stdin.wait().await;
}
let req = agent::CloseStdinRequest {
process_id: self.process.clone().into(),
};
if let Err(e) = agent.close_stdin(req).await {
warn!(self.logger, "failed clsoe process io: {:?}", e);
warn!(self.logger, "failed close process io: {:?}", e);
}
}

View File

@ -140,6 +140,11 @@ async fn new_hypervisor(toml_config: &TomlConfig) -> Result<Arc<dyn Hypervisor>>
hypervisor
.set_hypervisor_config(hypervisor_config.clone())
.await;
if toml_config.runtime.use_passfd_io {
hypervisor
.set_passfd_listener_port(toml_config.runtime.passfd_listener_port)
.await;
}
Ok(Arc::new(hypervisor))
}
HYPERVISOR_QEMU => {

View File

@ -294,6 +294,7 @@ impl ContainerLauncher {
&self.id,
self.init,
0,
None,
)?)
} else {
Err(anyhow!("no process configuration"))