From 4be7185aa48f793428701b1b73f201e60b45d598 Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Sat, 26 Mar 2022 17:33:41 +0800 Subject: [PATCH] runtime-rs: runtime part implement Fixes: #3785 Signed-off-by: Tim Zhang Signed-off-by: Zhongtao Hu Signed-off-by: Quanwei Zhou --- src/runtime-rs/Cargo.lock | 37 +- .../crates/runtimes/virt_container/Cargo.toml | 20 + .../src/container_manager/container.rs | 403 ++++++++++++++++++ .../src/container_manager/container_inner.rs | 261 ++++++++++++ .../src/container_manager/io/container_io.rs | 171 ++++++++ .../src/container_manager/io/mod.rs | 10 + .../src/container_manager/io/shim_io.rs | 135 ++++++ .../src/container_manager/manager.rs | 274 ++++++++++++ .../src/container_manager/mod.rs | 20 + .../src/container_manager/process.rs | 211 +++++++++ .../virt_container/src/health_check.rs | 123 ++++++ .../crates/runtimes/virt_container/src/lib.rs | 73 +++- .../runtimes/virt_container/src/sandbox.rs | 229 ++++++++++ 13 files changed, 1961 insertions(+), 6 deletions(-) create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/container_io.rs create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/container_manager/mod.rs create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/health_check.rs create mode 100644 src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs diff --git a/src/runtime-rs/Cargo.lock b/src/runtime-rs/Cargo.lock index 2533ab1ebb..09abdaca59 100644 --- a/src/runtime-rs/Cargo.lock +++ b/src/runtime-rs/Cargo.lock @@ -77,6 +77,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "awaitgroup" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc17ab023b4091c10ff099f9deebaeeb59b5189df07e554c4fef042b70745d68" + [[package]] name = "backtrace" version = "0.3.64" @@ -647,7 +653,7 @@ dependencies = [ "slog", "slog-scope", "thiserror", - "toml", + "toml 0.5.8", ] [[package]] @@ -1697,6 +1703,15 @@ dependencies = [ "vsock", ] +[[package]] +name = "toml" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f" +dependencies = [ + "serde", +] + [[package]] name = "toml" version = "0.5.8" @@ -1848,11 +1863,31 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" name = "virt_container" version = "0.1.0" dependencies = [ + "agent", "anyhow", "async-trait", + "awaitgroup", "common", + "containerd-shim-protos", + "futures 0.3.21", + "hypervisor", + "kata-sys-util", "kata-types", + "lazy_static", + "libc", + "logging", + "nix 0.16.1", + "oci", + "protobuf", + "resource", + "serde", + "serde_derive", + "serde_json", + "slog", + "slog-scope", "tokio", + "toml 0.4.10", + "url", ] [[package]] diff --git a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml index 8e63c01c1a..e34e2fd5b7 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml +++ b/src/runtime-rs/crates/runtimes/virt_container/Cargo.toml @@ -7,7 +7,27 @@ edition = "2018" [dependencies] anyhow = "^1.0" async-trait = "0.1.48" +awaitgroup = "0.6.0" +containerd-shim-protos = { version = "0.2.0", features = ["async"]} +futures = "0.3.19" +lazy_static = "1.4.0" +libc = ">=0.2.39" +nix = "0.16.0" +protobuf = "2.23.0" +serde = { version = "1.0.100", features = ["derive"] } +serde_derive = "1.0.27" +serde_json = "1.0.39" +slog = "2.5.2" +slog-scope = "4.4.0" tokio = { version = "1.8.0" } +toml = "0.4.2" +url = "2.1.1" +agent = { path = "../../agent" } common = { path = "../common" } +hypervisor = { path = "../../hypervisor" } +kata-sys-util = { path = "../../../../libs/kata-sys-util" } kata-types = { path = "../../../../libs/kata-types" } +logging = { path = "../../../../libs/logging"} +oci = { path = "../../../../libs/oci" } +resource = { path = "../../resource" } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs new file mode 100644 index 0000000000..cb1c7b2b1c --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container.rs @@ -0,0 +1,403 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::sync::Arc; + +use agent::Agent; +use anyhow::{anyhow, Context, Result}; +use common::{ + error::Error, + types::{ + ContainerConfig, ContainerID, ContainerProcess, ProcessStateInfo, ProcessStatus, + ProcessType, + }, +}; +use oci::{LinuxResources, Process as OCIProcess}; +use resource::ResourceManager; +use tokio::sync::RwLock; + +use super::{ + process::{Process, ProcessWatcher}, + ContainerInner, +}; +use crate::container_manager::logger_with_process; + +pub struct Exec { + pub(crate) process: Process, + pub(crate) oci_process: OCIProcess, +} + +pub struct Container { + pid: u32, + pub container_id: ContainerID, + config: ContainerConfig, + inner: Arc>, + agent: Arc, + resource_manager: Arc, + logger: slog::Logger, +} + +impl Container { + pub fn new( + pid: u32, + config: ContainerConfig, + agent: Arc, + resource_manager: Arc, + ) -> Result { + let container_id = ContainerID::new(&config.container_id).context("new container id")?; + let logger = sl!().new(o!("container_id" => config.container_id.clone())); + let process = ContainerProcess::new(&config.container_id, "")?; + let init_process = Process::new( + &process, + pid, + &config.bundle, + config.stdin.clone(), + config.stdout.clone(), + config.stderr.clone(), + config.terminal, + ); + + Ok(Self { + pid, + container_id, + config, + inner: Arc::new(RwLock::new(ContainerInner::new( + agent.clone(), + init_process, + logger.clone(), + ))), + agent, + resource_manager, + logger, + }) + } + + pub async fn create(&self, mut spec: oci::Spec) -> Result<()> { + // process oci spec + let mut inner = self.inner.write().await; + let config = &self.config; + amend_spec(&mut spec).context("load spec")?; + + // handler rootfs + let rootfs = self + .resource_manager + .handler_rootfs(&config.container_id, &config.bundle, &config.rootfs_mounts) + .await + .context("handler rootfs")?; + + // update rootfs + match spec.root.as_mut() { + Some(spec) => { + spec.path = rootfs + .get_guest_rootfs_path() + .await + .context("get guest rootfs path")? + } + None => return Err(anyhow!("spec miss root field")), + }; + inner.rootfs.push(rootfs); + + // handler volumes + let volumes = self + .resource_manager + .handler_volumes(&config.container_id, &spec.mounts) + .await + .context("handler volumes")?; + let mut oci_mounts = vec![]; + let mut storages = vec![]; + for v in volumes { + let mut volume_mounts = v.get_volume_mount().context("get volume mount")?; + if !volume_mounts.is_empty() { + oci_mounts.append(&mut volume_mounts); + } + + let mut s = v.get_storage().context("get storage")?; + if !s.is_empty() { + storages.append(&mut s); + } + inner.volumes.push(v); + } + spec.mounts = oci_mounts; + + // TODO: handler device + + // update cgroups + self.resource_manager + .update_cgroups( + &config.container_id, + spec.linux + .as_ref() + .map(|linux| linux.resources.as_ref()) + .flatten(), + ) + .await?; + + // create container + let r = agent::CreateContainerRequest { + process_id: agent::ContainerProcessID::new(&config.container_id, ""), + string_user: None, + devices: vec![], + storages, + oci: Some(spec), + guest_hooks: None, + sandbox_pidns: false, + rootfs_mounts: vec![], + }; + + self.agent + .create_container(r) + .await + .context("agent create container")?; + self.resource_manager.dump().await; + Ok(()) + } + + pub async fn start(&self, process: &ContainerProcess) -> Result<()> { + let mut inner = self.inner.write().await; + match process.process_type { + ProcessType::Container => { + if let Err(err) = inner.start_container(&process.container_id).await { + let _ = inner.stop_process(process, true).await; + return Err(err); + } + + let container_io = inner.new_container_io(process).await?; + inner + .init_process + .start_io_and_wait(self.agent.clone(), container_io) + .await?; + } + ProcessType::Exec => { + if let Err(e) = inner.start_exec_process(process).await { + let _ = inner.stop_process(process, true).await; + return Err(e).context("enter process"); + } + + let container_io = inner.new_container_io(process).await.context("io stream")?; + + { + let exec = inner + .exec_processes + .get(&process.exec_id) + .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; + if exec.process.height != 0 && exec.process.width != 0 { + inner + .win_resize_process(process, exec.process.height, exec.process.width) + .await + .context("win resize")?; + } + } + + // start io and wait + { + let exec = inner + .exec_processes + .get_mut(&process.exec_id) + .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; + + exec.process + .start_io_and_wait(self.agent.clone(), container_io) + .await + .context("start io and wait")?; + } + } + } + + Ok(()) + } + + pub async fn delete_exec_process(&self, container_process: &ContainerProcess) -> Result<()> { + let mut inner = self.inner.write().await; + inner + .delete_exec_process(&container_process.exec_id) + .await + .context("delete process") + } + + pub async fn state_process( + &self, + container_process: &ContainerProcess, + ) -> Result { + let inner = self.inner.read().await; + match container_process.process_type { + ProcessType::Container => inner.init_process.state().await, + ProcessType::Exec => { + let exec = inner + .exec_processes + .get(&container_process.exec_id) + .ok_or_else(|| Error::ProcessNotFound(container_process.clone()))?; + exec.process.state().await + } + } + } + + pub async fn wait_process( + &self, + container_process: &ContainerProcess, + ) -> Result { + let logger = logger_with_process(container_process); + info!(logger, "start wait process"); + + let inner = self.inner.read().await; + inner + .fetch_exit_watcher(container_process) + .context("fetch exit watcher") + } + + pub async fn kill_process( + &self, + container_process: &ContainerProcess, + signal: u32, + all: bool, + ) -> Result<()> { + let inner = self.inner.read().await; + inner.signal_process(container_process, signal, all).await + } + + pub async fn exec_process( + &self, + container_process: &ContainerProcess, + stdin: Option, + stdout: Option, + stderr: Option, + terminal: bool, + oci_process: OCIProcess, + ) -> Result<()> { + let process = Process::new( + container_process, + self.pid, + &self.config.bundle, + stdin, + stdout, + stderr, + terminal, + ); + let exec = Exec { + process, + oci_process, + }; + let mut inner = self.inner.write().await; + inner.add_exec_process(&container_process.exec_id, exec); + Ok(()) + } + + pub async fn close_io(&self, container_process: &ContainerProcess) -> Result<()> { + let mut inner = self.inner.write().await; + inner.close_io(container_process).await + } + + pub async fn stop_process(&self, container_process: &ContainerProcess) -> Result<()> { + let mut inner = self.inner.write().await; + inner + .stop_process(container_process, true) + .await + .context("stop process") + } + + pub async fn pause(&self) -> Result<()> { + let inner = self.inner.read().await; + if inner.init_process.status == ProcessStatus::Paused { + warn!(self.logger, "container is paused no need to pause"); + return Ok(()); + } + self.agent + .pause_container(self.container_id.clone().into()) + .await + .context("agent pause container")?; + Ok(()) + } + + pub async fn resume(&self) -> Result<()> { + let inner = self.inner.read().await; + if inner.init_process.status == ProcessStatus::Running { + warn!(self.logger, "container is running no need to resume"); + return Ok(()); + } + self.agent + .resume_container(self.container_id.clone().into()) + .await + .context("agent pause container")?; + Ok(()) + } + + pub async fn resize_pty( + &self, + process: &ContainerProcess, + width: u32, + height: u32, + ) -> Result<()> { + let logger = logger_with_process(process); + let inner = self.inner.read().await; + if inner.init_process.status != ProcessStatus::Running { + warn!(logger, "container is running no need to resume"); + return Ok(()); + } + self.agent + .tty_win_resize(agent::TtyWinResizeRequest { + process_id: process.clone().into(), + row: height, + column: width, + }) + .await + .context("resize pty")?; + Ok(()) + } + + pub async fn stats(&self) -> Result> { + let stats_resp = self + .agent + .stats_container(self.container_id.clone().into()) + .await + .context("agent stats container")?; + Ok(Some(stats_resp)) + } + + pub async fn update(&self, resources: &LinuxResources) -> Result<()> { + self.resource_manager + .update_cgroups(&self.config.container_id, Some(resources)) + .await?; + + let req = agent::UpdateContainerRequest { + container_id: self.container_id.container_id.clone(), + resources: resources.clone(), + mounts: Vec::new(), + }; + self.agent + .update_container(req) + .await + .context("agent update container")?; + Ok(()) + } +} + +fn amend_spec(spec: &mut oci::Spec) -> Result<()> { + // hook should be done on host + spec.hooks = None; + + if let Some(linux) = spec.linux.as_mut() { + linux.seccomp = None; + + if let Some(resource) = linux.resources.as_mut() { + resource.devices = Vec::new(); + resource.pids = None; + resource.block_io = None; + resource.hugepage_limits = Vec::new(); + resource.network = None; + } + + let mut ns: Vec = Vec::new(); + for n in linux.namespaces.iter() { + match n.r#type.as_str() { + oci::PIDNAMESPACE | oci::NETWORKNAMESPACE => continue, + _ => ns.push(n.clone()), + } + } + + linux.namespaces = ns; + } + + Ok(()) +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs new file mode 100644 index 0000000000..2920b23ff8 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/container_inner.rs @@ -0,0 +1,261 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{collections::HashMap, sync::Arc}; + +use agent::Agent; +use anyhow::{anyhow, Context, Result}; +use common::{ + error::Error, + types::{ContainerID, ContainerProcess, ProcessExitStatus, ProcessStatus, ProcessType}, +}; +use nix::sys::signal::Signal; +use resource::{rootfs::Rootfs, volume::Volume}; +use tokio::sync::RwLock; + +use crate::container_manager::logger_with_process; + +use super::{ + io::ContainerIo, + process::{Process, ProcessWatcher}, + Exec, +}; + +pub struct ContainerInner { + agent: Arc, + logger: slog::Logger, + pub(crate) init_process: Process, + pub(crate) exec_processes: HashMap, + pub(crate) rootfs: Vec>, + pub(crate) volumes: Vec>, +} + +impl ContainerInner { + pub(crate) fn new(agent: Arc, init_process: Process, logger: slog::Logger) -> Self { + Self { + agent, + logger, + init_process, + exec_processes: HashMap::new(), + rootfs: vec![], + volumes: vec![], + } + } + + fn container_id(&self) -> &str { + self.init_process.process.container_id() + } + + pub(crate) fn check_state(&self, states: Vec) -> Result<()> { + let state = self.init_process.status; + if states.contains(&state) { + return Ok(()); + } + + Err(anyhow!( + "failed to check state {:?} for {:?}", + state, + states + )) + } + + pub(crate) fn set_state(&mut self, state: ProcessStatus) { + self.init_process.status = state; + } + + pub(crate) async fn start_exec_process(&mut self, process: &ContainerProcess) -> Result<()> { + let exec = self + .exec_processes + .get_mut(&process.exec_id) + .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; + + self.agent + .exec_process(agent::ExecProcessRequest { + process_id: process.clone().into(), + string_user: None, + process: Some(exec.oci_process.clone()), + }) + .await + .map(|_| { + exec.process.status = ProcessStatus::Running; + }) + } + + pub(crate) async fn win_resize_process( + &self, + process: &ContainerProcess, + height: u32, + width: u32, + ) -> Result<()> { + self.check_state(vec![ProcessStatus::Created, ProcessStatus::Running]) + .context("check state")?; + + self.agent + .tty_win_resize(agent::TtyWinResizeRequest { + process_id: process.clone().into(), + row: height, + column: width, + }) + .await?; + Ok(()) + } + + pub fn fetch_exit_watcher(&self, process: &ContainerProcess) -> Result { + match process.process_type { + ProcessType::Container => self.init_process.fetch_exit_watcher(), + ProcessType::Exec => { + let exec = self + .exec_processes + .get(&process.exec_id) + .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; + exec.process.fetch_exit_watcher() + } + } + } + + pub(crate) async fn start_container(&mut self, cid: &ContainerID) -> Result<()> { + self.check_state(vec![ProcessStatus::Created, ProcessStatus::Stopped]) + .context("check state")?; + + self.agent + .start_container(agent::ContainerID { + container_id: cid.container_id.clone(), + }) + .await + .context("start container")?; + + self.set_state(ProcessStatus::Running); + + Ok(()) + } + + async fn get_exit_status(&self) -> Arc> { + self.init_process.exit_status.clone() + } + + pub(crate) fn add_exec_process(&mut self, id: &str, exec: Exec) -> Option { + self.exec_processes.insert(id.to_string(), exec) + } + + pub(crate) async fn delete_exec_process(&mut self, eid: &str) -> Result<()> { + match self.exec_processes.remove(eid) { + Some(_) => { + debug!(self.logger, " delete process eid {}", eid); + Ok(()) + } + None => Err(anyhow!( + "failed to find cid {} eid {}", + self.container_id(), + eid + )), + } + } + + async fn cleanup_container(&mut self, cid: &str, force: bool) -> Result<()> { + // wait until the container process + // terminated and the status write lock released. + info!(self.logger, "wait on container terminated"); + let exit_status = self.get_exit_status().await; + let _locked_exit_status = exit_status.read().await; + info!(self.logger, "container terminated"); + let timeout: u32 = 10; + self.agent + .remove_container(agent::RemoveContainerRequest::new(cid, timeout)) + .await + .or_else(|e| { + if force { + warn!( + self.logger, + "stop container: agent remove container failed: {}", e + ); + Ok(agent::Empty::new()) + } else { + Err(e) + } + })?; + + // close the exit channel to wakeup wait service + // send to notify watchers who are waiting for the process exit + self.init_process.stop(); + Ok(()) + } + + pub(crate) async fn stop_process( + &mut self, + process: &ContainerProcess, + force: bool, + ) -> Result<()> { + let logger = logger_with_process(process); + info!(logger, "begin to stop process"); + // do not stop again when state stopped, may cause multi cleanup resource + self.check_state(vec![ProcessStatus::Running]) + .context("check state")?; + + // if use force mode to stop container, stop always successful + // send kill signal to container + // ignore the error of sending signal, since the process would + // have been killed and exited yet. + self.signal_process(process, Signal::SIGKILL as u32, false) + .await + .map_err(|e| { + warn!(logger, "failed to signal kill. {:?}", e); + }) + .ok(); + + match process.process_type { + ProcessType::Container => self + .cleanup_container(&process.container_id.container_id, force) + .await + .context("stop container")?, + ProcessType::Exec => { + let exec = self + .exec_processes + .get_mut(&process.exec_id) + .ok_or_else(|| anyhow!("failed to find exec"))?; + exec.process.stop(); + } + } + + Ok(()) + } + + pub(crate) async fn signal_process( + &self, + process: &ContainerProcess, + signal: u32, + all: bool, + ) -> Result<()> { + let mut process_id: agent::ContainerProcessID = process.clone().into(); + if all { + // force signal init process + process_id.exec_id.clear(); + }; + + self.agent + .signal_process(agent::SignalProcessRequest { process_id, signal }) + .await?; + Ok(()) + } + + pub async fn new_container_io(&self, process: &ContainerProcess) -> Result { + Ok(ContainerIo::new(self.agent.clone(), process.clone())) + } + + pub async fn close_io(&mut self, process: &ContainerProcess) -> Result<()> { + match process.process_type { + ProcessType::Container => self.init_process.close_io().await, + ProcessType::Exec => { + let exec = self + .exec_processes + .get_mut(&process.exec_id) + .ok_or_else(|| Error::ProcessNotFound(process.clone()))?; + exec.process.close_io().await; + } + }; + + Ok(()) + } +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/container_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/container_io.rs new file mode 100644 index 0000000000..c211e8bca4 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/container_io.rs @@ -0,0 +1,171 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{ + future::Future, + io, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use agent::Agent; +use anyhow::Result; +use common::types::ContainerProcess; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +struct ContainerIoInfo { + pub agent: Arc, + pub process: ContainerProcess, +} + +pub struct ContainerIo { + pub stdin: Box, + pub stdout: Box, + pub stderr: Box, +} + +impl ContainerIo { + pub fn new(agent: Arc, process: ContainerProcess) -> Self { + let info = Arc::new(ContainerIoInfo { agent, process }); + + Self { + stdin: Box::new(ContainerIoWrite::new(info.clone())), + stdout: Box::new(ContainerIoRead::new(info.clone(), true)), + stderr: Box::new(ContainerIoRead::new(info, false)), + } + } +} + +struct ContainerIoWrite<'inner> { + pub info: Arc, + write_future: + Option> + Send + 'inner>>>, +} + +impl<'inner> ContainerIoWrite<'inner> { + pub fn new(info: Arc) -> Self { + Self { + info, + write_future: Default::default(), + } + } + + fn poll_write_inner( + &'inner mut self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut write_future = self.write_future.take(); + if write_future.is_none() { + let req = agent::WriteStreamRequest { + process_id: self.info.process.clone().into(), + data: buf.to_vec(), + }; + write_future = Some(Box::pin(self.info.agent.write_stdin(req))); + } + + let mut write_future = write_future.unwrap(); + match write_future.as_mut().poll(cx) { + Poll::Ready(v) => match v { + Ok(resp) => Poll::Ready(Ok(resp.length as usize)), + Err(err) => Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err))), + }, + Poll::Pending => { + self.write_future = Some(write_future); + Poll::Pending + } + } + } +} + +impl<'inner> AsyncWrite for ContainerIoWrite<'inner> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let me = unsafe { + std::mem::transmute::<&mut ContainerIoWrite<'_>, &mut ContainerIoWrite<'inner>>( + &mut *self, + ) + }; + me.poll_write_inner(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +type ResultBuffer = Result; +struct ContainerIoRead<'inner> { + pub info: Arc, + is_stdout: bool, + read_future: Option + Send + 'inner>>>, +} + +impl<'inner> ContainerIoRead<'inner> { + pub fn new(info: Arc, is_stdout: bool) -> Self { + Self { + info, + is_stdout, + read_future: Default::default(), + } + } + fn poll_read_inner( + &'inner mut self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let mut read_future = self.read_future.take(); + if read_future.is_none() { + let req = agent::ReadStreamRequest { + process_id: self.info.process.clone().into(), + len: buf.remaining() as u32, + }; + read_future = if self.is_stdout { + Some(Box::pin(self.info.agent.read_stdout(req))) + } else { + Some(Box::pin(self.info.agent.read_stderr(req))) + }; + } + + let mut read_future = read_future.unwrap(); + match read_future.as_mut().poll(cx) { + Poll::Ready(v) => match v { + Ok(resp) => { + buf.put_slice(&resp.data); + Poll::Ready(Ok(())) + } + Err(err) => Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err))), + }, + Poll::Pending => { + self.read_future = Some(read_future); + Poll::Pending + } + } + } +} + +impl<'inner> AsyncRead for ContainerIoRead<'inner> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let me = unsafe { + std::mem::transmute::<&mut ContainerIoRead<'_>, &mut ContainerIoRead<'inner>>( + &mut *self, + ) + }; + me.poll_read_inner(cx, buf) + } +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs new file mode 100644 index 0000000000..3c6ca719bc --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/mod.rs @@ -0,0 +1,10 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +mod container_io; +pub use container_io::ContainerIo; +mod shim_io; +pub use shim_io::ShimIo; diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs new file mode 100644 index 0000000000..03a4c387b4 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/io/shim_io.rs @@ -0,0 +1,135 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::{ + io, + os::unix::{io::FromRawFd, net::UnixStream as StdUnixStream}, + pin::Pin, + task::Context as TaskContext, + task::Poll, +}; + +use anyhow::{anyhow, Context, Result}; +use nix::{ + fcntl::{self, OFlag}, + sys::stat::Mode, +}; +use tokio::{ + fs::File, + io::{AsyncRead, AsyncWrite}, + net::UnixStream as AsyncUnixStream, +}; +use url::Url; + +fn open_fifo(path: &str) -> Result { + let fd = fcntl::open(path, OFlag::O_RDWR, Mode::from_bits(0).unwrap())?; + + let std_stream = unsafe { StdUnixStream::from_raw_fd(fd) }; + std_stream + .set_nonblocking(true) + .context("set nonblocking")?; + + AsyncUnixStream::from_std(std_stream).map_err(|e| anyhow!(e)) +} + +pub struct ShimIo { + pub stdin: Option>, + pub stdout: Option>, + pub stderr: Option>, +} + +impl ShimIo { + pub async fn new( + stdin: &Option, + stdout: &Option, + stderr: &Option, + ) -> Result { + let stdin_fd: Option> = if let Some(stdin) = stdin { + match File::open(&stdin).await { + Ok(file) => Some(Box::new(file)), + Err(err) => { + error!(sl!(), "failed to open {} error {:?}", &stdin, err); + None + } + } + } else { + None + }; + + let get_url = |url: &Option| -> Option { + match url { + None => None, + Some(out) => match Url::parse(out.as_str()) { + Err(url::ParseError::RelativeUrlWithoutBase) => { + let out = "fifo://".to_owned() + out.as_str(); + let u = Url::parse(out.as_str()).unwrap(); + Some(u) + } + Err(err) => { + warn!(sl!(), "unable to parse stdout uri: {}", err); + None + } + Ok(u) => Some(u), + }, + } + }; + + let stdout_url = get_url(stdout); + let get_fd = |url: &Option| -> Option> { + if let Some(url) = url { + if url.scheme() == "fifo" { + let path = url.path(); + match open_fifo(path) { + Ok(s) => { + return Some(Box::new(ShimIoWrite::Stream(s))); + } + Err(err) => { + error!(sl!(), "failed to open file {} error {:?}", url.path(), err); + } + } + } + } + None + }; + + let stderr_url = get_url(stderr); + Ok(Self { + stdin: stdin_fd, + stdout: get_fd(&stdout_url), + stderr: get_fd(&stderr_url), + }) + } +} + +#[derive(Debug)] +enum ShimIoWrite { + Stream(AsyncUnixStream), + // TODO: support other type +} + +impl AsyncWrite for ShimIoWrite { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + buf: &[u8], + ) -> Poll> { + match *self { + ShimIoWrite::Stream(ref mut s) => Pin::new(s).poll_write(cx, buf), + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { + match *self { + ShimIoWrite::Stream(ref mut s) => Pin::new(s).poll_flush(cx), + } + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { + match *self { + ShimIoWrite::Stream(ref mut s) => Pin::new(s).poll_shutdown(cx), + } + } +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs new file mode 100644 index 0000000000..155cf0a9c4 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/manager.rs @@ -0,0 +1,274 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use anyhow::{anyhow, Context, Result}; + +use std::{collections::HashMap, sync::Arc}; + +use agent::Agent; +use async_trait::async_trait; +use common::{ + error::Error, + types::{ + ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest, + ProcessExitStatus, ProcessStateInfo, ProcessType, ResizePTYRequest, ShutdownRequest, + StatsInfo, UpdateRequest, PID, + }, + ContainerManager, +}; +use oci::Process as OCIProcess; +use resource::ResourceManager; +use tokio::sync::RwLock; + +use super::{logger_with_process, Container}; + +unsafe impl Send for VirtContainerManager {} +unsafe impl Sync for VirtContainerManager {} +pub struct VirtContainerManager { + sid: String, + pid: u32, + containers: Arc>>, + resource_manager: Arc, + agent: Arc, +} + +impl VirtContainerManager { + pub fn new( + sid: &str, + pid: u32, + agent: Arc, + resource_manager: Arc, + ) -> Self { + Self { + sid: sid.to_string(), + pid, + containers: Default::default(), + resource_manager, + agent, + } + } +} + +#[async_trait] +impl ContainerManager for VirtContainerManager { + async fn create_container(&self, config: ContainerConfig, spec: oci::Spec) -> Result { + let container = Container::new( + self.pid, + config, + self.agent.clone(), + self.resource_manager.clone(), + ) + .context("new container")?; + + let mut containers = self.containers.write().await; + container.create(spec).await.context("create")?; + containers.insert(container.container_id.to_string(), container); + + Ok(PID { pid: self.pid }) + } + + async fn close_process_io(&self, process: &ContainerProcess) -> Result<()> { + let containers = self.containers.read().await; + let container_id = &process.container_id.to_string(); + let c = containers + .get(container_id) + .ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?; + + c.close_io(process).await.context("close io")?; + Ok(()) + } + + async fn delete_process(&self, process: &ContainerProcess) -> Result { + let container_id = &process.container_id.container_id; + match process.process_type { + ProcessType::Container => { + let mut containers = self.containers.write().await; + let c = containers + .remove(container_id) + .ok_or_else(|| Error::ContainerNotFound(container_id.to_string()))?; + c.state_process(process).await.context("state process") + } + ProcessType::Exec => { + let containers = self.containers.read().await; + let c = containers + .get(container_id) + .ok_or_else(|| Error::ContainerNotFound(container_id.to_string()))?; + c.delete_exec_process(process) + .await + .context("delete process")?; + c.state_process(process).await.context("state process") + } + } + } + + async fn exec_process(&self, req: ExecProcessRequest) -> Result<()> { + if req.spec_type_url.is_empty() { + return Err(anyhow!("invalid type url")); + } + let oci_process: OCIProcess = + serde_json::from_slice(&req.spec_value).context("serde from slice")?; + + let containers = self.containers.read().await; + let container_id = &req.process.container_id.container_id; + let c = containers + .get(container_id) + .ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?; + c.exec_process( + &req.process, + req.stdin, + req.stdout, + req.stderr, + req.terminal, + oci_process, + ) + .await + .context("exec")?; + Ok(()) + } + + async fn kill_process(&self, req: &KillRequest) -> Result<()> { + let containers = self.containers.read().await; + let container_id = &req.process.container_id.container_id; + let c = containers + .get(container_id) + .ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?; + c.kill_process(&req.process, req.signal, req.all) + .await + .map_err(|err| { + warn!( + sl!(), + "failed to signal process {:?} {:?}", &req.process, err + ); + err + }) + .ok(); + Ok(()) + } + + async fn wait_process(&self, process: &ContainerProcess) -> Result { + let logger = logger_with_process(process); + + let containers = self.containers.read().await; + let container_id = &process.container_id.container_id; + let c = containers + .get(container_id) + .ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?; + let (watcher, status) = c.wait_process(process).await.context("wait")?; + drop(containers); + + match watcher { + Some(mut watcher) => { + info!(logger, "begin wait exit"); + while watcher.changed().await.is_ok() {} + info!(logger, "end wait exited"); + } + None => { + warn!(logger, "failed to find watcher for wait process"); + } + } + + let status = status.read().await; + + info!(logger, "wait process exit status {:?}", status); + + // stop process + let containers = self.containers.read().await; + let container_id = &process.container_id.container_id; + let c = containers + .get(container_id) + .ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?; + c.stop_process(process).await.context("stop container")?; + Ok(status.clone()) + } + + async fn start_process(&self, process: &ContainerProcess) -> Result { + let containers = self.containers.read().await; + let container_id = &process.container_id.container_id; + let c = containers + .get(container_id) + .ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?; + c.start(process).await.context("start")?; + Ok(PID { pid: self.pid }) + } + + async fn state_process(&self, process: &ContainerProcess) -> Result { + let containers = self.containers.read().await; + let container_id = &process.container_id.container_id; + let c = containers + .get(container_id) + .ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?; + let state = c.state_process(process).await.context("state process")?; + Ok(state) + } + + async fn pause_container(&self, id: &ContainerID) -> Result<()> { + let containers = self.containers.read().await; + let c = containers + .get(&id.container_id) + .ok_or_else(|| Error::ContainerNotFound(id.container_id.clone()))?; + c.pause().await.context("pause")?; + Ok(()) + } + + async fn resume_container(&self, id: &ContainerID) -> Result<()> { + let containers = self.containers.read().await; + let c = containers + .get(&id.container_id) + .ok_or_else(|| Error::ContainerNotFound(id.container_id.clone()))?; + c.resume().await.context("resume")?; + Ok(()) + } + + async fn resize_process_pty(&self, req: &ResizePTYRequest) -> Result<()> { + let containers = self.containers.read().await; + let c = containers + .get(&req.process.container_id.container_id) + .ok_or_else(|| { + Error::ContainerNotFound(req.process.container_id.container_id.clone()) + })?; + c.resize_pty(&req.process, req.width, req.height) + .await + .context("resize pty")?; + Ok(()) + } + + async fn stats_container(&self, id: &ContainerID) -> Result { + let containers = self.containers.read().await; + let c = containers + .get(&id.container_id) + .ok_or_else(|| Error::ContainerNotFound(id.container_id.clone()))?; + let stats = c.stats().await.context("stats")?; + Ok(StatsInfo::from(stats)) + } + + async fn update_container(&self, req: UpdateRequest) -> Result<()> { + let resource = serde_json::from_slice::(&req.value) + .context("deserialize LinuxResource")?; + let containers = self.containers.read().await; + let container_id = &req.container_id; + let c = containers + .get(container_id) + .ok_or_else(|| Error::ContainerNotFound(container_id.to_string()))?; + c.update(&resource).await.context("stats") + } + + async fn pid(&self) -> Result { + Ok(PID { pid: self.pid }) + } + + async fn connect_container(&self, _id: &ContainerID) -> Result { + Ok(PID { pid: self.pid }) + } + + async fn need_shutdown_sandbox(&self, req: &ShutdownRequest) -> bool { + req.is_now || self.containers.read().await.is_empty() || self.sid == req.container_id + } + + async fn is_sandbox_container(&self, process: &ContainerProcess) -> bool { + process.process_type == ProcessType::Container + && process.container_id.container_id == self.sid + } +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/mod.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/mod.rs new file mode 100644 index 0000000000..3c615517fd --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/mod.rs @@ -0,0 +1,20 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +mod container; +use container::{Container, Exec}; +mod container_inner; +mod io; +use container_inner::ContainerInner; +mod manager; +pub use manager::VirtContainerManager; +mod process; + +use common::types::ContainerProcess; + +fn logger_with_process(container_process: &ContainerProcess) -> slog::Logger { + sl!().new(o!("container_id" => container_process.container_id.container_id.clone(), "exec_id" => container_process.exec_id.clone())) +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs new file mode 100644 index 0000000000..28b9b023c7 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/container_manager/process.rs @@ -0,0 +1,211 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::sync::Arc; + +use agent::Agent; +use anyhow::{Context, Result}; +use awaitgroup::{WaitGroup, Worker as WaitGroupWorker}; +use common::types::{ContainerProcess, ProcessExitStatus, ProcessStateInfo, ProcessStatus, PID}; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{watch, RwLock}, +}; + +use super::{ + io::{ContainerIo, ShimIo}, + logger_with_process, +}; + +pub type ProcessWatcher = ( + Option>, + Arc>, +); + +#[derive(Debug)] +pub struct Process { + pub process: ContainerProcess, + pub pid: u32, + logger: slog::Logger, + pub bundle: String, + + pub stdin: Option, + pub stdout: Option, + pub stderr: Option, + pub terminal: bool, + + pub height: u32, + pub width: u32, + pub status: ProcessStatus, + + pub exit_status: Arc>, + pub exit_watcher_rx: Option>, + pub exit_watcher_tx: Option>, + // used to sync between stdin io copy thread(tokio) and the close it call. + // close io call should wait until the stdin io copy finished to + // prevent stdin data lost. + pub wg_stdin: WaitGroup, +} + +impl Process { + pub fn new( + process: &ContainerProcess, + pid: u32, + bundle: &str, + stdin: Option, + stdout: Option, + stderr: Option, + terminal: bool, + ) -> Process { + let (sender, receiver) = watch::channel(false); + + Process { + process: process.clone(), + pid, + logger: logger_with_process(process), + bundle: bundle.to_string(), + stdin, + stdout, + stderr, + terminal, + height: 0, + width: 0, + status: ProcessStatus::Created, + exit_status: Arc::new(RwLock::new(ProcessExitStatus::new())), + exit_watcher_rx: Some(receiver), + exit_watcher_tx: Some(sender), + wg_stdin: WaitGroup::new(), + } + } + + pub async fn start_io_and_wait( + &mut self, + agent: Arc, + container_io: ContainerIo, + ) -> Result<()> { + info!(self.logger, "start io and wait"); + + // new shim io + let shim_io = ShimIo::new(&self.stdin, &self.stdout, &self.stderr) + .await + .context("new shim io")?; + + // start io copy for stdin + let wgw_stdin = self.wg_stdin.worker(); + if let Some(stdin) = shim_io.stdin { + self.run_io_copy("stdin", wgw_stdin, stdin, container_io.stdin) + .await?; + } + + // prepare for wait group for stdout, stderr + let wg = WaitGroup::new(); + let wgw = wg.worker(); + + // start io copy for stdout + if let Some(stdout) = shim_io.stdout { + self.run_io_copy("stdout", wgw.clone(), container_io.stdout, stdout) + .await?; + } + + // start io copy for stderr + if !self.terminal { + if let Some(stderr) = shim_io.stderr { + self.run_io_copy("stderr", wgw, container_io.stderr, stderr) + .await?; + } + } + + self.run_io_wait(agent, wg).await.context("run io thread")?; + Ok(()) + } + + async fn run_io_copy<'a>( + &'a self, + io_name: &'a str, + wgw: WaitGroupWorker, + mut reader: Box, + mut writer: Box, + ) -> Result<()> { + let io_name = io_name.to_string(); + let logger = self.logger.new(o!("io name" => io_name)); + let _ = tokio::spawn(async move { + match tokio::io::copy(&mut reader, &mut writer).await { + Err(e) => warn!(logger, "io: failed to copy stdin stream {}", e), + Ok(length) => warn!(logger, "io: stop to copy stdin stream length {}", length), + }; + + wgw.done(); + }); + + Ok(()) + } + + async fn run_io_wait(&mut self, agent: Arc, mut wg: WaitGroup) -> Result<()> { + let logger = self.logger.clone(); + info!(logger, "start run io wait"); + let process = self.process.clone(); + let status = self.exit_status.clone(); + let exit_notifier = self.exit_watcher_tx.take(); + + let _ = tokio::spawn(async move { + //wait on all of the container's io stream terminated + info!(logger, "begin wait group io",); + wg.wait().await; + info!(logger, "end wait group for io"); + + let req = agent::WaitProcessRequest { + process_id: process.clone().into(), + }; + + info!(logger, "begin wait process"); + let resp = match agent.wait_process(req).await { + Ok(ret) => ret, + Err(e) => { + error!(logger, "failed to wait process {:?}", e); + return; + } + }; + + info!(logger, "end wait process exit code {}", resp.status); + + let mut locked_status = status.write().await; + locked_status.update_exit_code(resp.status); + + drop(exit_notifier); + info!(logger, "end io wait thread"); + }); + Ok(()) + } + + pub fn fetch_exit_watcher(&self) -> Result { + Ok((self.exit_watcher_rx.clone(), self.exit_status.clone())) + } + + pub async fn state(&self) -> Result { + let exit_status = self.exit_status.read().await; + Ok(ProcessStateInfo { + container_id: self.process.container_id.container_id.clone(), + exec_id: self.process.exec_id.clone(), + pid: PID { pid: self.pid }, + bundle: self.bundle.clone(), + stdin: self.stdin.clone(), + stdout: self.stdout.clone(), + stderr: self.stderr.clone(), + terminal: self.terminal, + status: self.status, + exit_status: exit_status.exit_code, + exited_at: exit_status.exit_time, + }) + } + + pub fn stop(&mut self) { + self.status = ProcessStatus::Stopped; + } + + pub async fn close_io(&mut self) { + self.wg_stdin.wait().await; + } +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/health_check.rs b/src/runtime-rs/crates/runtimes/virt_container/src/health_check.rs new file mode 100644 index 0000000000..3a7703ac3b --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/health_check.rs @@ -0,0 +1,123 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::sync::Arc; + +use agent::Agent; +use anyhow::Context; +use tokio::sync::{mpsc, Mutex}; + +/// monitor check interval 30s +const HEALTH_CHECK_TIMER_INTERVAL: u64 = 30; + +/// version check threshold 5min +const VERSION_CHECK_THRESHOLD: u64 = 5 * 60 / HEALTH_CHECK_TIMER_INTERVAL; + +/// health check stop channel buffer size +const HEALTH_CHECK_STOP_CHANNEL_BUFFER_SIZE: usize = 1; + +pub struct HealthCheck { + pub keep_alive: bool, + keep_vm: bool, + stop_tx: mpsc::Sender<()>, + stop_rx: Arc>>, +} + +impl HealthCheck { + pub fn new(keep_alive: bool, keep_vm: bool) -> HealthCheck { + let (tx, rx) = mpsc::channel(HEALTH_CHECK_STOP_CHANNEL_BUFFER_SIZE); + HealthCheck { + keep_alive, + keep_vm, + stop_tx: tx, + stop_rx: Arc::new(Mutex::new(rx)), + } + } + + pub fn start(&self, id: &str, agent: Arc) { + if !self.keep_alive { + return; + } + let id = id.to_string(); + + info!(sl!(), "start runtime keep alive"); + + let stop_rx = self.stop_rx.clone(); + let keep_vm = self.keep_vm; + let _ = tokio::spawn(async move { + let mut version_check_threshold_count = 0; + + loop { + tokio::time::sleep(std::time::Duration::from_secs(HEALTH_CHECK_TIMER_INTERVAL)) + .await; + let mut stop_rx = stop_rx.lock().await; + match stop_rx.try_recv() { + Ok(_) => { + info!(sl!(), "revive stop {} monitor signal", id); + break; + } + + Err(mpsc::error::TryRecvError::Empty) => { + // check agent + match agent + .check(agent::CheckRequest::new("")) + .await + .context("check health") + { + Ok(_) => { + debug!(sl!(), "check {} agent health successfully", id); + version_check_threshold_count += 1; + if version_check_threshold_count >= VERSION_CHECK_THRESHOLD { + // need to check version + version_check_threshold_count = 0; + if let Ok(v) = agent + .version(agent::CheckRequest::new("")) + .await + .context("check version") + { + info!(sl!(), "agent {}", v.agent_version) + } + } + continue; + } + Err(e) => { + error!(sl!(), "failed to do {} agent health check: {}", id, e); + if let Err(mpsc::error::TryRecvError::Empty) = stop_rx.try_recv() { + error!(sl!(), "failed to receive stop monitor signal"); + if !keep_vm { + ::std::process::exit(1); + } + } else { + info!(sl!(), "wait to exit exit {}", id); + break; + } + } + } + } + + Err(mpsc::error::TryRecvError::Disconnected) => { + warn!(sl!(), "{} monitor channel has broken", id); + break; + } + } + } + }); + } + + pub async fn stop(&self) { + if !self.keep_alive { + return; + } + info!(sl!(), "stop runtime keep alive"); + self.stop_tx + .send(()) + .await + .map_err(|e| { + warn!(sl!(), "failed send monitor channel. {:?}", e); + }) + .ok(); + } +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs b/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs index 8419a93828..1710a83701 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/lib.rs @@ -3,12 +3,25 @@ // // SPDX-License-Identifier: Apache-2.0 // + +#[macro_use] +extern crate slog; + +logging::logger_with_subsystem!(sl, "virt-container"); + +mod container_manager; +pub mod health_check; +pub mod sandbox; + use std::sync::Arc; -use anyhow::Result; +use agent::kata::KataAgent; +use anyhow::{Context, Result}; use async_trait::async_trait; use common::{message::Message, RuntimeHandler, RuntimeInstance}; -use kata_types::config::{hypervisor::register_hypervisor_plugin, DragonballConfig}; +use hypervisor::Hypervisor; +use kata_types::config::{hypervisor::register_hypervisor_plugin, DragonballConfig, TomlConfig}; +use resource::ResourceManager; use tokio::sync::mpsc::Sender; unsafe impl Send for VirtContainer {} @@ -34,10 +47,55 @@ impl RuntimeHandler for VirtContainer { async fn new_instance( &self, - _sid: &str, - _msg_sender: Sender, + sid: &str, + msg_sender: Sender, ) -> Result { - todo!() + let (toml_config, _) = TomlConfig::load_from_file("").context("load config")?; + + // TODO: new sandbox and container manager + // TODO: get from hypervisor + let hypervisor = new_hypervisor(&toml_config) + .await + .context("new hypervisor")?; + + // get uds from hypervisor and get config from toml_config + let agent = Arc::new(KataAgent::new(kata_types::config::Agent { + debug: true, + enable_tracing: false, + server_port: 1024, + log_port: 1025, + dial_timeout_ms: 10, + reconnect_timeout_ms: 3_000, + request_timeout_ms: 30_000, + health_check_request_timeout_ms: 90_000, + kernel_modules: Default::default(), + container_pipe_size: 0, + debug_console_enabled: false, + })); + + let resource_manager = Arc::new(ResourceManager::new( + sid, + agent.clone(), + hypervisor.clone(), + &toml_config, + )?); + let pid = std::process::id(); + + let sandbox = sandbox::VirtSandbox::new( + sid, + msg_sender, + agent.clone(), + hypervisor, + resource_manager.clone(), + ) + .await + .context("new virt sandbox")?; + let container_manager = + container_manager::VirtContainerManager::new(sid, pid, agent, resource_manager); + Ok(RuntimeInstance { + sandbox: Arc::new(sandbox), + container_manager: Arc::new(container_manager), + }) } fn cleanup(&self, _id: &str) -> Result<()> { @@ -45,3 +103,8 @@ impl RuntimeHandler for VirtContainer { Ok(()) } } + +async fn new_hypervisor(_toml_config: &TomlConfig) -> Result> { + // TODO: implement ready hypervisor + todo!() +} diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs new file mode 100644 index 0000000000..b98492af47 --- /dev/null +++ b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs @@ -0,0 +1,229 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +use std::sync::Arc; + +use agent::{self, Agent}; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use common::{ + message::{Action, Message}, + Sandbox, +}; +use containerd_shim_protos::events::task::TaskOOM; +use hypervisor::Hypervisor; +use kata_types::config::TomlConfig; +use resource::{ResourceConfig, ResourceManager}; +use tokio::sync::{mpsc::Sender, Mutex, RwLock}; + +use crate::health_check::HealthCheck; + +#[derive(Clone, Copy, PartialEq, Debug)] +pub enum SandboxState { + Init, + Running, + Stopped, +} + +struct SandboxInner { + state: SandboxState, +} + +impl SandboxInner { + pub fn new() -> Self { + Self { + state: SandboxState::Init, + } + } +} + +unsafe impl Send for VirtSandbox {} +unsafe impl Sync for VirtSandbox {} +#[derive(Clone)] +pub struct VirtSandbox { + sid: String, + msg_sender: Arc>>, + inner: Arc>, + resource_manager: Arc, + agent: Arc, + hypervisor: Arc, + monitor: Arc, +} + +impl VirtSandbox { + pub async fn new( + sid: &str, + msg_sender: Sender, + agent: Arc, + hypervisor: Arc, + resource_manager: Arc, + ) -> Result { + Ok(Self { + sid: sid.to_string(), + msg_sender: Arc::new(Mutex::new(msg_sender)), + inner: Arc::new(RwLock::new(SandboxInner::new())), + agent, + hypervisor, + resource_manager, + monitor: Arc::new(HealthCheck::new(true, true)), + }) + } + + async fn prepare_for_start_sandbox( + &self, + netns: Option, + _config: &TomlConfig, + ) -> Result> { + let mut resource_configs = vec![]; + + if let Some(_netns_path) = netns { + // TODO: support network + } + + let hypervisor_config = self.hypervisor.hypervisor_config().await; + let virtio_fs_config = ResourceConfig::ShareFs(hypervisor_config.shared_fs); + resource_configs.push(virtio_fs_config); + + Ok(resource_configs) + } +} + +#[async_trait] +impl Sandbox for VirtSandbox { + async fn start(&self, netns: Option, config: &TomlConfig) -> Result<()> { + let id = &self.sid; + + // if sandbox running, return + // if sandbox not running try to start sandbox + let mut inner = self.inner.write().await; + if inner.state == SandboxState::Running { + warn!(sl!(), "sandbox is running, no need to start"); + return Ok(()); + } + + self.hypervisor + .prepare_vm(id, netns.clone()) + .await + .context("prepare vm")?; + + // generate device and setup before start vm + // should after hypervisor.prepare_vm + let resources = self.prepare_for_start_sandbox(netns, config).await?; + self.resource_manager + .prepare_before_start_vm(resources) + .await + .context("set up device before start vm")?; + + // start vm + self.hypervisor.start_vm(10_000).await.context("start vm")?; + info!(sl!(), "start vm"); + + // connect agent + // set agent socket + let address = self + .hypervisor + .get_agent_socket() + .await + .context("get agent socket")?; + self.agent.start(&address).await.context("connect")?; + + self.resource_manager + .setup_after_start_vm() + .await + .context("setup device after start vm")?; + + // create sandbox in vm + let req = agent::CreateSandboxRequest { + hostname: "".to_string(), + dns: vec![], + storages: self + .resource_manager + .get_storage_for_sandbox() + .await + .context("get storages for sandbox")?, + sandbox_pidns: false, + sandbox_id: id.to_string(), + guest_hook_path: "".to_string(), + kernel_modules: vec![], + }; + + self.agent + .create_sandbox(req) + .await + .context("create sandbox")?; + + inner.state = SandboxState::Running; + let agent = self.agent.clone(); + let sender = self.msg_sender.clone(); + info!(sl!(), "oom watcher start"); + let _ = tokio::spawn(async move { + loop { + match agent + .get_oom_event(agent::Empty::new()) + .await + .context("get oom event") + { + Ok(resp) => { + let cid = &resp.container_id; + warn!(sl!(), "send oom event for container {}", &cid); + let event = TaskOOM { + container_id: cid.to_string(), + ..Default::default() + }; + let msg = Message::new(Action::Event(Arc::new(event))); + let lock_sender = sender.lock().await; + if let Err(err) = lock_sender.send(msg).await.context("send event") { + error!( + sl!(), + "failed to send oom event for {} error {:?}", cid, err + ); + } + } + Err(err) => { + warn!(sl!(), "failed to get oom event error {:?}", err); + break; + } + } + } + }); + self.monitor.start(id, self.agent.clone()); + Ok(()) + } + + async fn stop(&self) -> Result<()> { + info!(sl!(), "begin stop sandbox"); + // TODO: stop sandbox + Ok(()) + } + + async fn shutdown(&self) -> Result<()> { + info!(sl!(), "shutdown"); + + self.resource_manager + .delete_cgroups() + .await + .context("delete cgroups")?; + + info!(sl!(), "stop monitor"); + self.monitor.stop().await; + + info!(sl!(), "stop agent"); + self.agent.stop().await; + + // stop server + info!(sl!(), "send shutdown message"); + let msg = Message::new(Action::Shutdown); + let sender = self.msg_sender.clone(); + let sender = sender.lock().await; + sender.send(msg).await.context("send shutdown msg")?; + Ok(()) + } + + async fn cleanup(&self, _id: &str) -> Result<()> { + // TODO: cleanup + Ok(()) + } +}