Merge pull request #4527 from Tim-0731-Hzt/rund-new/netlink

runtime-rs:refactor network model with netlink
This commit is contained in:
Bin Liu 2022-07-01 11:12:54 +08:00 committed by GitHub
commit 18093251ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 104 additions and 60 deletions

View File

@ -12,11 +12,11 @@ cgroups-rs = "0.2.9"
futures = "0.3.11" futures = "0.3.11"
lazy_static = "1.4.0" lazy_static = "1.4.0"
libc = ">=0.2.39" libc = ">=0.2.39"
netlink-sys = "0.8.2" netlink-sys = "0.8.3"
netlink-packet-route = "0.11.0" netlink-packet-route = "0.12.0"
nix = "0.16.0" nix = "0.16.0"
rand = "^0.7.2" rand = "^0.7.2"
rtnetlink = "0.9.1" rtnetlink = "0.10.0"
scopeguard = "1.0.0" scopeguard = "1.0.0"
slog = "2.5.2" slog = "2.5.2"
slog-scope = "4.4.0" slog-scope = "4.4.0"
@ -29,5 +29,5 @@ kata-types = { path = "../../../libs/kata-types" }
kata-sys-util = { path = "../../../libs/kata-sys-util" } kata-sys-util = { path = "../../../libs/kata-sys-util" }
logging = { path = "../../../libs/logging" } logging = { path = "../../../libs/logging" }
oci = { path = "../../../libs/oci" } oci = { path = "../../../libs/oci" }
actix-rt = "2.7.0"
[features] [features]

View File

@ -7,7 +7,7 @@
pub mod none_model; pub mod none_model;
pub mod route_model; pub mod route_model;
pub mod tc_filter_model; pub mod tc_filter_model;
pub mod test_network_model;
use std::sync::Arc; use std::sync::Arc;
use anyhow::{Context, Result}; use anyhow::{Context, Result};

View File

@ -4,9 +4,10 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// //
use anyhow::{anyhow, Context, Result}; use anyhow::{Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use tokio::process::Command; use rtnetlink::Handle;
use scopeguard::defer;
use super::{NetworkModel, NetworkModelType}; use super::{NetworkModel, NetworkModelType};
use crate::network::NetworkPair; use crate::network::NetworkPair;
@ -19,7 +20,6 @@ impl TcFilterModel {
Ok(Self {}) Ok(Self {})
} }
} }
#[async_trait] #[async_trait]
impl NetworkModel for TcFilterModel { impl NetworkModel for TcFilterModel {
fn model_type(&self) -> NetworkModelType { fn model_type(&self) -> NetworkModelType {
@ -27,69 +27,74 @@ impl NetworkModel for TcFilterModel {
} }
async fn add(&self, pair: &NetworkPair) -> Result<()> { async fn add(&self, pair: &NetworkPair) -> Result<()> {
let tap_name = &pair.tap.tap_iface.name; let (connection, handle, _) = rtnetlink::new_connection().context("new connection")?;
let virt_name = &pair.virt_iface.name; let thread_handler = tokio::spawn(connection);
add_qdisc_ingress(tap_name) defer!({
.await thread_handler.abort();
.context("add qdisc ingress for tap link")?; });
add_qdisc_ingress(virt_name)
.await
.context("add qdisc ingress")?;
add_redirect_tcfilter(tap_name, virt_name) let tap_index = fetch_index(&handle, pair.tap.tap_iface.name.as_str())
.await .await
.context("add tc filter for tap")?; .context("fetch tap by index")?;
add_redirect_tcfilter(virt_name, tap_name) let virt_index = fetch_index(&handle, pair.virt_iface.name.as_str())
.await .await
.context("add tc filter")?; .context("fetch virt by index")?;
handle
.qdisc()
.add(tap_index as i32)
.ingress()
.execute()
.await
.context("add tap ingress")?;
handle
.qdisc()
.add(virt_index as i32)
.ingress()
.execute()
.await
.context("add virt ingress")?;
handle
.traffic_filter(tap_index as i32)
.add()
.protocol(0x0003)
.egress()
.redirect(virt_index)
.execute()
.await
.context("add tap egress")?;
handle
.traffic_filter(virt_index as i32)
.add()
.protocol(0x0003)
.egress()
.redirect(tap_index)
.execute()
.await
.context("add virt egress")?;
Ok(()) Ok(())
} }
async fn del(&self, pair: &NetworkPair) -> Result<()> { async fn del(&self, pair: &NetworkPair) -> Result<()> {
del_qdisc(&pair.virt_iface.name) let (connection, handle, _) = rtnetlink::new_connection().context("new connection")?;
.await let thread_handler = tokio::spawn(connection);
.context("del qdisc")?; defer!({
thread_handler.abort();
});
let virt_index = fetch_index(&handle, &pair.virt_iface.name).await?;
handle.qdisc().del(virt_index as i32).execute().await?;
Ok(()) Ok(())
} }
} }
// TODO: use netlink replace tc command pub async fn fetch_index(handle: &Handle, name: &str) -> Result<u32> {
async fn add_qdisc_ingress(dev: &str) -> Result<()> { let link = crate::network::network_pair::get_link_by_name(handle, name)
let output = Command::new("/sbin/tc")
.args(&["qdisc", "add", "dev", dev, "handle", "ffff:", "ingress"])
.output()
.await .await
.context("add tc")?; .context("get link by name")?;
if !output.status.success() { let base = link.attrs();
return Err(anyhow!("{}", String::from_utf8(output.stderr)?)); Ok(base.index)
}
Ok(())
}
async fn add_redirect_tcfilter(src: &str, dst: &str) -> Result<()> {
let output = Command::new("/sbin/tc")
.args(&[
"filter", "add", "dev", src, "parent", "ffff:", "protocol", "all", "u32", "match",
"u8", "0", "0", "action", "mirred", "egress", "redirect", "dev", dst,
])
.output()
.await
.context("add redirect tcfilter")?;
if !output.status.success() {
return Err(anyhow!("{}", String::from_utf8(output.stderr)?));
}
Ok(())
}
async fn del_qdisc(dev: &str) -> Result<()> {
let output = Command::new("/sbin/tc")
.args(&["qdisc", "del", "dev", dev, "handle", "ffff:", "ingress"])
.output()
.await
.context("del qdisc")?;
if !output.status.success() {
return Err(anyhow!("{}", String::from_utf8(output.stderr)?));
}
Ok(())
} }

View File

@ -0,0 +1,39 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
#[cfg(test)]
mod tests {
use crate::network::{
network_model::{tc_filter_model::fetch_index, TC_FILTER_NET_MODEL_STR},
network_pair::NetworkPair,
};
use anyhow::Context;
use scopeguard::defer;
#[actix_rt::test]
async fn test_tc_redirect_network() {
if let Ok((connection, handle, _)) = rtnetlink::new_connection().context("new connection") {
let thread_handler = tokio::spawn(connection);
defer!({
thread_handler.abort();
});
handle
.link()
.add()
.veth("foo".to_string(), "bar".to_string());
if let Ok(net_pair) =
NetworkPair::new(&handle, 1, "bar", TC_FILTER_NET_MODEL_STR, 2).await
{
if let Ok(index) = fetch_index(&handle, "bar").await {
assert!(net_pair.add_network_model().await.is_ok());
assert!(net_pair.del_network_model().await.is_ok());
assert!(handle.link().del(index).execute().await.is_ok());
}
}
}
}
}