runtime-rs: runtime part implement

Fixes: #3785
Signed-off-by: Tim Zhang <tim@hyper.sh>
Signed-off-by: Zhongtao Hu <zhongtaohu.tim@linux.alibaba.com>
Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
This commit is contained in:
Tim Zhang 2022-03-26 17:33:41 +08:00 committed by Fupan Li
parent 10343b1f3d
commit 4be7185aa4
13 changed files with 1961 additions and 6 deletions

View File

@ -77,6 +77,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "awaitgroup"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc17ab023b4091c10ff099f9deebaeeb59b5189df07e554c4fef042b70745d68"
[[package]]
name = "backtrace"
version = "0.3.64"
@ -647,7 +653,7 @@ dependencies = [
"slog",
"slog-scope",
"thiserror",
"toml",
"toml 0.5.8",
]
[[package]]
@ -1697,6 +1703,15 @@ dependencies = [
"vsock",
]
[[package]]
name = "toml"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f"
dependencies = [
"serde",
]
[[package]]
name = "toml"
version = "0.5.8"
@ -1848,11 +1863,31 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
name = "virt_container"
version = "0.1.0"
dependencies = [
"agent",
"anyhow",
"async-trait",
"awaitgroup",
"common",
"containerd-shim-protos",
"futures 0.3.21",
"hypervisor",
"kata-sys-util",
"kata-types",
"lazy_static",
"libc",
"logging",
"nix 0.16.1",
"oci",
"protobuf",
"resource",
"serde",
"serde_derive",
"serde_json",
"slog",
"slog-scope",
"tokio",
"toml 0.4.10",
"url",
]
[[package]]

View File

@ -7,7 +7,27 @@ edition = "2018"
[dependencies]
anyhow = "^1.0"
async-trait = "0.1.48"
awaitgroup = "0.6.0"
containerd-shim-protos = { version = "0.2.0", features = ["async"]}
futures = "0.3.19"
lazy_static = "1.4.0"
libc = ">=0.2.39"
nix = "0.16.0"
protobuf = "2.23.0"
serde = { version = "1.0.100", features = ["derive"] }
serde_derive = "1.0.27"
serde_json = "1.0.39"
slog = "2.5.2"
slog-scope = "4.4.0"
tokio = { version = "1.8.0" }
toml = "0.4.2"
url = "2.1.1"
agent = { path = "../../agent" }
common = { path = "../common" }
hypervisor = { path = "../../hypervisor" }
kata-sys-util = { path = "../../../../libs/kata-sys-util" }
kata-types = { path = "../../../../libs/kata-types" }
logging = { path = "../../../../libs/logging"}
oci = { path = "../../../../libs/oci" }
resource = { path = "../../resource" }

View File

@ -0,0 +1,403 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use agent::Agent;
use anyhow::{anyhow, Context, Result};
use common::{
error::Error,
types::{
ContainerConfig, ContainerID, ContainerProcess, ProcessStateInfo, ProcessStatus,
ProcessType,
},
};
use oci::{LinuxResources, Process as OCIProcess};
use resource::ResourceManager;
use tokio::sync::RwLock;
use super::{
process::{Process, ProcessWatcher},
ContainerInner,
};
use crate::container_manager::logger_with_process;
pub struct Exec {
pub(crate) process: Process,
pub(crate) oci_process: OCIProcess,
}
pub struct Container {
pid: u32,
pub container_id: ContainerID,
config: ContainerConfig,
inner: Arc<RwLock<ContainerInner>>,
agent: Arc<dyn Agent>,
resource_manager: Arc<ResourceManager>,
logger: slog::Logger,
}
impl Container {
pub fn new(
pid: u32,
config: ContainerConfig,
agent: Arc<dyn Agent>,
resource_manager: Arc<ResourceManager>,
) -> Result<Self> {
let container_id = ContainerID::new(&config.container_id).context("new container id")?;
let logger = sl!().new(o!("container_id" => config.container_id.clone()));
let process = ContainerProcess::new(&config.container_id, "")?;
let init_process = Process::new(
&process,
pid,
&config.bundle,
config.stdin.clone(),
config.stdout.clone(),
config.stderr.clone(),
config.terminal,
);
Ok(Self {
pid,
container_id,
config,
inner: Arc::new(RwLock::new(ContainerInner::new(
agent.clone(),
init_process,
logger.clone(),
))),
agent,
resource_manager,
logger,
})
}
pub async fn create(&self, mut spec: oci::Spec) -> Result<()> {
// process oci spec
let mut inner = self.inner.write().await;
let config = &self.config;
amend_spec(&mut spec).context("load spec")?;
// handler rootfs
let rootfs = self
.resource_manager
.handler_rootfs(&config.container_id, &config.bundle, &config.rootfs_mounts)
.await
.context("handler rootfs")?;
// update rootfs
match spec.root.as_mut() {
Some(spec) => {
spec.path = rootfs
.get_guest_rootfs_path()
.await
.context("get guest rootfs path")?
}
None => return Err(anyhow!("spec miss root field")),
};
inner.rootfs.push(rootfs);
// handler volumes
let volumes = self
.resource_manager
.handler_volumes(&config.container_id, &spec.mounts)
.await
.context("handler volumes")?;
let mut oci_mounts = vec![];
let mut storages = vec![];
for v in volumes {
let mut volume_mounts = v.get_volume_mount().context("get volume mount")?;
if !volume_mounts.is_empty() {
oci_mounts.append(&mut volume_mounts);
}
let mut s = v.get_storage().context("get storage")?;
if !s.is_empty() {
storages.append(&mut s);
}
inner.volumes.push(v);
}
spec.mounts = oci_mounts;
// TODO: handler device
// update cgroups
self.resource_manager
.update_cgroups(
&config.container_id,
spec.linux
.as_ref()
.map(|linux| linux.resources.as_ref())
.flatten(),
)
.await?;
// create container
let r = agent::CreateContainerRequest {
process_id: agent::ContainerProcessID::new(&config.container_id, ""),
string_user: None,
devices: vec![],
storages,
oci: Some(spec),
guest_hooks: None,
sandbox_pidns: false,
rootfs_mounts: vec![],
};
self.agent
.create_container(r)
.await
.context("agent create container")?;
self.resource_manager.dump().await;
Ok(())
}
pub async fn start(&self, process: &ContainerProcess) -> Result<()> {
let mut inner = self.inner.write().await;
match process.process_type {
ProcessType::Container => {
if let Err(err) = inner.start_container(&process.container_id).await {
let _ = inner.stop_process(process, true).await;
return Err(err);
}
let container_io = inner.new_container_io(process).await?;
inner
.init_process
.start_io_and_wait(self.agent.clone(), container_io)
.await?;
}
ProcessType::Exec => {
if let Err(e) = inner.start_exec_process(process).await {
let _ = inner.stop_process(process, true).await;
return Err(e).context("enter process");
}
let container_io = inner.new_container_io(process).await.context("io stream")?;
{
let exec = inner
.exec_processes
.get(&process.exec_id)
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
if exec.process.height != 0 && exec.process.width != 0 {
inner
.win_resize_process(process, exec.process.height, exec.process.width)
.await
.context("win resize")?;
}
}
// start io and wait
{
let exec = inner
.exec_processes
.get_mut(&process.exec_id)
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
exec.process
.start_io_and_wait(self.agent.clone(), container_io)
.await
.context("start io and wait")?;
}
}
}
Ok(())
}
pub async fn delete_exec_process(&self, container_process: &ContainerProcess) -> Result<()> {
let mut inner = self.inner.write().await;
inner
.delete_exec_process(&container_process.exec_id)
.await
.context("delete process")
}
pub async fn state_process(
&self,
container_process: &ContainerProcess,
) -> Result<ProcessStateInfo> {
let inner = self.inner.read().await;
match container_process.process_type {
ProcessType::Container => inner.init_process.state().await,
ProcessType::Exec => {
let exec = inner
.exec_processes
.get(&container_process.exec_id)
.ok_or_else(|| Error::ProcessNotFound(container_process.clone()))?;
exec.process.state().await
}
}
}
pub async fn wait_process(
&self,
container_process: &ContainerProcess,
) -> Result<ProcessWatcher> {
let logger = logger_with_process(container_process);
info!(logger, "start wait process");
let inner = self.inner.read().await;
inner
.fetch_exit_watcher(container_process)
.context("fetch exit watcher")
}
pub async fn kill_process(
&self,
container_process: &ContainerProcess,
signal: u32,
all: bool,
) -> Result<()> {
let inner = self.inner.read().await;
inner.signal_process(container_process, signal, all).await
}
pub async fn exec_process(
&self,
container_process: &ContainerProcess,
stdin: Option<String>,
stdout: Option<String>,
stderr: Option<String>,
terminal: bool,
oci_process: OCIProcess,
) -> Result<()> {
let process = Process::new(
container_process,
self.pid,
&self.config.bundle,
stdin,
stdout,
stderr,
terminal,
);
let exec = Exec {
process,
oci_process,
};
let mut inner = self.inner.write().await;
inner.add_exec_process(&container_process.exec_id, exec);
Ok(())
}
pub async fn close_io(&self, container_process: &ContainerProcess) -> Result<()> {
let mut inner = self.inner.write().await;
inner.close_io(container_process).await
}
pub async fn stop_process(&self, container_process: &ContainerProcess) -> Result<()> {
let mut inner = self.inner.write().await;
inner
.stop_process(container_process, true)
.await
.context("stop process")
}
pub async fn pause(&self) -> Result<()> {
let inner = self.inner.read().await;
if inner.init_process.status == ProcessStatus::Paused {
warn!(self.logger, "container is paused no need to pause");
return Ok(());
}
self.agent
.pause_container(self.container_id.clone().into())
.await
.context("agent pause container")?;
Ok(())
}
pub async fn resume(&self) -> Result<()> {
let inner = self.inner.read().await;
if inner.init_process.status == ProcessStatus::Running {
warn!(self.logger, "container is running no need to resume");
return Ok(());
}
self.agent
.resume_container(self.container_id.clone().into())
.await
.context("agent pause container")?;
Ok(())
}
pub async fn resize_pty(
&self,
process: &ContainerProcess,
width: u32,
height: u32,
) -> Result<()> {
let logger = logger_with_process(process);
let inner = self.inner.read().await;
if inner.init_process.status != ProcessStatus::Running {
warn!(logger, "container is running no need to resume");
return Ok(());
}
self.agent
.tty_win_resize(agent::TtyWinResizeRequest {
process_id: process.clone().into(),
row: height,
column: width,
})
.await
.context("resize pty")?;
Ok(())
}
pub async fn stats(&self) -> Result<Option<agent::StatsContainerResponse>> {
let stats_resp = self
.agent
.stats_container(self.container_id.clone().into())
.await
.context("agent stats container")?;
Ok(Some(stats_resp))
}
pub async fn update(&self, resources: &LinuxResources) -> Result<()> {
self.resource_manager
.update_cgroups(&self.config.container_id, Some(resources))
.await?;
let req = agent::UpdateContainerRequest {
container_id: self.container_id.container_id.clone(),
resources: resources.clone(),
mounts: Vec::new(),
};
self.agent
.update_container(req)
.await
.context("agent update container")?;
Ok(())
}
}
fn amend_spec(spec: &mut oci::Spec) -> Result<()> {
// hook should be done on host
spec.hooks = None;
if let Some(linux) = spec.linux.as_mut() {
linux.seccomp = None;
if let Some(resource) = linux.resources.as_mut() {
resource.devices = Vec::new();
resource.pids = None;
resource.block_io = None;
resource.hugepage_limits = Vec::new();
resource.network = None;
}
let mut ns: Vec<oci::LinuxNamespace> = Vec::new();
for n in linux.namespaces.iter() {
match n.r#type.as_str() {
oci::PIDNAMESPACE | oci::NETWORKNAMESPACE => continue,
_ => ns.push(n.clone()),
}
}
linux.namespaces = ns;
}
Ok(())
}

View File

@ -0,0 +1,261 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{collections::HashMap, sync::Arc};
use agent::Agent;
use anyhow::{anyhow, Context, Result};
use common::{
error::Error,
types::{ContainerID, ContainerProcess, ProcessExitStatus, ProcessStatus, ProcessType},
};
use nix::sys::signal::Signal;
use resource::{rootfs::Rootfs, volume::Volume};
use tokio::sync::RwLock;
use crate::container_manager::logger_with_process;
use super::{
io::ContainerIo,
process::{Process, ProcessWatcher},
Exec,
};
pub struct ContainerInner {
agent: Arc<dyn Agent>,
logger: slog::Logger,
pub(crate) init_process: Process,
pub(crate) exec_processes: HashMap<String, Exec>,
pub(crate) rootfs: Vec<Arc<dyn Rootfs>>,
pub(crate) volumes: Vec<Arc<dyn Volume>>,
}
impl ContainerInner {
pub(crate) fn new(agent: Arc<dyn Agent>, init_process: Process, logger: slog::Logger) -> Self {
Self {
agent,
logger,
init_process,
exec_processes: HashMap::new(),
rootfs: vec![],
volumes: vec![],
}
}
fn container_id(&self) -> &str {
self.init_process.process.container_id()
}
pub(crate) fn check_state(&self, states: Vec<ProcessStatus>) -> Result<()> {
let state = self.init_process.status;
if states.contains(&state) {
return Ok(());
}
Err(anyhow!(
"failed to check state {:?} for {:?}",
state,
states
))
}
pub(crate) fn set_state(&mut self, state: ProcessStatus) {
self.init_process.status = state;
}
pub(crate) async fn start_exec_process(&mut self, process: &ContainerProcess) -> Result<()> {
let exec = self
.exec_processes
.get_mut(&process.exec_id)
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
self.agent
.exec_process(agent::ExecProcessRequest {
process_id: process.clone().into(),
string_user: None,
process: Some(exec.oci_process.clone()),
})
.await
.map(|_| {
exec.process.status = ProcessStatus::Running;
})
}
pub(crate) async fn win_resize_process(
&self,
process: &ContainerProcess,
height: u32,
width: u32,
) -> Result<()> {
self.check_state(vec![ProcessStatus::Created, ProcessStatus::Running])
.context("check state")?;
self.agent
.tty_win_resize(agent::TtyWinResizeRequest {
process_id: process.clone().into(),
row: height,
column: width,
})
.await?;
Ok(())
}
pub fn fetch_exit_watcher(&self, process: &ContainerProcess) -> Result<ProcessWatcher> {
match process.process_type {
ProcessType::Container => self.init_process.fetch_exit_watcher(),
ProcessType::Exec => {
let exec = self
.exec_processes
.get(&process.exec_id)
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
exec.process.fetch_exit_watcher()
}
}
}
pub(crate) async fn start_container(&mut self, cid: &ContainerID) -> Result<()> {
self.check_state(vec![ProcessStatus::Created, ProcessStatus::Stopped])
.context("check state")?;
self.agent
.start_container(agent::ContainerID {
container_id: cid.container_id.clone(),
})
.await
.context("start container")?;
self.set_state(ProcessStatus::Running);
Ok(())
}
async fn get_exit_status(&self) -> Arc<RwLock<ProcessExitStatus>> {
self.init_process.exit_status.clone()
}
pub(crate) fn add_exec_process(&mut self, id: &str, exec: Exec) -> Option<Exec> {
self.exec_processes.insert(id.to_string(), exec)
}
pub(crate) async fn delete_exec_process(&mut self, eid: &str) -> Result<()> {
match self.exec_processes.remove(eid) {
Some(_) => {
debug!(self.logger, " delete process eid {}", eid);
Ok(())
}
None => Err(anyhow!(
"failed to find cid {} eid {}",
self.container_id(),
eid
)),
}
}
async fn cleanup_container(&mut self, cid: &str, force: bool) -> Result<()> {
// wait until the container process
// terminated and the status write lock released.
info!(self.logger, "wait on container terminated");
let exit_status = self.get_exit_status().await;
let _locked_exit_status = exit_status.read().await;
info!(self.logger, "container terminated");
let timeout: u32 = 10;
self.agent
.remove_container(agent::RemoveContainerRequest::new(cid, timeout))
.await
.or_else(|e| {
if force {
warn!(
self.logger,
"stop container: agent remove container failed: {}", e
);
Ok(agent::Empty::new())
} else {
Err(e)
}
})?;
// close the exit channel to wakeup wait service
// send to notify watchers who are waiting for the process exit
self.init_process.stop();
Ok(())
}
pub(crate) async fn stop_process(
&mut self,
process: &ContainerProcess,
force: bool,
) -> Result<()> {
let logger = logger_with_process(process);
info!(logger, "begin to stop process");
// do not stop again when state stopped, may cause multi cleanup resource
self.check_state(vec![ProcessStatus::Running])
.context("check state")?;
// if use force mode to stop container, stop always successful
// send kill signal to container
// ignore the error of sending signal, since the process would
// have been killed and exited yet.
self.signal_process(process, Signal::SIGKILL as u32, false)
.await
.map_err(|e| {
warn!(logger, "failed to signal kill. {:?}", e);
})
.ok();
match process.process_type {
ProcessType::Container => self
.cleanup_container(&process.container_id.container_id, force)
.await
.context("stop container")?,
ProcessType::Exec => {
let exec = self
.exec_processes
.get_mut(&process.exec_id)
.ok_or_else(|| anyhow!("failed to find exec"))?;
exec.process.stop();
}
}
Ok(())
}
pub(crate) async fn signal_process(
&self,
process: &ContainerProcess,
signal: u32,
all: bool,
) -> Result<()> {
let mut process_id: agent::ContainerProcessID = process.clone().into();
if all {
// force signal init process
process_id.exec_id.clear();
};
self.agent
.signal_process(agent::SignalProcessRequest { process_id, signal })
.await?;
Ok(())
}
pub async fn new_container_io(&self, process: &ContainerProcess) -> Result<ContainerIo> {
Ok(ContainerIo::new(self.agent.clone(), process.clone()))
}
pub async fn close_io(&mut self, process: &ContainerProcess) -> Result<()> {
match process.process_type {
ProcessType::Container => self.init_process.close_io().await,
ProcessType::Exec => {
let exec = self
.exec_processes
.get_mut(&process.exec_id)
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
exec.process.close_io().await;
}
};
Ok(())
}
}

View File

@ -0,0 +1,171 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
future::Future,
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use agent::Agent;
use anyhow::Result;
use common::types::ContainerProcess;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
struct ContainerIoInfo {
pub agent: Arc<dyn Agent>,
pub process: ContainerProcess,
}
pub struct ContainerIo {
pub stdin: Box<dyn AsyncWrite + Send + Unpin>,
pub stdout: Box<dyn AsyncRead + Send + Unpin>,
pub stderr: Box<dyn AsyncRead + Send + Unpin>,
}
impl ContainerIo {
pub fn new(agent: Arc<dyn Agent>, process: ContainerProcess) -> Self {
let info = Arc::new(ContainerIoInfo { agent, process });
Self {
stdin: Box::new(ContainerIoWrite::new(info.clone())),
stdout: Box::new(ContainerIoRead::new(info.clone(), true)),
stderr: Box::new(ContainerIoRead::new(info, false)),
}
}
}
struct ContainerIoWrite<'inner> {
pub info: Arc<ContainerIoInfo>,
write_future:
Option<Pin<Box<dyn Future<Output = Result<agent::WriteStreamResponse>> + Send + 'inner>>>,
}
impl<'inner> ContainerIoWrite<'inner> {
pub fn new(info: Arc<ContainerIoInfo>) -> Self {
Self {
info,
write_future: Default::default(),
}
}
fn poll_write_inner(
&'inner mut self,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let mut write_future = self.write_future.take();
if write_future.is_none() {
let req = agent::WriteStreamRequest {
process_id: self.info.process.clone().into(),
data: buf.to_vec(),
};
write_future = Some(Box::pin(self.info.agent.write_stdin(req)));
}
let mut write_future = write_future.unwrap();
match write_future.as_mut().poll(cx) {
Poll::Ready(v) => match v {
Ok(resp) => Poll::Ready(Ok(resp.length as usize)),
Err(err) => Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err))),
},
Poll::Pending => {
self.write_future = Some(write_future);
Poll::Pending
}
}
}
}
impl<'inner> AsyncWrite for ContainerIoWrite<'inner> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let me = unsafe {
std::mem::transmute::<&mut ContainerIoWrite<'_>, &mut ContainerIoWrite<'inner>>(
&mut *self,
)
};
me.poll_write_inner(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
type ResultBuffer = Result<agent::ReadStreamResponse>;
struct ContainerIoRead<'inner> {
pub info: Arc<ContainerIoInfo>,
is_stdout: bool,
read_future: Option<Pin<Box<dyn Future<Output = ResultBuffer> + Send + 'inner>>>,
}
impl<'inner> ContainerIoRead<'inner> {
pub fn new(info: Arc<ContainerIoInfo>, is_stdout: bool) -> Self {
Self {
info,
is_stdout,
read_future: Default::default(),
}
}
fn poll_read_inner(
&'inner mut self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let mut read_future = self.read_future.take();
if read_future.is_none() {
let req = agent::ReadStreamRequest {
process_id: self.info.process.clone().into(),
len: buf.remaining() as u32,
};
read_future = if self.is_stdout {
Some(Box::pin(self.info.agent.read_stdout(req)))
} else {
Some(Box::pin(self.info.agent.read_stderr(req)))
};
}
let mut read_future = read_future.unwrap();
match read_future.as_mut().poll(cx) {
Poll::Ready(v) => match v {
Ok(resp) => {
buf.put_slice(&resp.data);
Poll::Ready(Ok(()))
}
Err(err) => Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err))),
},
Poll::Pending => {
self.read_future = Some(read_future);
Poll::Pending
}
}
}
}
impl<'inner> AsyncRead for ContainerIoRead<'inner> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let me = unsafe {
std::mem::transmute::<&mut ContainerIoRead<'_>, &mut ContainerIoRead<'inner>>(
&mut *self,
)
};
me.poll_read_inner(cx, buf)
}
}

View File

@ -0,0 +1,10 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
mod container_io;
pub use container_io::ContainerIo;
mod shim_io;
pub use shim_io::ShimIo;

View File

@ -0,0 +1,135 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
io,
os::unix::{io::FromRawFd, net::UnixStream as StdUnixStream},
pin::Pin,
task::Context as TaskContext,
task::Poll,
};
use anyhow::{anyhow, Context, Result};
use nix::{
fcntl::{self, OFlag},
sys::stat::Mode,
};
use tokio::{
fs::File,
io::{AsyncRead, AsyncWrite},
net::UnixStream as AsyncUnixStream,
};
use url::Url;
fn open_fifo(path: &str) -> Result<AsyncUnixStream> {
let fd = fcntl::open(path, OFlag::O_RDWR, Mode::from_bits(0).unwrap())?;
let std_stream = unsafe { StdUnixStream::from_raw_fd(fd) };
std_stream
.set_nonblocking(true)
.context("set nonblocking")?;
AsyncUnixStream::from_std(std_stream).map_err(|e| anyhow!(e))
}
pub struct ShimIo {
pub stdin: Option<Box<dyn AsyncRead + Send + Unpin>>,
pub stdout: Option<Box<dyn AsyncWrite + Send + Unpin>>,
pub stderr: Option<Box<dyn AsyncWrite + Send + Unpin>>,
}
impl ShimIo {
pub async fn new(
stdin: &Option<String>,
stdout: &Option<String>,
stderr: &Option<String>,
) -> Result<Self> {
let stdin_fd: Option<Box<dyn AsyncRead + Send + Unpin>> = if let Some(stdin) = stdin {
match File::open(&stdin).await {
Ok(file) => Some(Box::new(file)),
Err(err) => {
error!(sl!(), "failed to open {} error {:?}", &stdin, err);
None
}
}
} else {
None
};
let get_url = |url: &Option<String>| -> Option<Url> {
match url {
None => None,
Some(out) => match Url::parse(out.as_str()) {
Err(url::ParseError::RelativeUrlWithoutBase) => {
let out = "fifo://".to_owned() + out.as_str();
let u = Url::parse(out.as_str()).unwrap();
Some(u)
}
Err(err) => {
warn!(sl!(), "unable to parse stdout uri: {}", err);
None
}
Ok(u) => Some(u),
},
}
};
let stdout_url = get_url(stdout);
let get_fd = |url: &Option<Url>| -> Option<Box<dyn AsyncWrite + Send + Unpin>> {
if let Some(url) = url {
if url.scheme() == "fifo" {
let path = url.path();
match open_fifo(path) {
Ok(s) => {
return Some(Box::new(ShimIoWrite::Stream(s)));
}
Err(err) => {
error!(sl!(), "failed to open file {} error {:?}", url.path(), err);
}
}
}
}
None
};
let stderr_url = get_url(stderr);
Ok(Self {
stdin: stdin_fd,
stdout: get_fd(&stdout_url),
stderr: get_fd(&stderr_url),
})
}
}
#[derive(Debug)]
enum ShimIoWrite {
Stream(AsyncUnixStream),
// TODO: support other type
}
impl AsyncWrite for ShimIoWrite {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match *self {
ShimIoWrite::Stream(ref mut s) => Pin::new(s).poll_write(cx, buf),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<io::Result<()>> {
match *self {
ShimIoWrite::Stream(ref mut s) => Pin::new(s).poll_flush(cx),
}
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<io::Result<()>> {
match *self {
ShimIoWrite::Stream(ref mut s) => Pin::new(s).poll_shutdown(cx),
}
}
}

View File

@ -0,0 +1,274 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::{anyhow, Context, Result};
use std::{collections::HashMap, sync::Arc};
use agent::Agent;
use async_trait::async_trait;
use common::{
error::Error,
types::{
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest,
ProcessExitStatus, ProcessStateInfo, ProcessType, ResizePTYRequest, ShutdownRequest,
StatsInfo, UpdateRequest, PID,
},
ContainerManager,
};
use oci::Process as OCIProcess;
use resource::ResourceManager;
use tokio::sync::RwLock;
use super::{logger_with_process, Container};
unsafe impl Send for VirtContainerManager {}
unsafe impl Sync for VirtContainerManager {}
pub struct VirtContainerManager {
sid: String,
pid: u32,
containers: Arc<RwLock<HashMap<String, Container>>>,
resource_manager: Arc<ResourceManager>,
agent: Arc<dyn Agent>,
}
impl VirtContainerManager {
pub fn new(
sid: &str,
pid: u32,
agent: Arc<dyn Agent>,
resource_manager: Arc<ResourceManager>,
) -> Self {
Self {
sid: sid.to_string(),
pid,
containers: Default::default(),
resource_manager,
agent,
}
}
}
#[async_trait]
impl ContainerManager for VirtContainerManager {
async fn create_container(&self, config: ContainerConfig, spec: oci::Spec) -> Result<PID> {
let container = Container::new(
self.pid,
config,
self.agent.clone(),
self.resource_manager.clone(),
)
.context("new container")?;
let mut containers = self.containers.write().await;
container.create(spec).await.context("create")?;
containers.insert(container.container_id.to_string(), container);
Ok(PID { pid: self.pid })
}
async fn close_process_io(&self, process: &ContainerProcess) -> Result<()> {
let containers = self.containers.read().await;
let container_id = &process.container_id.to_string();
let c = containers
.get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
c.close_io(process).await.context("close io")?;
Ok(())
}
async fn delete_process(&self, process: &ContainerProcess) -> Result<ProcessStateInfo> {
let container_id = &process.container_id.container_id;
match process.process_type {
ProcessType::Container => {
let mut containers = self.containers.write().await;
let c = containers
.remove(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.to_string()))?;
c.state_process(process).await.context("state process")
}
ProcessType::Exec => {
let containers = self.containers.read().await;
let c = containers
.get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.to_string()))?;
c.delete_exec_process(process)
.await
.context("delete process")?;
c.state_process(process).await.context("state process")
}
}
}
async fn exec_process(&self, req: ExecProcessRequest) -> Result<()> {
if req.spec_type_url.is_empty() {
return Err(anyhow!("invalid type url"));
}
let oci_process: OCIProcess =
serde_json::from_slice(&req.spec_value).context("serde from slice")?;
let containers = self.containers.read().await;
let container_id = &req.process.container_id.container_id;
let c = containers
.get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
c.exec_process(
&req.process,
req.stdin,
req.stdout,
req.stderr,
req.terminal,
oci_process,
)
.await
.context("exec")?;
Ok(())
}
async fn kill_process(&self, req: &KillRequest) -> Result<()> {
let containers = self.containers.read().await;
let container_id = &req.process.container_id.container_id;
let c = containers
.get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
c.kill_process(&req.process, req.signal, req.all)
.await
.map_err(|err| {
warn!(
sl!(),
"failed to signal process {:?} {:?}", &req.process, err
);
err
})
.ok();
Ok(())
}
async fn wait_process(&self, process: &ContainerProcess) -> Result<ProcessExitStatus> {
let logger = logger_with_process(process);
let containers = self.containers.read().await;
let container_id = &process.container_id.container_id;
let c = containers
.get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
let (watcher, status) = c.wait_process(process).await.context("wait")?;
drop(containers);
match watcher {
Some(mut watcher) => {
info!(logger, "begin wait exit");
while watcher.changed().await.is_ok() {}
info!(logger, "end wait exited");
}
None => {
warn!(logger, "failed to find watcher for wait process");
}
}
let status = status.read().await;
info!(logger, "wait process exit status {:?}", status);
// stop process
let containers = self.containers.read().await;
let container_id = &process.container_id.container_id;
let c = containers
.get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
c.stop_process(process).await.context("stop container")?;
Ok(status.clone())
}
async fn start_process(&self, process: &ContainerProcess) -> Result<PID> {
let containers = self.containers.read().await;
let container_id = &process.container_id.container_id;
let c = containers
.get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
c.start(process).await.context("start")?;
Ok(PID { pid: self.pid })
}
async fn state_process(&self, process: &ContainerProcess) -> Result<ProcessStateInfo> {
let containers = self.containers.read().await;
let container_id = &process.container_id.container_id;
let c = containers
.get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
let state = c.state_process(process).await.context("state process")?;
Ok(state)
}
async fn pause_container(&self, id: &ContainerID) -> Result<()> {
let containers = self.containers.read().await;
let c = containers
.get(&id.container_id)
.ok_or_else(|| Error::ContainerNotFound(id.container_id.clone()))?;
c.pause().await.context("pause")?;
Ok(())
}
async fn resume_container(&self, id: &ContainerID) -> Result<()> {
let containers = self.containers.read().await;
let c = containers
.get(&id.container_id)
.ok_or_else(|| Error::ContainerNotFound(id.container_id.clone()))?;
c.resume().await.context("resume")?;
Ok(())
}
async fn resize_process_pty(&self, req: &ResizePTYRequest) -> Result<()> {
let containers = self.containers.read().await;
let c = containers
.get(&req.process.container_id.container_id)
.ok_or_else(|| {
Error::ContainerNotFound(req.process.container_id.container_id.clone())
})?;
c.resize_pty(&req.process, req.width, req.height)
.await
.context("resize pty")?;
Ok(())
}
async fn stats_container(&self, id: &ContainerID) -> Result<StatsInfo> {
let containers = self.containers.read().await;
let c = containers
.get(&id.container_id)
.ok_or_else(|| Error::ContainerNotFound(id.container_id.clone()))?;
let stats = c.stats().await.context("stats")?;
Ok(StatsInfo::from(stats))
}
async fn update_container(&self, req: UpdateRequest) -> Result<()> {
let resource = serde_json::from_slice::<oci::LinuxResources>(&req.value)
.context("deserialize LinuxResource")?;
let containers = self.containers.read().await;
let container_id = &req.container_id;
let c = containers
.get(container_id)
.ok_or_else(|| Error::ContainerNotFound(container_id.to_string()))?;
c.update(&resource).await.context("stats")
}
async fn pid(&self) -> Result<PID> {
Ok(PID { pid: self.pid })
}
async fn connect_container(&self, _id: &ContainerID) -> Result<PID> {
Ok(PID { pid: self.pid })
}
async fn need_shutdown_sandbox(&self, req: &ShutdownRequest) -> bool {
req.is_now || self.containers.read().await.is_empty() || self.sid == req.container_id
}
async fn is_sandbox_container(&self, process: &ContainerProcess) -> bool {
process.process_type == ProcessType::Container
&& process.container_id.container_id == self.sid
}
}

View File

@ -0,0 +1,20 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
mod container;
use container::{Container, Exec};
mod container_inner;
mod io;
use container_inner::ContainerInner;
mod manager;
pub use manager::VirtContainerManager;
mod process;
use common::types::ContainerProcess;
fn logger_with_process(container_process: &ContainerProcess) -> slog::Logger {
sl!().new(o!("container_id" => container_process.container_id.container_id.clone(), "exec_id" => container_process.exec_id.clone()))
}

View File

@ -0,0 +1,211 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use agent::Agent;
use anyhow::{Context, Result};
use awaitgroup::{WaitGroup, Worker as WaitGroupWorker};
use common::types::{ContainerProcess, ProcessExitStatus, ProcessStateInfo, ProcessStatus, PID};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{watch, RwLock},
};
use super::{
io::{ContainerIo, ShimIo},
logger_with_process,
};
pub type ProcessWatcher = (
Option<watch::Receiver<bool>>,
Arc<RwLock<ProcessExitStatus>>,
);
#[derive(Debug)]
pub struct Process {
pub process: ContainerProcess,
pub pid: u32,
logger: slog::Logger,
pub bundle: String,
pub stdin: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
pub terminal: bool,
pub height: u32,
pub width: u32,
pub status: ProcessStatus,
pub exit_status: Arc<RwLock<ProcessExitStatus>>,
pub exit_watcher_rx: Option<watch::Receiver<bool>>,
pub exit_watcher_tx: Option<watch::Sender<bool>>,
// used to sync between stdin io copy thread(tokio) and the close it call.
// close io call should wait until the stdin io copy finished to
// prevent stdin data lost.
pub wg_stdin: WaitGroup,
}
impl Process {
pub fn new(
process: &ContainerProcess,
pid: u32,
bundle: &str,
stdin: Option<String>,
stdout: Option<String>,
stderr: Option<String>,
terminal: bool,
) -> Process {
let (sender, receiver) = watch::channel(false);
Process {
process: process.clone(),
pid,
logger: logger_with_process(process),
bundle: bundle.to_string(),
stdin,
stdout,
stderr,
terminal,
height: 0,
width: 0,
status: ProcessStatus::Created,
exit_status: Arc::new(RwLock::new(ProcessExitStatus::new())),
exit_watcher_rx: Some(receiver),
exit_watcher_tx: Some(sender),
wg_stdin: WaitGroup::new(),
}
}
pub async fn start_io_and_wait(
&mut self,
agent: Arc<dyn Agent>,
container_io: ContainerIo,
) -> Result<()> {
info!(self.logger, "start io and wait");
// new shim io
let shim_io = ShimIo::new(&self.stdin, &self.stdout, &self.stderr)
.await
.context("new shim io")?;
// start io copy for stdin
let wgw_stdin = self.wg_stdin.worker();
if let Some(stdin) = shim_io.stdin {
self.run_io_copy("stdin", wgw_stdin, stdin, container_io.stdin)
.await?;
}
// prepare for wait group for stdout, stderr
let wg = WaitGroup::new();
let wgw = wg.worker();
// start io copy for stdout
if let Some(stdout) = shim_io.stdout {
self.run_io_copy("stdout", wgw.clone(), container_io.stdout, stdout)
.await?;
}
// start io copy for stderr
if !self.terminal {
if let Some(stderr) = shim_io.stderr {
self.run_io_copy("stderr", wgw, container_io.stderr, stderr)
.await?;
}
}
self.run_io_wait(agent, wg).await.context("run io thread")?;
Ok(())
}
async fn run_io_copy<'a>(
&'a self,
io_name: &'a str,
wgw: WaitGroupWorker,
mut reader: Box<dyn AsyncRead + Send + Unpin>,
mut writer: Box<dyn AsyncWrite + Send + Unpin>,
) -> Result<()> {
let io_name = io_name.to_string();
let logger = self.logger.new(o!("io name" => io_name));
let _ = tokio::spawn(async move {
match tokio::io::copy(&mut reader, &mut writer).await {
Err(e) => warn!(logger, "io: failed to copy stdin stream {}", e),
Ok(length) => warn!(logger, "io: stop to copy stdin stream length {}", length),
};
wgw.done();
});
Ok(())
}
async fn run_io_wait(&mut self, agent: Arc<dyn Agent>, mut wg: WaitGroup) -> Result<()> {
let logger = self.logger.clone();
info!(logger, "start run io wait");
let process = self.process.clone();
let status = self.exit_status.clone();
let exit_notifier = self.exit_watcher_tx.take();
let _ = tokio::spawn(async move {
//wait on all of the container's io stream terminated
info!(logger, "begin wait group io",);
wg.wait().await;
info!(logger, "end wait group for io");
let req = agent::WaitProcessRequest {
process_id: process.clone().into(),
};
info!(logger, "begin wait process");
let resp = match agent.wait_process(req).await {
Ok(ret) => ret,
Err(e) => {
error!(logger, "failed to wait process {:?}", e);
return;
}
};
info!(logger, "end wait process exit code {}", resp.status);
let mut locked_status = status.write().await;
locked_status.update_exit_code(resp.status);
drop(exit_notifier);
info!(logger, "end io wait thread");
});
Ok(())
}
pub fn fetch_exit_watcher(&self) -> Result<ProcessWatcher> {
Ok((self.exit_watcher_rx.clone(), self.exit_status.clone()))
}
pub async fn state(&self) -> Result<ProcessStateInfo> {
let exit_status = self.exit_status.read().await;
Ok(ProcessStateInfo {
container_id: self.process.container_id.container_id.clone(),
exec_id: self.process.exec_id.clone(),
pid: PID { pid: self.pid },
bundle: self.bundle.clone(),
stdin: self.stdin.clone(),
stdout: self.stdout.clone(),
stderr: self.stderr.clone(),
terminal: self.terminal,
status: self.status,
exit_status: exit_status.exit_code,
exited_at: exit_status.exit_time,
})
}
pub fn stop(&mut self) {
self.status = ProcessStatus::Stopped;
}
pub async fn close_io(&mut self) {
self.wg_stdin.wait().await;
}
}

View File

@ -0,0 +1,123 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use agent::Agent;
use anyhow::Context;
use tokio::sync::{mpsc, Mutex};
/// monitor check interval 30s
const HEALTH_CHECK_TIMER_INTERVAL: u64 = 30;
/// version check threshold 5min
const VERSION_CHECK_THRESHOLD: u64 = 5 * 60 / HEALTH_CHECK_TIMER_INTERVAL;
/// health check stop channel buffer size
const HEALTH_CHECK_STOP_CHANNEL_BUFFER_SIZE: usize = 1;
pub struct HealthCheck {
pub keep_alive: bool,
keep_vm: bool,
stop_tx: mpsc::Sender<()>,
stop_rx: Arc<Mutex<mpsc::Receiver<()>>>,
}
impl HealthCheck {
pub fn new(keep_alive: bool, keep_vm: bool) -> HealthCheck {
let (tx, rx) = mpsc::channel(HEALTH_CHECK_STOP_CHANNEL_BUFFER_SIZE);
HealthCheck {
keep_alive,
keep_vm,
stop_tx: tx,
stop_rx: Arc::new(Mutex::new(rx)),
}
}
pub fn start(&self, id: &str, agent: Arc<dyn Agent>) {
if !self.keep_alive {
return;
}
let id = id.to_string();
info!(sl!(), "start runtime keep alive");
let stop_rx = self.stop_rx.clone();
let keep_vm = self.keep_vm;
let _ = tokio::spawn(async move {
let mut version_check_threshold_count = 0;
loop {
tokio::time::sleep(std::time::Duration::from_secs(HEALTH_CHECK_TIMER_INTERVAL))
.await;
let mut stop_rx = stop_rx.lock().await;
match stop_rx.try_recv() {
Ok(_) => {
info!(sl!(), "revive stop {} monitor signal", id);
break;
}
Err(mpsc::error::TryRecvError::Empty) => {
// check agent
match agent
.check(agent::CheckRequest::new(""))
.await
.context("check health")
{
Ok(_) => {
debug!(sl!(), "check {} agent health successfully", id);
version_check_threshold_count += 1;
if version_check_threshold_count >= VERSION_CHECK_THRESHOLD {
// need to check version
version_check_threshold_count = 0;
if let Ok(v) = agent
.version(agent::CheckRequest::new(""))
.await
.context("check version")
{
info!(sl!(), "agent {}", v.agent_version)
}
}
continue;
}
Err(e) => {
error!(sl!(), "failed to do {} agent health check: {}", id, e);
if let Err(mpsc::error::TryRecvError::Empty) = stop_rx.try_recv() {
error!(sl!(), "failed to receive stop monitor signal");
if !keep_vm {
::std::process::exit(1);
}
} else {
info!(sl!(), "wait to exit exit {}", id);
break;
}
}
}
}
Err(mpsc::error::TryRecvError::Disconnected) => {
warn!(sl!(), "{} monitor channel has broken", id);
break;
}
}
}
});
}
pub async fn stop(&self) {
if !self.keep_alive {
return;
}
info!(sl!(), "stop runtime keep alive");
self.stop_tx
.send(())
.await
.map_err(|e| {
warn!(sl!(), "failed send monitor channel. {:?}", e);
})
.ok();
}
}

View File

@ -3,12 +3,25 @@
//
// SPDX-License-Identifier: Apache-2.0
//
#[macro_use]
extern crate slog;
logging::logger_with_subsystem!(sl, "virt-container");
mod container_manager;
pub mod health_check;
pub mod sandbox;
use std::sync::Arc;
use anyhow::Result;
use agent::kata::KataAgent;
use anyhow::{Context, Result};
use async_trait::async_trait;
use common::{message::Message, RuntimeHandler, RuntimeInstance};
use kata_types::config::{hypervisor::register_hypervisor_plugin, DragonballConfig};
use hypervisor::Hypervisor;
use kata_types::config::{hypervisor::register_hypervisor_plugin, DragonballConfig, TomlConfig};
use resource::ResourceManager;
use tokio::sync::mpsc::Sender;
unsafe impl Send for VirtContainer {}
@ -34,10 +47,55 @@ impl RuntimeHandler for VirtContainer {
async fn new_instance(
&self,
_sid: &str,
_msg_sender: Sender<Message>,
sid: &str,
msg_sender: Sender<Message>,
) -> Result<RuntimeInstance> {
todo!()
let (toml_config, _) = TomlConfig::load_from_file("").context("load config")?;
// TODO: new sandbox and container manager
// TODO: get from hypervisor
let hypervisor = new_hypervisor(&toml_config)
.await
.context("new hypervisor")?;
// get uds from hypervisor and get config from toml_config
let agent = Arc::new(KataAgent::new(kata_types::config::Agent {
debug: true,
enable_tracing: false,
server_port: 1024,
log_port: 1025,
dial_timeout_ms: 10,
reconnect_timeout_ms: 3_000,
request_timeout_ms: 30_000,
health_check_request_timeout_ms: 90_000,
kernel_modules: Default::default(),
container_pipe_size: 0,
debug_console_enabled: false,
}));
let resource_manager = Arc::new(ResourceManager::new(
sid,
agent.clone(),
hypervisor.clone(),
&toml_config,
)?);
let pid = std::process::id();
let sandbox = sandbox::VirtSandbox::new(
sid,
msg_sender,
agent.clone(),
hypervisor,
resource_manager.clone(),
)
.await
.context("new virt sandbox")?;
let container_manager =
container_manager::VirtContainerManager::new(sid, pid, agent, resource_manager);
Ok(RuntimeInstance {
sandbox: Arc::new(sandbox),
container_manager: Arc::new(container_manager),
})
}
fn cleanup(&self, _id: &str) -> Result<()> {
@ -45,3 +103,8 @@ impl RuntimeHandler for VirtContainer {
Ok(())
}
}
async fn new_hypervisor(_toml_config: &TomlConfig) -> Result<Arc<dyn Hypervisor>> {
// TODO: implement ready hypervisor
todo!()
}

View File

@ -0,0 +1,229 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::sync::Arc;
use agent::{self, Agent};
use anyhow::{Context, Result};
use async_trait::async_trait;
use common::{
message::{Action, Message},
Sandbox,
};
use containerd_shim_protos::events::task::TaskOOM;
use hypervisor::Hypervisor;
use kata_types::config::TomlConfig;
use resource::{ResourceConfig, ResourceManager};
use tokio::sync::{mpsc::Sender, Mutex, RwLock};
use crate::health_check::HealthCheck;
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum SandboxState {
Init,
Running,
Stopped,
}
struct SandboxInner {
state: SandboxState,
}
impl SandboxInner {
pub fn new() -> Self {
Self {
state: SandboxState::Init,
}
}
}
unsafe impl Send for VirtSandbox {}
unsafe impl Sync for VirtSandbox {}
#[derive(Clone)]
pub struct VirtSandbox {
sid: String,
msg_sender: Arc<Mutex<Sender<Message>>>,
inner: Arc<RwLock<SandboxInner>>,
resource_manager: Arc<ResourceManager>,
agent: Arc<dyn Agent>,
hypervisor: Arc<dyn Hypervisor>,
monitor: Arc<HealthCheck>,
}
impl VirtSandbox {
pub async fn new(
sid: &str,
msg_sender: Sender<Message>,
agent: Arc<dyn Agent>,
hypervisor: Arc<dyn Hypervisor>,
resource_manager: Arc<ResourceManager>,
) -> Result<Self> {
Ok(Self {
sid: sid.to_string(),
msg_sender: Arc::new(Mutex::new(msg_sender)),
inner: Arc::new(RwLock::new(SandboxInner::new())),
agent,
hypervisor,
resource_manager,
monitor: Arc::new(HealthCheck::new(true, true)),
})
}
async fn prepare_for_start_sandbox(
&self,
netns: Option<String>,
_config: &TomlConfig,
) -> Result<Vec<ResourceConfig>> {
let mut resource_configs = vec![];
if let Some(_netns_path) = netns {
// TODO: support network
}
let hypervisor_config = self.hypervisor.hypervisor_config().await;
let virtio_fs_config = ResourceConfig::ShareFs(hypervisor_config.shared_fs);
resource_configs.push(virtio_fs_config);
Ok(resource_configs)
}
}
#[async_trait]
impl Sandbox for VirtSandbox {
async fn start(&self, netns: Option<String>, config: &TomlConfig) -> Result<()> {
let id = &self.sid;
// if sandbox running, return
// if sandbox not running try to start sandbox
let mut inner = self.inner.write().await;
if inner.state == SandboxState::Running {
warn!(sl!(), "sandbox is running, no need to start");
return Ok(());
}
self.hypervisor
.prepare_vm(id, netns.clone())
.await
.context("prepare vm")?;
// generate device and setup before start vm
// should after hypervisor.prepare_vm
let resources = self.prepare_for_start_sandbox(netns, config).await?;
self.resource_manager
.prepare_before_start_vm(resources)
.await
.context("set up device before start vm")?;
// start vm
self.hypervisor.start_vm(10_000).await.context("start vm")?;
info!(sl!(), "start vm");
// connect agent
// set agent socket
let address = self
.hypervisor
.get_agent_socket()
.await
.context("get agent socket")?;
self.agent.start(&address).await.context("connect")?;
self.resource_manager
.setup_after_start_vm()
.await
.context("setup device after start vm")?;
// create sandbox in vm
let req = agent::CreateSandboxRequest {
hostname: "".to_string(),
dns: vec![],
storages: self
.resource_manager
.get_storage_for_sandbox()
.await
.context("get storages for sandbox")?,
sandbox_pidns: false,
sandbox_id: id.to_string(),
guest_hook_path: "".to_string(),
kernel_modules: vec![],
};
self.agent
.create_sandbox(req)
.await
.context("create sandbox")?;
inner.state = SandboxState::Running;
let agent = self.agent.clone();
let sender = self.msg_sender.clone();
info!(sl!(), "oom watcher start");
let _ = tokio::spawn(async move {
loop {
match agent
.get_oom_event(agent::Empty::new())
.await
.context("get oom event")
{
Ok(resp) => {
let cid = &resp.container_id;
warn!(sl!(), "send oom event for container {}", &cid);
let event = TaskOOM {
container_id: cid.to_string(),
..Default::default()
};
let msg = Message::new(Action::Event(Arc::new(event)));
let lock_sender = sender.lock().await;
if let Err(err) = lock_sender.send(msg).await.context("send event") {
error!(
sl!(),
"failed to send oom event for {} error {:?}", cid, err
);
}
}
Err(err) => {
warn!(sl!(), "failed to get oom event error {:?}", err);
break;
}
}
}
});
self.monitor.start(id, self.agent.clone());
Ok(())
}
async fn stop(&self) -> Result<()> {
info!(sl!(), "begin stop sandbox");
// TODO: stop sandbox
Ok(())
}
async fn shutdown(&self) -> Result<()> {
info!(sl!(), "shutdown");
self.resource_manager
.delete_cgroups()
.await
.context("delete cgroups")?;
info!(sl!(), "stop monitor");
self.monitor.stop().await;
info!(sl!(), "stop agent");
self.agent.stop().await;
// stop server
info!(sl!(), "send shutdown message");
let msg = Message::new(Action::Shutdown);
let sender = self.msg_sender.clone();
let sender = sender.lock().await;
sender.send(msg).await.context("send shutdown msg")?;
Ok(())
}
async fn cleanup(&self, _id: &str) -> Result<()> {
// TODO: cleanup
Ok(())
}
}