Merge pull request #6672 from Yuan-Zhuo/add-monitor-in-kata-ctl

kata-ctl: add monitor subcommand for runtime-rs
This commit is contained in:
Fupan Li 2023-08-02 13:39:02 +08:00 committed by GitHub
commit a536d4a7bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 875 additions and 341 deletions

File diff suppressed because it is too large Load Diff

View File

@ -44,7 +44,12 @@ logging = { path = "../../libs/logging" }
slog = "2.7.0"
slog-scope = "4.4.0"
hyper = "0.14.20"
tokio = "1.28.1"
tokio = { version = "1.28.1", features = ["signal"] }
ttrpc = "0.6.0"
prometheus = { version = "0.13.0", features = ["process"] }
procfs = "0.12.0"
lazy_static = "1.2"
[target.'cfg(target_arch = "s390x")'.dependencies]
reqwest = { version = "0.11", default-features = false, features = ["json", "blocking", "native-tls"] }

View File

@ -56,6 +56,9 @@ pub enum Commands {
/// Gather metrics associated with infrastructure used to run a sandbox
Metrics(MetricsCommand),
/// Start a monitor to get metrics of Kata Containers
Monitor(MonitorArgument),
/// Display version details
Version,
}
@ -122,6 +125,12 @@ pub enum IpTablesArguments {
Metrics,
}
#[derive(Debug, Args)]
pub struct MonitorArgument {
/// The address to listen on for HTTP requests. (default "127.0.0.1:8090")
pub address: Option<String>,
}
#[derive(Debug, Args)]
pub struct DirectVolumeCommand {
#[clap(subcommand)]

View File

@ -3,9 +3,16 @@
// SPDX-License-Identifier: Apache-2.0
//
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate slog;
mod arch;
mod args;
mod check;
mod monitor;
mod ops;
mod types;
mod utils;
@ -18,7 +25,7 @@ use std::process::exit;
use args::{Commands, KataCtlCli};
use ops::check_ops::{
handle_check, handle_factory, handle_iptables, handle_metrics, handle_version,
handle_check, handle_factory, handle_iptables, handle_metrics, handle_monitor, handle_version,
};
use ops::env_ops::handle_env;
use ops::exec_ops::handle_exec;
@ -52,6 +59,7 @@ fn real_main() -> Result<()> {
Commands::Factory => handle_factory(),
Commands::Iptables(args) => handle_iptables(args),
Commands::Metrics(args) => handle_metrics(args),
Commands::Monitor(args) => handle_monitor(args),
Commands::Version => handle_version(),
};

View File

@ -0,0 +1,181 @@
// Copyright 2022-2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use crate::monitor::metrics::get_monitor_metrics;
use crate::sl;
use crate::utils::TIMEOUT;
use anyhow::{anyhow, Context, Result};
use hyper::body;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use shim_interface::shim_mgmt::client::MgmtClient;
use slog::{self, info};
use std::collections::HashMap;
use std::net::SocketAddr;
const ROOT_URI: &str = "/";
const METRICS_URI: &str = "/metrics";
async fn handler_mux(req: Request<Body>) -> Result<Response<Body>> {
info!(
sl!(),
"mgmt-svr(mux): recv req, method: {}, uri: {}",
req.method(),
req.uri().path()
);
match (req.method(), req.uri().path()) {
(&Method::GET, ROOT_URI) => root_uri_handler(req).await,
(&Method::GET, METRICS_URI) => metrics_uri_handler(req).await,
_ => not_found_uri_handler(req).await,
}
.map_or_else(
|e| {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}\n", e)))
.map_err(|e| anyhow!("Failed to Build Response {:?}", e))
},
Ok,
)
}
pub async fn http_server_setup(socket_addr: &str) -> Result<()> {
let addr: SocketAddr = socket_addr
.parse()
.context("failed to parse http socket address")?;
let make_svc =
make_service_fn(|_conn| async { Ok::<_, anyhow::Error>(service_fn(handler_mux)) });
Server::bind(&addr).serve(make_svc).await?;
Ok(())
}
async fn root_uri_handler(_req: Request<Body>) -> Result<Response<Body>> {
Response::builder()
.status(StatusCode::OK)
.body(Body::from(
r#"Available HTTP endpoints:
/metrics : Get metrics from sandboxes.
"#,
))
.map_err(|e| anyhow!("Failed to Build Response {:?}", e))
}
async fn metrics_uri_handler(req: Request<Body>) -> Result<Response<Body>> {
let mut response_body = String::new();
response_body += &get_monitor_metrics().context("Failed to Get Monitor Metrics")?;
if let Some(uri_query) = req.uri().query() {
if let Ok(sandbox_id) = parse_sandbox_id(uri_query) {
response_body += &get_runtime_metrics(sandbox_id)
.await
.context(format!("{}\nFailed to Get Runtime Metrics", response_body))?;
}
}
Response::builder()
.status(StatusCode::OK)
.body(Body::from(response_body))
.map_err(|e| anyhow!("Failed to Build Response {:?}", e))
}
async fn get_runtime_metrics(sandbox_id: &str) -> Result<String> {
// build shim client
let shim_client =
MgmtClient::new(sandbox_id, Some(TIMEOUT)).context("failed to build shim mgmt client")?;
// get METRICS_URI
let shim_response = shim_client
.get(METRICS_URI)
.await
.context("failed to get METRICS_URI")?;
// get runtime_metrics
let runtime_metrics = String::from_utf8(body::to_bytes(shim_response).await?.to_vec())
.context("failed to get runtime_metrics")?;
Ok(runtime_metrics)
}
async fn not_found_uri_handler(_req: Request<Body>) -> Result<Response<Body>> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("NOT FOUND"))
.map_err(|e| anyhow!("Failed to Build Response {:?}", e))
}
fn parse_sandbox_id(uri: &str) -> Result<&str> {
let uri_pairs: HashMap<_, _> = uri
.split_whitespace()
.map(|s| s.split_at(s.find('=').unwrap_or(0)))
.map(|(key, val)| (key, &val[1..]))
.collect();
match uri_pairs.get("sandbox") {
Some(sid) => Ok(sid.to_owned()),
None => Err(anyhow!("params sandbox not found")),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_sandbox_id() {
assert!(parse_sandbox_id("sandbox=demo_sandbox").unwrap() == "demo_sandbox");
assert!(parse_sandbox_id("foo=bar").is_err());
}
#[tokio::test]
async fn test_root_uri_handler() {
let root_resp = handler_mux(
Request::builder()
.method("GET")
.uri("/")
.body(hyper::Body::from(""))
.unwrap(),
)
.await
.unwrap();
assert!(root_resp.status() == StatusCode::OK);
}
#[tokio::test]
async fn test_metrics_uri_handler() {
let metrics_resp = handler_mux(
Request::builder()
.method("GET")
.uri("/metrics?sandbox=demo_sandbox")
.body(hyper::Body::from(""))
.unwrap(),
)
.await
.unwrap();
assert!(metrics_resp.status() == StatusCode::INTERNAL_SERVER_ERROR);
}
#[tokio::test]
async fn test_not_found_uri_handler() {
let not_found_resp = handler_mux(
Request::builder()
.method("POST")
.uri("/metrics?sandbox=demo_sandbox")
.body(hyper::Body::from(""))
.unwrap(),
)
.await
.unwrap();
assert!(not_found_resp.status() == StatusCode::NOT_FOUND);
}
}

View File

@ -0,0 +1,91 @@
// Copyright 2022-2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
extern crate procfs;
use anyhow::{anyhow, Context, Result};
use prometheus::{Encoder, Gauge, IntCounter, Registry, TextEncoder};
use std::sync::Mutex;
const NAMESPACE_KATA_MONITOR: &str = "kata_ctl_monitor";
lazy_static! {
static ref REGISTERED: Mutex<bool> = Mutex::new(false);
// custom registry
static ref REGISTRY: Registry = Registry::new();
// monitor metrics
static ref MONITOR_SCRAPE_COUNT: IntCounter =
IntCounter::new(format!("{}_{}", NAMESPACE_KATA_MONITOR, "scrape_count"), "Monitor scrape count").unwrap();
static ref MONITOR_MAX_FDS: Gauge = Gauge::new(format!("{}_{}", NAMESPACE_KATA_MONITOR, "process_max_fds"), "Open FDs for monitor").unwrap();
static ref MONITOR_OPEN_FDS: Gauge = Gauge::new(format!("{}_{}", NAMESPACE_KATA_MONITOR, "process_open_fds"), "Open FDs for monitor").unwrap();
static ref MONITOR_RESIDENT_MEMORY: Gauge = Gauge::new(format!("{}_{}", NAMESPACE_KATA_MONITOR, "process_resident_memory_bytes"), "Resident memory size in bytes for monitor").unwrap();
}
/// get monitor metrics
pub fn get_monitor_metrics() -> Result<String> {
let mut registered = REGISTERED
.lock()
.map_err(|e| anyhow!("failed to check monitor metrics register status {:?}", e))?;
if !(*registered) {
register_monitor_metrics().context("failed to register monitor metrics")?;
*registered = true;
}
update_monitor_metrics().context("failed to update monitor metrics")?;
// gather all metrics and return as a String
let metric_families = REGISTRY.gather();
let mut buffer = Vec::new();
TextEncoder::new()
.encode(&metric_families, &mut buffer)
.context("failed to encode gathered metrics")?;
Ok(String::from_utf8(buffer)?)
}
fn register_monitor_metrics() -> Result<()> {
REGISTRY.register(Box::new(MONITOR_SCRAPE_COUNT.clone()))?;
REGISTRY.register(Box::new(MONITOR_MAX_FDS.clone()))?;
REGISTRY.register(Box::new(MONITOR_OPEN_FDS.clone()))?;
REGISTRY.register(Box::new(MONITOR_RESIDENT_MEMORY.clone()))?;
Ok(())
}
fn update_monitor_metrics() -> Result<()> {
MONITOR_SCRAPE_COUNT.inc();
let me = match procfs::process::Process::myself() {
Ok(p) => p,
Err(e) => {
eprintln!("failed to create process instance: {:?}", e);
return Ok(());
}
};
if let Ok(fds) = procfs::sys::fs::file_max() {
MONITOR_MAX_FDS.set(fds as f64);
}
if let Ok(fds) = me.fd_count() {
MONITOR_OPEN_FDS.set(fds as f64);
}
if let Ok(statm) = me.statm() {
MONITOR_RESIDENT_MEMORY.set(statm.resident as f64);
}
Ok(())
}

View File

@ -0,0 +1,8 @@
// Copyright 2022-2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
mod metrics;
pub mod http_server;

View File

@ -5,15 +5,21 @@
use crate::arch::arch_specific::get_checks;
use crate::args::{CheckArgument, CheckSubCommand, IptablesCommand, MetricsCommand};
use crate::args::{
CheckArgument, CheckSubCommand, IptablesCommand, MetricsCommand, MonitorArgument,
};
use crate::check;
use crate::monitor::http_server;
use crate::ops::version;
use crate::types::*;
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
const MONITOR_DEFAULT_SOCK_ADDR: &str = "127.0.0.1:8090";
use slog::{info, o, warn};
@ -128,6 +134,17 @@ pub fn handle_metrics(_args: MetricsCommand) -> Result<()> {
Ok(())
}
pub fn handle_monitor(monitor_args: MonitorArgument) -> Result<()> {
tokio::runtime::Runtime::new()
.context("failed to new runtime for aync http server")?
.block_on(http_server::http_server_setup(
monitor_args
.address
.as_deref()
.unwrap_or(MONITOR_DEFAULT_SOCK_ADDR),
))
}
pub fn handle_version() -> Result<()> {
let version = version::get().unwrap();

View File

@ -25,6 +25,8 @@ use vmm_sys_util::terminal::Terminal;
use crate::args::ExecArguments;
use shim_interface::shim_mgmt::{client::MgmtClient, AGENT_URL};
use crate::utils::TIMEOUT;
const CMD_CONNECT: &str = "CONNECT";
const CMD_OK: &str = "OK";
const SCHEME_VSOCK: &str = "VSOCK";
@ -32,7 +34,6 @@ const SCHEME_HYBRID_VSOCK: &str = "HVSOCK";
const EPOLL_EVENTS_LEN: usize = 16;
const KATA_AGENT_VSOCK_TIMEOUT: u64 = 5;
const TIMEOUT: Duration = Duration::from_millis(2000);
type Result<T> = std::result::Result<T, Error>;

View File

@ -14,7 +14,7 @@ use kata_types::mount::{
use nix;
use reqwest::StatusCode;
use slog::{info, o};
use std::{fs, time::Duration};
use std::fs;
use url;
use agent::ResizeVolumeRequest;
@ -23,7 +23,8 @@ use shim_interface::shim_mgmt::{
DIRECT_VOLUME_PATH_KEY, DIRECT_VOLUME_RESIZE_URL, DIRECT_VOLUME_STATS_URL,
};
const TIMEOUT: Duration = Duration::from_millis(2000);
use crate::utils::TIMEOUT;
const CONTENT_TYPE_JSON: &str = "application/json";
macro_rules! sl {

View File

@ -8,10 +8,12 @@
use crate::arch::arch_specific;
use anyhow::{anyhow, Context, Result};
use std::fs;
use std::{fs, time::Duration};
const NON_PRIV_USER: &str = "nobody";
pub const TIMEOUT: Duration = Duration::from_millis(2000);
pub fn drop_privs() -> Result<()> {
if nix::unistd::Uid::effective().is_root() {
privdrop::PrivDrop::default()