From 33367be4c70c7a84a96b15d737d2c5dc639b8e69 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Tue, 19 Jan 2021 09:48:51 -0800 Subject: [PATCH] agent: Integrate netlink This patch integrates new netlink module routines with the agent (mainly replaces calls to old netlink module with the new one). Fixes: #1294 Signed-off-by: Maksym Pavlenko --- src/agent/src/main.rs | 9 +- src/agent/src/rpc.rs | 179 +++++++++++++++++++-------------------- src/agent/src/sandbox.rs | 15 ++-- 3 files changed, 96 insertions(+), 107 deletions(-) diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index 2a13d16ae1..4675f785ab 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -26,7 +26,6 @@ extern crate scopeguard; extern crate slog; extern crate netlink; -use crate::netlink::{RtnlHandle, NETLINK_ROUTE}; use anyhow::{anyhow, Context, Result}; use nix::fcntl::{self, OFlag}; use nix::fcntl::{FcntlArg, FdFlag}; @@ -277,13 +276,9 @@ async fn start_sandbox(logger: &Logger, config: &agentConfig, init_mode: bool) - }; // Initialize unique sandbox structure. - let mut s = Sandbox::new(&logger).context("Failed to create sandbox")?; - + let s = Sandbox::new(&logger).context("Failed to create sandbox")?; if init_mode { - let mut rtnl = RtnlHandle::new(NETLINK_ROUTE, 0).unwrap(); - rtnl.handle_localhost()?; - - s.rtnl = Some(rtnl); + s.rtnl.handle_localhost().await?; } let sandbox = Arc::new(Mutex::new(s)); diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 7d9a657ade..b06e48fa4a 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -51,7 +51,6 @@ use crate::random; use crate::sandbox::Sandbox; use crate::version::{AGENT_VERSION, API_VERSION}; use crate::AGENT_CONFIG; -use netlink::{RtnlHandle, NETLINK_ROUTE}; use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ}; use std::convert::TryFrom; @@ -850,30 +849,24 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &TtrpcContext, req: protocols::agent::UpdateInterfaceRequest, ) -> ttrpc::Result { - if req.interface.is_none() { - return Err(ttrpc_error( + let interface = req.interface.into_option().ok_or_else(|| { + ttrpc_error( ttrpc::Code::INVALID_ARGUMENT, "empty update interface request".to_string(), - )); - } + ) + })?; - let interface = req.interface; - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; - - if sandbox.rtnl.is_none() { - sandbox.rtnl = Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()); - } - - let rtnl = sandbox.rtnl.as_mut().unwrap(); - - let iface = rtnl - .update_interface(interface.as_ref().unwrap()) + self.sandbox + .lock() + .await + .rtnl + .update_interface(&interface) + .await .map_err(|e| { ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e)) })?; - Ok(iface) + Ok(interface) } async fn update_routes( @@ -881,38 +874,37 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &TtrpcContext, req: protocols::agent::UpdateRoutesRequest, ) -> ttrpc::Result { - let mut routes = protocols::agent::Routes::new(); - if req.routes.is_none() { - return Err(ttrpc_error( - ttrpc::Code::INVALID_ARGUMENT, - "empty update routes request".to_string(), - )); - } + let new_routes = req + .routes + .into_option() + .map(|r| r.Routes.into_vec()) + .ok_or_else(|| { + ttrpc_error( + ttrpc::Code::INVALID_ARGUMENT, + "empty update routes request".to_string(), + ) + })?; - let rs = req.routes.unwrap().Routes.into_vec(); + let mut sandbox = self.sandbox.lock().await; - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; + sandbox.rtnl.update_routes(new_routes).await.map_err(|e| { + ttrpc_error( + ttrpc::Code::INTERNAL, + format!("Failed to update routes: {:?}", e), + ) + })?; - if sandbox.rtnl.is_none() { - sandbox.rtnl = Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()); - } + let list = sandbox.rtnl.list_routes().await.map_err(|e| { + ttrpc_error( + ttrpc::Code::INTERNAL, + format!("Failed to list routes after update: {:?}", e), + ) + })?; - let rtnl = sandbox.rtnl.as_mut().unwrap(); - - // get current routes to return when error out - let crs = rtnl - .list_routes() - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("update routes: {:?}", e)))?; - - let v = match rtnl.update_routes(rs.as_ref()) { - Ok(value) => value, - Err(_) => crs, - }; - - routes.set_Routes(RepeatedField::from_vec(v)); - - Ok(routes) + Ok(protocols::agent::Routes { + Routes: RepeatedField::from_vec(list), + ..Default::default() + }) } async fn list_interfaces( @@ -920,22 +912,24 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &TtrpcContext, _req: protocols::agent::ListInterfacesRequest, ) -> ttrpc::Result { - let mut interface = protocols::agent::Interfaces::new(); - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; - - if sandbox.rtnl.is_none() { - sandbox.rtnl = Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()); - } - - let rtnl = sandbox.rtnl.as_mut().unwrap(); - let v = rtnl + let list = self + .sandbox + .lock() + .await + .rtnl .list_interfaces() - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list interface: {:?}", e)))?; + .await + .map_err(|e| { + ttrpc_error( + ttrpc::Code::INTERNAL, + format!("Failed to list interfaces: {:?}", e), + ) + })?; - interface.set_Interfaces(RepeatedField::from_vec(v)); - - Ok(interface) + Ok(protocols::agent::Interfaces { + Interfaces: RepeatedField::from_vec(list), + ..Default::default() + }) } async fn list_routes( @@ -943,23 +937,19 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &TtrpcContext, _req: protocols::agent::ListRoutesRequest, ) -> ttrpc::Result { - let mut routes = protocols::agent::Routes::new(); - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; - - if sandbox.rtnl.is_none() { - sandbox.rtnl = Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()); - } - - let rtnl = sandbox.rtnl.as_mut().unwrap(); - - let v = rtnl + let list = self + .sandbox + .lock() + .await + .rtnl .list_routes() + .await .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; - routes.set_Routes(RepeatedField::from_vec(v)); - - Ok(routes) + Ok(protocols::agent::Routes { + Routes: RepeatedField::from_vec(list), + ..Default::default() + }) } async fn start_tracing( @@ -1062,26 +1052,29 @@ impl protocols::agent_ttrpc::AgentService for agentService { _ctx: &TtrpcContext, req: protocols::agent::AddARPNeighborsRequest, ) -> ttrpc::Result { - if req.neighbors.is_none() { - return Err(ttrpc_error( - ttrpc::Code::INVALID_ARGUMENT, - "empty add arp neighbours request".to_string(), - )); - } + let neighs = req + .neighbors + .into_option() + .map(|n| n.ARPNeighbors.into_vec()) + .ok_or_else(|| { + ttrpc_error( + ttrpc::Code::INVALID_ARGUMENT, + "empty add arp neighbours request".to_string(), + ) + })?; - let neighs = req.neighbors.unwrap().ARPNeighbors.into_vec(); - - let s = Arc::clone(&self.sandbox); - let mut sandbox = s.lock().await; - - if sandbox.rtnl.is_none() { - sandbox.rtnl = Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()); - } - - let rtnl = sandbox.rtnl.as_mut().unwrap(); - - rtnl.add_arp_neighbors(neighs.as_ref()) - .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; + self.sandbox + .lock() + .await + .rtnl + .add_arp_neighbors(neighs) + .await + .map_err(|e| { + ttrpc_error( + ttrpc::Code::INTERNAL, + format!("Failed to add ARP neighbours: {:?}", e), + ) + })?; Ok(Empty::new()) } diff --git a/src/agent/src/sandbox.rs b/src/agent/src/sandbox.rs index 93533d1b64..bbdfd39652 100644 --- a/src/agent/src/sandbox.rs +++ b/src/agent/src/sandbox.rs @@ -6,10 +6,10 @@ use crate::linux_abi::*; use crate::mount::{get_mount_fs_type, remove_mounts, TYPEROOTFS}; use crate::namespace::Namespace; +use crate::netlink2::Handle; use crate::network::Network; use anyhow::{anyhow, Context, Result}; use libc::pid_t; -use netlink::{RtnlHandle, NETLINK_ROUTE}; use oci::{Hook, Hooks}; use protocols::agent::OnlineCPUMemRequest; use regex::Regex; @@ -44,7 +44,7 @@ pub struct Sandbox { pub running: bool, pub no_pivot_root: bool, pub sender: Option>, - pub rtnl: Option, + pub rtnl: Handle, pub hooks: Option, pub event_rx: Arc>>, pub event_tx: Sender, @@ -73,7 +73,7 @@ impl Sandbox { running: false, no_pivot_root: fs_type.eq(TYPEROOTFS), sender: None, - rtnl: Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()), + rtnl: Handle::new()?, hooks: None, event_rx, event_tx: tx, @@ -680,8 +680,8 @@ mod tests { assert!(cnt.is_none()); } - #[test] - fn add_and_get_container() { + #[tokio::test] + async fn add_and_get_container() { skip_if_not_root!(); let logger = slog::Logger::root(slog::Discard, o!()); let mut s = Sandbox::new(&logger).unwrap(); @@ -707,8 +707,9 @@ mod tests { let ns_path = format!("/proc/{}/ns/pid", test_pid); assert_eq!(s.sandbox_pidns.unwrap().path, ns_path); } - #[test] - fn add_guest_hooks() { + + #[tokio::test] + async fn add_guest_hooks() { let logger = slog::Logger::root(slog::Discard, o!()); let mut s = Sandbox::new(&logger).unwrap(); let tmpdir = Builder::new().tempdir().unwrap();