mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-23 22:17:19 +00:00
tracing: Add tracing to runtime-rs
Introduce tracing into runtime-rs, only some functions are instrumented. Fixes: #5239 Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com> Signed-off-by: Yushuo <y-shuo@linux.alibaba.com>
This commit is contained in:
parent
58e921eace
commit
ed23b47c71
1050
src/runtime-rs/Cargo.lock
generated
1050
src/runtime-rs/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -19,6 +19,7 @@ slog = "2.5.2"
|
||||
slog-scope = "4.4.0"
|
||||
ttrpc = { version = "0.7.1" }
|
||||
tokio = { version = "1.28.1", features = ["fs", "rt"] }
|
||||
tracing = "0.1.36"
|
||||
url = "2.2.2"
|
||||
nix = "0.24.2"
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use tracing::instrument;
|
||||
use ttrpc::context as ttrpc_ctx;
|
||||
|
||||
use kata_types::config::Agent as AgentConfig;
|
||||
@ -22,6 +23,7 @@ fn new_ttrpc_ctx(timeout: i64) -> ttrpc_ctx::Context {
|
||||
|
||||
#[async_trait]
|
||||
impl AgentManager for KataAgent {
|
||||
#[instrument]
|
||||
async fn start(&self, address: &str) -> Result<()> {
|
||||
info!(sl!(), "begin to connect agent {:?}", address);
|
||||
self.set_socket_address(address)
|
||||
@ -73,6 +75,7 @@ macro_rules! impl_agent {
|
||||
($($name: tt | $req: ty | $resp: ty | $new_timeout: expr),*) => {
|
||||
#[async_trait]
|
||||
impl Agent for KataAgent {
|
||||
#[instrument(skip(req))]
|
||||
$(async fn $name(&self, req: $req) -> Result<$resp> {
|
||||
let r = req.into();
|
||||
let (client, mut timeout, _) = self.get_agent_client().await.context("get client")?;
|
||||
|
@ -44,6 +44,19 @@ pub(crate) struct KataAgentInner {
|
||||
log_forwarder: LogForwarder,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for KataAgentInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("KataAgentInner")
|
||||
.field("client_fd", &self.client_fd)
|
||||
.field("socket_address", &self.socket_address)
|
||||
.field("config", &self.config)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for KataAgent {}
|
||||
unsafe impl Sync for KataAgent {}
|
||||
#[derive(Debug)]
|
||||
pub struct KataAgent {
|
||||
pub(crate) inner: Arc<RwLock<KataAgentInner>>,
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ vmm-sys-util = "0.11.0"
|
||||
rand = "0.8.4"
|
||||
path-clean = "1.0.1"
|
||||
lazy_static = "1.4"
|
||||
tracing = "0.1.36"
|
||||
|
||||
kata-sys-util = { path = "../../../libs/kata-sys-util" }
|
||||
kata-types = { path = "../../../libs/kata-types" }
|
||||
|
@ -19,14 +19,20 @@ use async_trait::async_trait;
|
||||
use kata_types::capabilities::Capabilities;
|
||||
use kata_types::config::hypervisor::Hypervisor as HypervisorConfig;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::{DeviceType, Hypervisor, VcpuThreadIds};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Dragonball {
|
||||
inner: Arc<RwLock<DragonballInner>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Dragonball {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Dragonball").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Dragonball {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
@ -48,11 +54,13 @@ impl Dragonball {
|
||||
|
||||
#[async_trait]
|
||||
impl Hypervisor for Dragonball {
|
||||
#[instrument]
|
||||
async fn prepare_vm(&self, id: &str, netns: Option<String>) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.prepare_vm(id, netns).await
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn start_vm(&self, timeout: i32) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.start_vm(timeout).await
|
||||
|
@ -31,6 +31,7 @@ serde_json = "1.0.82"
|
||||
slog = "2.5.2"
|
||||
slog-scope = "4.4.0"
|
||||
tokio = { version = "1.28.1", features = ["process"] }
|
||||
tracing = "0.1.36"
|
||||
uuid = { version = "0.4", features = ["v4"] }
|
||||
|
||||
agent = { path = "../agent" }
|
||||
|
@ -17,6 +17,7 @@ use kata_types::mount::Mount;
|
||||
use oci::{Linux, LinuxResources};
|
||||
use persist::sandbox_persist::Persist;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::network::NetworkConfig;
|
||||
use crate::resource_persist::ResourceState;
|
||||
@ -34,6 +35,12 @@ pub struct ResourceManager {
|
||||
inner: Arc<RwLock<ResourceManagerInner>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ResourceManager {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ResourceManager").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl ResourceManager {
|
||||
pub async fn new(
|
||||
sid: &str,
|
||||
@ -58,6 +65,7 @@ impl ResourceManager {
|
||||
inner.get_device_manager()
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
pub async fn prepare_before_start_vm(&self, device_configs: Vec<ResourceConfig>) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.prepare_before_start_vm(device_configs).await
|
||||
@ -68,6 +76,7 @@ impl ResourceManager {
|
||||
inner.handle_network(network_config).await
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
pub async fn setup_after_start_vm(&self) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
inner.setup_after_start_vm().await
|
||||
|
@ -12,6 +12,11 @@ netns-rs = "0.1.0"
|
||||
slog = "2.5.2"
|
||||
slog-scope = "4.4.0"
|
||||
tokio = { version = "1.28.1", features = ["rt-multi-thread"] }
|
||||
tracing = "0.1.36"
|
||||
tracing-opentelemetry = "0.18.0"
|
||||
opentelemetry = { version = "0.18.0", features = ["rt-tokio-current-thread", "trace", "rt-tokio"] }
|
||||
opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio", "hyper_collector_client", "collector_client"] }
|
||||
tracing-subscriber = { version = "0.3", features = ["registry", "std"] }
|
||||
hyper = { version = "0.14.20", features = ["stream", "server", "http1"] }
|
||||
hyperlocal = "0.8"
|
||||
serde_json = "1.0.88"
|
||||
|
@ -13,6 +13,15 @@ pub struct SandboxNetworkEnv {
|
||||
pub network_created: bool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SandboxNetworkEnv {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SandboxNetworkEnv")
|
||||
.field("netns", &self.netns)
|
||||
.field("network_created", &self.network_created)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Sandbox: Send + Sync {
|
||||
async fn start(
|
||||
|
@ -13,3 +13,4 @@ pub mod manager;
|
||||
pub use manager::RuntimeHandlerManager;
|
||||
pub use shim_interface;
|
||||
mod shim_mgmt;
|
||||
pub mod tracer;
|
||||
|
@ -24,7 +24,8 @@ use persist::sandbox_persist::Persist;
|
||||
use resource::{cpu_mem::initial_size::InitialSizeManager, network::generate_netns_name};
|
||||
use shim_interface::shim_mgmt::ERR_NO_SHIM_SERVER;
|
||||
use tokio::fs;
|
||||
use tokio::sync::{mpsc::Sender, RwLock};
|
||||
use tokio::sync::{mpsc::Sender, Mutex, RwLock};
|
||||
use tracing::instrument;
|
||||
#[cfg(feature = "virt")]
|
||||
use virt_container::{
|
||||
sandbox::{SandboxRestoreArgs, VirtSandbox},
|
||||
@ -34,23 +35,39 @@ use virt_container::{
|
||||
#[cfg(feature = "wasm")]
|
||||
use wasm_container::WasmContainer;
|
||||
|
||||
use crate::shim_mgmt::server::MgmtServer;
|
||||
use crate::{
|
||||
shim_mgmt::server::MgmtServer,
|
||||
tracer::{KataTracer, ROOTSPAN},
|
||||
};
|
||||
|
||||
struct RuntimeHandlerManagerInner {
|
||||
id: String,
|
||||
msg_sender: Sender<Message>,
|
||||
kata_tracer: Arc<Mutex<KataTracer>>,
|
||||
runtime_instance: Option<Arc<RuntimeInstance>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RuntimeHandlerManagerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RuntimeHandlerManagerInner")
|
||||
.field("id", &self.id)
|
||||
.field("msg_sender", &self.msg_sender)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl RuntimeHandlerManagerInner {
|
||||
fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
|
||||
let tracer = KataTracer::new();
|
||||
Ok(Self {
|
||||
id: id.to_string(),
|
||||
msg_sender,
|
||||
kata_tracer: Arc::new(Mutex::new(tracer)),
|
||||
runtime_instance: None,
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn init_runtime_handler(
|
||||
&mut self,
|
||||
spec: &oci::Spec,
|
||||
@ -72,10 +89,23 @@ impl RuntimeHandlerManagerInner {
|
||||
_ => return Err(anyhow!("Unsupported runtime: {}", &config.runtime.name)),
|
||||
};
|
||||
let runtime_instance = runtime_handler
|
||||
.new_instance(&self.id, self.msg_sender.clone(), config)
|
||||
.new_instance(&self.id, self.msg_sender.clone(), config.clone())
|
||||
.await
|
||||
.context("new runtime instance")?;
|
||||
|
||||
// initilize the trace subscriber
|
||||
if config.runtime.enable_tracing {
|
||||
let mut tracer = self.kata_tracer.lock().await;
|
||||
if let Err(e) = tracer.trace_setup(
|
||||
&self.id,
|
||||
&config.runtime.jaeger_endpoint,
|
||||
&config.runtime.jaeger_user,
|
||||
&config.runtime.jaeger_password,
|
||||
) {
|
||||
warn!(sl!(), "failed to setup tracing, {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// start sandbox
|
||||
runtime_instance
|
||||
.sandbox
|
||||
@ -86,6 +116,7 @@ impl RuntimeHandlerManagerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn try_init(
|
||||
&mut self,
|
||||
spec: &oci::Spec,
|
||||
@ -149,6 +180,7 @@ impl RuntimeHandlerManagerInner {
|
||||
netns,
|
||||
network_created,
|
||||
};
|
||||
|
||||
self.init_runtime_handler(spec, state, network_env, dns, Arc::new(config))
|
||||
.await
|
||||
.context("init runtime handler")?;
|
||||
@ -171,12 +203,23 @@ impl RuntimeHandlerManagerInner {
|
||||
fn get_runtime_instance(&self) -> Option<Arc<RuntimeInstance>> {
|
||||
self.runtime_instance.clone()
|
||||
}
|
||||
|
||||
fn get_kata_tracer(&self) -> Arc<Mutex<KataTracer>> {
|
||||
self.kata_tracer.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RuntimeHandlerManager {
|
||||
inner: Arc<RwLock<RuntimeHandlerManagerInner>>,
|
||||
}
|
||||
|
||||
// todo: a more detailed impl for fmt::Debug
|
||||
impl std::fmt::Debug for RuntimeHandlerManager {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RuntimeHandlerManager").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl RuntimeHandlerManager {
|
||||
pub async fn new(id: &str, msg_sender: Sender<Message>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
@ -243,6 +286,12 @@ impl RuntimeHandlerManager {
|
||||
.ok_or_else(|| anyhow!("runtime not ready"))
|
||||
}
|
||||
|
||||
async fn get_kata_tracer(&self) -> Result<Arc<Mutex<KataTracer>>> {
|
||||
let inner = self.inner.read().await;
|
||||
Ok(inner.get_kata_tracer())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn try_init_runtime_instance(
|
||||
&self,
|
||||
spec: &oci::Spec,
|
||||
@ -253,6 +302,7 @@ impl RuntimeHandlerManager {
|
||||
inner.try_init(spec, state, options).await
|
||||
}
|
||||
|
||||
#[instrument(parent = &*(ROOTSPAN))]
|
||||
pub async fn handler_message(&self, req: Request) -> Result<Response> {
|
||||
if let Request::CreateContainer(container_config) = req {
|
||||
// get oci spec
|
||||
@ -291,6 +341,7 @@ impl RuntimeHandlerManager {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(parent = &(*ROOTSPAN))]
|
||||
pub async fn handler_request(&self, req: Request) -> Result<Response> {
|
||||
let instance = self
|
||||
.get_runtime_instance()
|
||||
@ -320,6 +371,11 @@ impl RuntimeHandlerManager {
|
||||
Request::ShutdownContainer(req) => {
|
||||
if cm.need_shutdown_sandbox(&req).await {
|
||||
sandbox.shutdown().await.context("do shutdown")?;
|
||||
|
||||
// stop the tracer collector
|
||||
let kata_tracer = self.get_kata_tracer().await.context("get kata tracer")?;
|
||||
let tracer = kata_tracer.lock().await;
|
||||
tracer.trace_end();
|
||||
}
|
||||
Ok(Response::ShutdownContainer)
|
||||
}
|
||||
@ -388,6 +444,7 @@ impl RuntimeHandlerManager {
|
||||
/// 3. shimv2 create task option
|
||||
/// 4. If above three are not set, then get default path from DEFAULT_RUNTIME_CONFIGURATIONS
|
||||
/// in kata-containers/src/libs/kata-types/src/config/default.rs, in array order.
|
||||
#[instrument]
|
||||
fn load_config(spec: &oci::Spec, option: &Option<Vec<u8>>) -> Result<TomlConfig> {
|
||||
const KATA_CONF_FILE: &str = "KATA_CONF_FILE";
|
||||
let annotation = Annotation::new(spec.annotations.clone());
|
||||
|
169
src/runtime-rs/crates/runtimes/src/tracer.rs
Normal file
169
src/runtime-rs/crates/runtimes/src/tracer.rs
Normal file
@ -0,0 +1,169 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::cmp::min;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use lazy_static::lazy_static;
|
||||
use opentelemetry::global;
|
||||
use opentelemetry::runtime::Tokio;
|
||||
use tracing::{span, subscriber::NoSubscriber, Span, Subscriber};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::Registry;
|
||||
|
||||
const DEFAULT_JAEGER_URL: &str = "http://localhost:14268/api/traces";
|
||||
|
||||
lazy_static! {
|
||||
/// The ROOTSPAN is a phantom span that is running by calling [`trace_enter_root()`] at the background
|
||||
/// once the configuration is read and config.runtime.enable_tracing is enabled
|
||||
/// The ROOTSPAN exits by calling [`trace_exit_root()`] on shutdown request sent from containerd
|
||||
///
|
||||
/// NOTE:
|
||||
/// This allows other threads which are not directly running under some spans to be tracked easily
|
||||
/// within the entire sandbox's lifetime.
|
||||
/// To do this, you just need to add attribute #[instrment(parent=&(*ROOTSPAN))]
|
||||
pub static ref ROOTSPAN: Span = span!(tracing::Level::TRACE, "root-span");
|
||||
}
|
||||
|
||||
/// The tracer wrapper for kata-containers
|
||||
/// The fields and member methods should ALWAYS be PRIVATE and be exposed in a safe
|
||||
/// way to other modules
|
||||
unsafe impl Send for KataTracer {}
|
||||
unsafe impl Sync for KataTracer {}
|
||||
pub struct KataTracer {
|
||||
subscriber: Arc<dyn Subscriber + Send + Sync>,
|
||||
enabled: bool,
|
||||
}
|
||||
|
||||
impl Default for KataTracer {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl KataTracer {
|
||||
/// Constructor of KataTracer, this is a dummy implementation for static initialization
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
subscriber: Arc::new(NoSubscriber::default()),
|
||||
enabled: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the tracing enabled flag
|
||||
fn enable(&mut self) {
|
||||
self.enabled = true;
|
||||
}
|
||||
|
||||
/// Return whether the tracing is enabled, enabled by [`trace_setup`]
|
||||
fn enabled(&self) -> bool {
|
||||
self.enabled
|
||||
}
|
||||
|
||||
/// Call when the tracing is enabled (set in toml configuration file)
|
||||
/// This setup the subscriber, which maintains the span's information, to global and
|
||||
/// inside KATA_TRACER.
|
||||
///
|
||||
/// Note that the span will be noop(not collected) if a invalid subscriber is set
|
||||
pub fn trace_setup(
|
||||
&mut self,
|
||||
sid: &str,
|
||||
jaeger_endpoint: &str,
|
||||
jaeger_username: &str,
|
||||
jaeger_password: &str,
|
||||
) -> Result<()> {
|
||||
// If varify jaeger config returns an error, it means that the tracing should not be enabled
|
||||
let endpoint = verify_jaeger_config(jaeger_endpoint, jaeger_username, jaeger_password)?;
|
||||
|
||||
// derive a subscriber to collect span info
|
||||
let tracer = opentelemetry_jaeger::new_collector_pipeline()
|
||||
.with_service_name(format!("kata-sb-{}", &sid[0..min(8, sid.len())]))
|
||||
.with_endpoint(endpoint)
|
||||
.with_username(jaeger_username)
|
||||
.with_password(jaeger_password)
|
||||
.with_hyper()
|
||||
.install_batch(Tokio)?;
|
||||
|
||||
let layer = tracing_opentelemetry::layer().with_tracer(tracer);
|
||||
|
||||
let sub = Registry::default().with(layer);
|
||||
|
||||
// we use Arc to let global subscriber and katatracer to SHARE the SAME subscriber
|
||||
// this is for record the global subscriber into a global variable KATA_TRACER for more usages
|
||||
let subscriber = Arc::new(sub);
|
||||
tracing::subscriber::set_global_default(subscriber.clone())?;
|
||||
self.subscriber = subscriber;
|
||||
|
||||
// enter the rootspan
|
||||
self.trace_enter_root();
|
||||
|
||||
// modity the enable state, note that we have successfully enable tracing
|
||||
self.enable();
|
||||
|
||||
info!(sl!(), "Tracing enabled successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Shutdown the tracer and emit the span info to jaeger agent
|
||||
/// The tracing information is only partially update to jaeger agent before this function is called
|
||||
pub fn trace_end(&self) {
|
||||
if self.enabled() {
|
||||
// exit the rootspan
|
||||
self.trace_exit_root();
|
||||
|
||||
global::shutdown_tracer_provider();
|
||||
}
|
||||
}
|
||||
|
||||
/// Enter the global ROOTSPAN
|
||||
/// This function is a hack on tracing library's guard approach, letting the span
|
||||
/// to enter without using a RAII guard to exit. This function should only be called
|
||||
/// once, and also in paired with [`trace_exit_root()`].
|
||||
fn trace_enter_root(&self) {
|
||||
self.enter_span(&ROOTSPAN);
|
||||
}
|
||||
|
||||
/// Exit the global ROOTSPAN
|
||||
/// This should be called in paired with [`trace_enter_root()`].
|
||||
fn trace_exit_root(&self) {
|
||||
self.exit_span(&ROOTSPAN);
|
||||
}
|
||||
|
||||
/// let the subscriber enter the span, this has to be called in pair with exit(span)
|
||||
/// This function allows **cross function span** to run without span guard
|
||||
fn enter_span(&self, span: &Span) {
|
||||
let id: Option<span::Id> = span.into();
|
||||
self.subscriber.enter(&id.unwrap());
|
||||
}
|
||||
|
||||
/// let the subscriber exit the span, this has to be called in pair to enter(span)
|
||||
fn exit_span(&self, span: &Span) {
|
||||
let id: Option<span::Id> = span.into();
|
||||
self.subscriber.exit(&id.unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
/// Verifying the configuration of jaeger and setup the default value
|
||||
fn verify_jaeger_config(endpoint: &str, username: &str, passwd: &str) -> Result<String> {
|
||||
if username.is_empty() && !passwd.is_empty() {
|
||||
warn!(
|
||||
sl!(),
|
||||
"Jaeger password with empty username is not allowed, tracing is NOT enabled"
|
||||
);
|
||||
return Err(anyhow::anyhow!("Empty username with non-empty password"));
|
||||
}
|
||||
|
||||
// set the default endpoint address, this expects a jaeger-collector running on localhost:14268
|
||||
let endpt = if endpoint.is_empty() {
|
||||
DEFAULT_JAEGER_URL
|
||||
} else {
|
||||
endpoint
|
||||
}
|
||||
.to_owned();
|
||||
|
||||
Ok(endpt)
|
||||
}
|
@ -24,6 +24,7 @@ tokio = { version = "1.28.1" }
|
||||
toml = "0.4.2"
|
||||
url = "2.1.1"
|
||||
async-std = "1.12.0"
|
||||
tracing = "0.1.36"
|
||||
|
||||
agent = { path = "../../agent" }
|
||||
common = { path = "../common" }
|
||||
|
@ -24,6 +24,7 @@ use oci::Process as OCIProcess;
|
||||
use resource::network::NetnsGuard;
|
||||
use resource::ResourceManager;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::instrument;
|
||||
|
||||
use kata_sys_util::hooks::HookStates;
|
||||
|
||||
@ -38,6 +39,15 @@ pub struct VirtContainerManager {
|
||||
hypervisor: Arc<dyn Hypervisor>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for VirtContainerManager {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("VirtContainerManager")
|
||||
.field("sid", &self.sid)
|
||||
.field("pid", &self.pid)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtContainerManager {
|
||||
pub fn new(
|
||||
sid: &str,
|
||||
@ -59,6 +69,7 @@ impl VirtContainerManager {
|
||||
|
||||
#[async_trait]
|
||||
impl ContainerManager for VirtContainerManager {
|
||||
#[instrument]
|
||||
async fn create_container(&self, config: ContainerConfig, spec: oci::Spec) -> Result<PID> {
|
||||
let container = Container::new(
|
||||
self.pid,
|
||||
@ -100,6 +111,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(PID { pid: self.pid })
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn close_process_io(&self, process: &ContainerProcess) -> Result<()> {
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &process.container_id.to_string();
|
||||
@ -111,6 +123,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn delete_process(&self, process: &ContainerProcess) -> Result<ProcessStateInfo> {
|
||||
let container_id = &process.container_id.container_id;
|
||||
match process.process_type {
|
||||
@ -155,6 +168,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn exec_process(&self, req: ExecProcessRequest) -> Result<()> {
|
||||
if req.spec_type_url.is_empty() {
|
||||
return Err(anyhow!("invalid type url"));
|
||||
@ -180,6 +194,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn kill_process(&self, req: &KillRequest) -> Result<()> {
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &req.process.container_id.container_id;
|
||||
@ -199,6 +214,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn wait_process(&self, process: &ContainerProcess) -> Result<ProcessExitStatus> {
|
||||
let logger = logger_with_process(process);
|
||||
|
||||
@ -235,6 +251,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(status.clone())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn start_process(&self, process: &ContainerProcess) -> Result<PID> {
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &process.container_id.container_id;
|
||||
@ -265,6 +282,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(PID { pid: self.pid })
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn state_process(&self, process: &ContainerProcess) -> Result<ProcessStateInfo> {
|
||||
let containers = self.containers.read().await;
|
||||
let container_id = &process.container_id.container_id;
|
||||
@ -275,6 +293,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn pause_container(&self, id: &ContainerID) -> Result<()> {
|
||||
let containers = self.containers.read().await;
|
||||
let c = containers
|
||||
@ -284,6 +303,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn resume_container(&self, id: &ContainerID) -> Result<()> {
|
||||
let containers = self.containers.read().await;
|
||||
let c = containers
|
||||
@ -293,6 +313,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn resize_process_pty(&self, req: &ResizePTYRequest) -> Result<()> {
|
||||
let containers = self.containers.read().await;
|
||||
let c = containers
|
||||
@ -306,6 +327,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn stats_container(&self, id: &ContainerID) -> Result<StatsInfo> {
|
||||
let containers = self.containers.read().await;
|
||||
let c = containers
|
||||
@ -315,6 +337,7 @@ impl ContainerManager for VirtContainerManager {
|
||||
Ok(StatsInfo::from(stats))
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn update_container(&self, req: UpdateRequest) -> Result<()> {
|
||||
let resource = serde_json::from_slice::<oci::LinuxResources>(&req.value)
|
||||
.context("deserialize LinuxResource")?;
|
||||
@ -326,18 +349,22 @@ impl ContainerManager for VirtContainerManager {
|
||||
c.update(&resource).await.context("update_container")
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn pid(&self) -> Result<PID> {
|
||||
Ok(PID { pid: self.pid })
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn connect_container(&self, _id: &ContainerID) -> Result<PID> {
|
||||
Ok(PID { pid: self.pid })
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn need_shutdown_sandbox(&self, req: &ShutdownRequest) -> bool {
|
||||
req.is_now || self.containers.read().await.is_empty() || self.sid == req.container_id
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn is_sandbox_container(&self, process: &ContainerProcess) -> bool {
|
||||
process.process_type == ProcessType::Container
|
||||
&& process.container_id.container_id == self.sid
|
||||
|
@ -34,7 +34,11 @@ use kata_types::config::{hypervisor::HYPERVISOR_NAME_CH, CloudHypervisorConfig};
|
||||
use resource::ResourceManager;
|
||||
use sandbox::VIRTCONTAINER;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::instrument;
|
||||
|
||||
unsafe impl Send for VirtContainer {}
|
||||
unsafe impl Sync for VirtContainer {}
|
||||
#[derive(Debug)]
|
||||
pub struct VirtContainer {}
|
||||
|
||||
#[async_trait]
|
||||
@ -64,6 +68,7 @@ impl RuntimeHandler for VirtContainer {
|
||||
Arc::new(VirtContainer {})
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
async fn new_instance(
|
||||
&self,
|
||||
sid: &str,
|
||||
|
@ -26,6 +26,7 @@ use resource::{
|
||||
ResourceConfig, ResourceManager,
|
||||
};
|
||||
use tokio::sync::{mpsc::Sender, Mutex, RwLock};
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::health_check::HealthCheck;
|
||||
use persist::{self, sandbox_persist::Persist};
|
||||
@ -67,6 +68,15 @@ pub struct VirtSandbox {
|
||||
monitor: Arc<HealthCheck>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for VirtSandbox {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("VirtSandbox")
|
||||
.field("sid", &self.sid)
|
||||
.field("msg_sender", &self.msg_sender)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtSandbox {
|
||||
pub async fn new(
|
||||
sid: &str,
|
||||
@ -88,7 +98,8 @@ impl VirtSandbox {
|
||||
})
|
||||
}
|
||||
|
||||
async fn prepare_config_for_sandbox(
|
||||
#[instrument]
|
||||
async fn prepare_for_start_sandbox(
|
||||
&self,
|
||||
_id: &str,
|
||||
network_env: SandboxNetworkEnv,
|
||||
@ -173,6 +184,7 @@ impl VirtSandbox {
|
||||
|
||||
#[async_trait]
|
||||
impl Sandbox for VirtSandbox {
|
||||
#[instrument(name = "sb: start")]
|
||||
async fn start(
|
||||
&self,
|
||||
dns: Vec<String>,
|
||||
@ -198,7 +210,7 @@ impl Sandbox for VirtSandbox {
|
||||
// generate device and setup before start vm
|
||||
// should after hypervisor.prepare_vm
|
||||
let resources = self
|
||||
.prepare_config_for_sandbox(id, network_env.clone())
|
||||
.prepare_for_start_sandbox(id, network_env.clone())
|
||||
.await?;
|
||||
self.resource_manager
|
||||
.prepare_before_start_vm(resources)
|
||||
|
@ -11,6 +11,7 @@ async-trait = "0.1.48"
|
||||
slog = "2.5.2"
|
||||
slog-scope = "4.4.0"
|
||||
tokio = { version = "1.28.1", features = ["rt-multi-thread"] }
|
||||
tracing = "0.1.36"
|
||||
ttrpc = { version = "0.7.1" }
|
||||
|
||||
common = { path = "../runtimes/common" }
|
||||
|
@ -39,6 +39,19 @@ pub struct ServiceManager {
|
||||
namespace: String,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ServiceManager {
|
||||
// todo: some how to implement debug for handler
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ServiceManager")
|
||||
.field("receiver", &self.receiver)
|
||||
.field("task_server.is_some()", &self.task_server.is_some())
|
||||
.field("binary", &self.binary)
|
||||
.field("address", &self.address)
|
||||
.field("namespace", &self.namespace)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_event(
|
||||
containerd_binary: String,
|
||||
address: String,
|
||||
|
@ -29,12 +29,15 @@ slog-stdlog = "4.1.0"
|
||||
thiserror = "1.0.30"
|
||||
tokio = { version = "1.28.1", features = [ "rt", "rt-multi-thread" ] }
|
||||
unix_socket2 = "0.5.4"
|
||||
tracing = "0.1.36"
|
||||
tracing-opentelemetry = "0.18.0"
|
||||
|
||||
kata-types = { path = "../../../libs/kata-types"}
|
||||
kata-sys-util = { path = "../../../libs/kata-sys-util"}
|
||||
logging = { path = "../../../libs/logging"}
|
||||
oci = { path = "../../../libs/oci" }
|
||||
service = { path = "../service" }
|
||||
runtimes = { path = "../runtimes" }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.2.0"
|
||||
|
@ -142,7 +142,7 @@ fn real_main() -> Result<()> {
|
||||
Action::Delete(args) => {
|
||||
let mut shim = ShimExecutor::new(args);
|
||||
let rt = get_tokio_runtime().context("get tokio runtime")?;
|
||||
rt.block_on(shim.delete())?
|
||||
rt.block_on(shim.delete())?;
|
||||
}
|
||||
Action::Run(args) => {
|
||||
// set mnt namespace
|
||||
@ -151,7 +151,7 @@ fn real_main() -> Result<()> {
|
||||
|
||||
let mut shim = ShimExecutor::new(args);
|
||||
let rt = get_tokio_runtime().context("get tokio runtime")?;
|
||||
rt.block_on(shim.run())?
|
||||
rt.block_on(shim.run())?;
|
||||
}
|
||||
Action::Help => show_help(&args[0]),
|
||||
Action::Version => show_version(None),
|
||||
|
@ -20,6 +20,7 @@ const SHIM_PID_FILE: &str = "shim.pid";
|
||||
pub(crate) const ENV_KATA_RUNTIME_BIND_FD: &str = "KATA_RUNTIME_BIND_FD";
|
||||
|
||||
/// Command executor for shim.
|
||||
#[derive(Debug)]
|
||||
pub struct ShimExecutor {
|
||||
pub(crate) args: Args,
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user