tracing: Add tracing to runtime-rs

Introduce tracing into runtime-rs, only some functions are instrumented.

Fixes: #5239

Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
Signed-off-by: Yushuo <y-shuo@linux.alibaba.com>
This commit is contained in:
Ji-Xinyou 2022-09-23 17:56:58 +08:00 committed by Yushuo
parent 58e921eace
commit ed23b47c71
22 changed files with 942 additions and 464 deletions

1050
src/runtime-rs/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,7 @@ slog = "2.5.2"
slog-scope = "4.4.0" slog-scope = "4.4.0"
ttrpc = { version = "0.7.1" } ttrpc = { version = "0.7.1" }
tokio = { version = "1.28.1", features = ["fs", "rt"] } tokio = { version = "1.28.1", features = ["fs", "rt"] }
tracing = "0.1.36"
url = "2.2.2" url = "2.2.2"
nix = "0.24.2" nix = "0.24.2"

View File

@ -6,6 +6,7 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use tracing::instrument;
use ttrpc::context as ttrpc_ctx; use ttrpc::context as ttrpc_ctx;
use kata_types::config::Agent as AgentConfig; use kata_types::config::Agent as AgentConfig;
@ -22,6 +23,7 @@ fn new_ttrpc_ctx(timeout: i64) -> ttrpc_ctx::Context {
#[async_trait] #[async_trait]
impl AgentManager for KataAgent { impl AgentManager for KataAgent {
#[instrument]
async fn start(&self, address: &str) -> Result<()> { async fn start(&self, address: &str) -> Result<()> {
info!(sl!(), "begin to connect agent {:?}", address); info!(sl!(), "begin to connect agent {:?}", address);
self.set_socket_address(address) self.set_socket_address(address)
@ -73,6 +75,7 @@ macro_rules! impl_agent {
($($name: tt | $req: ty | $resp: ty | $new_timeout: expr),*) => { ($($name: tt | $req: ty | $resp: ty | $new_timeout: expr),*) => {
#[async_trait] #[async_trait]
impl Agent for KataAgent { impl Agent for KataAgent {
#[instrument(skip(req))]
$(async fn $name(&self, req: $req) -> Result<$resp> { $(async fn $name(&self, req: $req) -> Result<$resp> {
let r = req.into(); let r = req.into();
let (client, mut timeout, _) = self.get_agent_client().await.context("get client")?; let (client, mut timeout, _) = self.get_agent_client().await.context("get client")?;

View File

@ -44,6 +44,19 @@ pub(crate) struct KataAgentInner {
log_forwarder: LogForwarder, log_forwarder: LogForwarder,
} }
impl std::fmt::Debug for KataAgentInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KataAgentInner")
.field("client_fd", &self.client_fd)
.field("socket_address", &self.socket_address)
.field("config", &self.config)
.finish()
}
}
unsafe impl Send for KataAgent {}
unsafe impl Sync for KataAgent {}
#[derive(Debug)]
pub struct KataAgent { pub struct KataAgent {
pub(crate) inner: Arc<RwLock<KataAgentInner>>, pub(crate) inner: Arc<RwLock<KataAgentInner>>,
} }

View File

@ -28,6 +28,7 @@ vmm-sys-util = "0.11.0"
rand = "0.8.4" rand = "0.8.4"
path-clean = "1.0.1" path-clean = "1.0.1"
lazy_static = "1.4" lazy_static = "1.4"
tracing = "0.1.36"
kata-sys-util = { path = "../../../libs/kata-sys-util" } kata-sys-util = { path = "../../../libs/kata-sys-util" }
kata-types = { path = "../../../libs/kata-types" } kata-types = { path = "../../../libs/kata-types" }

View File

@ -19,14 +19,20 @@ use async_trait::async_trait;
use kata_types::capabilities::Capabilities; use kata_types::capabilities::Capabilities;
use kata_types::config::hypervisor::Hypervisor as HypervisorConfig; use kata_types::config::hypervisor::Hypervisor as HypervisorConfig;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::instrument;
use crate::{DeviceType, Hypervisor, VcpuThreadIds}; use crate::{DeviceType, Hypervisor, VcpuThreadIds};
#[derive(Debug)]
pub struct Dragonball { pub struct Dragonball {
inner: Arc<RwLock<DragonballInner>>, inner: Arc<RwLock<DragonballInner>>,
} }
impl std::fmt::Debug for Dragonball {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Dragonball").finish()
}
}
impl Default for Dragonball { impl Default for Dragonball {
fn default() -> Self { fn default() -> Self {
Self::new() Self::new()
@ -48,11 +54,13 @@ impl Dragonball {
#[async_trait] #[async_trait]
impl Hypervisor for Dragonball { impl Hypervisor for Dragonball {
#[instrument]
async fn prepare_vm(&self, id: &str, netns: Option<String>) -> Result<()> { async fn prepare_vm(&self, id: &str, netns: Option<String>) -> Result<()> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.prepare_vm(id, netns).await inner.prepare_vm(id, netns).await
} }
#[instrument]
async fn start_vm(&self, timeout: i32) -> Result<()> { async fn start_vm(&self, timeout: i32) -> Result<()> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.start_vm(timeout).await inner.start_vm(timeout).await

View File

@ -31,6 +31,7 @@ serde_json = "1.0.82"
slog = "2.5.2" slog = "2.5.2"
slog-scope = "4.4.0" slog-scope = "4.4.0"
tokio = { version = "1.28.1", features = ["process"] } tokio = { version = "1.28.1", features = ["process"] }
tracing = "0.1.36"
uuid = { version = "0.4", features = ["v4"] } uuid = { version = "0.4", features = ["v4"] }
agent = { path = "../agent" } agent = { path = "../agent" }

View File

@ -17,6 +17,7 @@ use kata_types::mount::Mount;
use oci::{Linux, LinuxResources}; use oci::{Linux, LinuxResources};
use persist::sandbox_persist::Persist; use persist::sandbox_persist::Persist;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::instrument;
use crate::network::NetworkConfig; use crate::network::NetworkConfig;
use crate::resource_persist::ResourceState; use crate::resource_persist::ResourceState;
@ -34,6 +35,12 @@ pub struct ResourceManager {
inner: Arc<RwLock<ResourceManagerInner>>, inner: Arc<RwLock<ResourceManagerInner>>,
} }
impl std::fmt::Debug for ResourceManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ResourceManager").finish()
}
}
impl ResourceManager { impl ResourceManager {
pub async fn new( pub async fn new(
sid: &str, sid: &str,
@ -58,6 +65,7 @@ impl ResourceManager {
inner.get_device_manager() inner.get_device_manager()
} }
#[instrument]
pub async fn prepare_before_start_vm(&self, device_configs: Vec<ResourceConfig>) -> Result<()> { pub async fn prepare_before_start_vm(&self, device_configs: Vec<ResourceConfig>) -> Result<()> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.prepare_before_start_vm(device_configs).await inner.prepare_before_start_vm(device_configs).await
@ -68,6 +76,7 @@ impl ResourceManager {
inner.handle_network(network_config).await inner.handle_network(network_config).await
} }
#[instrument]
pub async fn setup_after_start_vm(&self) -> Result<()> { pub async fn setup_after_start_vm(&self) -> Result<()> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.setup_after_start_vm().await inner.setup_after_start_vm().await

View File

@ -12,6 +12,11 @@ netns-rs = "0.1.0"
slog = "2.5.2" slog = "2.5.2"
slog-scope = "4.4.0" slog-scope = "4.4.0"
tokio = { version = "1.28.1", features = ["rt-multi-thread"] } tokio = { version = "1.28.1", features = ["rt-multi-thread"] }
tracing = "0.1.36"
tracing-opentelemetry = "0.18.0"
opentelemetry = { version = "0.18.0", features = ["rt-tokio-current-thread", "trace", "rt-tokio"] }
opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio", "hyper_collector_client", "collector_client"] }
tracing-subscriber = { version = "0.3", features = ["registry", "std"] }
hyper = { version = "0.14.20", features = ["stream", "server", "http1"] } hyper = { version = "0.14.20", features = ["stream", "server", "http1"] }
hyperlocal = "0.8" hyperlocal = "0.8"
serde_json = "1.0.88" serde_json = "1.0.88"

View File

@ -13,6 +13,15 @@ pub struct SandboxNetworkEnv {
pub network_created: bool, pub network_created: bool,
} }
impl std::fmt::Debug for SandboxNetworkEnv {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SandboxNetworkEnv")
.field("netns", &self.netns)
.field("network_created", &self.network_created)
.finish()
}
}
#[async_trait] #[async_trait]
pub trait Sandbox: Send + Sync { pub trait Sandbox: Send + Sync {
async fn start( async fn start(

View File

@ -13,3 +13,4 @@ pub mod manager;
pub use manager::RuntimeHandlerManager; pub use manager::RuntimeHandlerManager;
pub use shim_interface; pub use shim_interface;
mod shim_mgmt; mod shim_mgmt;
pub mod tracer;

View File

@ -24,7 +24,8 @@ use persist::sandbox_persist::Persist;
use resource::{cpu_mem::initial_size::InitialSizeManager, network::generate_netns_name}; use resource::{cpu_mem::initial_size::InitialSizeManager, network::generate_netns_name};
use shim_interface::shim_mgmt::ERR_NO_SHIM_SERVER; use shim_interface::shim_mgmt::ERR_NO_SHIM_SERVER;
use tokio::fs; use tokio::fs;
use tokio::sync::{mpsc::Sender, RwLock}; use tokio::sync::{mpsc::Sender, Mutex, RwLock};
use tracing::instrument;
#[cfg(feature = "virt")] #[cfg(feature = "virt")]
use virt_container::{ use virt_container::{
sandbox::{SandboxRestoreArgs, VirtSandbox}, sandbox::{SandboxRestoreArgs, VirtSandbox},
@ -34,23 +35,39 @@ use virt_container::{
#[cfg(feature = "wasm")] #[cfg(feature = "wasm")]
use wasm_container::WasmContainer; use wasm_container::WasmContainer;
use crate::shim_mgmt::server::MgmtServer; use crate::{
shim_mgmt::server::MgmtServer,
tracer::{KataTracer, ROOTSPAN},
};
struct RuntimeHandlerManagerInner { struct RuntimeHandlerManagerInner {
id: String, id: String,
msg_sender: Sender<Message>, msg_sender: Sender<Message>,
kata_tracer: Arc<Mutex<KataTracer>>,
runtime_instance: Option<Arc<RuntimeInstance>>, runtime_instance: Option<Arc<RuntimeInstance>>,
} }
impl std::fmt::Debug for RuntimeHandlerManagerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RuntimeHandlerManagerInner")
.field("id", &self.id)
.field("msg_sender", &self.msg_sender)
.finish()
}
}
impl RuntimeHandlerManagerInner { impl RuntimeHandlerManagerInner {
fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> { fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
let tracer = KataTracer::new();
Ok(Self { Ok(Self {
id: id.to_string(), id: id.to_string(),
msg_sender, msg_sender,
kata_tracer: Arc::new(Mutex::new(tracer)),
runtime_instance: None, runtime_instance: None,
}) })
} }
#[instrument]
async fn init_runtime_handler( async fn init_runtime_handler(
&mut self, &mut self,
spec: &oci::Spec, spec: &oci::Spec,
@ -72,10 +89,23 @@ impl RuntimeHandlerManagerInner {
_ => return Err(anyhow!("Unsupported runtime: {}", &config.runtime.name)), _ => return Err(anyhow!("Unsupported runtime: {}", &config.runtime.name)),
}; };
let runtime_instance = runtime_handler let runtime_instance = runtime_handler
.new_instance(&self.id, self.msg_sender.clone(), config) .new_instance(&self.id, self.msg_sender.clone(), config.clone())
.await .await
.context("new runtime instance")?; .context("new runtime instance")?;
// initilize the trace subscriber
if config.runtime.enable_tracing {
let mut tracer = self.kata_tracer.lock().await;
if let Err(e) = tracer.trace_setup(
&self.id,
&config.runtime.jaeger_endpoint,
&config.runtime.jaeger_user,
&config.runtime.jaeger_password,
) {
warn!(sl!(), "failed to setup tracing, {:?}", e);
}
}
// start sandbox // start sandbox
runtime_instance runtime_instance
.sandbox .sandbox
@ -86,6 +116,7 @@ impl RuntimeHandlerManagerInner {
Ok(()) Ok(())
} }
#[instrument]
async fn try_init( async fn try_init(
&mut self, &mut self,
spec: &oci::Spec, spec: &oci::Spec,
@ -149,6 +180,7 @@ impl RuntimeHandlerManagerInner {
netns, netns,
network_created, network_created,
}; };
self.init_runtime_handler(spec, state, network_env, dns, Arc::new(config)) self.init_runtime_handler(spec, state, network_env, dns, Arc::new(config))
.await .await
.context("init runtime handler")?; .context("init runtime handler")?;
@ -171,12 +203,23 @@ impl RuntimeHandlerManagerInner {
fn get_runtime_instance(&self) -> Option<Arc<RuntimeInstance>> { fn get_runtime_instance(&self) -> Option<Arc<RuntimeInstance>> {
self.runtime_instance.clone() self.runtime_instance.clone()
} }
fn get_kata_tracer(&self) -> Arc<Mutex<KataTracer>> {
self.kata_tracer.clone()
}
} }
pub struct RuntimeHandlerManager { pub struct RuntimeHandlerManager {
inner: Arc<RwLock<RuntimeHandlerManagerInner>>, inner: Arc<RwLock<RuntimeHandlerManagerInner>>,
} }
// todo: a more detailed impl for fmt::Debug
impl std::fmt::Debug for RuntimeHandlerManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RuntimeHandlerManager").finish()
}
}
impl RuntimeHandlerManager { impl RuntimeHandlerManager {
pub async fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> { pub async fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
Ok(Self { Ok(Self {
@ -243,6 +286,12 @@ impl RuntimeHandlerManager {
.ok_or_else(|| anyhow!("runtime not ready")) .ok_or_else(|| anyhow!("runtime not ready"))
} }
async fn get_kata_tracer(&self) -> Result<Arc<Mutex<KataTracer>>> {
let inner = self.inner.read().await;
Ok(inner.get_kata_tracer())
}
#[instrument]
async fn try_init_runtime_instance( async fn try_init_runtime_instance(
&self, &self,
spec: &oci::Spec, spec: &oci::Spec,
@ -253,6 +302,7 @@ impl RuntimeHandlerManager {
inner.try_init(spec, state, options).await inner.try_init(spec, state, options).await
} }
#[instrument(parent = &*(ROOTSPAN))]
pub async fn handler_message(&self, req: Request) -> Result<Response> { pub async fn handler_message(&self, req: Request) -> Result<Response> {
if let Request::CreateContainer(container_config) = req { if let Request::CreateContainer(container_config) = req {
// get oci spec // get oci spec
@ -291,6 +341,7 @@ impl RuntimeHandlerManager {
} }
} }
#[instrument(parent = &(*ROOTSPAN))]
pub async fn handler_request(&self, req: Request) -> Result<Response> { pub async fn handler_request(&self, req: Request) -> Result<Response> {
let instance = self let instance = self
.get_runtime_instance() .get_runtime_instance()
@ -320,6 +371,11 @@ impl RuntimeHandlerManager {
Request::ShutdownContainer(req) => { Request::ShutdownContainer(req) => {
if cm.need_shutdown_sandbox(&req).await { if cm.need_shutdown_sandbox(&req).await {
sandbox.shutdown().await.context("do shutdown")?; sandbox.shutdown().await.context("do shutdown")?;
// stop the tracer collector
let kata_tracer = self.get_kata_tracer().await.context("get kata tracer")?;
let tracer = kata_tracer.lock().await;
tracer.trace_end();
} }
Ok(Response::ShutdownContainer) Ok(Response::ShutdownContainer)
} }
@ -388,6 +444,7 @@ impl RuntimeHandlerManager {
/// 3. shimv2 create task option /// 3. shimv2 create task option
/// 4. If above three are not set, then get default path from DEFAULT_RUNTIME_CONFIGURATIONS /// 4. If above three are not set, then get default path from DEFAULT_RUNTIME_CONFIGURATIONS
/// in kata-containers/src/libs/kata-types/src/config/default.rs, in array order. /// in kata-containers/src/libs/kata-types/src/config/default.rs, in array order.
#[instrument]
fn load_config(spec: &oci::Spec, option: &Option<Vec<u8>>) -> Result<TomlConfig> { fn load_config(spec: &oci::Spec, option: &Option<Vec<u8>>) -> Result<TomlConfig> {
const KATA_CONF_FILE: &str = "KATA_CONF_FILE"; const KATA_CONF_FILE: &str = "KATA_CONF_FILE";
let annotation = Annotation::new(spec.annotations.clone()); let annotation = Annotation::new(spec.annotations.clone());

View File

@ -0,0 +1,169 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::cmp::min;
use std::sync::Arc;
use anyhow::Result;
use lazy_static::lazy_static;
use opentelemetry::global;
use opentelemetry::runtime::Tokio;
use tracing::{span, subscriber::NoSubscriber, Span, Subscriber};
use tracing_subscriber::prelude::*;
use tracing_subscriber::Registry;
const DEFAULT_JAEGER_URL: &str = "http://localhost:14268/api/traces";
lazy_static! {
/// The ROOTSPAN is a phantom span that is running by calling [`trace_enter_root()`] at the background
/// once the configuration is read and config.runtime.enable_tracing is enabled
/// The ROOTSPAN exits by calling [`trace_exit_root()`] on shutdown request sent from containerd
///
/// NOTE:
/// This allows other threads which are not directly running under some spans to be tracked easily
/// within the entire sandbox's lifetime.
/// To do this, you just need to add attribute #[instrment(parent=&(*ROOTSPAN))]
pub static ref ROOTSPAN: Span = span!(tracing::Level::TRACE, "root-span");
}
/// The tracer wrapper for kata-containers
/// The fields and member methods should ALWAYS be PRIVATE and be exposed in a safe
/// way to other modules
unsafe impl Send for KataTracer {}
unsafe impl Sync for KataTracer {}
pub struct KataTracer {
subscriber: Arc<dyn Subscriber + Send + Sync>,
enabled: bool,
}
impl Default for KataTracer {
fn default() -> Self {
Self::new()
}
}
impl KataTracer {
/// Constructor of KataTracer, this is a dummy implementation for static initialization
pub fn new() -> Self {
Self {
subscriber: Arc::new(NoSubscriber::default()),
enabled: false,
}
}
/// Set the tracing enabled flag
fn enable(&mut self) {
self.enabled = true;
}
/// Return whether the tracing is enabled, enabled by [`trace_setup`]
fn enabled(&self) -> bool {
self.enabled
}
/// Call when the tracing is enabled (set in toml configuration file)
/// This setup the subscriber, which maintains the span's information, to global and
/// inside KATA_TRACER.
///
/// Note that the span will be noop(not collected) if a invalid subscriber is set
pub fn trace_setup(
&mut self,
sid: &str,
jaeger_endpoint: &str,
jaeger_username: &str,
jaeger_password: &str,
) -> Result<()> {
// If varify jaeger config returns an error, it means that the tracing should not be enabled
let endpoint = verify_jaeger_config(jaeger_endpoint, jaeger_username, jaeger_password)?;
// derive a subscriber to collect span info
let tracer = opentelemetry_jaeger::new_collector_pipeline()
.with_service_name(format!("kata-sb-{}", &sid[0..min(8, sid.len())]))
.with_endpoint(endpoint)
.with_username(jaeger_username)
.with_password(jaeger_password)
.with_hyper()
.install_batch(Tokio)?;
let layer = tracing_opentelemetry::layer().with_tracer(tracer);
let sub = Registry::default().with(layer);
// we use Arc to let global subscriber and katatracer to SHARE the SAME subscriber
// this is for record the global subscriber into a global variable KATA_TRACER for more usages
let subscriber = Arc::new(sub);
tracing::subscriber::set_global_default(subscriber.clone())?;
self.subscriber = subscriber;
// enter the rootspan
self.trace_enter_root();
// modity the enable state, note that we have successfully enable tracing
self.enable();
info!(sl!(), "Tracing enabled successfully");
Ok(())
}
/// Shutdown the tracer and emit the span info to jaeger agent
/// The tracing information is only partially update to jaeger agent before this function is called
pub fn trace_end(&self) {
if self.enabled() {
// exit the rootspan
self.trace_exit_root();
global::shutdown_tracer_provider();
}
}
/// Enter the global ROOTSPAN
/// This function is a hack on tracing library's guard approach, letting the span
/// to enter without using a RAII guard to exit. This function should only be called
/// once, and also in paired with [`trace_exit_root()`].
fn trace_enter_root(&self) {
self.enter_span(&ROOTSPAN);
}
/// Exit the global ROOTSPAN
/// This should be called in paired with [`trace_enter_root()`].
fn trace_exit_root(&self) {
self.exit_span(&ROOTSPAN);
}
/// let the subscriber enter the span, this has to be called in pair with exit(span)
/// This function allows **cross function span** to run without span guard
fn enter_span(&self, span: &Span) {
let id: Option<span::Id> = span.into();
self.subscriber.enter(&id.unwrap());
}
/// let the subscriber exit the span, this has to be called in pair to enter(span)
fn exit_span(&self, span: &Span) {
let id: Option<span::Id> = span.into();
self.subscriber.exit(&id.unwrap());
}
}
/// Verifying the configuration of jaeger and setup the default value
fn verify_jaeger_config(endpoint: &str, username: &str, passwd: &str) -> Result<String> {
if username.is_empty() && !passwd.is_empty() {
warn!(
sl!(),
"Jaeger password with empty username is not allowed, tracing is NOT enabled"
);
return Err(anyhow::anyhow!("Empty username with non-empty password"));
}
// set the default endpoint address, this expects a jaeger-collector running on localhost:14268
let endpt = if endpoint.is_empty() {
DEFAULT_JAEGER_URL
} else {
endpoint
}
.to_owned();
Ok(endpt)
}

View File

@ -24,6 +24,7 @@ tokio = { version = "1.28.1" }
toml = "0.4.2" toml = "0.4.2"
url = "2.1.1" url = "2.1.1"
async-std = "1.12.0" async-std = "1.12.0"
tracing = "0.1.36"
agent = { path = "../../agent" } agent = { path = "../../agent" }
common = { path = "../common" } common = { path = "../common" }

View File

@ -24,6 +24,7 @@ use oci::Process as OCIProcess;
use resource::network::NetnsGuard; use resource::network::NetnsGuard;
use resource::ResourceManager; use resource::ResourceManager;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::instrument;
use kata_sys_util::hooks::HookStates; use kata_sys_util::hooks::HookStates;
@ -38,6 +39,15 @@ pub struct VirtContainerManager {
hypervisor: Arc<dyn Hypervisor>, hypervisor: Arc<dyn Hypervisor>,
} }
impl std::fmt::Debug for VirtContainerManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("VirtContainerManager")
.field("sid", &self.sid)
.field("pid", &self.pid)
.finish()
}
}
impl VirtContainerManager { impl VirtContainerManager {
pub fn new( pub fn new(
sid: &str, sid: &str,
@ -59,6 +69,7 @@ impl VirtContainerManager {
#[async_trait] #[async_trait]
impl ContainerManager for VirtContainerManager { impl ContainerManager for VirtContainerManager {
#[instrument]
async fn create_container(&self, config: ContainerConfig, spec: oci::Spec) -> Result<PID> { async fn create_container(&self, config: ContainerConfig, spec: oci::Spec) -> Result<PID> {
let container = Container::new( let container = Container::new(
self.pid, self.pid,
@ -100,6 +111,7 @@ impl ContainerManager for VirtContainerManager {
Ok(PID { pid: self.pid }) Ok(PID { pid: self.pid })
} }
#[instrument]
async fn close_process_io(&self, process: &ContainerProcess) -> Result<()> { async fn close_process_io(&self, process: &ContainerProcess) -> Result<()> {
let containers = self.containers.read().await; let containers = self.containers.read().await;
let container_id = &process.container_id.to_string(); let container_id = &process.container_id.to_string();
@ -111,6 +123,7 @@ impl ContainerManager for VirtContainerManager {
Ok(()) Ok(())
} }
#[instrument]
async fn delete_process(&self, process: &ContainerProcess) -> Result<ProcessStateInfo> { async fn delete_process(&self, process: &ContainerProcess) -> Result<ProcessStateInfo> {
let container_id = &process.container_id.container_id; let container_id = &process.container_id.container_id;
match process.process_type { match process.process_type {
@ -155,6 +168,7 @@ impl ContainerManager for VirtContainerManager {
} }
} }
#[instrument]
async fn exec_process(&self, req: ExecProcessRequest) -> Result<()> { async fn exec_process(&self, req: ExecProcessRequest) -> Result<()> {
if req.spec_type_url.is_empty() { if req.spec_type_url.is_empty() {
return Err(anyhow!("invalid type url")); return Err(anyhow!("invalid type url"));
@ -180,6 +194,7 @@ impl ContainerManager for VirtContainerManager {
Ok(()) Ok(())
} }
#[instrument]
async fn kill_process(&self, req: &KillRequest) -> Result<()> { async fn kill_process(&self, req: &KillRequest) -> Result<()> {
let containers = self.containers.read().await; let containers = self.containers.read().await;
let container_id = &req.process.container_id.container_id; let container_id = &req.process.container_id.container_id;
@ -199,6 +214,7 @@ impl ContainerManager for VirtContainerManager {
Ok(()) Ok(())
} }
#[instrument]
async fn wait_process(&self, process: &ContainerProcess) -> Result<ProcessExitStatus> { async fn wait_process(&self, process: &ContainerProcess) -> Result<ProcessExitStatus> {
let logger = logger_with_process(process); let logger = logger_with_process(process);
@ -235,6 +251,7 @@ impl ContainerManager for VirtContainerManager {
Ok(status.clone()) Ok(status.clone())
} }
#[instrument]
async fn start_process(&self, process: &ContainerProcess) -> Result<PID> { async fn start_process(&self, process: &ContainerProcess) -> Result<PID> {
let containers = self.containers.read().await; let containers = self.containers.read().await;
let container_id = &process.container_id.container_id; let container_id = &process.container_id.container_id;
@ -265,6 +282,7 @@ impl ContainerManager for VirtContainerManager {
Ok(PID { pid: self.pid }) Ok(PID { pid: self.pid })
} }
#[instrument]
async fn state_process(&self, process: &ContainerProcess) -> Result<ProcessStateInfo> { async fn state_process(&self, process: &ContainerProcess) -> Result<ProcessStateInfo> {
let containers = self.containers.read().await; let containers = self.containers.read().await;
let container_id = &process.container_id.container_id; let container_id = &process.container_id.container_id;
@ -275,6 +293,7 @@ impl ContainerManager for VirtContainerManager {
Ok(state) Ok(state)
} }
#[instrument]
async fn pause_container(&self, id: &ContainerID) -> Result<()> { async fn pause_container(&self, id: &ContainerID) -> Result<()> {
let containers = self.containers.read().await; let containers = self.containers.read().await;
let c = containers let c = containers
@ -284,6 +303,7 @@ impl ContainerManager for VirtContainerManager {
Ok(()) Ok(())
} }
#[instrument]
async fn resume_container(&self, id: &ContainerID) -> Result<()> { async fn resume_container(&self, id: &ContainerID) -> Result<()> {
let containers = self.containers.read().await; let containers = self.containers.read().await;
let c = containers let c = containers
@ -293,6 +313,7 @@ impl ContainerManager for VirtContainerManager {
Ok(()) Ok(())
} }
#[instrument]
async fn resize_process_pty(&self, req: &ResizePTYRequest) -> Result<()> { async fn resize_process_pty(&self, req: &ResizePTYRequest) -> Result<()> {
let containers = self.containers.read().await; let containers = self.containers.read().await;
let c = containers let c = containers
@ -306,6 +327,7 @@ impl ContainerManager for VirtContainerManager {
Ok(()) Ok(())
} }
#[instrument]
async fn stats_container(&self, id: &ContainerID) -> Result<StatsInfo> { async fn stats_container(&self, id: &ContainerID) -> Result<StatsInfo> {
let containers = self.containers.read().await; let containers = self.containers.read().await;
let c = containers let c = containers
@ -315,6 +337,7 @@ impl ContainerManager for VirtContainerManager {
Ok(StatsInfo::from(stats)) Ok(StatsInfo::from(stats))
} }
#[instrument]
async fn update_container(&self, req: UpdateRequest) -> Result<()> { async fn update_container(&self, req: UpdateRequest) -> Result<()> {
let resource = serde_json::from_slice::<oci::LinuxResources>(&req.value) let resource = serde_json::from_slice::<oci::LinuxResources>(&req.value)
.context("deserialize LinuxResource")?; .context("deserialize LinuxResource")?;
@ -326,18 +349,22 @@ impl ContainerManager for VirtContainerManager {
c.update(&resource).await.context("update_container") c.update(&resource).await.context("update_container")
} }
#[instrument]
async fn pid(&self) -> Result<PID> { async fn pid(&self) -> Result<PID> {
Ok(PID { pid: self.pid }) Ok(PID { pid: self.pid })
} }
#[instrument]
async fn connect_container(&self, _id: &ContainerID) -> Result<PID> { async fn connect_container(&self, _id: &ContainerID) -> Result<PID> {
Ok(PID { pid: self.pid }) Ok(PID { pid: self.pid })
} }
#[instrument]
async fn need_shutdown_sandbox(&self, req: &ShutdownRequest) -> bool { async fn need_shutdown_sandbox(&self, req: &ShutdownRequest) -> bool {
req.is_now || self.containers.read().await.is_empty() || self.sid == req.container_id req.is_now || self.containers.read().await.is_empty() || self.sid == req.container_id
} }
#[instrument]
async fn is_sandbox_container(&self, process: &ContainerProcess) -> bool { async fn is_sandbox_container(&self, process: &ContainerProcess) -> bool {
process.process_type == ProcessType::Container process.process_type == ProcessType::Container
&& process.container_id.container_id == self.sid && process.container_id.container_id == self.sid

View File

@ -34,7 +34,11 @@ use kata_types::config::{hypervisor::HYPERVISOR_NAME_CH, CloudHypervisorConfig};
use resource::ResourceManager; use resource::ResourceManager;
use sandbox::VIRTCONTAINER; use sandbox::VIRTCONTAINER;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tracing::instrument;
unsafe impl Send for VirtContainer {}
unsafe impl Sync for VirtContainer {}
#[derive(Debug)]
pub struct VirtContainer {} pub struct VirtContainer {}
#[async_trait] #[async_trait]
@ -64,6 +68,7 @@ impl RuntimeHandler for VirtContainer {
Arc::new(VirtContainer {}) Arc::new(VirtContainer {})
} }
#[instrument]
async fn new_instance( async fn new_instance(
&self, &self,
sid: &str, sid: &str,

View File

@ -26,6 +26,7 @@ use resource::{
ResourceConfig, ResourceManager, ResourceConfig, ResourceManager,
}; };
use tokio::sync::{mpsc::Sender, Mutex, RwLock}; use tokio::sync::{mpsc::Sender, Mutex, RwLock};
use tracing::instrument;
use crate::health_check::HealthCheck; use crate::health_check::HealthCheck;
use persist::{self, sandbox_persist::Persist}; use persist::{self, sandbox_persist::Persist};
@ -67,6 +68,15 @@ pub struct VirtSandbox {
monitor: Arc<HealthCheck>, monitor: Arc<HealthCheck>,
} }
impl std::fmt::Debug for VirtSandbox {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("VirtSandbox")
.field("sid", &self.sid)
.field("msg_sender", &self.msg_sender)
.finish()
}
}
impl VirtSandbox { impl VirtSandbox {
pub async fn new( pub async fn new(
sid: &str, sid: &str,
@ -88,7 +98,8 @@ impl VirtSandbox {
}) })
} }
async fn prepare_config_for_sandbox( #[instrument]
async fn prepare_for_start_sandbox(
&self, &self,
_id: &str, _id: &str,
network_env: SandboxNetworkEnv, network_env: SandboxNetworkEnv,
@ -173,6 +184,7 @@ impl VirtSandbox {
#[async_trait] #[async_trait]
impl Sandbox for VirtSandbox { impl Sandbox for VirtSandbox {
#[instrument(name = "sb: start")]
async fn start( async fn start(
&self, &self,
dns: Vec<String>, dns: Vec<String>,
@ -198,7 +210,7 @@ impl Sandbox for VirtSandbox {
// generate device and setup before start vm // generate device and setup before start vm
// should after hypervisor.prepare_vm // should after hypervisor.prepare_vm
let resources = self let resources = self
.prepare_config_for_sandbox(id, network_env.clone()) .prepare_for_start_sandbox(id, network_env.clone())
.await?; .await?;
self.resource_manager self.resource_manager
.prepare_before_start_vm(resources) .prepare_before_start_vm(resources)

View File

@ -11,6 +11,7 @@ async-trait = "0.1.48"
slog = "2.5.2" slog = "2.5.2"
slog-scope = "4.4.0" slog-scope = "4.4.0"
tokio = { version = "1.28.1", features = ["rt-multi-thread"] } tokio = { version = "1.28.1", features = ["rt-multi-thread"] }
tracing = "0.1.36"
ttrpc = { version = "0.7.1" } ttrpc = { version = "0.7.1" }
common = { path = "../runtimes/common" } common = { path = "../runtimes/common" }

View File

@ -39,6 +39,19 @@ pub struct ServiceManager {
namespace: String, namespace: String,
} }
impl std::fmt::Debug for ServiceManager {
// todo: some how to implement debug for handler
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ServiceManager")
.field("receiver", &self.receiver)
.field("task_server.is_some()", &self.task_server.is_some())
.field("binary", &self.binary)
.field("address", &self.address)
.field("namespace", &self.namespace)
.finish()
}
}
async fn send_event( async fn send_event(
containerd_binary: String, containerd_binary: String,
address: String, address: String,

View File

@ -29,12 +29,15 @@ slog-stdlog = "4.1.0"
thiserror = "1.0.30" thiserror = "1.0.30"
tokio = { version = "1.28.1", features = [ "rt", "rt-multi-thread" ] } tokio = { version = "1.28.1", features = [ "rt", "rt-multi-thread" ] }
unix_socket2 = "0.5.4" unix_socket2 = "0.5.4"
tracing = "0.1.36"
tracing-opentelemetry = "0.18.0"
kata-types = { path = "../../../libs/kata-types"} kata-types = { path = "../../../libs/kata-types"}
kata-sys-util = { path = "../../../libs/kata-sys-util"} kata-sys-util = { path = "../../../libs/kata-sys-util"}
logging = { path = "../../../libs/logging"} logging = { path = "../../../libs/logging"}
oci = { path = "../../../libs/oci" } oci = { path = "../../../libs/oci" }
service = { path = "../service" } service = { path = "../service" }
runtimes = { path = "../runtimes" }
[dev-dependencies] [dev-dependencies]
tempfile = "3.2.0" tempfile = "3.2.0"

View File

@ -142,7 +142,7 @@ fn real_main() -> Result<()> {
Action::Delete(args) => { Action::Delete(args) => {
let mut shim = ShimExecutor::new(args); let mut shim = ShimExecutor::new(args);
let rt = get_tokio_runtime().context("get tokio runtime")?; let rt = get_tokio_runtime().context("get tokio runtime")?;
rt.block_on(shim.delete())? rt.block_on(shim.delete())?;
} }
Action::Run(args) => { Action::Run(args) => {
// set mnt namespace // set mnt namespace
@ -151,7 +151,7 @@ fn real_main() -> Result<()> {
let mut shim = ShimExecutor::new(args); let mut shim = ShimExecutor::new(args);
let rt = get_tokio_runtime().context("get tokio runtime")?; let rt = get_tokio_runtime().context("get tokio runtime")?;
rt.block_on(shim.run())? rt.block_on(shim.run())?;
} }
Action::Help => show_help(&args[0]), Action::Help => show_help(&args[0]),
Action::Version => show_version(None), Action::Version => show_version(None),

View File

@ -20,6 +20,7 @@ const SHIM_PID_FILE: &str = "shim.pid";
pub(crate) const ENV_KATA_RUNTIME_BIND_FD: &str = "KATA_RUNTIME_BIND_FD"; pub(crate) const ENV_KATA_RUNTIME_BIND_FD: &str = "KATA_RUNTIME_BIND_FD";
/// Command executor for shim. /// Command executor for shim.
#[derive(Debug)]
pub struct ShimExecutor { pub struct ShimExecutor {
pub(crate) args: Args, pub(crate) args: Args,
} }