runtime-rs: add support for gather metrics in runtime-rs

1. Implemented metrics collection for runtime-rs shim and dragonball hypervisor.
2. Described the current supported metrics in runtime-rs.(docs/design/kata-metrics-in-runtime-rs.md)

Fixes: #5017

Signed-off-by: Yuan-Zhuo <yuanzhuo0118@outlook.com>
This commit is contained in:
Yuan-Zhuo
2023-07-28 17:16:51 +08:00
parent 61a8eabf8e
commit 02cc4fe9db
26 changed files with 914 additions and 1935 deletions

View File

@@ -121,5 +121,6 @@ impl_agent!(
set_ip_tables | crate::SetIPTablesRequest | crate::SetIPTablesResponse | None,
get_volume_stats | crate::VolumeStatsRequest | crate::VolumeStatsResponse | None,
resize_volume | crate::ResizeVolumeRequest | crate::Empty | None,
online_cpu_mem | crate::OnlineCPUMemRequest | crate::Empty | None
online_cpu_mem | crate::OnlineCPUMemRequest | crate::Empty | None,
get_metrics | crate::Empty | crate::MetricsResponse | None
);

View File

@@ -7,7 +7,7 @@
use std::convert::Into;
use protocols::{
agent::{self, OOMEvent},
agent::{self, Metrics, OOMEvent},
csi, empty, health, types,
};
@@ -19,13 +19,13 @@ use crate::{
Empty, ExecProcessRequest, FSGroup, FSGroupChangePolicy, GetIPTablesRequest,
GetIPTablesResponse, GuestDetailsResponse, HealthCheckResponse, HugetlbStats, IPAddress,
IPFamily, Interface, Interfaces, KernelModule, MemHotplugByProbeRequest, MemoryData,
MemoryStats, NetworkStats, OnlineCPUMemRequest, PidsStats, ReadStreamRequest,
ReadStreamResponse, RemoveContainerRequest, ReseedRandomDevRequest, ResizeVolumeRequest,
Route, Routes, SetGuestDateTimeRequest, SetIPTablesRequest, SetIPTablesResponse,
SignalProcessRequest, StatsContainerResponse, Storage, StringUser, ThrottlingData,
TtyWinResizeRequest, UpdateContainerRequest, UpdateInterfaceRequest, UpdateRoutesRequest,
VersionCheckResponse, VolumeStatsRequest, VolumeStatsResponse, WaitProcessRequest,
WriteStreamRequest,
MemoryStats, MetricsResponse, NetworkStats, OnlineCPUMemRequest, PidsStats,
ReadStreamRequest, ReadStreamResponse, RemoveContainerRequest, ReseedRandomDevRequest,
ResizeVolumeRequest, Route, Routes, SetGuestDateTimeRequest, SetIPTablesRequest,
SetIPTablesResponse, SignalProcessRequest, StatsContainerResponse, Storage, StringUser,
ThrottlingData, TtyWinResizeRequest, UpdateContainerRequest, UpdateInterfaceRequest,
UpdateRoutesRequest, VersionCheckResponse, VolumeStatsRequest, VolumeStatsResponse,
WaitProcessRequest, WriteStreamRequest,
},
OomEventResponse, WaitProcessResponse, WriteStreamResponse,
};
@@ -755,6 +755,14 @@ impl From<agent::WaitProcessResponse> for WaitProcessResponse {
}
}
impl From<Empty> for agent::GetMetricsRequest {
fn from(_: Empty) -> Self {
Self {
..Default::default()
}
}
}
impl From<Empty> for agent::GetOOMEventRequest {
fn from(_: Empty) -> Self {
Self {
@@ -789,6 +797,14 @@ impl From<health::VersionCheckResponse> for VersionCheckResponse {
}
}
impl From<agent::Metrics> for MetricsResponse {
fn from(from: Metrics) -> Self {
Self {
metrics: from.metrics,
}
}
}
impl From<agent::OOMEvent> for OomEventResponse {
fn from(from: OOMEvent) -> Self {
Self {

View File

@@ -18,13 +18,14 @@ pub use types::{
CloseStdinRequest, ContainerID, ContainerProcessID, CopyFileRequest, CreateContainerRequest,
CreateSandboxRequest, Empty, ExecProcessRequest, GetGuestDetailsRequest, GetIPTablesRequest,
GetIPTablesResponse, GuestDetailsResponse, HealthCheckResponse, IPAddress, IPFamily, Interface,
Interfaces, ListProcessesRequest, MemHotplugByProbeRequest, OnlineCPUMemRequest,
OomEventResponse, ReadStreamRequest, ReadStreamResponse, RemoveContainerRequest,
ReseedRandomDevRequest, ResizeVolumeRequest, Route, Routes, SetGuestDateTimeRequest,
SetIPTablesRequest, SetIPTablesResponse, SignalProcessRequest, StatsContainerResponse, Storage,
TtyWinResizeRequest, UpdateContainerRequest, UpdateInterfaceRequest, UpdateRoutesRequest,
VersionCheckResponse, VolumeStatsRequest, VolumeStatsResponse, WaitProcessRequest,
WaitProcessResponse, WriteStreamRequest, WriteStreamResponse,
Interfaces, ListProcessesRequest, MemHotplugByProbeRequest, MetricsResponse,
OnlineCPUMemRequest, OomEventResponse, ReadStreamRequest, ReadStreamResponse,
RemoveContainerRequest, ReseedRandomDevRequest, ResizeVolumeRequest, Route, Routes,
SetGuestDateTimeRequest, SetIPTablesRequest, SetIPTablesResponse, SignalProcessRequest,
StatsContainerResponse, Storage, TtyWinResizeRequest, UpdateContainerRequest,
UpdateInterfaceRequest, UpdateRoutesRequest, VersionCheckResponse, VolumeStatsRequest,
VolumeStatsResponse, WaitProcessRequest, WaitProcessResponse, WriteStreamRequest,
WriteStreamResponse,
};
use anyhow::Result;
@@ -86,6 +87,7 @@ pub trait Agent: AgentManager + HealthService + Send + Sync {
// utils
async fn copy_file(&self, req: CopyFileRequest) -> Result<Empty>;
async fn get_metrics(&self, req: Empty) -> Result<MetricsResponse>;
async fn get_oom_event(&self, req: Empty) -> Result<OomEventResponse>;
async fn get_ip_tables(&self, req: GetIPTablesRequest) -> Result<GetIPTablesResponse>;
async fn set_ip_tables(&self, req: SetIPTablesRequest) -> Result<SetIPTablesResponse>;

View File

@@ -556,6 +556,11 @@ pub struct VersionCheckResponse {
pub agent_version: String,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct MetricsResponse {
pub metrics: String,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct OomEventResponse {
pub container_id: String,

View File

@@ -536,6 +536,10 @@ impl CloudHypervisorInner {
caps.set(CapabilityBits::FsSharingSupport);
Ok(caps)
}
pub(crate) async fn get_hypervisor_metrics(&self) -> Result<String> {
todo!()
}
}
// Log all output from the CH process until a shutdown signal is received.

View File

@@ -152,6 +152,11 @@ impl Hypervisor for CloudHypervisor {
let inner = self.inner.read().await;
inner.capabilities().await
}
async fn get_hypervisor_metrics(&self) -> Result<String> {
let inner = self.inner.read().await;
inner.get_hypervisor_metrics().await
}
}
#[async_trait]

View File

@@ -92,6 +92,11 @@ impl DragonballInner {
))
}
pub(crate) async fn get_hypervisor_metrics(&self) -> Result<String> {
info!(sl!(), "get hypervisor metrics");
self.vmm_instance.get_hypervisor_metrics()
}
pub(crate) async fn disconnect(&mut self) {
self.state = VmmState::NotReady;
}

View File

@@ -160,6 +160,11 @@ impl Hypervisor for Dragonball {
let inner = self.inner.read().await;
inner.capabilities().await
}
async fn get_hypervisor_metrics(&self) -> Result<String> {
let inner = self.inner.read().await;
inner.get_hypervisor_metrics().await
}
}
#[async_trait]

View File

@@ -267,6 +267,15 @@ impl VmmInstance {
std::process::id()
}
pub fn get_hypervisor_metrics(&self) -> Result<String> {
if let Ok(VmmData::HypervisorMetrics(metrics)) =
self.handle_request(Request::Sync(VmmAction::GetHypervisorMetrics))
{
return Ok(metrics);
}
Err(anyhow!("Failed to get hypervisor metrics"))
}
pub fn stop(&mut self) -> Result<()> {
self.handle_request(Request::Sync(VmmAction::ShutdownMicroVm))
.map_err(|e| {

View File

@@ -97,4 +97,5 @@ pub trait Hypervisor: std::fmt::Debug + Send + Sync {
async fn get_jailer_root(&self) -> Result<String>;
async fn save_state(&self) -> Result<HypervisorState>;
async fn capabilities(&self) -> Result<Capabilities>;
async fn get_hypervisor_metrics(&self) -> Result<String>;
}

View File

@@ -136,6 +136,10 @@ impl QemuInner {
info!(sl!(), "QemuInner::hypervisor_config()");
self.config.clone()
}
pub(crate) async fn get_hypervisor_metrics(&self) -> Result<String> {
todo!()
}
}
use crate::device::DeviceType;

View File

@@ -147,4 +147,9 @@ impl Hypervisor for Qemu {
let inner = self.inner.read().await;
inner.capabilities().await
}
async fn get_hypervisor_metrics(&self) -> Result<String> {
let inner = self.inner.read().await;
inner.get_hypervisor_metrics().await
}
}

View File

@@ -22,6 +22,8 @@ hyperlocal = "0.8"
serde_json = "1.0.88"
nix = "0.25.0"
url = "2.3.1"
procfs = "0.12.0"
prometheus = { version = "0.13.0", features = ["process"] }
agent = { path = "../agent" }
common = { path = "./common" }

View File

@@ -41,4 +41,8 @@ pub trait Sandbox: Send + Sync {
async fn direct_volume_stats(&self, volume_path: &str) -> Result<String>;
async fn direct_volume_resize(&self, resize_req: agent::ResizeVolumeRequest) -> Result<()>;
async fn agent_sock(&self) -> Result<String>;
// metrics function
async fn agent_metrics(&self) -> Result<String>;
async fn hypervisor_metrics(&self) -> Result<String>;
}

View File

@@ -4,6 +4,9 @@
// SPDX-License-Identifier: Apache-2.0
//
#[macro_use(lazy_static)]
extern crate lazy_static;
#[macro_use]
extern crate slog;
@@ -12,5 +15,6 @@ logging::logger_with_subsystem!(sl, "runtimes");
pub mod manager;
pub use manager::RuntimeHandlerManager;
pub use shim_interface;
mod shim_metrics;
mod shim_mgmt;
pub mod tracer;

View File

@@ -0,0 +1,235 @@
// Copyright 2021-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
extern crate procfs;
use anyhow::{anyhow, Result};
use prometheus::{Encoder, Gauge, GaugeVec, Opts, Registry, TextEncoder};
use slog::warn;
use std::sync::Mutex;
const NAMESPACE_KATA_SHIM: &str = "kata_shim";
// Convenience macro to obtain the scope logger
macro_rules! sl {
() => {
slog_scope::logger().new(o!("subsystem" => "metrics"))
};
}
lazy_static! {
static ref REGISTERED: Mutex<bool> = Mutex::new(false);
// custom registry
static ref REGISTRY: Registry = Registry::new();
// shim metrics
static ref SHIM_THREADS: Gauge = Gauge::new(format!("{}_{}", NAMESPACE_KATA_SHIM, "threads"),"Kata containerd shim v2 process threads.").unwrap();
static ref SHIM_PROC_STATUS: GaugeVec =
GaugeVec::new(Opts::new(format!("{}_{}",NAMESPACE_KATA_SHIM,"proc_status"), "Kata containerd shim v2 process status."), &["item"]).unwrap();
static ref SHIM_PROC_STAT: GaugeVec = GaugeVec::new(Opts::new(format!("{}_{}",NAMESPACE_KATA_SHIM,"proc_stat"), "Kata containerd shim v2 process statistics."), &["item"]).unwrap();
static ref SHIM_NETDEV: GaugeVec = GaugeVec::new(Opts::new(format!("{}_{}",NAMESPACE_KATA_SHIM,"netdev"), "Kata containerd shim v2 network devices statistics."), &["interface", "item"]).unwrap();
static ref SHIM_IO_STAT: GaugeVec = GaugeVec::new(Opts::new(format!("{}_{}",NAMESPACE_KATA_SHIM,"io_stat"), "Kata containerd shim v2 process IO statistics."), &["item"]).unwrap();
static ref SHIM_OPEN_FDS: Gauge = Gauge::new(format!("{}_{}", NAMESPACE_KATA_SHIM, "fds"), "Kata containerd shim v2 open FDs.").unwrap();
}
pub fn get_shim_metrics() -> Result<String> {
let mut registered = REGISTERED
.lock()
.map_err(|e| anyhow!("failed to check shim metrics register status {:?}", e))?;
if !(*registered) {
register_shim_metrics()?;
*registered = true;
}
update_shim_metrics()?;
// gather all metrics and return as a String
let metric_families = REGISTRY.gather();
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer)?;
Ok(String::from_utf8(buffer)?)
}
fn register_shim_metrics() -> Result<()> {
REGISTRY.register(Box::new(SHIM_THREADS.clone()))?;
REGISTRY.register(Box::new(SHIM_PROC_STATUS.clone()))?;
REGISTRY.register(Box::new(SHIM_PROC_STAT.clone()))?;
REGISTRY.register(Box::new(SHIM_NETDEV.clone()))?;
REGISTRY.register(Box::new(SHIM_IO_STAT.clone()))?;
REGISTRY.register(Box::new(SHIM_OPEN_FDS.clone()))?;
// TODO:
// REGISTRY.register(Box::new(RPC_DURATIONS_HISTOGRAM.clone()))?;
// REGISTRY.register(Box::new(SHIM_POD_OVERHEAD_CPU.clone()))?;
// REGISTRY.register(Box::new(SHIM_POD_OVERHEAD_MEMORY.clone()))?;
Ok(())
}
fn update_shim_metrics() -> Result<()> {
let me = procfs::process::Process::myself();
let me = match me {
Ok(p) => p,
Err(e) => {
warn!(sl!(), "failed to create process instance: {:?}", e);
return Ok(());
}
};
SHIM_THREADS.set(me.stat.num_threads as f64);
match me.status() {
Err(err) => error!(sl!(), "failed to get process status: {:?}", err),
Ok(status) => set_gauge_vec_proc_status(&SHIM_PROC_STATUS, &status),
}
match me.stat() {
Err(err) => {
error!(sl!(), "failed to get process stat: {:?}", err);
}
Ok(stat) => {
set_gauge_vec_proc_stat(&SHIM_PROC_STAT, &stat);
}
}
match procfs::net::dev_status() {
Err(err) => {
error!(sl!(), "failed to get host net::dev_status: {:?}", err);
}
Ok(devs) => {
for (_, status) in devs {
set_gauge_vec_netdev(&SHIM_NETDEV, &status);
}
}
}
match me.io() {
Err(err) => {
error!(sl!(), "failed to get process io stat: {:?}", err);
}
Ok(io) => {
set_gauge_vec_proc_io(&SHIM_IO_STAT, &io);
}
}
match me.fd_count() {
Err(err) => {
error!(sl!(), "failed to get process open fds number: {:?}", err);
}
Ok(fds) => {
SHIM_OPEN_FDS.set(fds as f64);
}
}
// TODO:
// RPC_DURATIONS_HISTOGRAM & SHIM_POD_OVERHEAD_CPU & SHIM_POD_OVERHEAD_MEMORY
Ok(())
}
fn set_gauge_vec_proc_status(gv: &prometheus::GaugeVec, status: &procfs::process::Status) {
gv.with_label_values(&["vmpeak"])
.set(status.vmpeak.unwrap_or(0) as f64);
gv.with_label_values(&["vmsize"])
.set(status.vmsize.unwrap_or(0) as f64);
gv.with_label_values(&["vmlck"])
.set(status.vmlck.unwrap_or(0) as f64);
gv.with_label_values(&["vmpin"])
.set(status.vmpin.unwrap_or(0) as f64);
gv.with_label_values(&["vmhwm"])
.set(status.vmhwm.unwrap_or(0) as f64);
gv.with_label_values(&["vmrss"])
.set(status.vmrss.unwrap_or(0) as f64);
gv.with_label_values(&["rssanon"])
.set(status.rssanon.unwrap_or(0) as f64);
gv.with_label_values(&["rssfile"])
.set(status.rssfile.unwrap_or(0) as f64);
gv.with_label_values(&["rssshmem"])
.set(status.rssshmem.unwrap_or(0) as f64);
gv.with_label_values(&["vmdata"])
.set(status.vmdata.unwrap_or(0) as f64);
gv.with_label_values(&["vmstk"])
.set(status.vmstk.unwrap_or(0) as f64);
gv.with_label_values(&["vmexe"])
.set(status.vmexe.unwrap_or(0) as f64);
gv.with_label_values(&["vmlib"])
.set(status.vmlib.unwrap_or(0) as f64);
gv.with_label_values(&["vmpte"])
.set(status.vmpte.unwrap_or(0) as f64);
gv.with_label_values(&["vmswap"])
.set(status.vmswap.unwrap_or(0) as f64);
gv.with_label_values(&["hugetlbpages"])
.set(status.hugetlbpages.unwrap_or(0) as f64);
gv.with_label_values(&["voluntary_ctxt_switches"])
.set(status.voluntary_ctxt_switches.unwrap_or(0) as f64);
gv.with_label_values(&["nonvoluntary_ctxt_switches"])
.set(status.nonvoluntary_ctxt_switches.unwrap_or(0) as f64);
}
fn set_gauge_vec_proc_stat(gv: &prometheus::GaugeVec, stat: &procfs::process::Stat) {
gv.with_label_values(&["utime"]).set(stat.utime as f64);
gv.with_label_values(&["stime"]).set(stat.stime as f64);
gv.with_label_values(&["cutime"]).set(stat.cutime as f64);
gv.with_label_values(&["cstime"]).set(stat.cstime as f64);
}
fn set_gauge_vec_netdev(gv: &prometheus::GaugeVec, status: &procfs::net::DeviceStatus) {
gv.with_label_values(&[status.name.as_str(), "recv_bytes"])
.set(status.recv_bytes as f64);
gv.with_label_values(&[status.name.as_str(), "recv_packets"])
.set(status.recv_packets as f64);
gv.with_label_values(&[status.name.as_str(), "recv_errs"])
.set(status.recv_errs as f64);
gv.with_label_values(&[status.name.as_str(), "recv_drop"])
.set(status.recv_drop as f64);
gv.with_label_values(&[status.name.as_str(), "recv_fifo"])
.set(status.recv_fifo as f64);
gv.with_label_values(&[status.name.as_str(), "recv_frame"])
.set(status.recv_frame as f64);
gv.with_label_values(&[status.name.as_str(), "recv_compressed"])
.set(status.recv_compressed as f64);
gv.with_label_values(&[status.name.as_str(), "recv_multicast"])
.set(status.recv_multicast as f64);
gv.with_label_values(&[status.name.as_str(), "sent_bytes"])
.set(status.sent_bytes as f64);
gv.with_label_values(&[status.name.as_str(), "sent_packets"])
.set(status.sent_packets as f64);
gv.with_label_values(&[status.name.as_str(), "sent_errs"])
.set(status.sent_errs as f64);
gv.with_label_values(&[status.name.as_str(), "sent_drop"])
.set(status.sent_drop as f64);
gv.with_label_values(&[status.name.as_str(), "sent_fifo"])
.set(status.sent_fifo as f64);
gv.with_label_values(&[status.name.as_str(), "sent_colls"])
.set(status.sent_colls as f64);
gv.with_label_values(&[status.name.as_str(), "sent_carrier"])
.set(status.sent_carrier as f64);
gv.with_label_values(&[status.name.as_str(), "sent_compressed"])
.set(status.sent_compressed as f64);
}
fn set_gauge_vec_proc_io(gv: &prometheus::GaugeVec, io_stat: &procfs::process::Io) {
gv.with_label_values(&["rchar"]).set(io_stat.rchar as f64);
gv.with_label_values(&["wchar"]).set(io_stat.wchar as f64);
gv.with_label_values(&["syscr"]).set(io_stat.syscr as f64);
gv.with_label_values(&["syscw"]).set(io_stat.syscw as f64);
gv.with_label_values(&["read_bytes"])
.set(io_stat.read_bytes as f64);
gv.with_label_values(&["write_bytes"])
.set(io_stat.write_bytes as f64);
gv.with_label_values(&["cancelled_write_bytes"])
.set(io_stat.cancelled_write_bytes as f64);
}

View File

@@ -7,6 +7,7 @@
// This defines the handlers corresponding to the url when a request is sent to destined url,
// the handler function should be invoked, and the corresponding data will be in the response
use crate::shim_metrics::get_shim_metrics;
use agent::ResizeVolumeRequest;
use anyhow::{anyhow, Context, Result};
use common::Sandbox;
@@ -16,7 +17,7 @@ use url::Url;
use shim_interface::shim_mgmt::{
AGENT_URL, DIRECT_VOLUME_PATH_KEY, DIRECT_VOLUME_RESIZE_URL, DIRECT_VOLUME_STATS_URL,
IP6_TABLE_URL, IP_TABLE_URL,
IP6_TABLE_URL, IP_TABLE_URL, METRICS_URL,
};
// main router for response, this works as a multiplexer on
@@ -43,6 +44,7 @@ pub(crate) async fn handler_mux(
(&Method::POST, DIRECT_VOLUME_RESIZE_URL) => {
direct_volume_resize_handler(sandbox, req).await
}
(&Method::GET, METRICS_URL) => metrics_url_handler(sandbox, req).await,
_ => Ok(not_found(req).await),
}
}
@@ -146,3 +148,19 @@ async fn direct_volume_resize_handler(
_ => Err(anyhow!("handler: Failed to resize volume")),
}
}
// returns the url for metrics
async fn metrics_url_handler(
sandbox: Arc<dyn Sandbox>,
_req: Request<Body>,
) -> Result<Response<Body>> {
// get metrics from agent, hypervisor, and shim
let agent_metrics = sandbox.agent_metrics().await.unwrap_or_default();
let hypervisor_metrics = sandbox.hypervisor_metrics().await.unwrap_or_default();
let shim_metrics = get_shim_metrics().unwrap_or_default();
Ok(Response::new(Body::from(format!(
"{}{}{}",
agent_metrics, hypervisor_metrics, shim_metrics
))))
}

View File

@@ -459,6 +459,18 @@ impl Sandbox for VirtSandbox {
.context("sandbox: failed to get iptables")?;
Ok(resp.data)
}
async fn agent_metrics(&self) -> Result<String> {
self.agent
.get_metrics(agent::Empty::new())
.await
.map_err(|err| anyhow!("failed to get agent metrics {:?}", err))
.map(|resp| resp.metrics)
}
async fn hypervisor_metrics(&self) -> Result<String> {
self.hypervisor.get_hypervisor_metrics().await
}
}
#[async_trait]