agent: Integrate netlink

This patch integrates new netlink module routines with the agent (mainly
replaces calls to old netlink module with the new one).

Fixes: #1294

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2021-01-19 09:48:51 -08:00
parent 23f3aefa1d
commit 33367be4c7
3 changed files with 96 additions and 107 deletions

View File

@ -26,7 +26,6 @@ extern crate scopeguard;
extern crate slog; extern crate slog;
extern crate netlink; extern crate netlink;
use crate::netlink::{RtnlHandle, NETLINK_ROUTE};
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use nix::fcntl::{self, OFlag}; use nix::fcntl::{self, OFlag};
use nix::fcntl::{FcntlArg, FdFlag}; use nix::fcntl::{FcntlArg, FdFlag};
@ -277,13 +276,9 @@ async fn start_sandbox(logger: &Logger, config: &agentConfig, init_mode: bool) -
}; };
// Initialize unique sandbox structure. // Initialize unique sandbox structure.
let mut s = Sandbox::new(&logger).context("Failed to create sandbox")?; let s = Sandbox::new(&logger).context("Failed to create sandbox")?;
if init_mode { if init_mode {
let mut rtnl = RtnlHandle::new(NETLINK_ROUTE, 0).unwrap(); s.rtnl.handle_localhost().await?;
rtnl.handle_localhost()?;
s.rtnl = Some(rtnl);
} }
let sandbox = Arc::new(Mutex::new(s)); let sandbox = Arc::new(Mutex::new(s));

View File

@ -51,7 +51,6 @@ use crate::random;
use crate::sandbox::Sandbox; use crate::sandbox::Sandbox;
use crate::version::{AGENT_VERSION, API_VERSION}; use crate::version::{AGENT_VERSION, API_VERSION};
use crate::AGENT_CONFIG; use crate::AGENT_CONFIG;
use netlink::{RtnlHandle, NETLINK_ROUTE};
use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ}; use libc::{self, c_ushort, pid_t, winsize, TIOCSWINSZ};
use std::convert::TryFrom; use std::convert::TryFrom;
@ -850,30 +849,24 @@ impl protocols::agent_ttrpc::AgentService for agentService {
_ctx: &TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::UpdateInterfaceRequest, req: protocols::agent::UpdateInterfaceRequest,
) -> ttrpc::Result<Interface> { ) -> ttrpc::Result<Interface> {
if req.interface.is_none() { let interface = req.interface.into_option().ok_or_else(|| {
return Err(ttrpc_error( ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT, ttrpc::Code::INVALID_ARGUMENT,
"empty update interface request".to_string(), "empty update interface request".to_string(),
)); )
} })?;
let interface = req.interface; self.sandbox
let s = Arc::clone(&self.sandbox); .lock()
let mut sandbox = s.lock().await; .await
.rtnl
if sandbox.rtnl.is_none() { .update_interface(&interface)
sandbox.rtnl = Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()); .await
}
let rtnl = sandbox.rtnl.as_mut().unwrap();
let iface = rtnl
.update_interface(interface.as_ref().unwrap())
.map_err(|e| { .map_err(|e| {
ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e)) ttrpc_error(ttrpc::Code::INTERNAL, format!("update interface: {:?}", e))
})?; })?;
Ok(iface) Ok(interface)
} }
async fn update_routes( async fn update_routes(
@ -881,38 +874,37 @@ impl protocols::agent_ttrpc::AgentService for agentService {
_ctx: &TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::UpdateRoutesRequest, req: protocols::agent::UpdateRoutesRequest,
) -> ttrpc::Result<Routes> { ) -> ttrpc::Result<Routes> {
let mut routes = protocols::agent::Routes::new(); let new_routes = req
if req.routes.is_none() { .routes
return Err(ttrpc_error( .into_option()
.map(|r| r.Routes.into_vec())
.ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT, ttrpc::Code::INVALID_ARGUMENT,
"empty update routes request".to_string(), "empty update routes request".to_string(),
)); )
} })?;
let rs = req.routes.unwrap().Routes.into_vec(); let mut sandbox = self.sandbox.lock().await;
let s = Arc::clone(&self.sandbox); sandbox.rtnl.update_routes(new_routes).await.map_err(|e| {
let mut sandbox = s.lock().await; ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to update routes: {:?}", e),
)
})?;
if sandbox.rtnl.is_none() { let list = sandbox.rtnl.list_routes().await.map_err(|e| {
sandbox.rtnl = Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()); ttrpc_error(
} ttrpc::Code::INTERNAL,
format!("Failed to list routes after update: {:?}", e),
)
})?;
let rtnl = sandbox.rtnl.as_mut().unwrap(); Ok(protocols::agent::Routes {
Routes: RepeatedField::from_vec(list),
// get current routes to return when error out ..Default::default()
let crs = rtnl })
.list_routes()
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("update routes: {:?}", e)))?;
let v = match rtnl.update_routes(rs.as_ref()) {
Ok(value) => value,
Err(_) => crs,
};
routes.set_Routes(RepeatedField::from_vec(v));
Ok(routes)
} }
async fn list_interfaces( async fn list_interfaces(
@ -920,22 +912,24 @@ impl protocols::agent_ttrpc::AgentService for agentService {
_ctx: &TtrpcContext, _ctx: &TtrpcContext,
_req: protocols::agent::ListInterfacesRequest, _req: protocols::agent::ListInterfacesRequest,
) -> ttrpc::Result<Interfaces> { ) -> ttrpc::Result<Interfaces> {
let mut interface = protocols::agent::Interfaces::new(); let list = self
let s = Arc::clone(&self.sandbox); .sandbox
let mut sandbox = s.lock().await; .lock()
.await
if sandbox.rtnl.is_none() { .rtnl
sandbox.rtnl = Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap());
}
let rtnl = sandbox.rtnl.as_mut().unwrap();
let v = rtnl
.list_interfaces() .list_interfaces()
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list interface: {:?}", e)))?; .await
.map_err(|e| {
ttrpc_error(
ttrpc::Code::INTERNAL,
format!("Failed to list interfaces: {:?}", e),
)
})?;
interface.set_Interfaces(RepeatedField::from_vec(v)); Ok(protocols::agent::Interfaces {
Interfaces: RepeatedField::from_vec(list),
Ok(interface) ..Default::default()
})
} }
async fn list_routes( async fn list_routes(
@ -943,23 +937,19 @@ impl protocols::agent_ttrpc::AgentService for agentService {
_ctx: &TtrpcContext, _ctx: &TtrpcContext,
_req: protocols::agent::ListRoutesRequest, _req: protocols::agent::ListRoutesRequest,
) -> ttrpc::Result<Routes> { ) -> ttrpc::Result<Routes> {
let mut routes = protocols::agent::Routes::new(); let list = self
let s = Arc::clone(&self.sandbox); .sandbox
let mut sandbox = s.lock().await; .lock()
.await
if sandbox.rtnl.is_none() { .rtnl
sandbox.rtnl = Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap());
}
let rtnl = sandbox.rtnl.as_mut().unwrap();
let v = rtnl
.list_routes() .list_routes()
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?; .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, format!("list routes: {:?}", e)))?;
routes.set_Routes(RepeatedField::from_vec(v)); Ok(protocols::agent::Routes {
Routes: RepeatedField::from_vec(list),
Ok(routes) ..Default::default()
})
} }
async fn start_tracing( async fn start_tracing(
@ -1062,26 +1052,29 @@ impl protocols::agent_ttrpc::AgentService for agentService {
_ctx: &TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::AddARPNeighborsRequest, req: protocols::agent::AddARPNeighborsRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
if req.neighbors.is_none() { let neighs = req
return Err(ttrpc_error( .neighbors
.into_option()
.map(|n| n.ARPNeighbors.into_vec())
.ok_or_else(|| {
ttrpc_error(
ttrpc::Code::INVALID_ARGUMENT, ttrpc::Code::INVALID_ARGUMENT,
"empty add arp neighbours request".to_string(), "empty add arp neighbours request".to_string(),
)); )
} })?;
let neighs = req.neighbors.unwrap().ARPNeighbors.into_vec(); self.sandbox
.lock()
let s = Arc::clone(&self.sandbox); .await
let mut sandbox = s.lock().await; .rtnl
.add_arp_neighbors(neighs)
if sandbox.rtnl.is_none() { .await
sandbox.rtnl = Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()); .map_err(|e| {
} ttrpc_error(
ttrpc::Code::INTERNAL,
let rtnl = sandbox.rtnl.as_mut().unwrap(); format!("Failed to add ARP neighbours: {:?}", e),
)
rtnl.add_arp_neighbors(neighs.as_ref()) })?;
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
Ok(Empty::new()) Ok(Empty::new())
} }

View File

@ -6,10 +6,10 @@
use crate::linux_abi::*; use crate::linux_abi::*;
use crate::mount::{get_mount_fs_type, remove_mounts, TYPEROOTFS}; use crate::mount::{get_mount_fs_type, remove_mounts, TYPEROOTFS};
use crate::namespace::Namespace; use crate::namespace::Namespace;
use crate::netlink2::Handle;
use crate::network::Network; use crate::network::Network;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use libc::pid_t; use libc::pid_t;
use netlink::{RtnlHandle, NETLINK_ROUTE};
use oci::{Hook, Hooks}; use oci::{Hook, Hooks};
use protocols::agent::OnlineCPUMemRequest; use protocols::agent::OnlineCPUMemRequest;
use regex::Regex; use regex::Regex;
@ -44,7 +44,7 @@ pub struct Sandbox {
pub running: bool, pub running: bool,
pub no_pivot_root: bool, pub no_pivot_root: bool,
pub sender: Option<tokio::sync::oneshot::Sender<i32>>, pub sender: Option<tokio::sync::oneshot::Sender<i32>>,
pub rtnl: Option<RtnlHandle>, pub rtnl: Handle,
pub hooks: Option<Hooks>, pub hooks: Option<Hooks>,
pub event_rx: Arc<Mutex<Receiver<String>>>, pub event_rx: Arc<Mutex<Receiver<String>>>,
pub event_tx: Sender<String>, pub event_tx: Sender<String>,
@ -73,7 +73,7 @@ impl Sandbox {
running: false, running: false,
no_pivot_root: fs_type.eq(TYPEROOTFS), no_pivot_root: fs_type.eq(TYPEROOTFS),
sender: None, sender: None,
rtnl: Some(RtnlHandle::new(NETLINK_ROUTE, 0).unwrap()), rtnl: Handle::new()?,
hooks: None, hooks: None,
event_rx, event_rx,
event_tx: tx, event_tx: tx,
@ -680,8 +680,8 @@ mod tests {
assert!(cnt.is_none()); assert!(cnt.is_none());
} }
#[test] #[tokio::test]
fn add_and_get_container() { async fn add_and_get_container() {
skip_if_not_root!(); skip_if_not_root!();
let logger = slog::Logger::root(slog::Discard, o!()); let logger = slog::Logger::root(slog::Discard, o!());
let mut s = Sandbox::new(&logger).unwrap(); let mut s = Sandbox::new(&logger).unwrap();
@ -707,8 +707,9 @@ mod tests {
let ns_path = format!("/proc/{}/ns/pid", test_pid); let ns_path = format!("/proc/{}/ns/pid", test_pid);
assert_eq!(s.sandbox_pidns.unwrap().path, ns_path); assert_eq!(s.sandbox_pidns.unwrap().path, ns_path);
} }
#[test]
fn add_guest_hooks() { #[tokio::test]
async fn add_guest_hooks() {
let logger = slog::Logger::root(slog::Discard, o!()); let logger = slog::Logger::root(slog::Discard, o!());
let mut s = Sandbox::new(&logger).unwrap(); let mut s = Sandbox::new(&logger).unwrap();
let tmpdir = Builder::new().tempdir().unwrap(); let tmpdir = Builder::new().tempdir().unwrap();