Merge pull request #303 from lifupan/pause_resume

container pause/resume support
This commit is contained in:
Yang Bo
2020-06-11 20:28:41 +08:00
committed by GitHub
2 changed files with 180 additions and 23 deletions

View File

@@ -7,6 +7,7 @@ use lazy_static;
use oci::{Hook, Linux, LinuxNamespace, LinuxResources, POSIXRlimit, Spec};
use serde_json;
use std::ffi::{CStr, CString};
use std::fmt;
use std::fs;
use std::os::unix::io::RawFd;
use std::path::{Path, PathBuf};
@@ -66,20 +67,55 @@ const CWFD_FD: &str = "CWFD_FD";
const CLOG_FD: &str = "CLOG_FD";
const FIFO_FD: &str = "FIFO_FD";
type Status = Option<String>;
#[derive(PartialEq, Clone, Copy)]
pub enum Status {
CREATED,
RUNNING,
STOPPED,
PAUSED,
}
#[derive(Debug)]
pub struct ContainerStatus {
pre_status: Status,
cur_status: Status,
}
impl ContainerStatus {
fn new() -> Self {
ContainerStatus {
pre_status: Status::CREATED,
cur_status: Status::CREATED,
}
}
fn status(&self) -> Status {
self.cur_status
}
fn pre_status(&self) -> Status {
self.pre_status
}
fn transition(&mut self, to: Status) {
self.pre_status = self.status();
self.cur_status = to;
}
}
pub type Config = CreateOpts;
type NamespaceType = String;
/*
impl Status {
fn to_string(&self) -> String {
match *self {
Some(ref v) => v.to_string(),
None => "Unknown Status".to_string(),
impl fmt::Debug for Status {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Status::CREATED => write!(f, "{:?}", "created"),
Status::RUNNING => write!(f, "{:?}", "running"),
Status::STOPPED => write!(f, "{:?}", "stopped"),
Status::PAUSED => write!(f, "{:?}", "paused"),
}
}
}
*/
lazy_static! {
static ref NAMESPACES: HashMap<&'static str, CloneFlags> = {
@@ -184,7 +220,7 @@ pub struct BaseState {
pub trait BaseContainer {
fn id(&self) -> String;
fn status(&self) -> Result<Status>;
fn status(&self) -> Status;
fn state(&self) -> Result<State>;
fn oci_state(&self) -> Result<OCIState>;
fn config(&self) -> Result<&Config>;
@@ -216,7 +252,7 @@ pub struct LinuxContainer
pub uid_map_path: String,
pub gid_map_path: String,
pub processes: HashMap<pid_t, Process>,
pub status: Status,
pub status: ContainerStatus,
pub created: SystemTime,
pub logger: Logger,
}
@@ -245,12 +281,58 @@ pub struct SyncPC {
pub trait Container: BaseContainer {
// fn checkpoint(&self, opts: &CriuOpts) -> Result<()>;
// fn restore(&self, p: &Process, opts: &CriuOpts) -> Result<()>;
fn pause(&self) -> Result<()>;
fn resume(&self) -> Result<()>;
fn pause(&mut self) -> Result<()>;
fn resume(&mut self) -> Result<()>;
// fn notify_oom(&self) -> Result<(Sender, Receiver)>;
// fn notify_memory_pressure(&self, lvl: PressureLevel) -> Result<(Sender, Receiver)>;
}
impl Container for LinuxContainer {
fn pause(&mut self) -> Result<()> {
let status = self.status();
if status != Status::RUNNING && status != Status::CREATED {
return Err(ErrorKind::ErrorCode(format!(
"failed to pause container: current status is: {:?}",
status
))
.into());
}
if self.cgroup_manager.is_some() {
self.cgroup_manager
.as_ref()
.unwrap()
.freeze(fscgroup::FROZEN)?;
self.status.transition(Status::PAUSED);
return Ok(());
}
Err(ErrorKind::ErrorCode(String::from("failed to get container's cgroup manager")).into())
}
fn resume(&mut self) -> Result<()> {
let status = self.status();
if status != Status::PAUSED {
return Err(ErrorKind::ErrorCode(format!(
"container status is: {:?}, not paused",
status
))
.into());
}
if self.cgroup_manager.is_some() {
self.cgroup_manager
.as_ref()
.unwrap()
.freeze(fscgroup::THAWED)?;
self.status.transition(Status::RUNNING);
return Ok(());
}
Err(ErrorKind::ErrorCode(String::from("failed to get container's cgroup manager")).into())
}
}
pub fn init_child() {
let cwfd = std::env::var(CWFD_FD).unwrap().parse::<i32>().unwrap();
let cfd_log = std::env::var(CLOG_FD).unwrap().parse::<i32>().unwrap();
@@ -571,8 +653,8 @@ impl BaseContainer for LinuxContainer {
self.id.clone()
}
fn status(&self) -> Result<Status> {
Ok(self.status.clone())
fn status(&self) -> Status {
self.status.status()
}
fn state(&self) -> Result<State> {
@@ -581,8 +663,8 @@ impl BaseContainer for LinuxContainer {
fn oci_state(&self) -> Result<OCIState> {
let oci = self.config.spec.as_ref().unwrap();
let status = self.status().unwrap().unwrap();
let pid = if status != "stopped".to_string() {
let status = self.status();
let pid = if status != Status::STOPPED {
self.init_process_pid
} else {
0
@@ -594,7 +676,7 @@ impl BaseContainer for LinuxContainer {
Ok(OCIState {
version: oci.version.clone(),
id: self.id(),
status,
status: format!("{:?}", status),
pid,
bundle,
annotations: oci.annotations.clone(),
@@ -830,7 +912,6 @@ impl BaseContainer for LinuxContainer {
info!(logger, "entered namespaces!");
self.status = Some("created".to_string());
self.created = SystemTime::now();
// create the pipes for notify process exited
@@ -862,7 +943,7 @@ impl BaseContainer for LinuxContainer {
if init {
self.exec()?;
self.status = Some("running".to_string());
self.status.transition(Status::RUNNING);
}
Ok(())
@@ -884,7 +965,7 @@ impl BaseContainer for LinuxContainer {
}
}
self.status = Some("stopped".to_string());
self.status.transition(Status::STOPPED);
fs::remove_dir_all(&self.root)?;
Ok(())
}
@@ -912,7 +993,7 @@ impl BaseContainer for LinuxContainer {
.unwrap()
.as_secs();
self.status = Some("running".to_string());
self.status.transition(Status::RUNNING);
unistd::close(fd)?;
Ok(())
@@ -1252,7 +1333,7 @@ impl LinuxContainer {
id: id.clone(),
root,
cgroup_manager: Some(cgroup_manager),
status: Some("stopped".to_string()),
status: ContainerStatus::new(),
uid_map_path: String::from(""),
gid_map_path: "".to_string(),
config,
@@ -1560,3 +1641,26 @@ fn execute_hook(logger: &Logger, h: &Hook, st: &OCIState) -> Result<()> {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_status_transtition() {
let mut status = ContainerStatus::new();
let status_table: [Status; 4] = [
Status::CREATED,
Status::RUNNING,
Status::PAUSED,
Status::STOPPED,
];
for s in status_table.iter() {
let pre_status = status.status();
status.transition(*s);
assert_eq!(pre_status, status.pre_status());
}
}
}

View File

@@ -18,7 +18,7 @@ use protocols::health::{
};
use protocols::types::Interface;
use rustjail;
use rustjail::container::{BaseContainer, LinuxContainer};
use rustjail::container::{BaseContainer, Container, LinuxContainer};
use rustjail::errors::*;
use rustjail::process::Process;
use rustjail::specconv::CreateOpts;
@@ -754,6 +754,59 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(resp) => Ok(resp),
}
}
fn pause_container(
&self,
ctx: &ttrpc::TtrpcContext,
req: protocols::agent::PauseContainerRequest,
) -> ttrpc::Result<protocols::empty::Empty> {
let cid = req.get_container_id();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().unwrap();
if let Some(ctr) = sandbox.get_container(cid) {
match ctr.pause() {
Err(e) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
e.to_string(),
)))
}
Ok(_) => return Ok(Empty::new()),
}
};
Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INVALID_ARGUMENT,
"invalid argument".to_string(),
)))
}
fn resume_container(
&self,
ctx: &ttrpc::TtrpcContext,
req: protocols::agent::ResumeContainerRequest,
) -> ttrpc::Result<protocols::empty::Empty> {
let cid = req.get_container_id();
let s = Arc::clone(&self.sandbox);
let mut sandbox = s.lock().unwrap();
if let Some(ctr) = sandbox.get_container(cid) {
match ctr.resume() {
Err(e) => {
return Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INTERNAL,
e.to_string(),
)))
}
Ok(_) => return Ok(Empty::new()),
}
};
Err(ttrpc::Error::RpcStatus(ttrpc::get_status(
ttrpc::Code::INVALID_ARGUMENT,
"invalid argument: ".to_string(),
)))
}
fn write_stdin(
&self,
_ctx: &ttrpc::TtrpcContext,