mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-26 15:32:30 +00:00
runtime-rs: runtime part implement
Fixes: #3785 Signed-off-by: Tim Zhang <tim@hyper.sh> Signed-off-by: Zhongtao Hu <zhongtaohu.tim@linux.alibaba.com> Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
This commit is contained in:
parent
10343b1f3d
commit
4be7185aa4
37
src/runtime-rs/Cargo.lock
generated
37
src/runtime-rs/Cargo.lock
generated
@ -77,6 +77,12 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||
|
||||
[[package]]
|
||||
name = "awaitgroup"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cc17ab023b4091c10ff099f9deebaeeb59b5189df07e554c4fef042b70745d68"
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.64"
|
||||
@ -647,7 +653,7 @@ dependencies = [
|
||||
"slog",
|
||||
"slog-scope",
|
||||
"thiserror",
|
||||
"toml",
|
||||
"toml 0.5.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1697,6 +1703,15 @@ dependencies = [
|
||||
"vsock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.4.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.5.8"
|
||||
@ -1848,11 +1863,31 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
||||
name = "virt_container"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"agent",
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"awaitgroup",
|
||||
"common",
|
||||
"containerd-shim-protos",
|
||||
"futures 0.3.21",
|
||||
"hypervisor",
|
||||
"kata-sys-util",
|
||||
"kata-types",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"logging",
|
||||
"nix 0.16.1",
|
||||
"oci",
|
||||
"protobuf",
|
||||
"resource",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"slog",
|
||||
"slog-scope",
|
||||
"tokio",
|
||||
"toml 0.4.10",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -7,7 +7,27 @@ edition = "2018"
|
||||
[dependencies]
|
||||
anyhow = "^1.0"
|
||||
async-trait = "0.1.48"
|
||||
awaitgroup = "0.6.0"
|
||||
containerd-shim-protos = { version = "0.2.0", features = ["async"]}
|
||||
futures = "0.3.19"
|
||||
lazy_static = "1.4.0"
|
||||
libc = ">=0.2.39"
|
||||
nix = "0.16.0"
|
||||
protobuf = "2.23.0"
|
||||
serde = { version = "1.0.100", features = ["derive"] }
|
||||
serde_derive = "1.0.27"
|
||||
serde_json = "1.0.39"
|
||||
slog = "2.5.2"
|
||||
slog-scope = "4.4.0"
|
||||
tokio = { version = "1.8.0" }
|
||||
toml = "0.4.2"
|
||||
url = "2.1.1"
|
||||
|
||||
agent = { path = "../../agent" }
|
||||
common = { path = "../common" }
|
||||
hypervisor = { path = "../../hypervisor" }
|
||||
kata-sys-util = { path = "../../../../libs/kata-sys-util" }
|
||||
kata-types = { path = "../../../../libs/kata-types" }
|
||||
logging = { path = "../../../../libs/logging"}
|
||||
oci = { path = "../../../../libs/oci" }
|
||||
resource = { path = "../../resource" }
|
||||
|
@ -0,0 +1,403 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use agent::Agent;
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use common::{
|
||||
error::Error,
|
||||
types::{
|
||||
ContainerConfig, ContainerID, ContainerProcess, ProcessStateInfo, ProcessStatus,
|
||||
ProcessType,
|
||||
},
|
||||
};
|
||||
use oci::{LinuxResources, Process as OCIProcess};
|
||||
use resource::ResourceManager;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{
|
||||
process::{Process, ProcessWatcher},
|
||||
ContainerInner,
|
||||
};
|
||||
use crate::container_manager::logger_with_process;
|
||||
|
||||
pub struct Exec {
|
||||
pub(crate) process: Process,
|
||||
pub(crate) oci_process: OCIProcess,
|
||||
}
|
||||
|
||||
pub struct Container {
|
||||
pid: u32,
|
||||
pub container_id: ContainerID,
|
||||
config: ContainerConfig,
|
||||
inner: Arc<RwLock<ContainerInner>>,
|
||||
agent: Arc<dyn Agent>,
|
||||
resource_manager: Arc<ResourceManager>,
|
||||
logger: slog::Logger,
|
||||
}
|
||||
|
||||
impl Container {
|
||||
pub fn new(
|
||||
pid: u32,
|
||||
config: ContainerConfig,
|
||||
agent: Arc<dyn Agent>,
|
||||
resource_manager: Arc<ResourceManager>,
|
||||
) -> Result<Self> {
|
||||
let container_id = ContainerID::new(&config.container_id).context("new container id")?;
|
||||
let logger = sl!().new(o!("container_id" => config.container_id.clone()));
|
||||
let process = ContainerProcess::new(&config.container_id, "")?;
|
||||
let init_process = Process::new(
|
||||
&process,
|
||||
pid,
|
||||
&config.bundle,
|
||||
config.stdin.clone(),
|
||||
config.stdout.clone(),
|
||||
config.stderr.clone(),
|
||||
config.terminal,
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
pid,
|
||||
container_id,
|
||||
config,
|
||||
inner: Arc::new(RwLock::new(ContainerInner::new(
|
||||
agent.clone(),
|
||||
init_process,
|
||||
logger.clone(),
|
||||
))),
|
||||
agent,
|
||||
resource_manager,
|
||||
logger,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn create(&self, mut spec: oci::Spec) -> Result<()> {
|
||||
// process oci spec
|
||||
let mut inner = self.inner.write().await;
|
||||
let config = &self.config;
|
||||
amend_spec(&mut spec).context("load spec")?;
|
||||
|
||||
// handler rootfs
|
||||
let rootfs = self
|
||||
.resource_manager
|
||||
.handler_rootfs(&config.container_id, &config.bundle, &config.rootfs_mounts)
|
||||
.await
|
||||
.context("handler rootfs")?;
|
||||
|
||||
// update rootfs
|
||||
match spec.root.as_mut() {
|
||||
Some(spec) => {
|
||||
spec.path = rootfs
|
||||
.get_guest_rootfs_path()
|
||||
.await
|
||||
.context("get guest rootfs path")?
|
||||
}
|
||||
None => return Err(anyhow!("spec miss root field")),
|
||||
};
|
||||
inner.rootfs.push(rootfs);
|
||||
|
||||
// handler volumes
|
||||
let volumes = self
|
||||
.resource_manager
|
||||
.handler_volumes(&config.container_id, &spec.mounts)
|
||||
.await
|
||||
.context("handler volumes")?;
|
||||
let mut oci_mounts = vec![];
|
||||
let mut storages = vec![];
|
||||
for v in volumes {
|
||||
let mut volume_mounts = v.get_volume_mount().context("get volume mount")?;
|
||||
if !volume_mounts.is_empty() {
|
||||
oci_mounts.append(&mut volume_mounts);
|
||||
}
|
||||
|
||||
let mut s = v.get_storage().context("get storage")?;
|
||||
if !s.is_empty() {
|
||||
storages.append(&mut s);
|
||||
}
|
||||
inner.volumes.push(v);
|
||||
}
|
||||
spec.mounts = oci_mounts;
|
||||
|
||||
// TODO: handler device
|
||||
|
||||
// update cgroups
|
||||
self.resource_manager
|
||||
.update_cgroups(
|
||||
&config.container_id,
|
||||
spec.linux
|
||||
.as_ref()
|
||||
.map(|linux| linux.resources.as_ref())
|
||||
.flatten(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// create container
|
||||
let r = agent::CreateContainerRequest {
|
||||
process_id: agent::ContainerProcessID::new(&config.container_id, ""),
|
||||
string_user: None,
|
||||
devices: vec![],
|
||||
storages,
|
||||
oci: Some(spec),
|
||||
guest_hooks: None,
|
||||
sandbox_pidns: false,
|
||||
rootfs_mounts: vec![],
|
||||
};
|
||||
|
||||
self.agent
|
||||
.create_container(r)
|
||||
.await
|
||||
.context("agent create container")?;
|
||||
self.resource_manager.dump().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start(&self, process: &ContainerProcess) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
match process.process_type {
|
||||
ProcessType::Container => {
|
||||
if let Err(err) = inner.start_container(&process.container_id).await {
|
||||
let _ = inner.stop_process(process, true).await;
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
let container_io = inner.new_container_io(process).await?;
|
||||
inner
|
||||
.init_process
|
||||
.start_io_and_wait(self.agent.clone(), container_io)
|
||||
.await?;
|
||||
}
|
||||
ProcessType::Exec => {
|
||||
if let Err(e) = inner.start_exec_process(process).await {
|
||||
let _ = inner.stop_process(process, true).await;
|
||||
return Err(e).context("enter process");
|
||||
}
|
||||
|
||||
let container_io = inner.new_container_io(process).await.context("io stream")?;
|
||||
|
||||
{
|
||||
let exec = inner
|
||||
.exec_processes
|
||||
.get(&process.exec_id)
|
||||
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
|
||||
if exec.process.height != 0 && exec.process.width != 0 {
|
||||
inner
|
||||
.win_resize_process(process, exec.process.height, exec.process.width)
|
||||
.await
|
||||
.context("win resize")?;
|
||||
}
|
||||
}
|
||||
|
||||
// start io and wait
|
||||
{
|
||||
let exec = inner
|
||||
.exec_processes
|
||||
.get_mut(&process.exec_id)
|
||||
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
|
||||
|
||||
exec.process
|
||||
.start_io_and_wait(self.agent.clone(), container_io)
|
||||
.await
|
||||
.context("start io and wait")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_exec_process(&self, container_process: &ContainerProcess) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner
|
||||
.delete_exec_process(&container_process.exec_id)
|
||||
.await
|
||||
.context("delete process")
|
||||
}
|
||||
|
||||
pub async fn state_process(
|
||||
&self,
|
||||
container_process: &ContainerProcess,
|
||||
) -> Result<ProcessStateInfo> {
|
||||
let inner = self.inner.read().await;
|
||||
match container_process.process_type {
|
||||
ProcessType::Container => inner.init_process.state().await,
|
||||
ProcessType::Exec => {
|
||||
let exec = inner
|
||||
.exec_processes
|
||||
.get(&container_process.exec_id)
|
||||
.ok_or_else(|| Error::ProcessNotFound(container_process.clone()))?;
|
||||
exec.process.state().await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn wait_process(
|
||||
&self,
|
||||
container_process: &ContainerProcess,
|
||||
) -> Result<ProcessWatcher> {
|
||||
let logger = logger_with_process(container_process);
|
||||
info!(logger, "start wait process");
|
||||
|
||||
let inner = self.inner.read().await;
|
||||
inner
|
||||
.fetch_exit_watcher(container_process)
|
||||
.context("fetch exit watcher")
|
||||
}
|
||||
|
||||
pub async fn kill_process(
|
||||
&self,
|
||||
container_process: &ContainerProcess,
|
||||
signal: u32,
|
||||
all: bool,
|
||||
) -> Result<()> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.signal_process(container_process, signal, all).await
|
||||
}
|
||||
|
||||
pub async fn exec_process(
|
||||
&self,
|
||||
container_process: &ContainerProcess,
|
||||
stdin: Option<String>,
|
||||
stdout: Option<String>,
|
||||
stderr: Option<String>,
|
||||
terminal: bool,
|
||||
oci_process: OCIProcess,
|
||||
) -> Result<()> {
|
||||
let process = Process::new(
|
||||
container_process,
|
||||
self.pid,
|
||||
&self.config.bundle,
|
||||
stdin,
|
||||
stdout,
|
||||
stderr,
|
||||
terminal,
|
||||
);
|
||||
let exec = Exec {
|
||||
process,
|
||||
oci_process,
|
||||
};
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.add_exec_process(&container_process.exec_id, exec);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn close_io(&self, container_process: &ContainerProcess) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.close_io(container_process).await
|
||||
}
|
||||
|
||||
pub async fn stop_process(&self, container_process: &ContainerProcess) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner
|
||||
.stop_process(container_process, true)
|
||||
.await
|
||||
.context("stop process")
|
||||
}
|
||||
|
||||
pub async fn pause(&self) -> Result<()> {
|
||||
let inner = self.inner.read().await;
|
||||
if inner.init_process.status == ProcessStatus::Paused {
|
||||
warn!(self.logger, "container is paused no need to pause");
|
||||
return Ok(());
|
||||
}
|
||||
self.agent
|
||||
.pause_container(self.container_id.clone().into())
|
||||
.await
|
||||
.context("agent pause container")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn resume(&self) -> Result<()> {
|
||||
let inner = self.inner.read().await;
|
||||
if inner.init_process.status == ProcessStatus::Running {
|
||||
warn!(self.logger, "container is running no need to resume");
|
||||
return Ok(());
|
||||
}
|
||||
self.agent
|
||||
.resume_container(self.container_id.clone().into())
|
||||
.await
|
||||
.context("agent pause container")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn resize_pty(
|
||||
&self,
|
||||
process: &ContainerProcess,
|
||||
width: u32,
|
||||
height: u32,
|
||||
) -> Result<()> {
|
||||
let logger = logger_with_process(process);
|
||||
let inner = self.inner.read().await;
|
||||
if inner.init_process.status != ProcessStatus::Running {
|
||||
warn!(logger, "container is running no need to resume");
|
||||
return Ok(());
|
||||
}
|
||||
self.agent
|
||||
.tty_win_resize(agent::TtyWinResizeRequest {
|
||||
process_id: process.clone().into(),
|
||||
row: height,
|
||||
column: width,
|
||||
})
|
||||
.await
|
||||
.context("resize pty")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stats(&self) -> Result<Option<agent::StatsContainerResponse>> {
|
||||
let stats_resp = self
|
||||
.agent
|
||||
.stats_container(self.container_id.clone().into())
|
||||
.await
|
||||
.context("agent stats container")?;
|
||||
Ok(Some(stats_resp))
|
||||
}
|
||||
|
||||
pub async fn update(&self, resources: &LinuxResources) -> Result<()> {
|
||||
self.resource_manager
|
||||
.update_cgroups(&self.config.container_id, Some(resources))
|
||||
.await?;
|
||||
|
||||
let req = agent::UpdateContainerRequest {
|
||||
container_id: self.container_id.container_id.clone(),
|
||||
resources: resources.clone(),
|
||||
mounts: Vec::new(),
|
||||
};
|
||||
self.agent
|
||||
.update_container(req)
|
||||
.await
|
||||
.context("agent update container")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn amend_spec(spec: &mut oci::Spec) -> Result<()> {
|
||||
// hook should be done on host
|
||||
spec.hooks = None;
|
||||
|
||||
if let Some(linux) = spec.linux.as_mut() {
|
||||
linux.seccomp = None;
|
||||
|
||||
if let Some(resource) = linux.resources.as_mut() {
|
||||
resource.devices = Vec::new();
|
||||
resource.pids = None;
|
||||
resource.block_io = None;
|
||||
resource.hugepage_limits = Vec::new();
|
||||
resource.network = None;
|
||||
}
|
||||
|
||||
let mut ns: Vec<oci::LinuxNamespace> = Vec::new();
|
||||
for n in linux.namespaces.iter() {
|
||||
match n.r#type.as_str() {
|
||||
oci::PIDNAMESPACE | oci::NETWORKNAMESPACE => continue,
|
||||
_ => ns.push(n.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
linux.namespaces = ns;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
@ -0,0 +1,261 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use agent::Agent;
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use common::{
|
||||
error::Error,
|
||||
types::{ContainerID, ContainerProcess, ProcessExitStatus, ProcessStatus, ProcessType},
|
||||
};
|
||||
use nix::sys::signal::Signal;
|
||||
use resource::{rootfs::Rootfs, volume::Volume};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::container_manager::logger_with_process;
|
||||
|
||||
use super::{
|
||||
io::ContainerIo,
|
||||
process::{Process, ProcessWatcher},
|
||||
Exec,
|
||||
};
|
||||
|
||||
pub struct ContainerInner {
|
||||
agent: Arc<dyn Agent>,
|
||||
logger: slog::Logger,
|
||||
pub(crate) init_process: Process,
|
||||
pub(crate) exec_processes: HashMap<String, Exec>,
|
||||
pub(crate) rootfs: Vec<Arc<dyn Rootfs>>,
|
||||
pub(crate) volumes: Vec<Arc<dyn Volume>>,
|
||||
}
|
||||
|
||||
impl ContainerInner {
|
||||
pub(crate) fn new(agent: Arc<dyn Agent>, init_process: Process, logger: slog::Logger) -> Self {
|
||||
Self {
|
||||
agent,
|
||||
logger,
|
||||
init_process,
|
||||
exec_processes: HashMap::new(),
|
||||
rootfs: vec![],
|
||||
volumes: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
fn container_id(&self) -> &str {
|
||||
self.init_process.process.container_id()
|
||||
}
|
||||
|
||||
pub(crate) fn check_state(&self, states: Vec<ProcessStatus>) -> Result<()> {
|
||||
let state = self.init_process.status;
|
||||
if states.contains(&state) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(anyhow!(
|
||||
"failed to check state {:?} for {:?}",
|
||||
state,
|
||||
states
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) fn set_state(&mut self, state: ProcessStatus) {
|
||||
self.init_process.status = state;
|
||||
}
|
||||
|
||||
pub(crate) async fn start_exec_process(&mut self, process: &ContainerProcess) -> Result<()> {
|
||||
let exec = self
|
||||
.exec_processes
|
||||
.get_mut(&process.exec_id)
|
||||
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
|
||||
|
||||
self.agent
|
||||
.exec_process(agent::ExecProcessRequest {
|
||||
process_id: process.clone().into(),
|
||||
string_user: None,
|
||||
process: Some(exec.oci_process.clone()),
|
||||
})
|
||||
.await
|
||||
.map(|_| {
|
||||
exec.process.status = ProcessStatus::Running;
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn win_resize_process(
|
||||
&self,
|
||||
process: &ContainerProcess,
|
||||
height: u32,
|
||||
width: u32,
|
||||
) -> Result<()> {
|
||||
self.check_state(vec![ProcessStatus::Created, ProcessStatus::Running])
|
||||
.context("check state")?;
|
||||
|
||||
self.agent
|
||||
.tty_win_resize(agent::TtyWinResizeRequest {
|
||||
process_id: process.clone().into(),
|
||||
row: height,
|
||||
column: width,
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fetch_exit_watcher(&self, process: &ContainerProcess) -> Result<ProcessWatcher> {
|
||||
match process.process_type {
|
||||
ProcessType::Container => self.init_process.fetch_exit_watcher(),
|
||||
ProcessType::Exec => {
|
||||
let exec = self
|
||||
.exec_processes
|
||||
.get(&process.exec_id)
|
||||
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
|
||||
exec.process.fetch_exit_watcher()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn start_container(&mut self, cid: &ContainerID) -> Result<()> {
|
||||
self.check_state(vec![ProcessStatus::Created, ProcessStatus::Stopped])
|
||||
.context("check state")?;
|
||||
|
||||
self.agent
|
||||
.start_container(agent::ContainerID {
|
||||
container_id: cid.container_id.clone(),
|
||||
})
|
||||
.await
|
||||
.context("start container")?;
|
||||
|
||||
self.set_state(ProcessStatus::Running);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_exit_status(&self) -> Arc<RwLock<ProcessExitStatus>> {
|
||||
self.init_process.exit_status.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn add_exec_process(&mut self, id: &str, exec: Exec) -> Option<Exec> {
|
||||
self.exec_processes.insert(id.to_string(), exec)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_exec_process(&mut self, eid: &str) -> Result<()> {
|
||||
match self.exec_processes.remove(eid) {
|
||||
Some(_) => {
|
||||
debug!(self.logger, " delete process eid {}", eid);
|
||||
Ok(())
|
||||
}
|
||||
None => Err(anyhow!(
|
||||
"failed to find cid {} eid {}",
|
||||
self.container_id(),
|
||||
eid
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn cleanup_container(&mut self, cid: &str, force: bool) -> Result<()> {
|
||||
// wait until the container process
|
||||
// terminated and the status write lock released.
|
||||
info!(self.logger, "wait on container terminated");
|
||||
let exit_status = self.get_exit_status().await;
|
||||
let _locked_exit_status = exit_status.read().await;
|
||||
info!(self.logger, "container terminated");
|
||||
let timeout: u32 = 10;
|
||||
self.agent
|
||||
.remove_container(agent::RemoveContainerRequest::new(cid, timeout))
|
||||
.await
|
||||
.or_else(|e| {
|
||||
if force {
|
||||
warn!(
|
||||
self.logger,
|
||||
"stop container: agent remove container failed: {}", e
|
||||
);
|
||||
Ok(agent::Empty::new())
|
||||
} else {
|
||||
Err(e)
|
||||
}
|
||||
})?;
|
||||
|
||||
// close the exit channel to wakeup wait service
|
||||
// send to notify watchers who are waiting for the process exit
|
||||
self.init_process.stop();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn stop_process(
|
||||
&mut self,
|
||||
process: &ContainerProcess,
|
||||
force: bool,
|
||||
) -> Result<()> {
|
||||
let logger = logger_with_process(process);
|
||||
info!(logger, "begin to stop process");
|
||||
// do not stop again when state stopped, may cause multi cleanup resource
|
||||
self.check_state(vec![ProcessStatus::Running])
|
||||
.context("check state")?;
|
||||
|
||||
// if use force mode to stop container, stop always successful
|
||||
// send kill signal to container
|
||||
// ignore the error of sending signal, since the process would
|
||||
// have been killed and exited yet.
|
||||
self.signal_process(process, Signal::SIGKILL as u32, false)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
warn!(logger, "failed to signal kill. {:?}", e);
|
||||
})
|
||||
.ok();
|
||||
|
||||
match process.process_type {
|
||||
ProcessType::Container => self
|
||||
.cleanup_container(&process.container_id.container_id, force)
|
||||
.await
|
||||
.context("stop container")?,
|
||||
ProcessType::Exec => {
|
||||
let exec = self
|
||||
.exec_processes
|
||||
.get_mut(&process.exec_id)
|
||||
.ok_or_else(|| anyhow!("failed to find exec"))?;
|
||||
exec.process.stop();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn signal_process(
|
||||
&self,
|
||||
process: &ContainerProcess,
|
||||
signal: u32,
|
||||
all: bool,
|
||||
) -> Result<()> {
|
||||
let mut process_id: agent::ContainerProcessID = process.clone().into();
|
||||
if all {
|
||||
// force signal init process
|
||||
process_id.exec_id.clear();
|
||||
};
|
||||
|
||||
self.agent
|
||||
.signal_process(agent::SignalProcessRequest { process_id, signal })
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn new_container_io(&self, process: &ContainerProcess) -> Result<ContainerIo> {
|
||||
Ok(ContainerIo::new(self.agent.clone(), process.clone()))
|
||||
}
|
||||
|
||||
pub async fn close_io(&mut self, process: &ContainerProcess) -> Result<()> {
|
||||
match process.process_type {
|
||||
ProcessType::Container => self.init_process.close_io().await,
|
||||
ProcessType::Exec => {
|
||||
let exec = self
|
||||
.exec_processes
|
||||
.get_mut(&process.exec_id)
|
||||
.ok_or_else(|| Error::ProcessNotFound(process.clone()))?;
|
||||
exec.process.close_io().await;
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -0,0 +1,171 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
io,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use agent::Agent;
|
||||
use anyhow::Result;
|
||||
use common::types::ContainerProcess;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
|
||||
struct ContainerIoInfo {
|
||||
pub agent: Arc<dyn Agent>,
|
||||
pub process: ContainerProcess,
|
||||
}
|
||||
|
||||
pub struct ContainerIo {
|
||||
pub stdin: Box<dyn AsyncWrite + Send + Unpin>,
|
||||
pub stdout: Box<dyn AsyncRead + Send + Unpin>,
|
||||
pub stderr: Box<dyn AsyncRead + Send + Unpin>,
|
||||
}
|
||||
|
||||
impl ContainerIo {
|
||||
pub fn new(agent: Arc<dyn Agent>, process: ContainerProcess) -> Self {
|
||||
let info = Arc::new(ContainerIoInfo { agent, process });
|
||||
|
||||
Self {
|
||||
stdin: Box::new(ContainerIoWrite::new(info.clone())),
|
||||
stdout: Box::new(ContainerIoRead::new(info.clone(), true)),
|
||||
stderr: Box::new(ContainerIoRead::new(info, false)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ContainerIoWrite<'inner> {
|
||||
pub info: Arc<ContainerIoInfo>,
|
||||
write_future:
|
||||
Option<Pin<Box<dyn Future<Output = Result<agent::WriteStreamResponse>> + Send + 'inner>>>,
|
||||
}
|
||||
|
||||
impl<'inner> ContainerIoWrite<'inner> {
|
||||
pub fn new(info: Arc<ContainerIoInfo>) -> Self {
|
||||
Self {
|
||||
info,
|
||||
write_future: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_write_inner(
|
||||
&'inner mut self,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let mut write_future = self.write_future.take();
|
||||
if write_future.is_none() {
|
||||
let req = agent::WriteStreamRequest {
|
||||
process_id: self.info.process.clone().into(),
|
||||
data: buf.to_vec(),
|
||||
};
|
||||
write_future = Some(Box::pin(self.info.agent.write_stdin(req)));
|
||||
}
|
||||
|
||||
let mut write_future = write_future.unwrap();
|
||||
match write_future.as_mut().poll(cx) {
|
||||
Poll::Ready(v) => match v {
|
||||
Ok(resp) => Poll::Ready(Ok(resp.length as usize)),
|
||||
Err(err) => Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err))),
|
||||
},
|
||||
Poll::Pending => {
|
||||
self.write_future = Some(write_future);
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'inner> AsyncWrite for ContainerIoWrite<'inner> {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let me = unsafe {
|
||||
std::mem::transmute::<&mut ContainerIoWrite<'_>, &mut ContainerIoWrite<'inner>>(
|
||||
&mut *self,
|
||||
)
|
||||
};
|
||||
me.poll_write_inner(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
type ResultBuffer = Result<agent::ReadStreamResponse>;
|
||||
struct ContainerIoRead<'inner> {
|
||||
pub info: Arc<ContainerIoInfo>,
|
||||
is_stdout: bool,
|
||||
read_future: Option<Pin<Box<dyn Future<Output = ResultBuffer> + Send + 'inner>>>,
|
||||
}
|
||||
|
||||
impl<'inner> ContainerIoRead<'inner> {
|
||||
pub fn new(info: Arc<ContainerIoInfo>, is_stdout: bool) -> Self {
|
||||
Self {
|
||||
info,
|
||||
is_stdout,
|
||||
read_future: Default::default(),
|
||||
}
|
||||
}
|
||||
fn poll_read_inner(
|
||||
&'inner mut self,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
let mut read_future = self.read_future.take();
|
||||
if read_future.is_none() {
|
||||
let req = agent::ReadStreamRequest {
|
||||
process_id: self.info.process.clone().into(),
|
||||
len: buf.remaining() as u32,
|
||||
};
|
||||
read_future = if self.is_stdout {
|
||||
Some(Box::pin(self.info.agent.read_stdout(req)))
|
||||
} else {
|
||||
Some(Box::pin(self.info.agent.read_stderr(req)))
|
||||
};
|
||||
}
|
||||
|
||||
let mut read_future = read_future.unwrap();
|
||||
match read_future.as_mut().poll(cx) {
|
||||
Poll::Ready(v) => match v {
|
||||
Ok(resp) => {
|
||||
buf.put_slice(&resp.data);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Err(err) => Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, err))),
|
||||
},
|
||||
Poll::Pending => {
|
||||
self.read_future = Some(read_future);
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'inner> AsyncRead for ContainerIoRead<'inner> {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
let me = unsafe {
|
||||
std::mem::transmute::<&mut ContainerIoRead<'_>, &mut ContainerIoRead<'inner>>(
|
||||
&mut *self,
|
||||
)
|
||||
};
|
||||
me.poll_read_inner(cx, buf)
|
||||
}
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
mod container_io;
|
||||
pub use container_io::ContainerIo;
|
||||
mod shim_io;
|
||||
pub use shim_io::ShimIo;
|
@ -0,0 +1,135 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::{
|
||||
io,
|
||||
os::unix::{io::FromRawFd, net::UnixStream as StdUnixStream},
|
||||
pin::Pin,
|
||||
task::Context as TaskContext,
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use nix::{
|
||||
fcntl::{self, OFlag},
|
||||
sys::stat::Mode,
|
||||
};
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{AsyncRead, AsyncWrite},
|
||||
net::UnixStream as AsyncUnixStream,
|
||||
};
|
||||
use url::Url;
|
||||
|
||||
fn open_fifo(path: &str) -> Result<AsyncUnixStream> {
|
||||
let fd = fcntl::open(path, OFlag::O_RDWR, Mode::from_bits(0).unwrap())?;
|
||||
|
||||
let std_stream = unsafe { StdUnixStream::from_raw_fd(fd) };
|
||||
std_stream
|
||||
.set_nonblocking(true)
|
||||
.context("set nonblocking")?;
|
||||
|
||||
AsyncUnixStream::from_std(std_stream).map_err(|e| anyhow!(e))
|
||||
}
|
||||
|
||||
pub struct ShimIo {
|
||||
pub stdin: Option<Box<dyn AsyncRead + Send + Unpin>>,
|
||||
pub stdout: Option<Box<dyn AsyncWrite + Send + Unpin>>,
|
||||
pub stderr: Option<Box<dyn AsyncWrite + Send + Unpin>>,
|
||||
}
|
||||
|
||||
impl ShimIo {
|
||||
pub async fn new(
|
||||
stdin: &Option<String>,
|
||||
stdout: &Option<String>,
|
||||
stderr: &Option<String>,
|
||||
) -> Result<Self> {
|
||||
let stdin_fd: Option<Box<dyn AsyncRead + Send + Unpin>> = if let Some(stdin) = stdin {
|
||||
match File::open(&stdin).await {
|
||||
Ok(file) => Some(Box::new(file)),
|
||||
Err(err) => {
|
||||
error!(sl!(), "failed to open {} error {:?}", &stdin, err);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let get_url = |url: &Option<String>| -> Option<Url> {
|
||||
match url {
|
||||
None => None,
|
||||
Some(out) => match Url::parse(out.as_str()) {
|
||||
Err(url::ParseError::RelativeUrlWithoutBase) => {
|
||||
let out = "fifo://".to_owned() + out.as_str();
|
||||
let u = Url::parse(out.as_str()).unwrap();
|
||||
Some(u)
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(sl!(), "unable to parse stdout uri: {}", err);
|
||||
None
|
||||
}
|
||||
Ok(u) => Some(u),
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
let stdout_url = get_url(stdout);
|
||||
let get_fd = |url: &Option<Url>| -> Option<Box<dyn AsyncWrite + Send + Unpin>> {
|
||||
if let Some(url) = url {
|
||||
if url.scheme() == "fifo" {
|
||||
let path = url.path();
|
||||
match open_fifo(path) {
|
||||
Ok(s) => {
|
||||
return Some(Box::new(ShimIoWrite::Stream(s)));
|
||||
}
|
||||
Err(err) => {
|
||||
error!(sl!(), "failed to open file {} error {:?}", url.path(), err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
};
|
||||
|
||||
let stderr_url = get_url(stderr);
|
||||
Ok(Self {
|
||||
stdin: stdin_fd,
|
||||
stdout: get_fd(&stdout_url),
|
||||
stderr: get_fd(&stderr_url),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ShimIoWrite {
|
||||
Stream(AsyncUnixStream),
|
||||
// TODO: support other type
|
||||
}
|
||||
|
||||
impl AsyncWrite for ShimIoWrite {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut TaskContext<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
match *self {
|
||||
ShimIoWrite::Stream(ref mut s) => Pin::new(s).poll_write(cx, buf),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<io::Result<()>> {
|
||||
match *self {
|
||||
ShimIoWrite::Stream(ref mut s) => Pin::new(s).poll_flush(cx),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<io::Result<()>> {
|
||||
match *self {
|
||||
ShimIoWrite::Stream(ref mut s) => Pin::new(s).poll_shutdown(cx),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,274 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use agent::Agent;
|
||||
use async_trait::async_trait;
|
||||
use common::{
|
||||
error::Error,
|
||||
types::{
|
||||
ContainerConfig, ContainerID, ContainerProcess, ExecProcessRequest, KillRequest,
|
||||
ProcessExitStatus, ProcessStateInfo, ProcessType, ResizePTYRequest, ShutdownRequest,
|
||||
StatsInfo, UpdateRequest, PID,
|
||||
},
|
||||
ContainerManager,
|
||||
};
|
||||
use oci::Process as OCIProcess;
|
||||
use resource::ResourceManager;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{logger_with_process, Container};
|
||||
|
||||
unsafe impl Send for VirtContainerManager {}
|
||||
unsafe impl Sync for VirtContainerManager {}
|
||||
pub struct VirtContainerManager {
|
||||
sid: String,
|
||||
pid: u32,
|
||||
containers: Arc<RwLock<HashMap<String, Container>>>,
|
||||
resource_manager: Arc<ResourceManager>,
|
||||
agent: Arc<dyn Agent>,
|
||||
}
|
||||
|
||||
impl VirtContainerManager {
|
||||
pub fn new(
|
||||
sid: &str,
|
||||
pid: u32,
|
||||
agent: Arc<dyn Agent>,
|
||||
resource_manager: Arc<ResourceManager>,
|
||||
) -> Self {
|
||||
Self {
|
||||
sid: sid.to_string(),
|
||||
pid,
|
||||
containers: Default::default(),
|
||||
resource_manager,
|
||||
agent,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ContainerManager for VirtContainerManager {
|
||||
async fn create_container(&self, config: ContainerConfig, spec: oci::Spec) -> Result<PID> {
|
||||
let container = Container::new(
|
||||
self.pid,
|
||||
config,
|
||||
self.agent.clone(),
|
||||
self.resource_manager.clone(),
|
||||
)
|
||||
.context("new container")?;
|
||||
|
||||
let mut containers = self.containers.write().await;
|
||||
container.create(spec).await.context("create")?;
|
||||
containers.insert(container.container_id.to_string(), container);
|
||||
|
||||
Ok(PID { pid: self.pid })
|
||||
}
|
||||
|
||||
async fn close_process_io(&self, process: &ContainerProcess) -> Result<()> {
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &process.container_id.to_string();
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
|
||||
|
||||
c.close_io(process).await.context("close io")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_process(&self, process: &ContainerProcess) -> Result<ProcessStateInfo> {
|
||||
let container_id = &process.container_id.container_id;
|
||||
match process.process_type {
|
||||
ProcessType::Container => {
|
||||
let mut containers = self.containers.write().await;
|
||||
let c = containers
|
||||
.remove(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.to_string()))?;
|
||||
c.state_process(process).await.context("state process")
|
||||
}
|
||||
ProcessType::Exec => {
|
||||
let containers = self.containers.read().await;
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.to_string()))?;
|
||||
c.delete_exec_process(process)
|
||||
.await
|
||||
.context("delete process")?;
|
||||
c.state_process(process).await.context("state process")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn exec_process(&self, req: ExecProcessRequest) -> Result<()> {
|
||||
if req.spec_type_url.is_empty() {
|
||||
return Err(anyhow!("invalid type url"));
|
||||
}
|
||||
let oci_process: OCIProcess =
|
||||
serde_json::from_slice(&req.spec_value).context("serde from slice")?;
|
||||
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &req.process.container_id.container_id;
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
|
||||
c.exec_process(
|
||||
&req.process,
|
||||
req.stdin,
|
||||
req.stdout,
|
||||
req.stderr,
|
||||
req.terminal,
|
||||
oci_process,
|
||||
)
|
||||
.await
|
||||
.context("exec")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn kill_process(&self, req: &KillRequest) -> Result<()> {
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &req.process.container_id.container_id;
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
|
||||
c.kill_process(&req.process, req.signal, req.all)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
warn!(
|
||||
sl!(),
|
||||
"failed to signal process {:?} {:?}", &req.process, err
|
||||
);
|
||||
err
|
||||
})
|
||||
.ok();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_process(&self, process: &ContainerProcess) -> Result<ProcessExitStatus> {
|
||||
let logger = logger_with_process(process);
|
||||
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &process.container_id.container_id;
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
|
||||
let (watcher, status) = c.wait_process(process).await.context("wait")?;
|
||||
drop(containers);
|
||||
|
||||
match watcher {
|
||||
Some(mut watcher) => {
|
||||
info!(logger, "begin wait exit");
|
||||
while watcher.changed().await.is_ok() {}
|
||||
info!(logger, "end wait exited");
|
||||
}
|
||||
None => {
|
||||
warn!(logger, "failed to find watcher for wait process");
|
||||
}
|
||||
}
|
||||
|
||||
let status = status.read().await;
|
||||
|
||||
info!(logger, "wait process exit status {:?}", status);
|
||||
|
||||
// stop process
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &process.container_id.container_id;
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
|
||||
c.stop_process(process).await.context("stop container")?;
|
||||
Ok(status.clone())
|
||||
}
|
||||
|
||||
async fn start_process(&self, process: &ContainerProcess) -> Result<PID> {
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &process.container_id.container_id;
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
|
||||
c.start(process).await.context("start")?;
|
||||
Ok(PID { pid: self.pid })
|
||||
}
|
||||
|
||||
async fn state_process(&self, process: &ContainerProcess) -> Result<ProcessStateInfo> {
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &process.container_id.container_id;
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.clone()))?;
|
||||
let state = c.state_process(process).await.context("state process")?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
async fn pause_container(&self, id: &ContainerID) -> Result<()> {
|
||||
let containers = self.containers.read().await;
|
||||
let c = containers
|
||||
.get(&id.container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(id.container_id.clone()))?;
|
||||
c.pause().await.context("pause")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn resume_container(&self, id: &ContainerID) -> Result<()> {
|
||||
let containers = self.containers.read().await;
|
||||
let c = containers
|
||||
.get(&id.container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(id.container_id.clone()))?;
|
||||
c.resume().await.context("resume")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn resize_process_pty(&self, req: &ResizePTYRequest) -> Result<()> {
|
||||
let containers = self.containers.read().await;
|
||||
let c = containers
|
||||
.get(&req.process.container_id.container_id)
|
||||
.ok_or_else(|| {
|
||||
Error::ContainerNotFound(req.process.container_id.container_id.clone())
|
||||
})?;
|
||||
c.resize_pty(&req.process, req.width, req.height)
|
||||
.await
|
||||
.context("resize pty")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stats_container(&self, id: &ContainerID) -> Result<StatsInfo> {
|
||||
let containers = self.containers.read().await;
|
||||
let c = containers
|
||||
.get(&id.container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(id.container_id.clone()))?;
|
||||
let stats = c.stats().await.context("stats")?;
|
||||
Ok(StatsInfo::from(stats))
|
||||
}
|
||||
|
||||
async fn update_container(&self, req: UpdateRequest) -> Result<()> {
|
||||
let resource = serde_json::from_slice::<oci::LinuxResources>(&req.value)
|
||||
.context("deserialize LinuxResource")?;
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &req.container_id;
|
||||
let c = containers
|
||||
.get(container_id)
|
||||
.ok_or_else(|| Error::ContainerNotFound(container_id.to_string()))?;
|
||||
c.update(&resource).await.context("stats")
|
||||
}
|
||||
|
||||
async fn pid(&self) -> Result<PID> {
|
||||
Ok(PID { pid: self.pid })
|
||||
}
|
||||
|
||||
async fn connect_container(&self, _id: &ContainerID) -> Result<PID> {
|
||||
Ok(PID { pid: self.pid })
|
||||
}
|
||||
|
||||
async fn need_shutdown_sandbox(&self, req: &ShutdownRequest) -> bool {
|
||||
req.is_now || self.containers.read().await.is_empty() || self.sid == req.container_id
|
||||
}
|
||||
|
||||
async fn is_sandbox_container(&self, process: &ContainerProcess) -> bool {
|
||||
process.process_type == ProcessType::Container
|
||||
&& process.container_id.container_id == self.sid
|
||||
}
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
mod container;
|
||||
use container::{Container, Exec};
|
||||
mod container_inner;
|
||||
mod io;
|
||||
use container_inner::ContainerInner;
|
||||
mod manager;
|
||||
pub use manager::VirtContainerManager;
|
||||
mod process;
|
||||
|
||||
use common::types::ContainerProcess;
|
||||
|
||||
fn logger_with_process(container_process: &ContainerProcess) -> slog::Logger {
|
||||
sl!().new(o!("container_id" => container_process.container_id.container_id.clone(), "exec_id" => container_process.exec_id.clone()))
|
||||
}
|
@ -0,0 +1,211 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use agent::Agent;
|
||||
use anyhow::{Context, Result};
|
||||
use awaitgroup::{WaitGroup, Worker as WaitGroupWorker};
|
||||
use common::types::{ContainerProcess, ProcessExitStatus, ProcessStateInfo, ProcessStatus, PID};
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite},
|
||||
sync::{watch, RwLock},
|
||||
};
|
||||
|
||||
use super::{
|
||||
io::{ContainerIo, ShimIo},
|
||||
logger_with_process,
|
||||
};
|
||||
|
||||
pub type ProcessWatcher = (
|
||||
Option<watch::Receiver<bool>>,
|
||||
Arc<RwLock<ProcessExitStatus>>,
|
||||
);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Process {
|
||||
pub process: ContainerProcess,
|
||||
pub pid: u32,
|
||||
logger: slog::Logger,
|
||||
pub bundle: String,
|
||||
|
||||
pub stdin: Option<String>,
|
||||
pub stdout: Option<String>,
|
||||
pub stderr: Option<String>,
|
||||
pub terminal: bool,
|
||||
|
||||
pub height: u32,
|
||||
pub width: u32,
|
||||
pub status: ProcessStatus,
|
||||
|
||||
pub exit_status: Arc<RwLock<ProcessExitStatus>>,
|
||||
pub exit_watcher_rx: Option<watch::Receiver<bool>>,
|
||||
pub exit_watcher_tx: Option<watch::Sender<bool>>,
|
||||
// used to sync between stdin io copy thread(tokio) and the close it call.
|
||||
// close io call should wait until the stdin io copy finished to
|
||||
// prevent stdin data lost.
|
||||
pub wg_stdin: WaitGroup,
|
||||
}
|
||||
|
||||
impl Process {
|
||||
pub fn new(
|
||||
process: &ContainerProcess,
|
||||
pid: u32,
|
||||
bundle: &str,
|
||||
stdin: Option<String>,
|
||||
stdout: Option<String>,
|
||||
stderr: Option<String>,
|
||||
terminal: bool,
|
||||
) -> Process {
|
||||
let (sender, receiver) = watch::channel(false);
|
||||
|
||||
Process {
|
||||
process: process.clone(),
|
||||
pid,
|
||||
logger: logger_with_process(process),
|
||||
bundle: bundle.to_string(),
|
||||
stdin,
|
||||
stdout,
|
||||
stderr,
|
||||
terminal,
|
||||
height: 0,
|
||||
width: 0,
|
||||
status: ProcessStatus::Created,
|
||||
exit_status: Arc::new(RwLock::new(ProcessExitStatus::new())),
|
||||
exit_watcher_rx: Some(receiver),
|
||||
exit_watcher_tx: Some(sender),
|
||||
wg_stdin: WaitGroup::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_io_and_wait(
|
||||
&mut self,
|
||||
agent: Arc<dyn Agent>,
|
||||
container_io: ContainerIo,
|
||||
) -> Result<()> {
|
||||
info!(self.logger, "start io and wait");
|
||||
|
||||
// new shim io
|
||||
let shim_io = ShimIo::new(&self.stdin, &self.stdout, &self.stderr)
|
||||
.await
|
||||
.context("new shim io")?;
|
||||
|
||||
// start io copy for stdin
|
||||
let wgw_stdin = self.wg_stdin.worker();
|
||||
if let Some(stdin) = shim_io.stdin {
|
||||
self.run_io_copy("stdin", wgw_stdin, stdin, container_io.stdin)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// prepare for wait group for stdout, stderr
|
||||
let wg = WaitGroup::new();
|
||||
let wgw = wg.worker();
|
||||
|
||||
// start io copy for stdout
|
||||
if let Some(stdout) = shim_io.stdout {
|
||||
self.run_io_copy("stdout", wgw.clone(), container_io.stdout, stdout)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// start io copy for stderr
|
||||
if !self.terminal {
|
||||
if let Some(stderr) = shim_io.stderr {
|
||||
self.run_io_copy("stderr", wgw, container_io.stderr, stderr)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
self.run_io_wait(agent, wg).await.context("run io thread")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_io_copy<'a>(
|
||||
&'a self,
|
||||
io_name: &'a str,
|
||||
wgw: WaitGroupWorker,
|
||||
mut reader: Box<dyn AsyncRead + Send + Unpin>,
|
||||
mut writer: Box<dyn AsyncWrite + Send + Unpin>,
|
||||
) -> Result<()> {
|
||||
let io_name = io_name.to_string();
|
||||
let logger = self.logger.new(o!("io name" => io_name));
|
||||
let _ = tokio::spawn(async move {
|
||||
match tokio::io::copy(&mut reader, &mut writer).await {
|
||||
Err(e) => warn!(logger, "io: failed to copy stdin stream {}", e),
|
||||
Ok(length) => warn!(logger, "io: stop to copy stdin stream length {}", length),
|
||||
};
|
||||
|
||||
wgw.done();
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_io_wait(&mut self, agent: Arc<dyn Agent>, mut wg: WaitGroup) -> Result<()> {
|
||||
let logger = self.logger.clone();
|
||||
info!(logger, "start run io wait");
|
||||
let process = self.process.clone();
|
||||
let status = self.exit_status.clone();
|
||||
let exit_notifier = self.exit_watcher_tx.take();
|
||||
|
||||
let _ = tokio::spawn(async move {
|
||||
//wait on all of the container's io stream terminated
|
||||
info!(logger, "begin wait group io",);
|
||||
wg.wait().await;
|
||||
info!(logger, "end wait group for io");
|
||||
|
||||
let req = agent::WaitProcessRequest {
|
||||
process_id: process.clone().into(),
|
||||
};
|
||||
|
||||
info!(logger, "begin wait process");
|
||||
let resp = match agent.wait_process(req).await {
|
||||
Ok(ret) => ret,
|
||||
Err(e) => {
|
||||
error!(logger, "failed to wait process {:?}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
info!(logger, "end wait process exit code {}", resp.status);
|
||||
|
||||
let mut locked_status = status.write().await;
|
||||
locked_status.update_exit_code(resp.status);
|
||||
|
||||
drop(exit_notifier);
|
||||
info!(logger, "end io wait thread");
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fetch_exit_watcher(&self) -> Result<ProcessWatcher> {
|
||||
Ok((self.exit_watcher_rx.clone(), self.exit_status.clone()))
|
||||
}
|
||||
|
||||
pub async fn state(&self) -> Result<ProcessStateInfo> {
|
||||
let exit_status = self.exit_status.read().await;
|
||||
Ok(ProcessStateInfo {
|
||||
container_id: self.process.container_id.container_id.clone(),
|
||||
exec_id: self.process.exec_id.clone(),
|
||||
pid: PID { pid: self.pid },
|
||||
bundle: self.bundle.clone(),
|
||||
stdin: self.stdin.clone(),
|
||||
stdout: self.stdout.clone(),
|
||||
stderr: self.stderr.clone(),
|
||||
terminal: self.terminal,
|
||||
status: self.status,
|
||||
exit_status: exit_status.exit_code,
|
||||
exited_at: exit_status.exit_time,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn stop(&mut self) {
|
||||
self.status = ProcessStatus::Stopped;
|
||||
}
|
||||
|
||||
pub async fn close_io(&mut self) {
|
||||
self.wg_stdin.wait().await;
|
||||
}
|
||||
}
|
@ -0,0 +1,123 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use agent::Agent;
|
||||
use anyhow::Context;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
|
||||
/// monitor check interval 30s
|
||||
const HEALTH_CHECK_TIMER_INTERVAL: u64 = 30;
|
||||
|
||||
/// version check threshold 5min
|
||||
const VERSION_CHECK_THRESHOLD: u64 = 5 * 60 / HEALTH_CHECK_TIMER_INTERVAL;
|
||||
|
||||
/// health check stop channel buffer size
|
||||
const HEALTH_CHECK_STOP_CHANNEL_BUFFER_SIZE: usize = 1;
|
||||
|
||||
pub struct HealthCheck {
|
||||
pub keep_alive: bool,
|
||||
keep_vm: bool,
|
||||
stop_tx: mpsc::Sender<()>,
|
||||
stop_rx: Arc<Mutex<mpsc::Receiver<()>>>,
|
||||
}
|
||||
|
||||
impl HealthCheck {
|
||||
pub fn new(keep_alive: bool, keep_vm: bool) -> HealthCheck {
|
||||
let (tx, rx) = mpsc::channel(HEALTH_CHECK_STOP_CHANNEL_BUFFER_SIZE);
|
||||
HealthCheck {
|
||||
keep_alive,
|
||||
keep_vm,
|
||||
stop_tx: tx,
|
||||
stop_rx: Arc::new(Mutex::new(rx)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(&self, id: &str, agent: Arc<dyn Agent>) {
|
||||
if !self.keep_alive {
|
||||
return;
|
||||
}
|
||||
let id = id.to_string();
|
||||
|
||||
info!(sl!(), "start runtime keep alive");
|
||||
|
||||
let stop_rx = self.stop_rx.clone();
|
||||
let keep_vm = self.keep_vm;
|
||||
let _ = tokio::spawn(async move {
|
||||
let mut version_check_threshold_count = 0;
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(HEALTH_CHECK_TIMER_INTERVAL))
|
||||
.await;
|
||||
let mut stop_rx = stop_rx.lock().await;
|
||||
match stop_rx.try_recv() {
|
||||
Ok(_) => {
|
||||
info!(sl!(), "revive stop {} monitor signal", id);
|
||||
break;
|
||||
}
|
||||
|
||||
Err(mpsc::error::TryRecvError::Empty) => {
|
||||
// check agent
|
||||
match agent
|
||||
.check(agent::CheckRequest::new(""))
|
||||
.await
|
||||
.context("check health")
|
||||
{
|
||||
Ok(_) => {
|
||||
debug!(sl!(), "check {} agent health successfully", id);
|
||||
version_check_threshold_count += 1;
|
||||
if version_check_threshold_count >= VERSION_CHECK_THRESHOLD {
|
||||
// need to check version
|
||||
version_check_threshold_count = 0;
|
||||
if let Ok(v) = agent
|
||||
.version(agent::CheckRequest::new(""))
|
||||
.await
|
||||
.context("check version")
|
||||
{
|
||||
info!(sl!(), "agent {}", v.agent_version)
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(sl!(), "failed to do {} agent health check: {}", id, e);
|
||||
if let Err(mpsc::error::TryRecvError::Empty) = stop_rx.try_recv() {
|
||||
error!(sl!(), "failed to receive stop monitor signal");
|
||||
if !keep_vm {
|
||||
::std::process::exit(1);
|
||||
}
|
||||
} else {
|
||||
info!(sl!(), "wait to exit exit {}", id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(mpsc::error::TryRecvError::Disconnected) => {
|
||||
warn!(sl!(), "{} monitor channel has broken", id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn stop(&self) {
|
||||
if !self.keep_alive {
|
||||
return;
|
||||
}
|
||||
info!(sl!(), "stop runtime keep alive");
|
||||
self.stop_tx
|
||||
.send(())
|
||||
.await
|
||||
.map_err(|e| {
|
||||
warn!(sl!(), "failed send monitor channel. {:?}", e);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
@ -3,12 +3,25 @@
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
#[macro_use]
|
||||
extern crate slog;
|
||||
|
||||
logging::logger_with_subsystem!(sl, "virt-container");
|
||||
|
||||
mod container_manager;
|
||||
pub mod health_check;
|
||||
pub mod sandbox;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use agent::kata::KataAgent;
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use common::{message::Message, RuntimeHandler, RuntimeInstance};
|
||||
use kata_types::config::{hypervisor::register_hypervisor_plugin, DragonballConfig};
|
||||
use hypervisor::Hypervisor;
|
||||
use kata_types::config::{hypervisor::register_hypervisor_plugin, DragonballConfig, TomlConfig};
|
||||
use resource::ResourceManager;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
unsafe impl Send for VirtContainer {}
|
||||
@ -34,10 +47,55 @@ impl RuntimeHandler for VirtContainer {
|
||||
|
||||
async fn new_instance(
|
||||
&self,
|
||||
_sid: &str,
|
||||
_msg_sender: Sender<Message>,
|
||||
sid: &str,
|
||||
msg_sender: Sender<Message>,
|
||||
) -> Result<RuntimeInstance> {
|
||||
todo!()
|
||||
let (toml_config, _) = TomlConfig::load_from_file("").context("load config")?;
|
||||
|
||||
// TODO: new sandbox and container manager
|
||||
// TODO: get from hypervisor
|
||||
let hypervisor = new_hypervisor(&toml_config)
|
||||
.await
|
||||
.context("new hypervisor")?;
|
||||
|
||||
// get uds from hypervisor and get config from toml_config
|
||||
let agent = Arc::new(KataAgent::new(kata_types::config::Agent {
|
||||
debug: true,
|
||||
enable_tracing: false,
|
||||
server_port: 1024,
|
||||
log_port: 1025,
|
||||
dial_timeout_ms: 10,
|
||||
reconnect_timeout_ms: 3_000,
|
||||
request_timeout_ms: 30_000,
|
||||
health_check_request_timeout_ms: 90_000,
|
||||
kernel_modules: Default::default(),
|
||||
container_pipe_size: 0,
|
||||
debug_console_enabled: false,
|
||||
}));
|
||||
|
||||
let resource_manager = Arc::new(ResourceManager::new(
|
||||
sid,
|
||||
agent.clone(),
|
||||
hypervisor.clone(),
|
||||
&toml_config,
|
||||
)?);
|
||||
let pid = std::process::id();
|
||||
|
||||
let sandbox = sandbox::VirtSandbox::new(
|
||||
sid,
|
||||
msg_sender,
|
||||
agent.clone(),
|
||||
hypervisor,
|
||||
resource_manager.clone(),
|
||||
)
|
||||
.await
|
||||
.context("new virt sandbox")?;
|
||||
let container_manager =
|
||||
container_manager::VirtContainerManager::new(sid, pid, agent, resource_manager);
|
||||
Ok(RuntimeInstance {
|
||||
sandbox: Arc::new(sandbox),
|
||||
container_manager: Arc::new(container_manager),
|
||||
})
|
||||
}
|
||||
|
||||
fn cleanup(&self, _id: &str) -> Result<()> {
|
||||
@ -45,3 +103,8 @@ impl RuntimeHandler for VirtContainer {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn new_hypervisor(_toml_config: &TomlConfig) -> Result<Arc<dyn Hypervisor>> {
|
||||
// TODO: implement ready hypervisor
|
||||
todo!()
|
||||
}
|
||||
|
229
src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs
Normal file
229
src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs
Normal file
@ -0,0 +1,229 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use agent::{self, Agent};
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use common::{
|
||||
message::{Action, Message},
|
||||
Sandbox,
|
||||
};
|
||||
use containerd_shim_protos::events::task::TaskOOM;
|
||||
use hypervisor::Hypervisor;
|
||||
use kata_types::config::TomlConfig;
|
||||
use resource::{ResourceConfig, ResourceManager};
|
||||
use tokio::sync::{mpsc::Sender, Mutex, RwLock};
|
||||
|
||||
use crate::health_check::HealthCheck;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Debug)]
|
||||
pub enum SandboxState {
|
||||
Init,
|
||||
Running,
|
||||
Stopped,
|
||||
}
|
||||
|
||||
struct SandboxInner {
|
||||
state: SandboxState,
|
||||
}
|
||||
|
||||
impl SandboxInner {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
state: SandboxState::Init,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for VirtSandbox {}
|
||||
unsafe impl Sync for VirtSandbox {}
|
||||
#[derive(Clone)]
|
||||
pub struct VirtSandbox {
|
||||
sid: String,
|
||||
msg_sender: Arc<Mutex<Sender<Message>>>,
|
||||
inner: Arc<RwLock<SandboxInner>>,
|
||||
resource_manager: Arc<ResourceManager>,
|
||||
agent: Arc<dyn Agent>,
|
||||
hypervisor: Arc<dyn Hypervisor>,
|
||||
monitor: Arc<HealthCheck>,
|
||||
}
|
||||
|
||||
impl VirtSandbox {
|
||||
pub async fn new(
|
||||
sid: &str,
|
||||
msg_sender: Sender<Message>,
|
||||
agent: Arc<dyn Agent>,
|
||||
hypervisor: Arc<dyn Hypervisor>,
|
||||
resource_manager: Arc<ResourceManager>,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
sid: sid.to_string(),
|
||||
msg_sender: Arc::new(Mutex::new(msg_sender)),
|
||||
inner: Arc::new(RwLock::new(SandboxInner::new())),
|
||||
agent,
|
||||
hypervisor,
|
||||
resource_manager,
|
||||
monitor: Arc::new(HealthCheck::new(true, true)),
|
||||
})
|
||||
}
|
||||
|
||||
async fn prepare_for_start_sandbox(
|
||||
&self,
|
||||
netns: Option<String>,
|
||||
_config: &TomlConfig,
|
||||
) -> Result<Vec<ResourceConfig>> {
|
||||
let mut resource_configs = vec![];
|
||||
|
||||
if let Some(_netns_path) = netns {
|
||||
// TODO: support network
|
||||
}
|
||||
|
||||
let hypervisor_config = self.hypervisor.hypervisor_config().await;
|
||||
let virtio_fs_config = ResourceConfig::ShareFs(hypervisor_config.shared_fs);
|
||||
resource_configs.push(virtio_fs_config);
|
||||
|
||||
Ok(resource_configs)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Sandbox for VirtSandbox {
|
||||
async fn start(&self, netns: Option<String>, config: &TomlConfig) -> Result<()> {
|
||||
let id = &self.sid;
|
||||
|
||||
// if sandbox running, return
|
||||
// if sandbox not running try to start sandbox
|
||||
let mut inner = self.inner.write().await;
|
||||
if inner.state == SandboxState::Running {
|
||||
warn!(sl!(), "sandbox is running, no need to start");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.hypervisor
|
||||
.prepare_vm(id, netns.clone())
|
||||
.await
|
||||
.context("prepare vm")?;
|
||||
|
||||
// generate device and setup before start vm
|
||||
// should after hypervisor.prepare_vm
|
||||
let resources = self.prepare_for_start_sandbox(netns, config).await?;
|
||||
self.resource_manager
|
||||
.prepare_before_start_vm(resources)
|
||||
.await
|
||||
.context("set up device before start vm")?;
|
||||
|
||||
// start vm
|
||||
self.hypervisor.start_vm(10_000).await.context("start vm")?;
|
||||
info!(sl!(), "start vm");
|
||||
|
||||
// connect agent
|
||||
// set agent socket
|
||||
let address = self
|
||||
.hypervisor
|
||||
.get_agent_socket()
|
||||
.await
|
||||
.context("get agent socket")?;
|
||||
self.agent.start(&address).await.context("connect")?;
|
||||
|
||||
self.resource_manager
|
||||
.setup_after_start_vm()
|
||||
.await
|
||||
.context("setup device after start vm")?;
|
||||
|
||||
// create sandbox in vm
|
||||
let req = agent::CreateSandboxRequest {
|
||||
hostname: "".to_string(),
|
||||
dns: vec![],
|
||||
storages: self
|
||||
.resource_manager
|
||||
.get_storage_for_sandbox()
|
||||
.await
|
||||
.context("get storages for sandbox")?,
|
||||
sandbox_pidns: false,
|
||||
sandbox_id: id.to_string(),
|
||||
guest_hook_path: "".to_string(),
|
||||
kernel_modules: vec![],
|
||||
};
|
||||
|
||||
self.agent
|
||||
.create_sandbox(req)
|
||||
.await
|
||||
.context("create sandbox")?;
|
||||
|
||||
inner.state = SandboxState::Running;
|
||||
let agent = self.agent.clone();
|
||||
let sender = self.msg_sender.clone();
|
||||
info!(sl!(), "oom watcher start");
|
||||
let _ = tokio::spawn(async move {
|
||||
loop {
|
||||
match agent
|
||||
.get_oom_event(agent::Empty::new())
|
||||
.await
|
||||
.context("get oom event")
|
||||
{
|
||||
Ok(resp) => {
|
||||
let cid = &resp.container_id;
|
||||
warn!(sl!(), "send oom event for container {}", &cid);
|
||||
let event = TaskOOM {
|
||||
container_id: cid.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
let msg = Message::new(Action::Event(Arc::new(event)));
|
||||
let lock_sender = sender.lock().await;
|
||||
if let Err(err) = lock_sender.send(msg).await.context("send event") {
|
||||
error!(
|
||||
sl!(),
|
||||
"failed to send oom event for {} error {:?}", cid, err
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(sl!(), "failed to get oom event error {:?}", err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
self.monitor.start(id, self.agent.clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop(&self) -> Result<()> {
|
||||
info!(sl!(), "begin stop sandbox");
|
||||
// TODO: stop sandbox
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn shutdown(&self) -> Result<()> {
|
||||
info!(sl!(), "shutdown");
|
||||
|
||||
self.resource_manager
|
||||
.delete_cgroups()
|
||||
.await
|
||||
.context("delete cgroups")?;
|
||||
|
||||
info!(sl!(), "stop monitor");
|
||||
self.monitor.stop().await;
|
||||
|
||||
info!(sl!(), "stop agent");
|
||||
self.agent.stop().await;
|
||||
|
||||
// stop server
|
||||
info!(sl!(), "send shutdown message");
|
||||
let msg = Message::new(Action::Shutdown);
|
||||
let sender = self.msg_sender.clone();
|
||||
let sender = sender.lock().await;
|
||||
sender.send(msg).await.context("send shutdown msg")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn cleanup(&self, _id: &str) -> Result<()> {
|
||||
// TODO: cleanup
|
||||
Ok(())
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user