mirror of
				https://github.com/kata-containers/kata-containers.git
				synced 2025-11-04 11:50:15 +00:00 
			
		
		
		
	agent: simplify error handling
We extend the `Result` and `Option` types with associated types that allows converting a `Result<T, E>` and `Option<T>` into `ttrpc::Result<T>`. This allows the elimination of many `match` statements in favor of calling the map function plus the `?` operator. This transformation simplifies the code. Fixes: #7624 Signed-off-by: Wedson Almeida Filho <walmeida@microsoft.com>
This commit is contained in:
		@@ -9,6 +9,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf};
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
 | 
			
		||||
use std::ffi::{CString, OsStr};
 | 
			
		||||
use std::fmt::Debug;
 | 
			
		||||
use std::io;
 | 
			
		||||
use std::os::unix::ffi::OsStrExt;
 | 
			
		||||
use std::path::Path;
 | 
			
		||||
@@ -121,7 +122,7 @@ fn sl() -> slog::Logger {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Convenience function to wrap an error and response to ttrpc client
 | 
			
		||||
fn ttrpc_error(code: ttrpc::Code, err: impl std::fmt::Debug) -> ttrpc::Error {
 | 
			
		||||
fn ttrpc_error(code: ttrpc::Code, err: impl Debug) -> ttrpc::Error {
 | 
			
		||||
    get_rpc_status(code, format!("{:?}", err))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -165,6 +166,36 @@ async fn is_allowed(req: &(impl MessageDyn + serde::Serialize)) -> ttrpc::Result
 | 
			
		||||
    res
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn same<E>(e: E) -> E {
 | 
			
		||||
    e
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
trait ResultToTtrpcResult<T, E: Debug>: Sized {
 | 
			
		||||
    fn map_ttrpc_err<R: Debug>(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result<T>;
 | 
			
		||||
    fn map_ttrpc_err_do(self, doer: impl FnOnce(&E)) -> ttrpc::Result<T> {
 | 
			
		||||
        self.map_ttrpc_err(|e| {
 | 
			
		||||
            doer(&e);
 | 
			
		||||
            e
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T, E: Debug> ResultToTtrpcResult<T, E> for Result<T, E> {
 | 
			
		||||
    fn map_ttrpc_err<R: Debug>(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result<T> {
 | 
			
		||||
        self.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, msg_builder(e)))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
trait OptionToTtrpcResult<T>: Sized {
 | 
			
		||||
    fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result<T>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T> OptionToTtrpcResult<T> for Option<T> {
 | 
			
		||||
    fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result<T> {
 | 
			
		||||
        self.ok_or_else(|| ttrpc_error(code, msg))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug)]
 | 
			
		||||
pub struct AgentService {
 | 
			
		||||
    sandbox: Arc<Mutex<Sandbox>>,
 | 
			
		||||
@@ -642,10 +673,8 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
    ) -> ttrpc::Result<Empty> {
 | 
			
		||||
        trace_rpc_call!(ctx, "create_container", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        match self.do_create_container(req).await {
 | 
			
		||||
            Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
 | 
			
		||||
            Ok(_) => Ok(Empty::new()),
 | 
			
		||||
        }
 | 
			
		||||
        self.do_create_container(req).await.map_ttrpc_err(same)?;
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn start_container(
 | 
			
		||||
@@ -655,10 +684,8 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
    ) -> ttrpc::Result<Empty> {
 | 
			
		||||
        trace_rpc_call!(ctx, "start_container", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        match self.do_start_container(req).await {
 | 
			
		||||
            Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
 | 
			
		||||
            Ok(_) => Ok(Empty::new()),
 | 
			
		||||
        }
 | 
			
		||||
        self.do_start_container(req).await.map_ttrpc_err(same)?;
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn remove_container(
 | 
			
		||||
@@ -668,10 +695,8 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
    ) -> ttrpc::Result<Empty> {
 | 
			
		||||
        trace_rpc_call!(ctx, "remove_container", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        match self.do_remove_container(req).await {
 | 
			
		||||
            Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
 | 
			
		||||
            Ok(_) => Ok(Empty::new()),
 | 
			
		||||
        }
 | 
			
		||||
        self.do_remove_container(req).await.map_ttrpc_err(same)?;
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn exec_process(
 | 
			
		||||
@@ -681,10 +706,8 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
    ) -> ttrpc::Result<Empty> {
 | 
			
		||||
        trace_rpc_call!(ctx, "exec_process", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        match self.do_exec_process(req).await {
 | 
			
		||||
            Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
 | 
			
		||||
            Ok(_) => Ok(Empty::new()),
 | 
			
		||||
        }
 | 
			
		||||
        self.do_exec_process(req).await.map_ttrpc_err(same)?;
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn signal_process(
 | 
			
		||||
@@ -694,10 +717,8 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
    ) -> ttrpc::Result<Empty> {
 | 
			
		||||
        trace_rpc_call!(ctx, "signal_process", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        match self.do_signal_process(req).await {
 | 
			
		||||
            Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
 | 
			
		||||
            Ok(_) => Ok(Empty::new()),
 | 
			
		||||
        }
 | 
			
		||||
        self.do_signal_process(req).await.map_ttrpc_err(same)?;
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn wait_process(
 | 
			
		||||
@@ -707,9 +728,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
    ) -> ttrpc::Result<WaitProcessResponse> {
 | 
			
		||||
        trace_rpc_call!(ctx, "wait_process", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        self.do_wait_process(req)
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
 | 
			
		||||
        self.do_wait_process(req).await.map_ttrpc_err(same)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn update_container(
 | 
			
		||||
@@ -721,18 +740,12 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        let mut sandbox = self.sandbox.lock().await;
 | 
			
		||||
        let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
 | 
			
		||||
            ttrpc_error(
 | 
			
		||||
                ttrpc::Code::INVALID_ARGUMENT,
 | 
			
		||||
                "invalid container id".to_string(),
 | 
			
		||||
            )
 | 
			
		||||
        })?;
 | 
			
		||||
 | 
			
		||||
        let ctr = sandbox
 | 
			
		||||
            .get_container(&req.container_id)
 | 
			
		||||
            .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
 | 
			
		||||
        if let Some(res) = req.resources.as_ref() {
 | 
			
		||||
            let oci_res = rustjail::resources_grpc_to_oci(res);
 | 
			
		||||
            if let Err(e) = ctr.set(oci_res) {
 | 
			
		||||
                return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
 | 
			
		||||
            }
 | 
			
		||||
            ctr.set(oci_res).map_ttrpc_err(same)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
@@ -747,15 +760,10 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        let mut sandbox = self.sandbox.lock().await;
 | 
			
		||||
        let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
 | 
			
		||||
            ttrpc_error(
 | 
			
		||||
                ttrpc::Code::INVALID_ARGUMENT,
 | 
			
		||||
                "invalid container id".to_string(),
 | 
			
		||||
            )
 | 
			
		||||
        })?;
 | 
			
		||||
 | 
			
		||||
        ctr.stats()
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
 | 
			
		||||
        let ctr = sandbox
 | 
			
		||||
            .get_container(&req.container_id)
 | 
			
		||||
            .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
 | 
			
		||||
        ctr.stats().map_ttrpc_err(same)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn pause_container(
 | 
			
		||||
@@ -767,16 +775,10 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        let mut sandbox = self.sandbox.lock().await;
 | 
			
		||||
        let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
 | 
			
		||||
            ttrpc_error(
 | 
			
		||||
                ttrpc::Code::INVALID_ARGUMENT,
 | 
			
		||||
                "invalid container id".to_string(),
 | 
			
		||||
            )
 | 
			
		||||
        })?;
 | 
			
		||||
 | 
			
		||||
        ctr.pause()
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
 | 
			
		||||
        let ctr = sandbox
 | 
			
		||||
            .get_container(&req.container_id)
 | 
			
		||||
            .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
 | 
			
		||||
        ctr.pause().map_ttrpc_err(same)?;
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -789,16 +791,10 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        let mut sandbox = self.sandbox.lock().await;
 | 
			
		||||
        let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
 | 
			
		||||
            ttrpc_error(
 | 
			
		||||
                ttrpc::Code::INVALID_ARGUMENT,
 | 
			
		||||
                "invalid container id".to_string(),
 | 
			
		||||
            )
 | 
			
		||||
        })?;
 | 
			
		||||
 | 
			
		||||
        ctr.resume()
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
 | 
			
		||||
        let ctr = sandbox
 | 
			
		||||
            .get_container(&req.container_id)
 | 
			
		||||
            .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
 | 
			
		||||
        ctr.resume().map_ttrpc_err(same)?;
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -809,8 +805,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
    ) -> ttrpc::Result<Empty> {
 | 
			
		||||
        trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        let mount_infos = parse_mount_table("/proc/self/mountinfo")
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
        let mount_infos = parse_mount_table("/proc/self/mountinfo").map_ttrpc_err(same)?;
 | 
			
		||||
        for m in &mount_infos {
 | 
			
		||||
            if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) {
 | 
			
		||||
                // stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more.
 | 
			
		||||
@@ -831,9 +826,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        req: protocols::agent::WriteStreamRequest,
 | 
			
		||||
    ) -> ttrpc::Result<WriteStreamResponse> {
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        self.do_write_stream(req)
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
 | 
			
		||||
        self.do_write_stream(req).await.map_ttrpc_err(same)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn read_stdout(
 | 
			
		||||
@@ -842,9 +835,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        req: protocols::agent::ReadStreamRequest,
 | 
			
		||||
    ) -> ttrpc::Result<ReadStreamResponse> {
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        self.do_read_stream(req, true)
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
 | 
			
		||||
        self.do_read_stream(req, true).await.map_ttrpc_err(same)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn read_stderr(
 | 
			
		||||
@@ -853,9 +844,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        req: protocols::agent::ReadStreamRequest,
 | 
			
		||||
    ) -> ttrpc::Result<ReadStreamResponse> {
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        self.do_read_stream(req, false)
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
 | 
			
		||||
        self.do_read_stream(req, false).await.map_ttrpc_err(same)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn close_stdin(
 | 
			
		||||
@@ -902,8 +891,9 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
                )
 | 
			
		||||
            })?;
 | 
			
		||||
 | 
			
		||||
        if let Some(fd) = p.term_master {
 | 
			
		||||
            unsafe {
 | 
			
		||||
        let fd = p
 | 
			
		||||
            .term_master
 | 
			
		||||
            .map_ttrpc_err(ttrpc::Code::UNAVAILABLE, "no tty")?;
 | 
			
		||||
        let win = winsize {
 | 
			
		||||
            ws_row: req.row as c_ushort,
 | 
			
		||||
            ws_col: req.column as c_ushort,
 | 
			
		||||
@@ -911,14 +901,10 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
            ws_ypixel: 0,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
                let err = libc::ioctl(fd, TIOCSWINSZ, &win);
 | 
			
		||||
                Errno::result(err).map(drop).map_err(|e| {
 | 
			
		||||
                    ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e))
 | 
			
		||||
                })?;
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string()));
 | 
			
		||||
        }
 | 
			
		||||
        let err = unsafe { libc::ioctl(fd, TIOCSWINSZ, &win) };
 | 
			
		||||
        Errno::result(err)
 | 
			
		||||
            .map(drop)
 | 
			
		||||
            .map_ttrpc_err(|e| format!("ioctl error: {:?}", e))?;
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
@@ -931,12 +917,10 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        trace_rpc_call!(ctx, "update_interface", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        let interface = req.interface.into_option().ok_or_else(|| {
 | 
			
		||||
            ttrpc_error(
 | 
			
		||||
        let interface = req.interface.into_option().map_ttrpc_err(
 | 
			
		||||
            ttrpc::Code::INVALID_ARGUMENT,
 | 
			
		||||
                "empty update interface request".to_string(),
 | 
			
		||||
            )
 | 
			
		||||
        })?;
 | 
			
		||||
            "empty update interface request",
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        self.sandbox
 | 
			
		||||
            .lock()
 | 
			
		||||
@@ -944,9 +928,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
            .rtnl
 | 
			
		||||
            .update_interface(&interface)
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| {
 | 
			
		||||
                ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e))
 | 
			
		||||
            })?;
 | 
			
		||||
            .map_ttrpc_err(|e| format!("update interface: {:?}", e))?;
 | 
			
		||||
 | 
			
		||||
        Ok(interface)
 | 
			
		||||
    }
 | 
			
		||||
@@ -959,28 +941,25 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        trace_rpc_call!(ctx, "update_routes", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| {
 | 
			
		||||
            ttrpc_error(
 | 
			
		||||
                ttrpc::Code::INVALID_ARGUMENT,
 | 
			
		||||
                "empty update routes request".to_string(),
 | 
			
		||||
            )
 | 
			
		||||
        })?;
 | 
			
		||||
        let new_routes = req
 | 
			
		||||
            .routes
 | 
			
		||||
            .into_option()
 | 
			
		||||
            .map(|r| r.Routes)
 | 
			
		||||
            .map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "empty update routes request")?;
 | 
			
		||||
 | 
			
		||||
        let mut sandbox = self.sandbox.lock().await;
 | 
			
		||||
 | 
			
		||||
        sandbox.rtnl.update_routes(new_routes).await.map_err(|e| {
 | 
			
		||||
            ttrpc_error(
 | 
			
		||||
                ttrpc::Code::INTERNAL,
 | 
			
		||||
                format!("Failed to update routes: {:?}", e),
 | 
			
		||||
            )
 | 
			
		||||
        })?;
 | 
			
		||||
        sandbox
 | 
			
		||||
            .rtnl
 | 
			
		||||
            .update_routes(new_routes)
 | 
			
		||||
            .await
 | 
			
		||||
            .map_ttrpc_err(|e| format!("Failed to update routes: {:?}", e))?;
 | 
			
		||||
 | 
			
		||||
        let list = sandbox.rtnl.list_routes().await.map_err(|e| {
 | 
			
		||||
            ttrpc_error(
 | 
			
		||||
                ttrpc::Code::INTERNAL,
 | 
			
		||||
                format!("Failed to list routes after update: {:?}", e),
 | 
			
		||||
            )
 | 
			
		||||
        })?;
 | 
			
		||||
        let list = sandbox
 | 
			
		||||
            .rtnl
 | 
			
		||||
            .list_routes()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_ttrpc_err(|e| format!("Failed to list routes after update: {:?}", e))?;
 | 
			
		||||
 | 
			
		||||
        Ok(protocols::agent::Routes {
 | 
			
		||||
            Routes: list,
 | 
			
		||||
@@ -996,13 +975,10 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        trace_rpc_call!(ctx, "update_mounts", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        match update_ephemeral_mounts(sl(), &req.storages, &self.sandbox).await {
 | 
			
		||||
            Ok(_) => Ok(Empty::new()),
 | 
			
		||||
            Err(e) => Err(ttrpc_error(
 | 
			
		||||
                ttrpc::Code::INTERNAL,
 | 
			
		||||
                format!("Failed to update mounts: {:?}", e),
 | 
			
		||||
            )),
 | 
			
		||||
        }
 | 
			
		||||
        update_ephemeral_mounts(sl(), &req.storages, &self.sandbox)
 | 
			
		||||
            .await
 | 
			
		||||
            .map_ttrpc_err(|e| format!("Failed to update mounts: {:?}", e))?;
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn get_ip_tables(
 | 
			
		||||
@@ -1032,16 +1008,13 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        }
 | 
			
		||||
        .to_string();
 | 
			
		||||
 | 
			
		||||
        match Command::new(cmd.clone()).output() {
 | 
			
		||||
            Ok(output) => Ok(GetIPTablesResponse {
 | 
			
		||||
        let output = Command::new(cmd.clone())
 | 
			
		||||
            .output()
 | 
			
		||||
            .map_ttrpc_err_do(|e| warn!(sl(), "failed to run {}: {:?}", cmd, e.kind()))?;
 | 
			
		||||
        Ok(GetIPTablesResponse {
 | 
			
		||||
            data: output.stdout,
 | 
			
		||||
            ..Default::default()
 | 
			
		||||
            }),
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                warn!(sl(), "failed to run {}: {:?}", cmd, e.kind());
 | 
			
		||||
                return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn set_ip_tables(
 | 
			
		||||
@@ -1071,20 +1044,14 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        }
 | 
			
		||||
        .to_string();
 | 
			
		||||
 | 
			
		||||
        let mut child = match Command::new(cmd.clone())
 | 
			
		||||
        let mut child = Command::new(cmd.clone())
 | 
			
		||||
            .arg("--wait")
 | 
			
		||||
            .arg(IPTABLES_RESTORE_WAIT_SEC.to_string())
 | 
			
		||||
            .stdin(Stdio::piped())
 | 
			
		||||
            .stdout(Stdio::piped())
 | 
			
		||||
            .stderr(Stdio::piped())
 | 
			
		||||
            .spawn()
 | 
			
		||||
        {
 | 
			
		||||
            Ok(child) => child,
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind());
 | 
			
		||||
                return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
            .map_ttrpc_err_do(|e| warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind()))?;
 | 
			
		||||
 | 
			
		||||
        let mut stdin = match child.stdin.take() {
 | 
			
		||||
            Some(si) => si,
 | 
			
		||||
@@ -1092,7 +1059,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
                println!("failed to get stdin from child");
 | 
			
		||||
                return Err(ttrpc_error(
 | 
			
		||||
                    ttrpc::Code::INTERNAL,
 | 
			
		||||
                    "failed to take stdin from child".to_string(),
 | 
			
		||||
                    "failed to take stdin from child",
 | 
			
		||||
                ));
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
@@ -1111,35 +1078,22 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
            };
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        if tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx)
 | 
			
		||||
        let _ = tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx)
 | 
			
		||||
            .await
 | 
			
		||||
            .is_err()
 | 
			
		||||
        {
 | 
			
		||||
            return Err(ttrpc_error(
 | 
			
		||||
                ttrpc::Code::INTERNAL,
 | 
			
		||||
                "timeout waiting for stdin writer to complete".to_string(),
 | 
			
		||||
            ));
 | 
			
		||||
        }
 | 
			
		||||
            .map_ttrpc_err(|_| "timeout waiting for stdin writer to complete")?;
 | 
			
		||||
 | 
			
		||||
        if handle.await.is_err() {
 | 
			
		||||
            return Err(ttrpc_error(
 | 
			
		||||
                ttrpc::Code::INTERNAL,
 | 
			
		||||
                "stdin writer thread failure".to_string(),
 | 
			
		||||
            ));
 | 
			
		||||
        }
 | 
			
		||||
        handle
 | 
			
		||||
            .await
 | 
			
		||||
            .map_ttrpc_err(|_| "stdin writer thread failure")?;
 | 
			
		||||
 | 
			
		||||
        let output = match child.wait_with_output() {
 | 
			
		||||
            Ok(o) => o,
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
        let output = child.wait_with_output().map_ttrpc_err_do(|e| {
 | 
			
		||||
            warn!(
 | 
			
		||||
                sl(),
 | 
			
		||||
                "failure waiting for spawned {} to complete: {:?}",
 | 
			
		||||
                cmd,
 | 
			
		||||
                e.kind()
 | 
			
		||||
                );
 | 
			
		||||
                return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
            )
 | 
			
		||||
        })?;
 | 
			
		||||
 | 
			
		||||
        if !output.status.success() {
 | 
			
		||||
            warn!(sl(), "{} failed: {:?}", cmd, output.stderr);
 | 
			
		||||
@@ -1174,12 +1128,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
            .rtnl
 | 
			
		||||
            .list_interfaces()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| {
 | 
			
		||||
                ttrpc_error(
 | 
			
		||||
                    ttrpc::Code::INTERNAL,
 | 
			
		||||
                    format!("Failed to list interfaces: {:?}", e),
 | 
			
		||||
                )
 | 
			
		||||
            })?;
 | 
			
		||||
            .map_ttrpc_err(|e| format!("Failed to list interfaces: {:?}", e))?;
 | 
			
		||||
 | 
			
		||||
        Ok(protocols::agent::Interfaces {
 | 
			
		||||
            Interfaces: list,
 | 
			
		||||
@@ -1202,7 +1151,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
            .rtnl
 | 
			
		||||
            .list_routes()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?;
 | 
			
		||||
            .map_ttrpc_err(|e| format!("list routes: {:?}", e))?;
 | 
			
		||||
 | 
			
		||||
        Ok(protocols::agent::Routes {
 | 
			
		||||
            Routes: list,
 | 
			
		||||
@@ -1241,30 +1190,24 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            for m in req.kernel_modules.iter() {
 | 
			
		||||
                load_kernel_module(m).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
                load_kernel_module(m).map_ttrpc_err(same)?;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            s.setup_shared_namespaces()
 | 
			
		||||
            s.setup_shared_namespaces().await.map_ttrpc_err(same)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let m = add_storages(sl(), &req.storages, &self.sandbox, None)
 | 
			
		||||
            .await
 | 
			
		||||
                .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        match add_storages(sl(), &req.storages, &self.sandbox, None).await {
 | 
			
		||||
            Ok(m) => {
 | 
			
		||||
            .map_ttrpc_err(same)?;
 | 
			
		||||
        self.sandbox.lock().await.mounts = m;
 | 
			
		||||
            }
 | 
			
		||||
            Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        match setup_guest_dns(sl(), &req.dns) {
 | 
			
		||||
            Ok(_) => {
 | 
			
		||||
        setup_guest_dns(sl(), &req.dns).map_ttrpc_err(same)?;
 | 
			
		||||
        {
 | 
			
		||||
            let mut s = self.sandbox.lock().await;
 | 
			
		||||
            for dns in req.dns {
 | 
			
		||||
                s.network.set_dns(dns);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
            Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
@@ -1279,10 +1222,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
 | 
			
		||||
        let mut sandbox = self.sandbox.lock().await;
 | 
			
		||||
        // destroy all containers, clean up, notify agent to exit etc.
 | 
			
		||||
        sandbox
 | 
			
		||||
            .destroy()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
        sandbox.destroy().await.map_ttrpc_err(same)?;
 | 
			
		||||
        // Close get_oom_event connection,
 | 
			
		||||
        // otherwise it will block the shutdown of ttrpc.
 | 
			
		||||
        drop(sandbox.event_tx.take());
 | 
			
		||||
@@ -1290,14 +1230,12 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        sandbox
 | 
			
		||||
            .sender
 | 
			
		||||
            .take()
 | 
			
		||||
            .ok_or_else(|| {
 | 
			
		||||
                ttrpc_error(
 | 
			
		||||
            .map_ttrpc_err(
 | 
			
		||||
                ttrpc::Code::INTERNAL,
 | 
			
		||||
                    "failed to get sandbox sender channel".to_string(),
 | 
			
		||||
                )
 | 
			
		||||
            })?
 | 
			
		||||
                "failed to get sandbox sender channel",
 | 
			
		||||
            )?
 | 
			
		||||
            .send(1)
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
            .map_ttrpc_err(same)?;
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
@@ -1314,12 +1252,10 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
            .neighbors
 | 
			
		||||
            .into_option()
 | 
			
		||||
            .map(|n| n.ARPNeighbors)
 | 
			
		||||
            .ok_or_else(|| {
 | 
			
		||||
                ttrpc_error(
 | 
			
		||||
            .map_ttrpc_err(
 | 
			
		||||
                ttrpc::Code::INVALID_ARGUMENT,
 | 
			
		||||
                    "empty add arp neighbours request".to_string(),
 | 
			
		||||
                )
 | 
			
		||||
            })?;
 | 
			
		||||
                "empty add arp neighbours request",
 | 
			
		||||
            )?;
 | 
			
		||||
 | 
			
		||||
        self.sandbox
 | 
			
		||||
            .lock()
 | 
			
		||||
@@ -1327,12 +1263,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
            .rtnl
 | 
			
		||||
            .add_arp_neighbors(neighs)
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| {
 | 
			
		||||
                ttrpc_error(
 | 
			
		||||
                    ttrpc::Code::INTERNAL,
 | 
			
		||||
                    format!("Failed to add ARP neighbours: {:?}", e),
 | 
			
		||||
                )
 | 
			
		||||
            })?;
 | 
			
		||||
            .map_ttrpc_err(|e| format!("Failed to add ARP neighbours: {:?}", e))?;
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
@@ -1346,9 +1277,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
        let sandbox = self.sandbox.lock().await;
 | 
			
		||||
 | 
			
		||||
        sandbox
 | 
			
		||||
            .online_cpu_memory(&req)
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
        sandbox.online_cpu_memory(&req).map_ttrpc_err(same)?;
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
@@ -1361,8 +1290,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        trace_rpc_call!(ctx, "reseed_random_dev", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        random::reseed_rng(req.data.as_slice())
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
        random::reseed_rng(req.data.as_slice()).map_ttrpc_err(same)?;
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
@@ -1378,21 +1306,16 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        info!(sl(), "get guest details!");
 | 
			
		||||
        let mut resp = GuestDetailsResponse::new();
 | 
			
		||||
        // to get memory block size
 | 
			
		||||
        match get_memory_info(
 | 
			
		||||
        let (u, v) = get_memory_info(
 | 
			
		||||
            req.mem_block_size,
 | 
			
		||||
            req.mem_hotplug_probe,
 | 
			
		||||
            SYSFS_MEMORY_BLOCK_SIZE_PATH,
 | 
			
		||||
            SYSFS_MEMORY_HOTPLUG_PROBE_PATH,
 | 
			
		||||
        ) {
 | 
			
		||||
            Ok((u, v)) => {
 | 
			
		||||
        )
 | 
			
		||||
        .map_ttrpc_err_do(|_| info!(sl(), "fail to get memory info!"))?;
 | 
			
		||||
 | 
			
		||||
        resp.mem_block_size_bytes = u;
 | 
			
		||||
        resp.support_mem_hotplug_probe = v;
 | 
			
		||||
            }
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                info!(sl(), "fail to get memory info!");
 | 
			
		||||
                return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // to get agent details
 | 
			
		||||
        let detail = get_agent_details();
 | 
			
		||||
@@ -1409,8 +1332,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
        do_mem_hotplug_by_probe(&req.memHotplugProbeAddr).map_ttrpc_err(same)?;
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
@@ -1423,8 +1345,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        trace_rpc_call!(ctx, "set_guest_date_time", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        do_set_guest_date_time(req.Sec, req.Usec)
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
        do_set_guest_date_time(req.Sec, req.Usec).map_ttrpc_err(same)?;
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
@@ -1437,7 +1358,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        trace_rpc_call!(ctx, "copy_file", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
        do_copy_file(&req).map_ttrpc_err(same)?;
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
@@ -1450,15 +1371,11 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        trace_rpc_call!(ctx, "get_metrics", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        match get_metrics(&req) {
 | 
			
		||||
            Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
 | 
			
		||||
            Ok(s) => {
 | 
			
		||||
        let s = get_metrics(&req).map_ttrpc_err(same)?;
 | 
			
		||||
        let mut metrics = Metrics::new();
 | 
			
		||||
        metrics.set_metrics(s);
 | 
			
		||||
        Ok(metrics)
 | 
			
		||||
    }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn get_oom_event(
 | 
			
		||||
        &self,
 | 
			
		||||
@@ -1471,16 +1388,16 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        let mut event_rx = event_rx.lock().await;
 | 
			
		||||
        drop(s);
 | 
			
		||||
 | 
			
		||||
        if let Some(container_id) = event_rx.recv().await {
 | 
			
		||||
        let container_id = event_rx
 | 
			
		||||
            .recv()
 | 
			
		||||
            .await
 | 
			
		||||
            .map_ttrpc_err(ttrpc::Code::INTERNAL, "")?;
 | 
			
		||||
 | 
			
		||||
        info!(sl(), "get_oom_event return {}", &container_id);
 | 
			
		||||
 | 
			
		||||
        let mut resp = OOMEvent::new();
 | 
			
		||||
        resp.container_id = container_id;
 | 
			
		||||
 | 
			
		||||
            return Ok(resp);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Err(ttrpc_error(ttrpc::Code::INTERNAL, ""))
 | 
			
		||||
        Ok(resp)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn get_volume_stats(
 | 
			
		||||
@@ -1495,28 +1412,23 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        let mut resp = VolumeStatsResponse::new();
 | 
			
		||||
        let mut condition = VolumeCondition::new();
 | 
			
		||||
 | 
			
		||||
        match File::open(&req.volume_guest_path) {
 | 
			
		||||
            Ok(_) => {
 | 
			
		||||
        File::open(&req.volume_guest_path)
 | 
			
		||||
            .map_ttrpc_err_do(|_| info!(sl(), "failed to open the volume"))?;
 | 
			
		||||
 | 
			
		||||
        condition.abnormal = false;
 | 
			
		||||
        condition.message = String::from("OK");
 | 
			
		||||
            }
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                info!(sl(), "failed to open the volume");
 | 
			
		||||
                return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let mut usage_vec = Vec::new();
 | 
			
		||||
 | 
			
		||||
        // to get volume capacity stats
 | 
			
		||||
        get_volume_capacity_stats(&req.volume_guest_path)
 | 
			
		||||
            .map(|u| usage_vec.push(u))
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
            .map_ttrpc_err(same)?;
 | 
			
		||||
 | 
			
		||||
        // to get volume inode stats
 | 
			
		||||
        get_volume_inode_stats(&req.volume_guest_path)
 | 
			
		||||
            .map(|u| usage_vec.push(u))
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
            .map_ttrpc_err(same)?;
 | 
			
		||||
 | 
			
		||||
        resp.usage = usage_vec;
 | 
			
		||||
        resp.volume_condition = MessageField::some(condition);
 | 
			
		||||
@@ -1531,9 +1443,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
        trace_rpc_call!(ctx, "add_swap", req);
 | 
			
		||||
        is_allowed(&req).await?;
 | 
			
		||||
 | 
			
		||||
        do_add_swap(&self.sandbox, &req)
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
        do_add_swap(&self.sandbox, &req).await.map_ttrpc_err(same)?;
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
@@ -1552,7 +1462,7 @@ impl agent_ttrpc::AgentService for AgentService {
 | 
			
		||||
            .await
 | 
			
		||||
            .set_policy(&req.policy)
 | 
			
		||||
            .await
 | 
			
		||||
            .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
 | 
			
		||||
            .map_ttrpc_err(same)?;
 | 
			
		||||
 | 
			
		||||
        Ok(Empty::new())
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user