diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index ac9b5c5c3b..ee3872d797 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -6,7 +6,7 @@ use std::path::Path; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; -use ttrpc; +use ttrpc::{self, error::get_rpc_status as ttrpc_error}; use anyhow::{anyhow, Context, Result}; use oci::{LinuxNamespace, Root, Spec}; @@ -519,10 +519,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { req: protocols::agent::CreateContainerRequest, ) -> ttrpc::Result { match self.do_create_container(req) { - Err(e) => Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - e.to_string(), - ))), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), } } @@ -533,10 +530,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { req: protocols::agent::StartContainerRequest, ) -> ttrpc::Result { match self.do_start_container(req) { - Err(e) => Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - e.to_string(), - ))), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), } } @@ -547,10 +541,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { req: protocols::agent::RemoveContainerRequest, ) -> ttrpc::Result { match self.do_remove_container(req) { - Err(e) => Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - e.to_string(), - ))), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), } } @@ -561,10 +552,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { req: protocols::agent::ExecProcessRequest, ) -> ttrpc::Result { match self.do_exec_process(req) { - Err(e) => Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - e.to_string(), - ))), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), } } @@ -575,10 +563,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { req: protocols::agent::SignalProcessRequest, ) -> ttrpc::Result { match self.do_signal_process(req) { - Err(e) => Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - e.to_string(), - ))), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(_) => Ok(Empty::new()), } } @@ -588,9 +573,8 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &ttrpc::TtrpcContext, req: protocols::agent::WaitProcessRequest, ) -> ttrpc::Result { - self.do_wait_process(req).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - }) + self.do_wait_process(req) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) } fn list_processes( @@ -606,12 +590,10 @@ impl protocols::agent_ttrpc::AgentService for agentService { let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().unwrap(); - let ctr = sandbox - .get_container(&cid) - .ok_or(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INVALID_ARGUMENT, - "invalid container id".to_string(), - )))?; + let ctr = sandbox.get_container(&cid).ok_or(ttrpc_error( + ttrpc::Code::INVALID_ARGUMENT, + "invalid container id".to_string(), + ))?; let pids = ctr.processes().unwrap(); @@ -622,10 +604,10 @@ impl protocols::agent_ttrpc::AgentService for agentService { return Ok(resp); } _ => { - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( + return Err(ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "invalid format!".to_string(), - ))); + )); } } @@ -688,12 +670,10 @@ impl protocols::agent_ttrpc::AgentService for agentService { let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().unwrap(); - let ctr = sandbox - .get_container(&cid) - .ok_or(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INVALID_ARGUMENT, - "invalid container id".to_string(), - )))?; + let ctr = sandbox.get_container(&cid).ok_or(ttrpc_error( + ttrpc::Code::INVALID_ARGUMENT, + "invalid container id".to_string(), + ))?; let resp = Empty::new(); @@ -701,10 +681,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { let ociRes = rustjail::resources_grpc_to_oci(&res.unwrap()); match ctr.set(ociRes) { Err(e) => { - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - e.to_string(), - ))); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())); } Ok(_) => return Ok(resp), @@ -723,16 +700,13 @@ impl protocols::agent_ttrpc::AgentService for agentService { let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().unwrap(); - let ctr = sandbox - .get_container(&cid) - .ok_or(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INVALID_ARGUMENT, - "invalid container id".to_string(), - )))?; + let ctr = sandbox.get_container(&cid).ok_or(ttrpc_error( + ttrpc::Code::INVALID_ARGUMENT, + "invalid container id".to_string(), + ))?; - ctr.stats().map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - }) + ctr.stats() + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) } fn pause_container( @@ -744,16 +718,13 @@ impl protocols::agent_ttrpc::AgentService for agentService { let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().unwrap(); - let ctr = sandbox - .get_container(&cid) - .ok_or(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INVALID_ARGUMENT, - "invalid container id".to_string(), - )))?; + let ctr = sandbox.get_container(&cid).ok_or(ttrpc_error( + ttrpc::Code::INVALID_ARGUMENT, + "invalid container id".to_string(), + ))?; - ctr.pause().map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - })?; + ctr.pause() + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; Ok(Empty::new()) } @@ -767,16 +738,13 @@ impl protocols::agent_ttrpc::AgentService for agentService { let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().unwrap(); - let ctr = sandbox - .get_container(&cid) - .ok_or(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INVALID_ARGUMENT, - "invalid container id".to_string(), - )))?; + let ctr = sandbox.get_container(&cid).ok_or(ttrpc_error( + ttrpc::Code::INVALID_ARGUMENT, + "invalid container id".to_string(), + ))?; - ctr.resume().map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - })?; + ctr.resume() + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; Ok(Empty::new()) } @@ -786,9 +754,8 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &ttrpc::TtrpcContext, req: protocols::agent::WriteStreamRequest, ) -> ttrpc::Result { - self.do_write_stream(req).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - }) + self.do_write_stream(req) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) } fn read_stdout( @@ -796,9 +763,8 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &ttrpc::TtrpcContext, req: protocols::agent::ReadStreamRequest, ) -> ttrpc::Result { - self.do_read_stream(req, true).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - }) + self.do_read_stream(req, true) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) } fn read_stderr( @@ -806,9 +772,8 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &ttrpc::TtrpcContext, req: protocols::agent::ReadStreamRequest, ) -> ttrpc::Result { - self.do_read_stream(req, false).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - }) + self.do_read_stream(req, false) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) } fn close_stdin( @@ -822,10 +787,10 @@ impl protocols::agent_ttrpc::AgentService for agentService { let mut sandbox = s.lock().unwrap(); let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status( + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, format!("invalid argument: {:?}", e), - )) + ) })?; if p.term_master.is_some() { @@ -851,17 +816,14 @@ impl protocols::agent_ttrpc::AgentService for agentService { let s = Arc::clone(&self.sandbox); let mut sandbox = s.lock().unwrap(); let p = find_process(&mut sandbox, cid.as_str(), eid.as_str(), false).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status( + ttrpc_error( ttrpc::Code::UNAVAILABLE, format!("invalid argument: {:?}", e), - )) + ) })?; if p.term_master.is_none() { - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::UNAVAILABLE, - "no tty".to_string(), - ))); + return Err(ttrpc_error(ttrpc::Code::UNAVAILABLE, "no tty".to_string())); } let fd = p.term_master.unwrap(); @@ -874,12 +836,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { }; let err = libc::ioctl(fd, TIOCSWINSZ, &win); - Errno::result(err).map(drop).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - format!("ioctl error: {:?}", e), - )) - })?; + Errno::result(err) + .map(drop) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("ioctl error: {:?}", e)))?; } Ok(Empty::new()) @@ -891,10 +850,10 @@ impl protocols::agent_ttrpc::AgentService for agentService { req: protocols::agent::UpdateInterfaceRequest, ) -> ttrpc::Result { if req.interface.is_none() { - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( + return Err(ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, format!("empty update interface request"), - ))); + )); } let interface = req.interface.clone(); @@ -910,10 +869,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { let iface = rtnl .update_interface(interface.as_ref().unwrap()) .map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - format!("update interface: {:?}", e), - )) + ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e)) })?; Ok(iface) @@ -926,10 +882,10 @@ impl protocols::agent_ttrpc::AgentService for agentService { ) -> ttrpc::Result { let mut routes = protocols::agent::Routes::new(); if req.routes.is_none() { - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( + return Err(ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, format!("empty update routes request"), - ))); + )); } let rs = req.routes.clone().unwrap().Routes.into_vec(); @@ -944,12 +900,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { let rtnl = sandbox.rtnl.as_mut().unwrap(); // get current routes to return when error out - let crs = rtnl.list_routes().map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - format!("update routes: {:?}", e), - )) - })?; + let crs = rtnl + .list_routes() + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("update routes: {:?}", e)))?; let v = match rtnl.update_routes(rs.as_ref()) { Ok(value) => value, @@ -975,12 +928,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { } let rtnl = sandbox.rtnl.as_mut().unwrap(); - let v = rtnl.list_interfaces().map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - format!("list interface: {:?}", e), - )) - })?; + let v = rtnl + .list_interfaces() + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list interface: {:?}", e)))?; interface.set_Interfaces(RepeatedField::from_vec(v)); @@ -1002,12 +952,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { let rtnl = sandbox.rtnl.as_mut().unwrap(); - let v = rtnl.list_routes().map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - format!("list routes: {:?}", e), - )) - })?; + let v = rtnl + .list_routes() + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; routes.set_Routes(RepeatedField::from_vec(v)); @@ -1060,14 +1007,12 @@ impl protocols::agent_ttrpc::AgentService for agentService { } for m in req.kernel_modules.iter() { - let _ = load_kernel_module(m).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - })?; + let _ = load_kernel_module(m) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; } - s.setup_shared_namespaces().map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - })?; + s.setup_shared_namespaces() + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; } match add_storages(sl!(), req.storages.to_vec(), self.sandbox.clone()) { @@ -1076,12 +1021,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { let mut s = sandbox.lock().unwrap(); s.mounts = m } - Err(e) => { - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - e.to_string(), - ))) - } + Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), }; match setup_guest_dns(sl!(), req.dns.to_vec()) { @@ -1094,12 +1034,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { .iter() .map(|dns| s.network.set_dns(dns.to_string())); } - Err(e) => { - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - e.to_string(), - ))) - } + Err(e) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), }; Ok(Empty::new()) @@ -1128,10 +1063,10 @@ impl protocols::agent_ttrpc::AgentService for agentService { req: protocols::agent::AddARPNeighborsRequest, ) -> ttrpc::Result { if req.neighbors.is_none() { - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( + return Err(ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, format!("empty add arp neighbours request"), - ))); + )); } let neighs = req.neighbors.clone().unwrap().ARPNeighbors.into_vec(); @@ -1145,9 +1080,8 @@ impl protocols::agent_ttrpc::AgentService for agentService { let rtnl = sandbox.rtnl.as_mut().unwrap(); - rtnl.add_arp_neighbors(neighs.as_ref()).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - })?; + rtnl.add_arp_neighbors(neighs.as_ref()) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; Ok(Empty::new()) } @@ -1160,9 +1094,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { let s = Arc::clone(&self.sandbox); let sandbox = s.lock().unwrap(); - sandbox.online_cpu_memory(&req).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - })?; + sandbox + .online_cpu_memory(&req) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; Ok(Empty::new()) } @@ -1172,9 +1106,8 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &ttrpc::TtrpcContext, req: protocols::agent::ReseedRandomDevRequest, ) -> ttrpc::Result { - random::reseed_rng(req.data.as_slice()).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - })?; + random::reseed_rng(req.data.as_slice()) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; Ok(Empty::new()) } @@ -1194,10 +1127,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { } Err(e) => { info!(sl!(), "fail to get memory info!"); - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - e.to_string(), - ))); + return Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())); } } @@ -1213,9 +1143,8 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &ttrpc::TtrpcContext, req: protocols::agent::MemHotplugByProbeRequest, ) -> ttrpc::Result { - do_mem_hotplug_by_probe(&req.memHotplugProbeAddr).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - })?; + do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; Ok(Empty::new()) } @@ -1225,9 +1154,8 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &ttrpc::TtrpcContext, req: protocols::agent::SetGuestDateTimeRequest, ) -> ttrpc::Result { - do_set_guest_date_time(req.Sec, req.Usec).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - })?; + do_set_guest_date_time(req.Sec, req.Usec) + .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; Ok(Empty::new()) } @@ -1237,9 +1165,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &ttrpc::TtrpcContext, req: protocols::agent::CopyFileRequest, ) -> ttrpc::Result { - do_copy_file(&req).map_err(|e| { - ttrpc::Error::RpcStatus(ttrpc::get_status(ttrpc::Code::INTERNAL, e.to_string())) - })?; + do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; Ok(Empty::new()) } @@ -1250,10 +1176,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { req: protocols::agent::GetMetricsRequest, ) -> ttrpc::Result { match get_metrics(&req) { - Err(e) => Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - e.to_string(), - ))), + Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())), Ok(s) => { let mut metrics = Metrics::new(); metrics.set_metrics(s); @@ -1275,12 +1198,7 @@ impl protocols::agent_ttrpc::AgentService for agentService { drop(sandbox); match event_rx.recv() { - Err(err) => { - return Err(ttrpc::Error::RpcStatus(ttrpc::get_status( - ttrpc::Code::INTERNAL, - err.to_string(), - ))) - } + Err(err) => return Err(ttrpc_error(ttrpc::Code::INTERNAL, err.to_string())), Ok(container_id) => { info!(sl!(), "get_oom_event return {}", &container_id); let mut resp = OOMEvent::new();