mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 04:04:45 +00:00
agent: simplify error handling
We extend the `Result` and `Option` types with associated types that allows converting a `Result<T, E>` and `Option<T>` into `ttrpc::Result<T>`. This allows the elimination of many `match` statements in favor of calling the map function plus the `?` operator. This transformation simplifies the code. Fixes: #7624 Signed-off-by: Wedson Almeida Filho <walmeida@microsoft.com>
This commit is contained in:
parent
e107d1d94e
commit
76dac8f22c
@ -9,6 +9,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use std::ffi::{CString, OsStr};
|
||||
use std::fmt::Debug;
|
||||
use std::io;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::path::Path;
|
||||
@ -121,7 +122,7 @@ fn sl() -> slog::Logger {
|
||||
}
|
||||
|
||||
// Convenience function to wrap an error and response to ttrpc client
|
||||
fn ttrpc_error(code: ttrpc::Code, err: impl std::fmt::Debug) -> ttrpc::Error {
|
||||
fn ttrpc_error(code: ttrpc::Code, err: impl Debug) -> ttrpc::Error {
|
||||
get_rpc_status(code, format!("{:?}", err))
|
||||
}
|
||||
|
||||
@ -165,6 +166,36 @@ async fn is_allowed(req: &(impl MessageDyn + serde::Serialize)) -> ttrpc::Result
|
||||
res
|
||||
}
|
||||
|
||||
fn same<E>(e: E) -> E {
|
||||
e
|
||||
}
|
||||
|
||||
trait ResultToTtrpcResult<T, E: Debug>: Sized {
|
||||
fn map_ttrpc_err<R: Debug>(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result<T>;
|
||||
fn map_ttrpc_err_do(self, doer: impl FnOnce(&E)) -> ttrpc::Result<T> {
|
||||
self.map_ttrpc_err(|e| {
|
||||
doer(&e);
|
||||
e
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, E: Debug> ResultToTtrpcResult<T, E> for Result<T, E> {
|
||||
fn map_ttrpc_err<R: Debug>(self, msg_builder: impl FnOnce(E) -> R) -> ttrpc::Result<T> {
|
||||
self.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, msg_builder(e)))
|
||||
}
|
||||
}
|
||||
|
||||
trait OptionToTtrpcResult<T>: Sized {
|
||||
fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result<T>;
|
||||
}
|
||||
|
||||
impl<T> OptionToTtrpcResult<T> for Option<T> {
|
||||
fn map_ttrpc_err(self, code: ttrpc::Code, msg: &str) -> ttrpc::Result<T> {
|
||||
self.ok_or_else(|| ttrpc_error(code, msg))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AgentService {
|
||||
sandbox: Arc<Mutex<Sandbox>>,
|
||||
@ -642,10 +673,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "create_container", req);
|
||||
is_allowed(&req).await?;
|
||||
match self.do_create_container(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
}
|
||||
self.do_create_container(req).await.map_ttrpc_err(same)?;
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
async fn start_container(
|
||||
@ -655,10 +684,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "start_container", req);
|
||||
is_allowed(&req).await?;
|
||||
match self.do_start_container(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
}
|
||||
self.do_start_container(req).await.map_ttrpc_err(same)?;
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
async fn remove_container(
|
||||
@ -668,10 +695,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "remove_container", req);
|
||||
is_allowed(&req).await?;
|
||||
match self.do_remove_container(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
}
|
||||
self.do_remove_container(req).await.map_ttrpc_err(same)?;
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
async fn exec_process(
|
||||
@ -681,10 +706,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "exec_process", req);
|
||||
is_allowed(&req).await?;
|
||||
match self.do_exec_process(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
}
|
||||
self.do_exec_process(req).await.map_ttrpc_err(same)?;
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
async fn signal_process(
|
||||
@ -694,10 +717,8 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "signal_process", req);
|
||||
is_allowed(&req).await?;
|
||||
match self.do_signal_process(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
}
|
||||
self.do_signal_process(req).await.map_ttrpc_err(same)?;
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
async fn wait_process(
|
||||
@ -707,9 +728,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
) -> ttrpc::Result<WaitProcessResponse> {
|
||||
trace_rpc_call!(ctx, "wait_process", req);
|
||||
is_allowed(&req).await?;
|
||||
self.do_wait_process(req)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
|
||||
self.do_wait_process(req).await.map_ttrpc_err(same)
|
||||
}
|
||||
|
||||
async fn update_container(
|
||||
@ -721,18 +740,12 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
is_allowed(&req).await?;
|
||||
|
||||
let mut sandbox = self.sandbox.lock().await;
|
||||
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INVALID_ARGUMENT,
|
||||
"invalid container id".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
let ctr = sandbox
|
||||
.get_container(&req.container_id)
|
||||
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
|
||||
if let Some(res) = req.resources.as_ref() {
|
||||
let oci_res = rustjail::resources_grpc_to_oci(res);
|
||||
if let Err(e) = ctr.set(oci_res) {
|
||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||
}
|
||||
ctr.set(oci_res).map_ttrpc_err(same)?;
|
||||
}
|
||||
|
||||
Ok(Empty::new())
|
||||
@ -747,15 +760,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
is_allowed(&req).await?;
|
||||
|
||||
let mut sandbox = self.sandbox.lock().await;
|
||||
let ctr = sandbox.get_container(&req.container_id).ok_or_else(|| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INVALID_ARGUMENT,
|
||||
"invalid container id".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
ctr.stats()
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
|
||||
let ctr = sandbox
|
||||
.get_container(&req.container_id)
|
||||
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
|
||||
ctr.stats().map_ttrpc_err(same)
|
||||
}
|
||||
|
||||
async fn pause_container(
|
||||
@ -767,16 +775,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
is_allowed(&req).await?;
|
||||
|
||||
let mut sandbox = self.sandbox.lock().await;
|
||||
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INVALID_ARGUMENT,
|
||||
"invalid container id".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
ctr.pause()
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
|
||||
let ctr = sandbox
|
||||
.get_container(&req.container_id)
|
||||
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
|
||||
ctr.pause().map_ttrpc_err(same)?;
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
@ -789,16 +791,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
is_allowed(&req).await?;
|
||||
|
||||
let mut sandbox = self.sandbox.lock().await;
|
||||
let ctr = sandbox.get_container(req.container_id()).ok_or_else(|| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INVALID_ARGUMENT,
|
||||
"invalid container id".to_string(),
|
||||
)
|
||||
})?;
|
||||
|
||||
ctr.resume()
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
|
||||
let ctr = sandbox
|
||||
.get_container(&req.container_id)
|
||||
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "invalid container id")?;
|
||||
ctr.resume().map_ttrpc_err(same)?;
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
@ -809,8 +805,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req);
|
||||
is_allowed(&req).await?;
|
||||
let mount_infos = parse_mount_table("/proc/self/mountinfo")
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
let mount_infos = parse_mount_table("/proc/self/mountinfo").map_ttrpc_err(same)?;
|
||||
for m in &mount_infos {
|
||||
if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) {
|
||||
// stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more.
|
||||
@ -831,9 +826,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::WriteStreamRequest,
|
||||
) -> ttrpc::Result<WriteStreamResponse> {
|
||||
is_allowed(&req).await?;
|
||||
self.do_write_stream(req)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
|
||||
self.do_write_stream(req).await.map_ttrpc_err(same)
|
||||
}
|
||||
|
||||
async fn read_stdout(
|
||||
@ -842,9 +835,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::ReadStreamRequest,
|
||||
) -> ttrpc::Result<ReadStreamResponse> {
|
||||
is_allowed(&req).await?;
|
||||
self.do_read_stream(req, true)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
|
||||
self.do_read_stream(req, true).await.map_ttrpc_err(same)
|
||||
}
|
||||
|
||||
async fn read_stderr(
|
||||
@ -853,9 +844,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::ReadStreamRequest,
|
||||
) -> ttrpc::Result<ReadStreamResponse> {
|
||||
is_allowed(&req).await?;
|
||||
self.do_read_stream(req, false)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))
|
||||
self.do_read_stream(req, false).await.map_ttrpc_err(same)
|
||||
}
|
||||
|
||||
async fn close_stdin(
|
||||
@ -902,23 +891,20 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
)
|
||||
})?;
|
||||
|
||||
if let Some(fd) = p.term_master {
|
||||
unsafe {
|
||||
let win = winsize {
|
||||
ws_row: req.row as c_ushort,
|
||||
ws_col: req.column as c_ushort,
|
||||
ws_xpixel: 0,
|
||||
ws_ypixel: 0,
|
||||
};
|
||||
let fd = p
|
||||
.term_master
|
||||
.map_ttrpc_err(ttrpc::Code::UNAVAILABLE, "no tty")?;
|
||||
let win = winsize {
|
||||
ws_row: req.row as c_ushort,
|
||||
ws_col: req.column as c_ushort,
|
||||
ws_xpixel: 0,
|
||||
ws_ypixel: 0,
|
||||
};
|
||||
|
||||
let err = libc::ioctl(fd, TIOCSWINSZ, &win);
|
||||
Errno::result(err).map(drop).map_err(|e| {
|
||||
ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e))
|
||||
})?;
|
||||
}
|
||||
} else {
|
||||
return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string()));
|
||||
}
|
||||
let err = unsafe { libc::ioctl(fd, TIOCSWINSZ, &win) };
|
||||
Errno::result(err)
|
||||
.map(drop)
|
||||
.map_ttrpc_err(|e| format!("ioctl error: {:?}", e))?;
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -931,12 +917,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "update_interface", req);
|
||||
is_allowed(&req).await?;
|
||||
|
||||
let interface = req.interface.into_option().ok_or_else(|| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INVALID_ARGUMENT,
|
||||
"empty update interface request".to_string(),
|
||||
)
|
||||
})?;
|
||||
let interface = req.interface.into_option().map_ttrpc_err(
|
||||
ttrpc::Code::INVALID_ARGUMENT,
|
||||
"empty update interface request",
|
||||
)?;
|
||||
|
||||
self.sandbox
|
||||
.lock()
|
||||
@ -944,9 +928,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
.rtnl
|
||||
.update_interface(&interface)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e))
|
||||
})?;
|
||||
.map_ttrpc_err(|e| format!("update interface: {:?}", e))?;
|
||||
|
||||
Ok(interface)
|
||||
}
|
||||
@ -959,28 +941,25 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "update_routes", req);
|
||||
is_allowed(&req).await?;
|
||||
|
||||
let new_routes = req.routes.into_option().map(|r| r.Routes).ok_or_else(|| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INVALID_ARGUMENT,
|
||||
"empty update routes request".to_string(),
|
||||
)
|
||||
})?;
|
||||
let new_routes = req
|
||||
.routes
|
||||
.into_option()
|
||||
.map(|r| r.Routes)
|
||||
.map_ttrpc_err(ttrpc::Code::INVALID_ARGUMENT, "empty update routes request")?;
|
||||
|
||||
let mut sandbox = self.sandbox.lock().await;
|
||||
|
||||
sandbox.rtnl.update_routes(new_routes).await.map_err(|e| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
format!("Failed to update routes: {:?}", e),
|
||||
)
|
||||
})?;
|
||||
sandbox
|
||||
.rtnl
|
||||
.update_routes(new_routes)
|
||||
.await
|
||||
.map_ttrpc_err(|e| format!("Failed to update routes: {:?}", e))?;
|
||||
|
||||
let list = sandbox.rtnl.list_routes().await.map_err(|e| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
format!("Failed to list routes after update: {:?}", e),
|
||||
)
|
||||
})?;
|
||||
let list = sandbox
|
||||
.rtnl
|
||||
.list_routes()
|
||||
.await
|
||||
.map_ttrpc_err(|e| format!("Failed to list routes after update: {:?}", e))?;
|
||||
|
||||
Ok(protocols::agent::Routes {
|
||||
Routes: list,
|
||||
@ -996,13 +975,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "update_mounts", req);
|
||||
is_allowed(&req).await?;
|
||||
|
||||
match update_ephemeral_mounts(sl(), &req.storages, &self.sandbox).await {
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
Err(e) => Err(ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
format!("Failed to update mounts: {:?}", e),
|
||||
)),
|
||||
}
|
||||
update_ephemeral_mounts(sl(), &req.storages, &self.sandbox)
|
||||
.await
|
||||
.map_ttrpc_err(|e| format!("Failed to update mounts: {:?}", e))?;
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
async fn get_ip_tables(
|
||||
@ -1032,16 +1008,13 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
}
|
||||
.to_string();
|
||||
|
||||
match Command::new(cmd.clone()).output() {
|
||||
Ok(output) => Ok(GetIPTablesResponse {
|
||||
data: output.stdout,
|
||||
..Default::default()
|
||||
}),
|
||||
Err(e) => {
|
||||
warn!(sl(), "failed to run {}: {:?}", cmd, e.kind());
|
||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||
}
|
||||
}
|
||||
let output = Command::new(cmd.clone())
|
||||
.output()
|
||||
.map_ttrpc_err_do(|e| warn!(sl(), "failed to run {}: {:?}", cmd, e.kind()))?;
|
||||
Ok(GetIPTablesResponse {
|
||||
data: output.stdout,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
async fn set_ip_tables(
|
||||
@ -1071,20 +1044,14 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
}
|
||||
.to_string();
|
||||
|
||||
let mut child = match Command::new(cmd.clone())
|
||||
let mut child = Command::new(cmd.clone())
|
||||
.arg("--wait")
|
||||
.arg(IPTABLES_RESTORE_WAIT_SEC.to_string())
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Ok(child) => child,
|
||||
Err(e) => {
|
||||
warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind());
|
||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||
}
|
||||
};
|
||||
.map_ttrpc_err_do(|e| warn!(sl(), "failure to spawn {}: {:?}", cmd, e.kind()))?;
|
||||
|
||||
let mut stdin = match child.stdin.take() {
|
||||
Some(si) => si,
|
||||
@ -1092,7 +1059,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
println!("failed to get stdin from child");
|
||||
return Err(ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
"failed to take stdin from child".to_string(),
|
||||
"failed to take stdin from child",
|
||||
));
|
||||
}
|
||||
};
|
||||
@ -1111,35 +1078,22 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
};
|
||||
});
|
||||
|
||||
if tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx)
|
||||
let _ = tokio::time::timeout(Duration::from_secs(IPTABLES_RESTORE_WAIT_SEC), rx)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return Err(ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
"timeout waiting for stdin writer to complete".to_string(),
|
||||
));
|
||||
}
|
||||
.map_ttrpc_err(|_| "timeout waiting for stdin writer to complete")?;
|
||||
|
||||
if handle.await.is_err() {
|
||||
return Err(ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
"stdin writer thread failure".to_string(),
|
||||
));
|
||||
}
|
||||
handle
|
||||
.await
|
||||
.map_ttrpc_err(|_| "stdin writer thread failure")?;
|
||||
|
||||
let output = match child.wait_with_output() {
|
||||
Ok(o) => o,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
sl(),
|
||||
"failure waiting for spawned {} to complete: {:?}",
|
||||
cmd,
|
||||
e.kind()
|
||||
);
|
||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||
}
|
||||
};
|
||||
let output = child.wait_with_output().map_ttrpc_err_do(|e| {
|
||||
warn!(
|
||||
sl(),
|
||||
"failure waiting for spawned {} to complete: {:?}",
|
||||
cmd,
|
||||
e.kind()
|
||||
)
|
||||
})?;
|
||||
|
||||
if !output.status.success() {
|
||||
warn!(sl(), "{} failed: {:?}", cmd, output.stderr);
|
||||
@ -1174,12 +1128,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
.rtnl
|
||||
.list_interfaces()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
format!("Failed to list interfaces: {:?}", e),
|
||||
)
|
||||
})?;
|
||||
.map_ttrpc_err(|e| format!("Failed to list interfaces: {:?}", e))?;
|
||||
|
||||
Ok(protocols::agent::Interfaces {
|
||||
Interfaces: list,
|
||||
@ -1202,7 +1151,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
.rtnl
|
||||
.list_routes()
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?;
|
||||
.map_ttrpc_err(|e| format!("list routes: {:?}", e))?;
|
||||
|
||||
Ok(protocols::agent::Routes {
|
||||
Routes: list,
|
||||
@ -1241,30 +1190,24 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
}
|
||||
|
||||
for m in req.kernel_modules.iter() {
|
||||
load_kernel_module(m).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
load_kernel_module(m).map_ttrpc_err(same)?;
|
||||
}
|
||||
|
||||
s.setup_shared_namespaces()
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
s.setup_shared_namespaces().await.map_ttrpc_err(same)?;
|
||||
}
|
||||
|
||||
match add_storages(sl(), &req.storages, &self.sandbox, None).await {
|
||||
Ok(m) => {
|
||||
self.sandbox.lock().await.mounts = m;
|
||||
}
|
||||
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
||||
};
|
||||
let m = add_storages(sl(), &req.storages, &self.sandbox, None)
|
||||
.await
|
||||
.map_ttrpc_err(same)?;
|
||||
self.sandbox.lock().await.mounts = m;
|
||||
|
||||
match setup_guest_dns(sl(), &req.dns) {
|
||||
Ok(_) => {
|
||||
let mut s = self.sandbox.lock().await;
|
||||
for dns in req.dns {
|
||||
s.network.set_dns(dns);
|
||||
}
|
||||
setup_guest_dns(sl(), &req.dns).map_ttrpc_err(same)?;
|
||||
{
|
||||
let mut s = self.sandbox.lock().await;
|
||||
for dns in req.dns {
|
||||
s.network.set_dns(dns);
|
||||
}
|
||||
Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
||||
};
|
||||
}
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -1279,10 +1222,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
|
||||
let mut sandbox = self.sandbox.lock().await;
|
||||
// destroy all containers, clean up, notify agent to exit etc.
|
||||
sandbox
|
||||
.destroy()
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
sandbox.destroy().await.map_ttrpc_err(same)?;
|
||||
// Close get_oom_event connection,
|
||||
// otherwise it will block the shutdown of ttrpc.
|
||||
drop(sandbox.event_tx.take());
|
||||
@ -1290,14 +1230,12 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
sandbox
|
||||
.sender
|
||||
.take()
|
||||
.ok_or_else(|| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
"failed to get sandbox sender channel".to_string(),
|
||||
)
|
||||
})?
|
||||
.map_ttrpc_err(
|
||||
ttrpc::Code::INTERNAL,
|
||||
"failed to get sandbox sender channel",
|
||||
)?
|
||||
.send(1)
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
.map_ttrpc_err(same)?;
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -1314,12 +1252,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
.neighbors
|
||||
.into_option()
|
||||
.map(|n| n.ARPNeighbors)
|
||||
.ok_or_else(|| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INVALID_ARGUMENT,
|
||||
"empty add arp neighbours request".to_string(),
|
||||
)
|
||||
})?;
|
||||
.map_ttrpc_err(
|
||||
ttrpc::Code::INVALID_ARGUMENT,
|
||||
"empty add arp neighbours request",
|
||||
)?;
|
||||
|
||||
self.sandbox
|
||||
.lock()
|
||||
@ -1327,12 +1263,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
.rtnl
|
||||
.add_arp_neighbors(neighs)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ttrpc_error(
|
||||
ttrpc::Code::INTERNAL,
|
||||
format!("Failed to add ARP neighbours: {:?}", e),
|
||||
)
|
||||
})?;
|
||||
.map_ttrpc_err(|e| format!("Failed to add ARP neighbours: {:?}", e))?;
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -1346,9 +1277,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
is_allowed(&req).await?;
|
||||
let sandbox = self.sandbox.lock().await;
|
||||
|
||||
sandbox
|
||||
.online_cpu_memory(&req)
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
sandbox.online_cpu_memory(&req).map_ttrpc_err(same)?;
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -1361,8 +1290,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "reseed_random_dev", req);
|
||||
is_allowed(&req).await?;
|
||||
|
||||
random::reseed_rng(req.data.as_slice())
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
random::reseed_rng(req.data.as_slice()).map_ttrpc_err(same)?;
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -1378,21 +1306,16 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
info!(sl(), "get guest details!");
|
||||
let mut resp = GuestDetailsResponse::new();
|
||||
// to get memory block size
|
||||
match get_memory_info(
|
||||
let (u, v) = get_memory_info(
|
||||
req.mem_block_size,
|
||||
req.mem_hotplug_probe,
|
||||
SYSFS_MEMORY_BLOCK_SIZE_PATH,
|
||||
SYSFS_MEMORY_HOTPLUG_PROBE_PATH,
|
||||
) {
|
||||
Ok((u, v)) => {
|
||||
resp.mem_block_size_bytes = u;
|
||||
resp.support_mem_hotplug_probe = v;
|
||||
}
|
||||
Err(e) => {
|
||||
info!(sl(), "fail to get memory info!");
|
||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||
}
|
||||
}
|
||||
)
|
||||
.map_ttrpc_err_do(|_| info!(sl(), "fail to get memory info!"))?;
|
||||
|
||||
resp.mem_block_size_bytes = u;
|
||||
resp.support_mem_hotplug_probe = v;
|
||||
|
||||
// to get agent details
|
||||
let detail = get_agent_details();
|
||||
@ -1409,8 +1332,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
|
||||
is_allowed(&req).await?;
|
||||
|
||||
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr).map_ttrpc_err(same)?;
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -1423,8 +1345,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "set_guest_date_time", req);
|
||||
is_allowed(&req).await?;
|
||||
|
||||
do_set_guest_date_time(req.Sec, req.Usec)
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
do_set_guest_date_time(req.Sec, req.Usec).map_ttrpc_err(same)?;
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -1437,7 +1358,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "copy_file", req);
|
||||
is_allowed(&req).await?;
|
||||
|
||||
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
do_copy_file(&req).map_ttrpc_err(same)?;
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -1450,14 +1371,10 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "get_metrics", req);
|
||||
is_allowed(&req).await?;
|
||||
|
||||
match get_metrics(&req) {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e)),
|
||||
Ok(s) => {
|
||||
let mut metrics = Metrics::new();
|
||||
metrics.set_metrics(s);
|
||||
Ok(metrics)
|
||||
}
|
||||
}
|
||||
let s = get_metrics(&req).map_ttrpc_err(same)?;
|
||||
let mut metrics = Metrics::new();
|
||||
metrics.set_metrics(s);
|
||||
Ok(metrics)
|
||||
}
|
||||
|
||||
async fn get_oom_event(
|
||||
@ -1471,16 +1388,16 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
let mut event_rx = event_rx.lock().await;
|
||||
drop(s);
|
||||
|
||||
if let Some(container_id) = event_rx.recv().await {
|
||||
info!(sl(), "get_oom_event return {}", &container_id);
|
||||
let container_id = event_rx
|
||||
.recv()
|
||||
.await
|
||||
.map_ttrpc_err(ttrpc::Code::INTERNAL, "")?;
|
||||
|
||||
let mut resp = OOMEvent::new();
|
||||
resp.container_id = container_id;
|
||||
info!(sl(), "get_oom_event return {}", &container_id);
|
||||
|
||||
return Ok(resp);
|
||||
}
|
||||
|
||||
Err(ttrpc_error(ttrpc::Code::INTERNAL, ""))
|
||||
let mut resp = OOMEvent::new();
|
||||
resp.container_id = container_id;
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
async fn get_volume_stats(
|
||||
@ -1495,28 +1412,23 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
let mut resp = VolumeStatsResponse::new();
|
||||
let mut condition = VolumeCondition::new();
|
||||
|
||||
match File::open(&req.volume_guest_path) {
|
||||
Ok(_) => {
|
||||
condition.abnormal = false;
|
||||
condition.message = String::from("OK");
|
||||
}
|
||||
Err(e) => {
|
||||
info!(sl(), "failed to open the volume");
|
||||
return Err(ttrpc_error(ttrpc::Code::INTERNAL, e));
|
||||
}
|
||||
};
|
||||
File::open(&req.volume_guest_path)
|
||||
.map_ttrpc_err_do(|_| info!(sl(), "failed to open the volume"))?;
|
||||
|
||||
condition.abnormal = false;
|
||||
condition.message = String::from("OK");
|
||||
|
||||
let mut usage_vec = Vec::new();
|
||||
|
||||
// to get volume capacity stats
|
||||
get_volume_capacity_stats(&req.volume_guest_path)
|
||||
.map(|u| usage_vec.push(u))
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
.map_ttrpc_err(same)?;
|
||||
|
||||
// to get volume inode stats
|
||||
get_volume_inode_stats(&req.volume_guest_path)
|
||||
.map(|u| usage_vec.push(u))
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
.map_ttrpc_err(same)?;
|
||||
|
||||
resp.usage = usage_vec;
|
||||
resp.volume_condition = MessageField::some(condition);
|
||||
@ -1531,9 +1443,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
trace_rpc_call!(ctx, "add_swap", req);
|
||||
is_allowed(&req).await?;
|
||||
|
||||
do_add_swap(&self.sandbox, &req)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
do_add_swap(&self.sandbox, &req).await.map_ttrpc_err(same)?;
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
@ -1552,7 +1462,7 @@ impl agent_ttrpc::AgentService for AgentService {
|
||||
.await
|
||||
.set_policy(&req.policy)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e))?;
|
||||
.map_ttrpc_err(same)?;
|
||||
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user