runtime-rs: enable nerdctl cni plugin

1. when we use nerdctl to setup network for kata, no netns is created by
nerdctl, kata need to create netns by its own

2. after start VM, nerdctl will call cni plugin via oci hook, we need to
rescan the netns after the interfaces have been created, and hotplug
the network device into the VM

Fixes:#4693
Signed-off-by: Zhongtao Hu <zhongtaohu.tim@linux.alibaba.com>
This commit is contained in:
Zhongtao Hu 2023-04-09 21:44:45 +08:00
parent 3bfaafbf44
commit b31f103d12
12 changed files with 205 additions and 82 deletions

View File

@ -1734,6 +1734,16 @@ dependencies = [
"tokio",
]
[[package]]
name = "netns-rs"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23541694f1d7d18cd1a0da3a1352a6ea48b01cbb4a8e7a6e547963823fd5276e"
dependencies = [
"nix 0.23.2",
"thiserror",
]
[[package]]
name = "nix"
version = "0.23.2"
@ -2401,6 +2411,7 @@ dependencies = [
"byte-unit 4.0.18",
"cgroups-rs",
"futures 0.3.26",
"hex",
"hypervisor",
"kata-sys-util",
"kata-types",
@ -2464,9 +2475,11 @@ dependencies = [
"lazy_static",
"linux_container",
"logging",
"netns-rs",
"nix 0.25.1",
"oci",
"persist",
"resource",
"serde_json",
"shim-interface",
"slog",

View File

@ -16,6 +16,7 @@ bitflags = "1.2.1"
byte-unit = "4.0.14"
cgroups-rs = "0.3.2"
futures = "0.3.11"
hex = "0.4.3"
lazy_static = "1.4.0"
libc = ">=0.2.39"
netlink-sys = "0.8.3"

View File

@ -4,6 +4,7 @@
// SPDX-License-Identifier: Apache-2.0
//
use crate::network::NetworkConfig;
use crate::resource_persist::ResourceState;
use crate::{manager_inner::ResourceManagerInner, rootfs::Rootfs, volume::Volume, ResourceConfig};
use agent::{Agent, Storage};
@ -55,6 +56,11 @@ impl ResourceManager {
inner.prepare_before_start_vm(device_configs).await
}
pub async fn handle_network(&self, network_config: NetworkConfig) -> Result<()> {
let mut inner = self.inner.write().await;
inner.handle_network(network_config).await
}
pub async fn setup_after_start_vm(&self) -> Result<()> {
let mut inner = self.inner.write().await;
inner.setup_after_start_vm().await

View File

@ -6,7 +6,7 @@
use std::{sync::Arc, thread};
use crate::resource_persist::ResourceState;
use crate::{network::NetworkConfig, resource_persist::ResourceState};
use agent::{Agent, Storage};
use anyhow::{anyhow, Context, Ok, Result};
use async_trait::async_trait;
@ -89,32 +89,9 @@ impl ResourceManagerInner {
};
}
ResourceConfig::Network(c) => {
// 1. When using Rust asynchronous programming, we use .await to
// allow other task to run instead of waiting for the completion of the current task.
// 2. Also, when handling the pod network, we need to set the shim threads
// into the network namespace to perform those operations.
// However, as the increase of the I/O intensive tasks, two issues could be caused by the two points above:
// a. When the future is blocked, the current thread (which is in the pod netns)
// might be take over by other tasks. After the future is finished, the thread take over
// the current task might not be in the pod netns. But the current task still need to run in pod netns
// b. When finish setting up the network, the current thread will be set back to the host namespace.
// In Rust Async, if the current thread is taken over by other task, the netns is dropped on another thread,
// but it is not in netns. So, the previous thread would still remain in the pod netns.
// The solution is to block the future on the current thread, it is enabled by spawn an os thread, create a
// tokio runtime, and block the task on it.
let hypervisor = self.hypervisor.clone();
let network = thread::spawn(move || -> Result<Arc<dyn Network>> {
let rt = runtime::Builder::new_current_thread().enable_io().build()?;
let d = rt.block_on(network::new(&c)).context("new network")?;
rt.block_on(d.setup(hypervisor.as_ref()))
.context("setup network")?;
Ok(d)
})
.join()
.map_err(|e| anyhow!("{:?}", e))
.context("Couldn't join on the associated thread")?
.context("failed to set up network")?;
self.network = Some(network);
self.handle_network(c)
.await
.context("failed to handle network")?;
}
};
}
@ -122,6 +99,38 @@ impl ResourceManagerInner {
Ok(())
}
pub async fn handle_network(&mut self, network_config: NetworkConfig) -> Result<()> {
// 1. When using Rust asynchronous programming, we use .await to
// allow other task to run instead of waiting for the completion of the current task.
// 2. Also, when handling the pod network, we need to set the shim threads
// into the network namespace to perform those operations.
// However, as the increase of the I/O intensive tasks, two issues could be caused by the two points above:
// a. When the future is blocked, the current thread (which is in the pod netns)
// might be take over by other tasks. After the future is finished, the thread take over
// the current task might not be in the pod netns. But the current task still need to run in pod netns
// b. When finish setting up the network, the current thread will be set back to the host namespace.
// In Rust Async, if the current thread is taken over by other task, the netns is dropped on another thread,
// but it is not in netns. So, the previous thread would still remain in the pod netns.
// The solution is to block the future on the current thread, it is enabled by spawn an os thread, create a
// tokio runtime, and block the task on it.
let hypervisor = self.hypervisor.clone();
let network = thread::spawn(move || -> Result<Arc<dyn Network>> {
let rt = runtime::Builder::new_current_thread().enable_io().build()?;
let d = rt
.block_on(network::new(&network_config))
.context("new network")?;
rt.block_on(d.setup(hypervisor.as_ref()))
.context("setup network")?;
Ok(d)
})
.join()
.map_err(|e| anyhow!("{:?}", e))
.context("Couldn't join on the associated thread")?
.context("failed to set up network")?;
self.network = Some(network);
Ok(())
}
async fn handle_interfaces(&self, network: &dyn Network) -> Result<()> {
for i in network.interfaces().await.context("get interfaces")? {
// update interface

View File

@ -18,7 +18,7 @@ use network_with_netns::NetworkWithNetns;
mod network_pair;
use network_pair::NetworkPair;
mod utils;
pub use utils::netns::NetnsGuard;
pub use utils::netns::{generate_netns_name, NetnsGuard};
use std::sync::Arc;

View File

@ -33,6 +33,7 @@ pub struct NetworkWithNetNsConfig {
pub network_model: String,
pub netns_path: String,
pub queues: usize,
pub network_created: bool,
}
struct NetworkWithNetnsInner {

View File

@ -9,6 +9,7 @@ use std::{fs::File, os::unix::io::AsRawFd};
use anyhow::{Context, Result};
use nix::sched::{setns, CloneFlags};
use nix::unistd::{getpid, gettid};
use rand::Rng;
pub struct NetnsGuard {
old_netns: Option<File>,
@ -50,6 +51,20 @@ impl Drop for NetnsGuard {
}
}
// generate the network namespace name
pub fn generate_netns_name() -> String {
let mut rng = rand::thread_rng();
let random_bytes: [u8; 16] = rng.gen();
format!(
"cnitest-{}-{}-{}-{}-{}",
hex::encode(&random_bytes[..4]),
hex::encode(&random_bytes[4..6]),
hex::encode(&random_bytes[6..8]),
hex::encode(&random_bytes[8..10]),
hex::encode(&random_bytes[10..])
)
}
#[cfg(test)]
mod tests {
use super::*;
@ -67,4 +82,14 @@ mod tests {
let empty_path = "";
assert!(NetnsGuard::new(empty_path).unwrap().old_netns.is_none());
}
#[test]
fn test_generate_netns_name() {
let name1 = generate_netns_name();
let name2 = generate_netns_name();
let name3 = generate_netns_name();
assert_ne!(name1, name2);
assert_ne!(name2, name3);
assert_ne!(name1, name3);
}
}

View File

@ -8,6 +8,7 @@ license = "Apache-2.0"
[dependencies]
anyhow = "^1.0"
lazy_static = "1.4.0"
netns-rs = "0.1.0"
slog = "2.5.2"
slog-scope = "4.4.0"
tokio = { version = "1.8.0", features = ["rt-multi-thread"] }
@ -26,6 +27,8 @@ oci = { path = "../../../libs/oci" }
shim-interface = { path = "../../../libs/shim-interface" }
persist = { path = "../persist" }
hypervisor = { path = "../hypervisor" }
resource = { path = "../resource" }
# runtime handler
linux_container = { path = "./linux_container", optional = true }
virt_container = { path = "./virt_container", optional = true }

View File

@ -11,5 +11,5 @@ pub mod message;
mod runtime_handler;
pub use runtime_handler::{RuntimeHandler, RuntimeInstance};
mod sandbox;
pub use sandbox::Sandbox;
pub use sandbox::{Sandbox, SandboxNetworkEnv};
pub mod types;

View File

@ -7,14 +7,20 @@
use anyhow::Result;
use async_trait::async_trait;
#[derive(Clone)]
pub struct SandboxNetworkEnv {
pub netns: Option<String>,
pub network_created: bool,
}
#[async_trait]
pub trait Sandbox: Send + Sync {
async fn start(
&self,
netns: Option<String>,
dns: Vec<String>,
spec: &oci::Spec,
state: &oci::State,
network_env: SandboxNetworkEnv,
) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn cleanup(&self) -> Result<()>;

View File

@ -4,20 +4,21 @@
// SPDX-License-Identifier: Apache-2.0
//
use std::{str::from_utf8, sync::Arc};
use anyhow::{anyhow, Context, Result};
use std::{path::PathBuf, str::from_utf8, sync::Arc};
use crate::{shim_mgmt::server::MgmtServer, static_resource::StaticResourceManager};
use anyhow::{anyhow, Context, Result};
use common::{
message::Message,
types::{Request, Response},
RuntimeHandler, RuntimeInstance, Sandbox,
RuntimeHandler, RuntimeInstance, Sandbox, SandboxNetworkEnv,
};
use hypervisor::Param;
use kata_types::{
annotations::Annotation, config::default::DEFAULT_GUEST_DNS_FILE, config::TomlConfig,
};
use netns_rs::NetNs;
use resource::network::generate_netns_name;
#[cfg(feature = "linux")]
use linux_container::LinuxContainer;
@ -53,7 +54,7 @@ impl RuntimeHandlerManagerInner {
&mut self,
spec: &oci::Spec,
state: &oci::State,
netns: Option<String>,
network_env: SandboxNetworkEnv,
dns: Vec<String>,
config: Arc<TomlConfig>,
) -> Result<()> {
@ -77,7 +78,7 @@ impl RuntimeHandlerManagerInner {
// start sandbox
runtime_instance
.sandbox
.start(netns, dns, spec, state)
.start(dns, spec, state, network_env)
.await
.context("start sandbox")?;
self.runtime_instance = Some(Arc::new(runtime_instance));
@ -104,23 +105,6 @@ impl RuntimeHandlerManagerInner {
#[cfg(feature = "virt")]
VirtContainer::init().context("init virt container")?;
let netns = if let Some(linux) = &spec.linux {
let mut netns = None;
for ns in &linux.namespaces {
if ns.r#type.as_str() != oci::NETWORKNAMESPACE {
continue;
}
if !ns.path.is_empty() {
netns = Some(ns.path.clone());
break;
}
}
netns
} else {
None
};
for m in &spec.mounts {
if m.destination == DEFAULT_GUEST_DNS_FILE {
let contents = fs::read_to_string(&m.source).await?;
@ -129,7 +113,42 @@ impl RuntimeHandlerManagerInner {
}
let config = load_config(spec, options).context("load config")?;
self.init_runtime_handler(spec, state, netns, dns, Arc::new(config))
let mut network_created = false;
// set netns to None if we want no network for the VM
let netns = if config.runtime.disable_new_netns {
None
} else {
let mut netns_path = None;
if let Some(linux) = &spec.linux {
for ns in &linux.namespaces {
if ns.r#type.as_str() != oci::NETWORKNAMESPACE {
continue;
}
// get netns path from oci spec
if !ns.path.is_empty() {
netns_path = Some(ns.path.clone());
}
// if we get empty netns from oci spec, we need to create netns for the VM
else {
let ns_name = generate_netns_name();
let netns = NetNs::new(ns_name)?;
let path = PathBuf::from(netns.path()).to_str().map(|s| s.to_string());
info!(sl!(), "the netns path is {:?}", path);
netns_path = path;
network_created = true;
}
break;
}
}
netns_path
};
let network_env = SandboxNetworkEnv {
netns,
network_created,
};
self.init_runtime_handler(spec, state, network_env, dns, Arc::new(config))
.await
.context("init runtime handler")?;

View File

@ -14,7 +14,7 @@ use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use common::{
message::{Action, Message},
Sandbox,
Sandbox, SandboxNetworkEnv,
};
use containerd_shim_protos::events::task::TaskOOM;
use hypervisor::{dragonball::Dragonball, Hypervisor, HYPERVISOR_DRAGONBALL};
@ -86,30 +86,21 @@ impl VirtSandbox {
})
}
async fn prepare_for_start_sandbox(
async fn prepare_config_for_sandbox(
&self,
_id: &str,
netns: Option<String>,
network_env: SandboxNetworkEnv,
) -> Result<Vec<ResourceConfig>> {
let mut resource_configs = vec![];
let config = self.resource_manager.config().await;
if let Some(netns_path) = netns {
let network_config = ResourceConfig::Network(NetworkConfig::NetworkResourceWithNetNs(
NetworkWithNetNsConfig {
network_model: config.runtime.internetworking_model.clone(),
netns_path,
queues: self
.hypervisor
.hypervisor_config()
.await
.network_info
.network_queues as usize,
},
));
resource_configs.push(network_config);
if !network_env.network_created {
if let Some(netns_path) = network_env.netns {
let network_config = ResourceConfig::Network(
self.prepare_network_config(netns_path, network_env.network_created)
.await,
);
resource_configs.push(network_config);
}
}
let hypervisor_config = self.hypervisor.hypervisor_config().await;
let virtio_fs_config = ResourceConfig::ShareFs(hypervisor_config.shared_fs);
resource_configs.push(virtio_fs_config);
@ -149,16 +140,43 @@ impl VirtSandbox {
Ok(())
}
async fn prepare_network_config(
&self,
netns_path: String,
network_created: bool,
) -> NetworkConfig {
let config = self.resource_manager.config().await;
NetworkConfig::NetworkResourceWithNetNs(NetworkWithNetNsConfig {
network_model: config.runtime.internetworking_model.clone(),
netns_path,
queues: self
.hypervisor
.hypervisor_config()
.await
.network_info
.network_queues as usize,
network_created,
})
}
fn has_prestart_hooks(
&self,
prestart_hooks: Vec<oci::Hook>,
create_runtime_hooks: Vec<oci::Hook>,
) -> bool {
!prestart_hooks.is_empty() || !create_runtime_hooks.is_empty()
}
}
#[async_trait]
impl Sandbox for VirtSandbox {
async fn start(
&self,
netns: Option<String>,
dns: Vec<String>,
spec: &oci::Spec,
state: &oci::State,
network_env: SandboxNetworkEnv,
) -> Result<()> {
let id = &self.sid;
@ -171,13 +189,15 @@ impl Sandbox for VirtSandbox {
}
self.hypervisor
.prepare_vm(id, netns.clone())
.prepare_vm(id, network_env.netns.clone())
.await
.context("prepare vm")?;
// generate device and setup before start vm
// should after hypervisor.prepare_vm
let resources = self.prepare_for_start_sandbox(id, netns).await?;
let resources = self
.prepare_config_for_sandbox(id, network_env.clone())
.await?;
self.resource_manager
.prepare_before_start_vm(resources)
.await
@ -188,15 +208,35 @@ impl Sandbox for VirtSandbox {
info!(sl!(), "start vm");
// execute pre-start hook functions, including Prestart Hooks and CreateRuntime Hooks
let (prestart_hooks, create_runtime_hooks, _has_oci_hook) = match spec.hooks.as_ref() {
Some(hooks) => (hooks.prestart.clone(), hooks.create_runtime.clone(), true),
None => (Vec::new(), Vec::new(), false),
let (prestart_hooks, create_runtime_hooks) = match spec.hooks.as_ref() {
Some(hooks) => (hooks.prestart.clone(), hooks.create_runtime.clone()),
None => (Vec::new(), Vec::new()),
};
self.execute_oci_hook_functions(&prestart_hooks, &create_runtime_hooks, state)
.await?;
// TODO: if prestart_hooks is not empty, rescan the network endpoints(rely on hotplug endpoints).
// see: https://github.com/kata-containers/kata-containers/issues/6378
// 1. if there are pre-start hook functions, network config might have been changed.
// We need to rescan the netns to handle the change.
// 2. Do not scan the netns if we want no network for the VM.
// TODO In case of vm factory, scan the netns to hotplug interfaces after the VM is started.
if self.has_prestart_hooks(prestart_hooks, create_runtime_hooks)
&& !self
.resource_manager
.config()
.await
.runtime
.disable_new_netns
{
if let Some(netns_path) = network_env.netns {
let network_resource = self
.prepare_network_config(netns_path, network_env.network_created)
.await;
self.resource_manager
.handle_network(network_resource)
.await
.context("set up device after start vm")?;
}
}
// connect agent
// set agent socket