diff --git a/src/agent/src/mount.rs b/src/agent/src/mount.rs index 84227b1821..62426685af 100644 --- a/src/agent/src/mount.rs +++ b/src/agent/src/mount.rs @@ -4,8 +4,8 @@ // use std::collections::HashMap; -use std::fs; -use std::fs::{File, OpenOptions}; +use std::fmt::Debug; +use std::fs::{self, File, OpenOptions}; use std::io::{BufRead, BufReader, Write}; use std::iter; use std::os::unix::fs::{MetadataExt, PermissionsExt}; @@ -13,12 +13,13 @@ use std::path::Path; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::Mutex; - +use anyhow::{anyhow, Context, Result}; use nix::mount::MsFlags; use nix::unistd::{Gid, Uid}; - use regex::Regex; +use slog::Logger; +use tokio::sync::Mutex; +use tracing::instrument; use crate::device::{ get_scsi_device_name, get_virtio_blk_pci_device_name, get_virtio_mmio_device_name, @@ -34,17 +35,13 @@ use crate::protocols::types::FSGroupChangePolicy; use crate::Sandbox; #[cfg(target_arch = "s390x")] use crate::{ccw, device::get_virtio_blk_ccw_device_name}; -use anyhow::{anyhow, Context, Result}; -use slog::Logger; - -use tracing::instrument; pub const TYPE_ROOTFS: &str = "rootfs"; -const SYS_FS_HUGEPAGES_PREFIX: &str = "/sys/kernel/mm/hugepages"; pub const MOUNT_GUEST_TAG: &str = "kataShared"; // Allocating an FSGroup that owns the pod's volumes const FS_GID: &str = "fsgid"; +const SYS_FS_HUGEPAGES_PREFIX: &str = "/sys/kernel/mm/hugepages"; const RW_MASK: u32 = 0o660; const RO_MASK: u32 = 0o440; @@ -218,9 +215,9 @@ pub fn baremount( ) .map_err(|e| { anyhow!( - "failed to mount {:?} to {:?}, with error: {}", - source, - destination, + "failed to mount {} to {}, with error: {}", + source.display(), + destination.display(), e ) }) @@ -230,7 +227,7 @@ pub fn baremount( async fn ephemeral_storage_handler( logger: &Logger, storage: &Storage, - sandbox: &Arc>, + _sandbox: &Arc>, ) -> Result { // hugetlbfs if storage.fstype == FS_TYPE_HUGETLB { @@ -238,21 +235,19 @@ async fn ephemeral_storage_handler( } // normal ephemeral storage - fs::create_dir_all(Path::new(&storage.mount_point))?; + fs::create_dir_all(&storage.mount_point)?; - // By now we only support one option field: "fsGroup" which - // isn't an valid mount option, thus we should remove it when - // do mount. if !storage.options.is_empty() { - // ephemeral_storage didn't support mount options except fsGroup. + // By now we only support one option field: "fsGroup" which + // isn't an valid mount option, thus we should remove it when + // do mount. let mut new_storage = storage.clone(); new_storage.options = Default::default(); common_storage_handler(logger, &new_storage)?; - let opts_vec: Vec = storage.options.to_vec(); - - let opts = parse_options(opts_vec); + let opts = parse_options(&storage.options); + // ephemeral_storage didn't support mount options except fsGroup. if let Some(fsgid) = opts.get(FS_GID) { let gid = fsgid.parse::()?; @@ -278,56 +273,57 @@ async fn ephemeral_storage_handler( pub async fn update_ephemeral_mounts( logger: Logger, storages: Vec, - sandbox: &Arc>, + _sandbox: &Arc>, ) -> Result<()> { for (_, storage) in storages.iter().enumerate() { - let handler_name = storage.driver.clone(); + let handler_name = &storage.driver; let logger = logger.new(o!( - "msg" => "updating tmpfs storage", + "msg" => "updating tmpfs storage", "subsystem" => "storage", "storage-type" => handler_name.to_owned())); match handler_name.as_str() { DRIVER_EPHEMERAL_TYPE => { - fs::create_dir_all(Path::new(&storage.mount_point))?; + fs::create_dir_all(&storage.mount_point)?; if storage.options.is_empty() { continue; } else { // assume that fsGid has already been set - let mut opts = Vec::<&str>::new(); + let mut opts = Vec::new(); for (_, opt) in storage.options.iter().enumerate() { - if opt.starts_with(FS_GID) { + let fields: Vec<&str> = opt.split('=').collect(); + if fields.len() == 2 && fields[0] == FS_GID { continue; } - opts.push(opt) + opts.push(opt.as_str()) } + let (flags, options) = parse_mount_flags_and_options(&opts); + let mount_path = Path::new(&storage.mount_point); let src_path = Path::new(&storage.source); - let (flags, options) = parse_mount_flags_and_options(opts); - info!(logger, "mounting storage"; - "mount-source" => src_path.display(), - "mount-destination" => mount_path.display(), - "mount-fstype" => storage.fstype.as_str(), - "mount-options" => options.as_str(), + "mount-source" => src_path.display(), + "mount-destination" => mount_path.display(), + "mount-fstype" => storage.fstype.as_str(), + "mount-options" => options.as_str(), ); - return baremount( + baremount( src_path, mount_path, storage.fstype.as_str(), flags, options.as_str(), &logger, - ); + )?; } } _ => { return Err(anyhow!( "Unsupported storage type for syncing mounts {}. Only ephemeral storage update is supported", - storage.driver.to_owned() + storage.driver )); } }; @@ -374,16 +370,14 @@ async fn overlayfs_storage_handler( async fn local_storage_handler( _logger: &Logger, storage: &Storage, - sandbox: &Arc>, + _sandbox: &Arc>, ) -> Result { fs::create_dir_all(&storage.mount_point).context(format!( "failed to create dir all {:?}", &storage.mount_point ))?; - let opts_vec: Vec = storage.options.to_vec(); - - let opts = parse_options(opts_vec); + let opts = parse_options(&storage.options); let mut need_set_fsgid = false; if let Some(fsgid) = opts.get(FS_GID) { @@ -563,7 +557,6 @@ async fn virtio_blk_storage_handler( if storage.source.starts_with("/dev") { let metadata = fs::metadata(&storage.source) .context(format!("get metadata on file {:?}", &storage.source))?; - let mode = metadata.permissions().mode(); if mode & libc::S_IFBLK == 0 { return Err(anyhow!("Invalid device {}", &storage.source)); @@ -620,12 +613,9 @@ async fn virtio_scsi_storage_handler( #[instrument] fn common_storage_handler(logger: &Logger, storage: &Storage) -> Result { - // Mount the storage device. - let mount_point = storage.mount_point.to_string(); - mount_storage(logger, storage)?; set_ownership(logger, storage)?; - Ok(mount_point) + Ok(storage.mount_point.clone()) } // nvdimm_storage_handler handles the storage for NVDIMM driver. @@ -666,9 +656,8 @@ async fn bind_watcher_storage_handler( fn mount_storage(logger: &Logger, storage: &Storage) -> Result<()> { let logger = logger.new(o!("subsystem" => "mount")); - // Check share before attempting to mount to see if the destination is already a mount point. - // If so, skip doing the mount. This facilitates mounting the sharedfs automatically - // in the guest before the agent service starts. + // There's a special mechanism to create mountpoint from a `sharedfs` instance before + // starting the kata-agent. Check for such cases. if storage.source == MOUNT_GUEST_TAG && is_mounted(&storage.mount_point)? { warn!( logger, @@ -680,27 +669,23 @@ fn mount_storage(logger: &Logger, storage: &Storage) -> Result<()> { let mount_path = Path::new(&storage.mount_point); let src_path = Path::new(&storage.source); if storage.fstype == "bind" && !src_path.is_dir() { - ensure_destination_file_exists(mount_path) + ensure_destination_file_exists(mount_path).context("Could not create mountpoint file")?; } else { - fs::create_dir_all(mount_path).map_err(anyhow::Error::from) + fs::create_dir_all(mount_path) + .map_err(anyhow::Error::from) + .context("Could not create mountpoint")?; } - .context("Could not create mountpoint")?; - - let options_vec = storage.options.to_vec(); - let options_vec = options_vec.iter().map(String::as_str).collect(); - let (flags, options) = parse_mount_flags_and_options(options_vec); - - let source = Path::new(&storage.source); + let (flags, options) = parse_mount_flags_and_options(&storage.options); info!(logger, "mounting storage"; - "mount-source" => source.display(), - "mount-destination" => mount_path.display(), - "mount-fstype" => storage.fstype.as_str(), - "mount-options" => options.as_str(), + "mount-source" => src_path.display(), + "mount-destination" => mount_path.display(), + "mount-fstype" => storage.fstype.as_str(), + "mount-options" => options.as_str(), ); baremount( - source, + src_path, mount_path, storage.fstype.as_str(), flags, @@ -717,14 +702,9 @@ pub fn set_ownership(logger: &Logger, storage: &Storage) -> Result<()> { if storage.fs_group.is_none() { return Ok(()); } + let fs_group = storage.fs_group(); - - let mut read_only = false; - let opts_vec: Vec = storage.options.to_vec(); - if opts_vec.contains(&String::from("ro")) { - read_only = true; - } - + let read_only = storage.options.contains(&String::from("ro")); let mount_path = Path::new(&storage.mount_point); let metadata = mount_path.metadata().map_err(|err| { error!(logger, "failed to obtain metadata for mount path"; @@ -809,24 +789,25 @@ pub fn is_mounted(mount_point: &str) -> Result { let found = fs::metadata(mount_point).is_ok() // Looks through /proc/mounts and check if the mount exists && fs::read_to_string("/proc/mounts")? - .lines() - .any(|line| { - // The 2nd column reveals the mount point. - line.split_whitespace() - .nth(1) - .map(|target| mount_point.eq(target)) - .unwrap_or(false) - }); + .lines() + .any(|line| { + // The 2nd column reveals the mount point. + line.split_whitespace() + .nth(1) + .map(|target| mount_point.eq(target)) + .unwrap_or(false) + }); Ok(found) } #[instrument] -fn parse_mount_flags_and_options(options_vec: Vec<&str>) -> (MsFlags, String) { +fn parse_mount_flags_and_options + Debug>(options_vec: &[S]) -> (MsFlags, String) { let mut flags = MsFlags::empty(); let mut options: String = "".to_string(); for opt in options_vec { + let opt = opt.as_ref(); if !opt.is_empty() { match FLAGS.get(opt) { Some(x) => { @@ -861,17 +842,16 @@ fn parse_mount_flags_and_options(options_vec: Vec<&str>) -> (MsFlags, String) { #[instrument] pub async fn add_storages( logger: Logger, - storages: Vec, + storages: &[Storage], sandbox: &Arc>, cid: Option, ) -> Result> { let mut mount_list = Vec::new(); for storage in storages { - let handler_name = storage.driver.clone(); - let logger = logger.new(o!( - "subsystem" => "storage", - "storage-type" => handler_name.to_owned())); + let handler_name = &storage.driver; + let logger = + logger.new(o!( "subsystem" => "storage", "storage-type" => handler_name.to_string())); { let mut sb = sandbox.lock().await; @@ -882,22 +862,20 @@ pub async fn add_storages( } let res = match handler_name.as_str() { - DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, &storage, sandbox).await, - DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_storage_handler(&logger, &storage, sandbox).await, - DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, &storage, sandbox).await, - DRIVER_VIRTIOFS_TYPE => virtiofs_storage_handler(&logger, &storage, sandbox).await, - DRIVER_EPHEMERAL_TYPE => ephemeral_storage_handler(&logger, &storage, sandbox).await, + DRIVER_BLK_TYPE => virtio_blk_storage_handler(&logger, storage, sandbox).await, + DRIVER_BLK_CCW_TYPE => virtio_blk_ccw_storage_handler(&logger, storage, sandbox).await, + DRIVER_9P_TYPE => virtio9p_storage_handler(&logger, storage, sandbox).await, + DRIVER_VIRTIOFS_TYPE => virtiofs_storage_handler(&logger, storage, sandbox).await, + DRIVER_EPHEMERAL_TYPE => ephemeral_storage_handler(&logger, storage, sandbox).await, DRIVER_OVERLAYFS_TYPE => { - overlayfs_storage_handler(&logger, &storage, cid.as_deref(), sandbox).await + overlayfs_storage_handler(&logger, storage, cid.as_deref(), sandbox).await } - DRIVER_MMIO_BLK_TYPE => { - virtiommio_blk_storage_handler(&logger, &storage, sandbox).await - } - DRIVER_LOCAL_TYPE => local_storage_handler(&logger, &storage, sandbox).await, - DRIVER_SCSI_TYPE => virtio_scsi_storage_handler(&logger, &storage, sandbox).await, - DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, &storage, sandbox).await, + DRIVER_MMIO_BLK_TYPE => virtiommio_blk_storage_handler(&logger, storage, sandbox).await, + DRIVER_LOCAL_TYPE => local_storage_handler(&logger, storage, sandbox).await, + DRIVER_SCSI_TYPE => virtio_scsi_storage_handler(&logger, storage, sandbox).await, + DRIVER_NVDIMM_TYPE => nvdimm_storage_handler(&logger, storage, sandbox).await, DRIVER_WATCHABLE_BIND_TYPE => { - bind_watcher_storage_handler(&logger, &storage, sandbox, cid.clone()).await?; + bind_watcher_storage_handler(&logger, storage, sandbox, cid.clone()).await?; // Don't register watch mounts, they're handled separately by the watcher. Ok(String::new()) } @@ -916,9 +894,9 @@ pub async fn add_storages( "add_storages failed, storage: {:?}, error: {:?} ", storage, e ); let mut sb = sandbox.lock().await; - sb.unset_sandbox_storage(&storage.mount_point) - .map_err(|e| warn!(logger, "fail to unset sandbox storage {:?}", e)) - .ok(); + if let Err(e) = sb.unset_sandbox_storage(&storage.mount_point) { + warn!(logger, "fail to unset sandbox storage {:?}", e); + } return Err(e); } Ok(m) => m, @@ -934,11 +912,9 @@ pub async fn add_storages( #[instrument] fn mount_to_rootfs(logger: &Logger, m: &InitMount) -> Result<()> { - let options_vec: Vec<&str> = m.options.clone(); + let (flags, options) = parse_mount_flags_and_options(&m.options); - let (flags, options) = parse_mount_flags_and_options(options_vec); - - fs::create_dir_all(Path::new(m.dest)).context("could not create directory")?; + fs::create_dir_all(m.dest).context("could not create directory")?; let source = Path::new(m.src); let dest = Path::new(m.dest); @@ -1143,17 +1119,14 @@ fn ensure_destination_file_exists(path: &Path) -> Result<()> { } #[instrument] -fn parse_options(option_list: Vec) -> HashMap { +fn parse_options(option_list: &[String]) -> HashMap { let mut options = HashMap::new(); for opt in option_list.iter() { let fields: Vec<&str> = opt.split('=').collect(); - if fields.len() != 2 { - continue; + if fields.len() == 2 { + options.insert(fields[0].to_string(), fields[1].to_string()); } - - options.insert(fields[0].to_string(), fields[1].to_string()); } - options } @@ -2070,7 +2043,7 @@ mod tests { for (i, d) in tests.iter().enumerate() { let msg = format!("test[{}]: {:?}", i, d); - let result = parse_mount_flags_and_options(d.options_vec.clone()); + let result = parse_mount_flags_and_options(&d.options_vec); let msg = format!("{}: result: {:?}", msg, result); diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 7b92f245e8..a8e0beb1e1 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -169,7 +169,7 @@ impl AgentService { // updates the devices listed in the OCI spec, so that they actually // match real devices inside the VM. This step is necessary since we // cannot predict everything from the caller. - add_devices(&req.devices.to_vec(), &mut oci, &self.sandbox).await?; + add_devices(&req.devices, &mut oci, &self.sandbox).await?; // Both rootfs and volumes (invoked with --volume for instance) will // be processed the same way. The idea is to always mount any provided @@ -180,7 +180,7 @@ impl AgentService { // list) to bind mount all of them inside the container. let m = add_storages( sl(), - req.storages.to_vec(), + &req.storages, &self.sandbox, Some(req.container_id.clone()), ) @@ -258,15 +258,13 @@ impl AgentService { #[instrument] async fn do_start_container(&self, req: protocols::agent::StartContainerRequest) -> Result<()> { - let cid = req.container_id; - let mut s = self.sandbox.lock().await; let sid = s.id.clone(); + let cid = req.container_id; let ctr = s .get_container(&cid) .ok_or_else(|| anyhow!("Invalid container id"))?; - ctr.exec().await?; if sid == cid { @@ -274,13 +272,9 @@ impl AgentService { } // start oom event loop - - let cg_path = ctr.cgroup_manager.as_ref().get_cgroup_path("memory"); - - if let Ok(cg_path) = cg_path { + if let Ok(cg_path) = ctr.cgroup_manager.as_ref().get_cgroup_path("memory") { let rx = notifier::notify_oom(cid.as_str(), cg_path.to_string()).await?; - - s.run_oom_event_monitor(rx, cid.clone()).await; + s.run_oom_event_monitor(rx, cid).await; } Ok(()) @@ -291,64 +285,51 @@ impl AgentService { &self, req: protocols::agent::RemoveContainerRequest, ) -> Result<()> { - let cid = req.container_id.clone(); + let cid = req.container_id; if req.timeout == 0 { let mut sandbox = self.sandbox.lock().await; - sandbox.bind_watcher.remove_container(&cid).await; - sandbox .get_container(&cid) .ok_or_else(|| anyhow!("Invalid container id"))? .destroy() .await?; - remove_container_resources(&mut sandbox, &cid)?; - return Ok(()); } // timeout != 0 let s = self.sandbox.clone(); let cid2 = cid.clone(); - let (tx, rx) = tokio::sync::oneshot::channel::(); - let handle = tokio::spawn(async move { let mut sandbox = s.lock().await; - if let Some(ctr) = sandbox.get_container(&cid2) { - ctr.destroy().await.unwrap(); - sandbox.bind_watcher.remove_container(&cid2).await; - tx.send(1).unwrap(); - }; + sandbox.bind_watcher.remove_container(&cid2).await; + match sandbox.get_container(&cid2) { + Some(ctr) => ctr.destroy().await, + None => Err(anyhow!("Invalid container id")), + } }); - if tokio::time::timeout(Duration::from_secs(req.timeout.into()), rx) - .await - .is_err() - { - return Err(anyhow!(nix::Error::ETIME)); + let to = Duration::from_secs(req.timeout.into()); + match tokio::time::timeout(to, handle).await { + Ok(res) => { + res??; + let mut sandbox = self.sandbox.lock().await; + remove_container_resources(&mut sandbox, &cid) + } + Err(_e) => Err(anyhow!(nix::Error::ETIME)), } - - if handle.await.is_err() { - return Err(anyhow!(nix::Error::UnknownErrno)); - } - - let mut sandbox = self.sandbox.lock().await; - remove_container_resources(&mut sandbox, &cid)?; - - Ok(()) } #[instrument] async fn do_exec_process(&self, req: protocols::agent::ExecProcessRequest) -> Result<()> { - let cid = req.container_id.clone(); - let exec_id = req.exec_id.clone(); + let cid = req.container_id; + let exec_id = req.exec_id; info!(sl(), "do_exec_process cid: {} eid: {}", cid, exec_id); let mut sandbox = self.sandbox.lock().await; - let mut process = req .process .into_option() @@ -365,21 +346,19 @@ impl AgentService { .get_container(&cid) .ok_or_else(|| anyhow!("Invalid container id"))?; - ctr.run(p).await?; - - Ok(()) + ctr.run(p).await } #[instrument] async fn do_signal_process(&self, req: protocols::agent::SignalProcessRequest) -> Result<()> { - let cid = req.container_id.clone(); - let eid = req.exec_id.clone(); + let cid = req.container_id; + let eid = req.exec_id; info!( sl(), "signal process"; - "container-id" => cid.clone(), - "exec-id" => eid.clone(), + "container-id" => &cid, + "exec-id" => &eid, "signal" => req.signal, ); @@ -400,8 +379,8 @@ impl AgentService { info!( sl(), "signal encounter ESRCH, continue"; - "container-id" => cid.clone(), - "exec-id" => eid.clone(), + "container-id" => &cid, + "exec-id" => &eid, "pid" => p.pid, "signal" => sig, ); @@ -416,16 +395,16 @@ impl AgentService { info!( sl(), "signal all the remaining processes"; - "container-id" => cid.clone(), - "exec-id" => eid.clone(), + "container-id" => &cid, + "exec-id" => &eid, ); if let Err(err) = self.freeze_cgroup(&cid, FreezerState::Frozen).await { warn!( sl(), "freeze cgroup failed"; - "container-id" => cid.clone(), - "exec-id" => eid.clone(), + "container-id" => &cid, + "exec-id" => &eid, "error" => format!("{:?}", err), ); } @@ -437,8 +416,8 @@ impl AgentService { warn!( sl(), "signal failed"; - "container-id" => cid.clone(), - "exec-id" => eid.clone(), + "container-id" => &cid, + "exec-id" => &eid, "pid" => pid, "error" => format!("{:?}", err), ); @@ -448,12 +427,13 @@ impl AgentService { warn!( sl(), "unfreeze cgroup failed"; - "container-id" => cid.clone(), - "exec-id" => eid.clone(), + "container-id" => &cid, + "exec-id" => &eid, "error" => format!("{:?}", err), ); } } + Ok(()) } @@ -462,8 +442,7 @@ impl AgentService { let ctr = sandbox .get_container(cid) .ok_or_else(|| anyhow!("Invalid container id {}", cid))?; - ctr.cgroup_manager.as_ref().freeze(state)?; - Ok(()) + ctr.cgroup_manager.as_ref().freeze(state) } async fn get_pids(&self, cid: &str) -> Result> { @@ -471,8 +450,7 @@ impl AgentService { let ctr = sandbox .get_container(cid) .ok_or_else(|| anyhow!("Invalid container id {}", cid))?; - let pids = ctr.cgroup_manager.as_ref().get_pids()?; - Ok(pids) + ctr.cgroup_manager.as_ref().get_pids() } #[instrument] @@ -480,20 +458,19 @@ impl AgentService { &self, req: protocols::agent::WaitProcessRequest, ) -> Result { - let cid = req.container_id.clone(); + let cid = req.container_id; let eid = req.exec_id; let mut resp = WaitProcessResponse::new(); - let pid: pid_t; - - let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100); info!( sl(), "wait process"; - "container-id" => cid.clone(), - "exec-id" => eid.clone() + "container-id" => &cid, + "exec-id" => &eid ); + let pid: pid_t; + let (exit_send, mut exit_recv) = tokio::sync::mpsc::channel(100); let exit_rx = { let mut sandbox = self.sandbox.lock().await; let p = sandbox.find_container_process(cid.as_str(), eid.as_str())?; @@ -548,8 +525,8 @@ impl AgentService { &self, req: protocols::agent::WriteStreamRequest, ) -> Result { - let cid = req.container_id.clone(); - let eid = req.exec_id.clone(); + let cid = req.container_id; + let eid = req.exec_id; let writer = { let mut sandbox = self.sandbox.lock().await; @@ -601,10 +578,6 @@ impl AgentService { } }; - if reader.is_none() { - return Err(anyhow!("Unable to determine stream reader, is None")); - } - let reader = reader.ok_or_else(|| anyhow!("cannot get stream reader"))?; tokio::select! { @@ -663,7 +636,6 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "remove_container", req); is_allowed(&req)?; - match self.do_remove_container(req).await { Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)), Ok(_) => Ok(Empty::new()), @@ -715,32 +687,23 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "update_container", req); is_allowed(&req)?; - let cid = req.container_id.clone(); - let res = req.resources; let mut sandbox = self.sandbox.lock().await; - - let ctr = sandbox.get_container(&cid).ok_or_else(|| { + let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| { ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), ) })?; - let resp = Empty::new(); - - if let Some(res) = res.as_ref() { + if let Some(res) = req.resources.as_ref() { let oci_res = rustjail::resources_grpc_to_oci(res); - match ctr.set(oci_res) { - Err(e) => { - return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); - } - - Ok(_) => return Ok(resp), + if let Err(e) = ctr.set(oci_res) { + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)); } } - Ok(resp) + Ok(Empty::new()) } async fn stats_container( @@ -750,10 +713,9 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "stats_container", req); is_allowed(&req)?; - let cid = req.container_id; - let mut sandbox = self.sandbox.lock().await; - let ctr = sandbox.get_container(&cid).ok_or_else(|| { + let mut sandbox = self.sandbox.lock().await; + let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| { ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), @@ -771,10 +733,9 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "pause_container", req); is_allowed(&req)?; - let cid = req.container_id(); - let mut sandbox = self.sandbox.lock().await; - let ctr = sandbox.get_container(cid).ok_or_else(|| { + let mut sandbox = self.sandbox.lock().await; + let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| { ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), @@ -794,10 +755,9 @@ impl agent_ttrpc::AgentService for AgentService { ) -> ttrpc::Result { trace_rpc_call!(ctx, "resume_container", req); is_allowed(&req)?; - let cid = req.container_id(); - let mut sandbox = self.sandbox.lock().await; - let ctr = sandbox.get_container(cid).ok_or_else(|| { + let mut sandbox = self.sandbox.lock().await; + let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| { ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid container id".to_string(), @@ -874,7 +834,7 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "close_stdin", req); is_allowed(&req)?; - let cid = req.container_id.clone(); + let cid = req.container_id; let eid = req.exec_id; let mut sandbox = self.sandbox.lock().await; @@ -900,11 +860,9 @@ impl agent_ttrpc::AgentService for AgentService { trace_rpc_call!(ctx, "tty_win_resize", req); is_allowed(&req)?; - let cid = req.container_id.clone(); - let eid = req.exec_id.clone(); let mut sandbox = self.sandbox.lock().await; let p = sandbox - .find_container_process(cid.as_str(), eid.as_str()) + .find_container_process(req.container_id(), req.exec_id()) .map_err(|e| { ttrpc_error( ttrpc::Code::UNAVAILABLE, @@ -1259,7 +1217,7 @@ impl agent_ttrpc::AgentService for AgentService { .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; } - match add_storages(sl(), req.storages.to_vec(), &self.sandbox, None).await { + match add_storages(sl(), &req.storages, &self.sandbox, None).await { Ok(m) => { self.sandbox.lock().await.mounts = m; } @@ -1290,15 +1248,14 @@ impl agent_ttrpc::AgentService for AgentService { is_allowed(&req)?; let mut sandbox = self.sandbox.lock().await; - // destroy all containers, clean up, notify agent to exit - // etc. + // destroy all containers, clean up, notify agent to exit etc. sandbox .destroy() .await .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?; // Close get_oom_event connection, // otherwise it will block the shutdown of ttrpc. - sandbox.event_tx.take(); + drop(sandbox.event_tx.take()); sandbox .sender @@ -1355,9 +1312,9 @@ impl agent_ttrpc::AgentService for AgentService { ctx: &TtrpcContext, req: protocols::agent::OnlineCPUMemRequest, ) -> ttrpc::Result { + trace_rpc_call!(ctx, "online_cpu_mem", req); is_allowed(&req)?; let sandbox = self.sandbox.lock().await; - trace_rpc_call!(ctx, "online_cpu_mem", req); sandbox .online_cpu_memory(&req) @@ -1506,7 +1463,6 @@ impl agent_ttrpc::AgentService for AgentService { info!(sl(), "get volume stats!"); let mut resp = VolumeStatsResponse::new(); - let mut condition = VolumeCondition::new(); match File::open(&req.volume_guest_path) { @@ -1698,15 +1654,10 @@ pub fn start(s: Arc>, server_address: &str, init_mode: bool) -> R sandbox: s, init_mode, }) as Box; - - let agent_worker = Arc::new(agent_service); + let aservice = agent_ttrpc::create_agent_service(Arc::new(agent_service)); let health_service = Box::new(HealthService {}) as Box; - let health_worker = Arc::new(health_service); - - let aservice = agent_ttrpc::create_agent_service(agent_worker); - - let hservice = health_ttrpc::create_health(health_worker); + let hservice = health_ttrpc::create_health(Arc::new(health_service)); let server = TtrpcServer::new() .bind(server_address)? @@ -1750,6 +1701,7 @@ fn update_container_namespaces( continue; } } + // update pid namespace let mut pid_ns = LinuxNamespace { r#type: NSTYPEPID.to_string(), diff --git a/src/agent/src/sandbox.rs b/src/agent/src/sandbox.rs index 1202bd793f..473c7cee7f 100644 --- a/src/agent/src/sandbox.rs +++ b/src/agent/src/sandbox.rs @@ -3,14 +3,14 @@ // SPDX-License-Identifier: Apache-2.0 // -use crate::linux_abi::*; -use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS}; -use crate::namespace::Namespace; -use crate::netlink::Handle; -use crate::network::Network; -use crate::pci; -use crate::uevent::{Uevent, UeventMatcher}; -use crate::watcher::BindWatcher; +use std::collections::HashMap; +use std::fs; +use std::os::unix::fs::PermissionsExt; +use std::path::Path; +use std::str::FromStr; +use std::sync::Arc; +use std::{thread, time}; + use anyhow::{anyhow, Context, Result}; use kata_types::cpu::CpuSet; use libc::pid_t; @@ -22,18 +22,20 @@ use rustjail::container::BaseContainer; use rustjail::container::LinuxContainer; use rustjail::process::Process; use slog::Logger; -use std::collections::HashMap; -use std::fs; -use std::os::unix::fs::PermissionsExt; -use std::path::Path; -use std::str::FromStr; -use std::sync::Arc; -use std::{thread, time}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::oneshot; use tokio::sync::Mutex; use tracing::instrument; +use crate::linux_abi::*; +use crate::mount::{get_mount_fs_type, remove_mounts, TYPE_ROOTFS}; +use crate::namespace::Namespace; +use crate::netlink::Handle; +use crate::network::Network; +use crate::pci; +use crate::uevent::{Uevent, UeventMatcher}; +use crate::watcher::BindWatcher; + pub const ERR_INVALID_CONTAINER_ID: &str = "Invalid container id"; type UeventWatcher = (Box, oneshot::Sender); @@ -103,9 +105,6 @@ impl Sandbox { // This method also returns a boolean to let // callers know if the storage already existed or not. // It will return true if storage is new. - // - // It's assumed that caller is calling this method after - // acquiring a lock on sandbox. #[instrument] pub fn set_sandbox_storage(&mut self, path: &str) -> bool { match self.storages.get_mut(path) { @@ -126,16 +125,13 @@ impl Sandbox { // storage reference from the sandbox and return 'true' to // let the caller know that they can clean up the storage // related directories by calling remove_sandbox_storage - // - // It's assumed that caller is calling this method after - // acquiring a lock on sandbox. #[instrument] pub fn unset_sandbox_storage(&mut self, path: &str) -> Result { match self.storages.get_mut(path) { None => Err(anyhow!("Sandbox storage with path {} not found", path)), Some(count) => { *count -= 1; - if *count < 1 { + if *count == 0 { self.storages.remove(path); return Ok(true); } @@ -146,11 +142,8 @@ impl Sandbox { // remove_sandbox_storage removes the sandbox storage if no // containers are using that storage. - // - // It's assumed that caller is calling this method after - // acquiring a lock on sandbox. #[instrument] - pub fn remove_sandbox_storage(&self, path: &str) -> Result<()> { + pub fn remove_sandbox_storage(&mut self, path: &str) -> Result<()> { let mounts = vec![path.to_string()]; remove_mounts(&mounts)?; // "remove_dir" will fail if the mount point is backed by a read-only filesystem. @@ -165,9 +158,6 @@ impl Sandbox { // unset_and_remove_sandbox_storage unsets the storage from sandbox // and if there are no containers using this storage it will // remove it from the sandbox. - // - // It's assumed that caller is calling this method after - // acquiring a lock on sandbox. #[instrument] pub fn unset_and_remove_sandbox_storage(&mut self, path: &str) -> Result<()> { if self.unset_sandbox_storage(path)? { @@ -184,22 +174,18 @@ impl Sandbox { .get_ipc() .setup() .await - .context("Failed to setup persistent IPC namespace")?; + .context("setup persistent IPC namespace")?; // // Set up shared UTS namespace self.shared_utsns = Namespace::new(&self.logger) .get_uts(self.hostname.as_str()) .setup() .await - .context("Failed to setup persistent UTS namespace")?; + .context("setup persistent UTS namespace")?; Ok(true) } - pub fn add_container(&mut self, c: LinuxContainer) { - self.containers.insert(c.id.clone(), c); - } - #[instrument] pub fn update_shared_pidns(&mut self, c: &LinuxContainer) -> Result<()> { // Populate the shared pid path only if this is an infra container and @@ -224,14 +210,18 @@ impl Sandbox { Ok(()) } + pub fn add_container(&mut self, c: LinuxContainer) { + self.containers.insert(c.id.clone(), c); + } + pub fn get_container(&mut self, id: &str) -> Option<&mut LinuxContainer> { self.containers.get_mut(id) } pub fn find_process(&mut self, pid: pid_t) -> Option<&mut Process> { for (_, c) in self.containers.iter_mut() { - if c.processes.get(&pid).is_some() { - return c.processes.get_mut(&pid); + if let Some(p) = c.processes.get_mut(&pid) { + return Some(p); } } @@ -280,25 +270,17 @@ impl Sandbox { let guest_cpuset = rustjail_cgroups::fs::get_guest_cpuset()?; for (_, ctr) in self.containers.iter() { - let cpu = ctr - .config - .spec - .as_ref() - .unwrap() - .linux - .as_ref() - .unwrap() - .resources - .as_ref() - .unwrap() - .cpu - .as_ref(); - let container_cpust = if let Some(c) = cpu { &c.cpus } else { "" }; - - info!(self.logger, "updating {}", ctr.id.as_str()); - ctr.cgroup_manager - .as_ref() - .update_cpuset_path(guest_cpuset.as_str(), container_cpust)?; + if let Some(spec) = ctr.config.spec.as_ref() { + if let Some(linux) = spec.linux.as_ref() { + if let Some(resources) = linux.resources.as_ref() { + if let Some(cpus) = resources.cpu.as_ref() { + info!(self.logger, "updating {}", ctr.id.as_str()); + ctr.cgroup_manager + .update_cpuset_path(guest_cpuset.as_str(), &cpus.cpus)?; + } + } + } + } } Ok(()) @@ -360,31 +342,28 @@ impl Sandbox { #[instrument] pub async fn run_oom_event_monitor(&self, mut rx: Receiver, container_id: String) { let logger = self.logger.clone(); - - if self.event_tx.is_none() { - error!( - logger, - "sandbox.event_tx not found in run_oom_event_monitor" - ); - return; - } - - let tx = self.event_tx.as_ref().unwrap().clone(); + let tx = match self.event_tx.as_ref() { + Some(v) => v.clone(), + None => { + error!( + logger, + "sandbox.event_tx not found in run_oom_event_monitor" + ); + return; + } + }; tokio::spawn(async move { loop { let event = rx.recv().await; - // None means the container has exited, - // and sender in OOM notifier is dropped. + // None means the container has exited, and sender in OOM notifier is dropped. if event.is_none() { return; } info!(logger, "got an OOM event {:?}", event); - - let _ = tx - .send(container_id.clone()) - .await - .map_err(|e| error!(logger, "failed to send message: {:?}", e)); + if let Err(e) = tx.send(container_id.clone()).await { + error!(logger, "failed to send message: {:?}", e); + } } }); } @@ -397,39 +376,36 @@ fn online_resources(logger: &Logger, path: &str, pattern: &str, num: i32) -> Res for e in fs::read_dir(path)? { let entry = e?; - let tmpname = entry.file_name(); - let name = tmpname.to_str().unwrap(); - let p = entry.path(); - - if re.is_match(name) { - let file = format!("{}/{}", p.to_str().unwrap(), SYSFS_ONLINE_FILE); - info!(logger, "{}", file.as_str()); - - let c = fs::read_to_string(file.as_str()); - if c.is_err() { - continue; - } - let c = c.unwrap(); - - if c.trim().contains('0') { - let r = fs::write(file.as_str(), "1"); - if r.is_err() { + // Skip direntry which doesn't match the pattern. + match entry.file_name().to_str() { + None => continue, + Some(v) => { + if !re.is_match(v) { continue; } - count += 1; + } + }; - if num > 0 && count == num { + let p = entry.path().join(SYSFS_ONLINE_FILE); + if let Ok(c) = fs::read_to_string(&p) { + // Try to online the object in offline state. + if c.trim().contains('0') && fs::write(&p, "1").is_ok() && num > 0 { + count += 1; + if count == num { break; } } } } - if num > 0 { - return Ok(count); - } + Ok(count) +} - Ok(0) +#[instrument] +fn online_memory(logger: &Logger) -> Result<()> { + online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1) + .context("online memory resource")?; + Ok(()) } // max wait for all CPUs to online will use 50 * 100 = 5 seconds. @@ -473,13 +449,6 @@ fn online_cpus(logger: &Logger, num: i32) -> Result { )) } -#[instrument] -fn online_memory(logger: &Logger) -> Result<()> { - online_resources(logger, SYSFS_MEMORY_ONLINE_PATH, r"memory[0-9]+", -1) - .context("online memory resource")?; - Ok(()) -} - fn onlined_cpus() -> Result { let content = fs::read_to_string(SYSFS_CPU_ONLINE_PATH).context("read sysfs cpu online file")?; @@ -555,7 +524,7 @@ mod tests { skip_if_not_root!(); let logger = slog::Logger::root(slog::Discard, o!()); - let s = Sandbox::new(&logger).unwrap(); + let mut s = Sandbox::new(&logger).unwrap(); let tmpdir = Builder::new().tempdir().unwrap(); let tmpdir_path = tmpdir.path().to_str().unwrap();