mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-07-02 18:23:12 +00:00
Merge pull request #2517 from sameo/topic/agent-config
Agent configuration file and API restriction
This commit is contained in:
commit
011c58d626
19
src/agent/Cargo.lock
generated
19
src/agent/Cargo.lock
generated
@ -544,6 +544,7 @@ dependencies = [
|
||||
"rustjail",
|
||||
"scan_fmt",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"slog",
|
||||
"slog-scope",
|
||||
@ -552,6 +553,7 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-vsock",
|
||||
"toml",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
@ -1323,18 +1325,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.126"
|
||||
version = "1.0.129"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
|
||||
checksum = "d1f72836d2aa753853178eda473a3b9d8e4eefdaf20523b919677e6de489f8f1"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.126"
|
||||
version = "1.0.129"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43"
|
||||
checksum = "e57ae87ad533d9a56427558b516d0adac283614e347abf85b0dc0cbbf0a249f3"
|
||||
dependencies = [
|
||||
"proc-macro2 1.0.26",
|
||||
"quote 1.0.9",
|
||||
@ -1618,6 +1620,15 @@ dependencies = [
|
||||
"vsock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.5.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.26"
|
||||
|
@ -58,6 +58,10 @@ tracing-opentelemetry = "0.13.0"
|
||||
opentelemetry = { version = "0.14.0", features = ["rt-tokio-current-thread"]}
|
||||
vsock-exporter = { path = "vsock-exporter" }
|
||||
|
||||
# Configuration
|
||||
serde = { version = "1.0.129", features = ["derive"] }
|
||||
toml = "0.5.8"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.1.0"
|
||||
|
||||
|
21
src/agent/samples/configuration-all-endpoints.toml
Normal file
21
src/agent/samples/configuration-all-endpoints.toml
Normal file
@ -0,0 +1,21 @@
|
||||
# This is an agent configuration file example.
|
||||
dev_mode = true
|
||||
server_addr = 'vsock://8:2048'
|
||||
|
||||
[endpoints]
|
||||
# All endpoints are allowed
|
||||
allowed = [ "CreateContainer", "StartContainer", "RemoveContainer",
|
||||
"ExecProcess", "SignalProcess", "WaitProcess",
|
||||
"UpdateContainer", "StatsContainer", "PauseContainer", "ResumeContainer",
|
||||
"WriteStdin", "ReadStdout", "ReadStderr", "CloseStdin", "TtyWinResize",
|
||||
"UpdateInterface", "UpdateRoutes", "ListInterfaces", "ListRoutes", "AddARPNeighbors",
|
||||
"StartTracing", "StopTracing", "GetMetrics",
|
||||
"CreateSandbox", "DestroySandbox",
|
||||
"OnlineCPUMem",
|
||||
"ReseedRandomDev",
|
||||
"GetGuestDetails",
|
||||
"MemHotplugByProbe",
|
||||
"SetGuestDateTime",
|
||||
"CopyFile",
|
||||
"GetOOMEvent",
|
||||
"AddSwap"]
|
@ -4,8 +4,11 @@
|
||||
//
|
||||
use crate::tracer;
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashSet;
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::str::FromStr;
|
||||
use std::time;
|
||||
use tracing::instrument;
|
||||
|
||||
@ -19,6 +22,7 @@ const DEBUG_CONSOLE_VPORT_OPTION: &str = "agent.debug_console_vport";
|
||||
const LOG_VPORT_OPTION: &str = "agent.log_vport";
|
||||
const CONTAINER_PIPE_SIZE_OPTION: &str = "agent.container_pipe_size";
|
||||
const UNIFIED_CGROUP_HIERARCHY_OPTION: &str = "agent.unified_cgroup_hierarchy";
|
||||
const CONFIG_FILE: &str = "agent.config_file";
|
||||
|
||||
const DEFAULT_LOG_LEVEL: slog::Level = slog::Level::Info;
|
||||
const DEFAULT_HOTPLUG_TIMEOUT: time::Duration = time::Duration::from_secs(3);
|
||||
@ -47,6 +51,17 @@ const ERR_INVALID_CONTAINER_PIPE_SIZE_PARAM: &str = "unable to parse container p
|
||||
const ERR_INVALID_CONTAINER_PIPE_SIZE_KEY: &str = "invalid container pipe size key name";
|
||||
const ERR_INVALID_CONTAINER_PIPE_NEGATIVE: &str = "container pipe size should not be negative";
|
||||
|
||||
#[derive(Debug, Default, Deserialize)]
|
||||
pub struct EndpointsConfig {
|
||||
pub allowed: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct AgentEndpoints {
|
||||
pub allowed: HashSet<String>,
|
||||
pub all_allowed: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AgentConfig {
|
||||
pub debug_console: bool,
|
||||
@ -59,6 +74,36 @@ pub struct AgentConfig {
|
||||
pub server_addr: String,
|
||||
pub unified_cgroup_hierarchy: bool,
|
||||
pub tracing: tracer::TraceType,
|
||||
pub endpoints: AgentEndpoints,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct AgentConfigBuilder {
|
||||
pub debug_console: Option<bool>,
|
||||
pub dev_mode: Option<bool>,
|
||||
pub log_level: Option<String>,
|
||||
pub hotplug_timeout: Option<time::Duration>,
|
||||
pub debug_console_vport: Option<i32>,
|
||||
pub log_vport: Option<i32>,
|
||||
pub container_pipe_size: Option<i32>,
|
||||
pub server_addr: Option<String>,
|
||||
pub unified_cgroup_hierarchy: Option<bool>,
|
||||
pub tracing: Option<tracer::TraceType>,
|
||||
pub endpoints: Option<EndpointsConfig>,
|
||||
}
|
||||
|
||||
macro_rules! config_override {
|
||||
($builder:ident, $config:ident, $field:ident) => {
|
||||
if let Some(v) = $builder.$field {
|
||||
$config.$field = v;
|
||||
}
|
||||
};
|
||||
|
||||
($builder:ident, $config:ident, $field:ident, $func: ident) => {
|
||||
if let Some(v) = $builder.$field {
|
||||
$config.$field = $func(&v)?;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// parse_cmdline_param parse commandline parameters.
|
||||
@ -91,8 +136,8 @@ macro_rules! parse_cmdline_param {
|
||||
};
|
||||
}
|
||||
|
||||
impl AgentConfig {
|
||||
pub fn new() -> AgentConfig {
|
||||
impl Default for AgentConfig {
|
||||
fn default() -> Self {
|
||||
AgentConfig {
|
||||
debug_console: false,
|
||||
dev_mode: false,
|
||||
@ -104,33 +149,82 @@ impl AgentConfig {
|
||||
server_addr: format!("{}:{}", VSOCK_ADDR, VSOCK_PORT),
|
||||
unified_cgroup_hierarchy: false,
|
||||
tracing: tracer::TraceType::Disabled,
|
||||
endpoints: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for AgentConfig {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let agent_config_builder: AgentConfigBuilder =
|
||||
toml::from_str(s).map_err(anyhow::Error::new)?;
|
||||
let mut agent_config: AgentConfig = Default::default();
|
||||
|
||||
// Overwrite default values with the configuration files ones.
|
||||
config_override!(agent_config_builder, agent_config, debug_console);
|
||||
config_override!(agent_config_builder, agent_config, dev_mode);
|
||||
config_override!(
|
||||
agent_config_builder,
|
||||
agent_config,
|
||||
log_level,
|
||||
logrus_to_slog_level
|
||||
);
|
||||
config_override!(agent_config_builder, agent_config, hotplug_timeout);
|
||||
config_override!(agent_config_builder, agent_config, debug_console_vport);
|
||||
config_override!(agent_config_builder, agent_config, log_vport);
|
||||
config_override!(agent_config_builder, agent_config, container_pipe_size);
|
||||
config_override!(agent_config_builder, agent_config, server_addr);
|
||||
config_override!(agent_config_builder, agent_config, unified_cgroup_hierarchy);
|
||||
config_override!(agent_config_builder, agent_config, tracing);
|
||||
|
||||
// Populate the allowed endpoints hash set, if we got any from the config file.
|
||||
if let Some(endpoints) = agent_config_builder.endpoints {
|
||||
for ep in endpoints.allowed {
|
||||
agent_config.endpoints.allowed.insert(ep);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(agent_config)
|
||||
}
|
||||
}
|
||||
|
||||
impl AgentConfig {
|
||||
#[instrument]
|
||||
pub fn parse_cmdline(&mut self, file: &str) -> Result<()> {
|
||||
pub fn from_cmdline(file: &str) -> Result<AgentConfig> {
|
||||
let mut config: AgentConfig = Default::default();
|
||||
let cmdline = fs::read_to_string(file)?;
|
||||
let params: Vec<&str> = cmdline.split_ascii_whitespace().collect();
|
||||
for param in params.iter() {
|
||||
// If we get a configuration file path from the command line, we
|
||||
// generate our config from it.
|
||||
// The agent will fail to start if the configuration file is not present,
|
||||
// or if it can't be parsed properly.
|
||||
if param.starts_with(format!("{}=", CONFIG_FILE).as_str()) {
|
||||
let config_file = get_string_value(param)?;
|
||||
return AgentConfig::from_config_file(&config_file);
|
||||
}
|
||||
|
||||
// parse cmdline flags
|
||||
parse_cmdline_param!(param, DEBUG_CONSOLE_FLAG, self.debug_console);
|
||||
parse_cmdline_param!(param, DEV_MODE_FLAG, self.dev_mode);
|
||||
parse_cmdline_param!(param, DEBUG_CONSOLE_FLAG, config.debug_console);
|
||||
parse_cmdline_param!(param, DEV_MODE_FLAG, config.dev_mode);
|
||||
|
||||
// Support "bare" tracing option for backwards compatibility with
|
||||
// Kata 1.x.
|
||||
if param == &TRACE_MODE_OPTION {
|
||||
self.tracing = tracer::TraceType::Isolated;
|
||||
config.tracing = tracer::TraceType::Isolated;
|
||||
continue;
|
||||
}
|
||||
|
||||
parse_cmdline_param!(param, TRACE_MODE_OPTION, self.tracing, get_trace_type);
|
||||
parse_cmdline_param!(param, TRACE_MODE_OPTION, config.tracing, get_trace_type);
|
||||
|
||||
// parse cmdline options
|
||||
parse_cmdline_param!(param, LOG_LEVEL_OPTION, self.log_level, get_log_level);
|
||||
parse_cmdline_param!(param, LOG_LEVEL_OPTION, config.log_level, get_log_level);
|
||||
parse_cmdline_param!(
|
||||
param,
|
||||
SERVER_ADDR_OPTION,
|
||||
self.server_addr,
|
||||
config.server_addr,
|
||||
get_string_value
|
||||
);
|
||||
|
||||
@ -138,7 +232,7 @@ impl AgentConfig {
|
||||
parse_cmdline_param!(
|
||||
param,
|
||||
HOTPLUG_TIMOUT_OPTION,
|
||||
self.hotplug_timeout,
|
||||
config.hotplug_timeout,
|
||||
get_hotplug_timeout,
|
||||
|hotplug_timeout: time::Duration| hotplug_timeout.as_secs() > 0
|
||||
);
|
||||
@ -147,14 +241,14 @@ impl AgentConfig {
|
||||
parse_cmdline_param!(
|
||||
param,
|
||||
DEBUG_CONSOLE_VPORT_OPTION,
|
||||
self.debug_console_vport,
|
||||
config.debug_console_vport,
|
||||
get_vsock_port,
|
||||
|port| port > 0
|
||||
);
|
||||
parse_cmdline_param!(
|
||||
param,
|
||||
LOG_VPORT_OPTION,
|
||||
self.log_vport,
|
||||
config.log_vport,
|
||||
get_vsock_port,
|
||||
|port| port > 0
|
||||
);
|
||||
@ -162,34 +256,47 @@ impl AgentConfig {
|
||||
parse_cmdline_param!(
|
||||
param,
|
||||
CONTAINER_PIPE_SIZE_OPTION,
|
||||
self.container_pipe_size,
|
||||
config.container_pipe_size,
|
||||
get_container_pipe_size
|
||||
);
|
||||
parse_cmdline_param!(
|
||||
param,
|
||||
UNIFIED_CGROUP_HIERARCHY_OPTION,
|
||||
self.unified_cgroup_hierarchy,
|
||||
config.unified_cgroup_hierarchy,
|
||||
get_bool_value
|
||||
);
|
||||
}
|
||||
|
||||
if let Ok(addr) = env::var(SERVER_ADDR_ENV_VAR) {
|
||||
self.server_addr = addr;
|
||||
config.server_addr = addr;
|
||||
}
|
||||
|
||||
if let Ok(addr) = env::var(LOG_LEVEL_ENV_VAR) {
|
||||
if let Ok(level) = logrus_to_slog_level(&addr) {
|
||||
self.log_level = level;
|
||||
config.log_level = level;
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(value) = env::var(TRACE_TYPE_ENV_VAR) {
|
||||
if let Ok(result) = value.parse::<tracer::TraceType>() {
|
||||
self.tracing = result;
|
||||
config.tracing = result;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
// We did not get a configuration file: allow all endpoints.
|
||||
config.endpoints.all_allowed = true;
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
pub fn from_config_file(file: &str) -> Result<AgentConfig> {
|
||||
let config = fs::read_to_string(file)?;
|
||||
AgentConfig::from_str(&config)
|
||||
}
|
||||
|
||||
pub fn is_allowed_endpoint(&self, ep: &str) -> bool {
|
||||
self.endpoints.all_allowed || self.endpoints.allowed.contains(ep)
|
||||
}
|
||||
}
|
||||
|
||||
@ -371,7 +478,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_new() {
|
||||
let config = AgentConfig::new();
|
||||
let config: AgentConfig = Default::default();
|
||||
assert!(!config.debug_console);
|
||||
assert!(!config.dev_mode);
|
||||
assert_eq!(config.log_level, DEFAULT_LOG_LEVEL);
|
||||
@ -379,7 +486,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_cmdline() {
|
||||
fn test_from_cmdline() {
|
||||
const TEST_SERVER_ADDR: &str = "vsock://-1:1024";
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -716,15 +823,6 @@ mod tests {
|
||||
|
||||
let dir = tempdir().expect("failed to create tmpdir");
|
||||
|
||||
// First, check a missing file is handled
|
||||
let file_path = dir.path().join("enoent");
|
||||
|
||||
let filename = file_path.to_str().expect("failed to create filename");
|
||||
|
||||
let mut config = AgentConfig::new();
|
||||
let result = config.parse_cmdline(&filename.to_owned());
|
||||
assert!(result.is_err());
|
||||
|
||||
// Now, test various combinations of file contents and environment
|
||||
// variables.
|
||||
for (i, d) in tests.iter().enumerate() {
|
||||
@ -753,22 +851,7 @@ mod tests {
|
||||
vars_to_unset.push(name);
|
||||
}
|
||||
|
||||
let mut config = AgentConfig::new();
|
||||
assert!(!config.debug_console, "{}", msg);
|
||||
assert!(!config.dev_mode, "{}", msg);
|
||||
assert!(!config.unified_cgroup_hierarchy, "{}", msg);
|
||||
assert_eq!(
|
||||
config.hotplug_timeout,
|
||||
time::Duration::from_secs(3),
|
||||
"{}",
|
||||
msg
|
||||
);
|
||||
assert_eq!(config.container_pipe_size, 0, "{}", msg);
|
||||
assert_eq!(config.server_addr, TEST_SERVER_ADDR, "{}", msg);
|
||||
assert_eq!(config.tracing, tracer::TraceType::Disabled, "{}", msg);
|
||||
|
||||
let result = config.parse_cmdline(filename);
|
||||
assert!(result.is_ok(), "{}", msg);
|
||||
let config = AgentConfig::from_cmdline(filename).expect("Failed to parse command line");
|
||||
|
||||
assert_eq!(d.debug_console, config.debug_console, "{}", msg);
|
||||
assert_eq!(d.dev_mode, config.dev_mode, "{}", msg);
|
||||
@ -1276,4 +1359,35 @@ Caused by:
|
||||
assert_result!(d.result, result, msg);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_config_builder_from_string() {
|
||||
let config = AgentConfig::from_str(
|
||||
r#"
|
||||
dev_mode = true
|
||||
server_addr = 'vsock://8:2048'
|
||||
|
||||
[endpoints]
|
||||
allowed = ["CreateContainer", "StartContainer"]
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Verify that the all_allowed flag is false
|
||||
assert!(!config.endpoints.all_allowed);
|
||||
|
||||
// Verify that the override worked
|
||||
assert!(config.dev_mode);
|
||||
assert_eq!(config.server_addr, "vsock://8:2048");
|
||||
assert_eq!(
|
||||
config.endpoints.allowed,
|
||||
vec!["CreateContainer".to_string(), "StartContainer".to_string()]
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect()
|
||||
);
|
||||
|
||||
// Verify that the default values are valid
|
||||
assert_eq!(config.hotplug_timeout, DEFAULT_HOTPLUG_TIMEOUT);
|
||||
}
|
||||
}
|
||||
|
@ -77,11 +77,11 @@ mod rpc;
|
||||
mod tracer;
|
||||
|
||||
const NAME: &str = "kata-agent";
|
||||
const KERNEL_CMDLINE_FILE: &str = "/proc/cmdline";
|
||||
|
||||
lazy_static! {
|
||||
static ref AGENT_CONFIG: Arc<RwLock<AgentConfig>> =
|
||||
Arc::new(RwLock::new(config::AgentConfig::new()));
|
||||
static ref AGENT_CONFIG: Arc<RwLock<AgentConfig>> = Arc::new(RwLock::new(
|
||||
AgentConfig::from_cmdline("/proc/cmdline").unwrap()
|
||||
));
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
@ -134,15 +134,11 @@ async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
console::initialize();
|
||||
|
||||
lazy_static::initialize(&AGENT_CONFIG);
|
||||
|
||||
// support vsock log
|
||||
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = channel(true);
|
||||
|
||||
let agent_config = AGENT_CONFIG.clone();
|
||||
|
||||
let init_mode = unistd::getpid() == Pid::from_raw(1);
|
||||
if init_mode {
|
||||
// dup a new file descriptor for this temporary logger writer,
|
||||
@ -163,20 +159,15 @@ async fn real_main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
e
|
||||
})?;
|
||||
|
||||
let mut config = agent_config.write().await;
|
||||
config.parse_cmdline(KERNEL_CMDLINE_FILE)?;
|
||||
lazy_static::initialize(&AGENT_CONFIG);
|
||||
|
||||
init_agent_as_init(&logger, config.unified_cgroup_hierarchy)?;
|
||||
init_agent_as_init(&logger, AGENT_CONFIG.read().await.unified_cgroup_hierarchy)?;
|
||||
drop(logger_async_guard);
|
||||
} else {
|
||||
// once parsed cmdline and set the config, release the write lock
|
||||
// as soon as possible in case other thread would get read lock on
|
||||
// it.
|
||||
let mut config = agent_config.write().await;
|
||||
config.parse_cmdline(KERNEL_CMDLINE_FILE)?;
|
||||
lazy_static::initialize(&AGENT_CONFIG);
|
||||
}
|
||||
let config = agent_config.read().await;
|
||||
|
||||
let config = AGENT_CONFIG.read().await;
|
||||
let log_vport = config.log_vport as u32;
|
||||
|
||||
let log_handle = tokio::spawn(create_logger_task(rfd, log_vport, shutdown_rx.clone()));
|
||||
|
@ -20,7 +20,7 @@ use ttrpc::{
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use oci::{LinuxNamespace, Root, Spec};
|
||||
use protobuf::{RepeatedField, SingularPtrField};
|
||||
use protobuf::{Message, RepeatedField, SingularPtrField};
|
||||
use protocols::agent::{
|
||||
AddSwapRequest, AgentDetails, CopyFileRequest, GuestDetailsResponse, Interfaces, Metrics,
|
||||
OOMEvent, ReadStreamResponse, Routes, StatsContainerResponse, WaitProcessResponse,
|
||||
@ -86,6 +86,21 @@ macro_rules! sl {
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! is_allowed {
|
||||
($req:ident) => {
|
||||
if !AGENT_CONFIG
|
||||
.read()
|
||||
.await
|
||||
.is_allowed_endpoint($req.descriptor().name())
|
||||
{
|
||||
return Err(ttrpc_error(
|
||||
ttrpc::Code::UNIMPLEMENTED,
|
||||
format!("{} is blocked", $req.descriptor().name()),
|
||||
));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AgentService {
|
||||
sandbox: Arc<Mutex<Sandbox>>,
|
||||
@ -531,6 +546,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::CreateContainerRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "create_container", req);
|
||||
is_allowed!(req);
|
||||
match self.do_create_container(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
@ -543,6 +559,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::StartContainerRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "start_container", req);
|
||||
is_allowed!(req);
|
||||
match self.do_start_container(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
@ -555,6 +572,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::RemoveContainerRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "remove_container", req);
|
||||
is_allowed!(req);
|
||||
match self.do_remove_container(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
@ -567,6 +585,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::ExecProcessRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "exec_process", req);
|
||||
is_allowed!(req);
|
||||
match self.do_exec_process(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
@ -579,6 +598,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::SignalProcessRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "signal_process", req);
|
||||
is_allowed!(req);
|
||||
match self.do_signal_process(req).await {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
Ok(_) => Ok(Empty::new()),
|
||||
@ -591,6 +611,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::WaitProcessRequest,
|
||||
) -> ttrpc::Result<WaitProcessResponse> {
|
||||
trace_rpc_call!(ctx, "wait_process", req);
|
||||
is_allowed!(req);
|
||||
self.do_wait_process(req)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
|
||||
@ -602,6 +623,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::UpdateContainerRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "update_container", req);
|
||||
is_allowed!(req);
|
||||
let cid = req.container_id.clone();
|
||||
let res = req.resources;
|
||||
|
||||
@ -637,6 +659,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::StatsContainerRequest,
|
||||
) -> ttrpc::Result<StatsContainerResponse> {
|
||||
trace_rpc_call!(ctx, "stats_container", req);
|
||||
is_allowed!(req);
|
||||
let cid = req.container_id;
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -658,6 +681,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::PauseContainerRequest,
|
||||
) -> ttrpc::Result<protocols::empty::Empty> {
|
||||
trace_rpc_call!(ctx, "pause_container", req);
|
||||
is_allowed!(req);
|
||||
let cid = req.get_container_id();
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -681,6 +705,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::ResumeContainerRequest,
|
||||
) -> ttrpc::Result<protocols::empty::Empty> {
|
||||
trace_rpc_call!(ctx, "resume_container", req);
|
||||
is_allowed!(req);
|
||||
let cid = req.get_container_id();
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -703,6 +728,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
_ctx: &TtrpcContext,
|
||||
req: protocols::agent::WriteStreamRequest,
|
||||
) -> ttrpc::Result<WriteStreamResponse> {
|
||||
is_allowed!(req);
|
||||
self.do_write_stream(req)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
|
||||
@ -713,6 +739,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
_ctx: &TtrpcContext,
|
||||
req: protocols::agent::ReadStreamRequest,
|
||||
) -> ttrpc::Result<ReadStreamResponse> {
|
||||
is_allowed!(req);
|
||||
self.do_read_stream(req, true)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
|
||||
@ -723,6 +750,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
_ctx: &TtrpcContext,
|
||||
req: protocols::agent::ReadStreamRequest,
|
||||
) -> ttrpc::Result<ReadStreamResponse> {
|
||||
is_allowed!(req);
|
||||
self.do_read_stream(req, false)
|
||||
.await
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
|
||||
@ -734,6 +762,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::CloseStdinRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "close_stdin", req);
|
||||
is_allowed!(req);
|
||||
|
||||
let cid = req.container_id.clone();
|
||||
let eid = req.exec_id;
|
||||
@ -770,6 +799,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::TtyWinResizeRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "tty_win_resize", req);
|
||||
is_allowed!(req);
|
||||
|
||||
let cid = req.container_id.clone();
|
||||
let eid = req.exec_id.clone();
|
||||
@ -810,6 +840,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::UpdateInterfaceRequest,
|
||||
) -> ttrpc::Result<Interface> {
|
||||
trace_rpc_call!(ctx, "update_interface", req);
|
||||
is_allowed!(req);
|
||||
|
||||
let interface = req.interface.into_option().ok_or_else(|| {
|
||||
ttrpc_error(
|
||||
@ -837,6 +868,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::UpdateRoutesRequest,
|
||||
) -> ttrpc::Result<Routes> {
|
||||
trace_rpc_call!(ctx, "update_routes", req);
|
||||
is_allowed!(req);
|
||||
|
||||
let new_routes = req
|
||||
.routes
|
||||
@ -877,6 +909,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::ListInterfacesRequest,
|
||||
) -> ttrpc::Result<Interfaces> {
|
||||
trace_rpc_call!(ctx, "list_interfaces", req);
|
||||
is_allowed!(req);
|
||||
|
||||
let list = self
|
||||
.sandbox
|
||||
@ -904,6 +937,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::ListRoutesRequest,
|
||||
) -> ttrpc::Result<Routes> {
|
||||
trace_rpc_call!(ctx, "list_routes", req);
|
||||
is_allowed!(req);
|
||||
|
||||
let list = self
|
||||
.sandbox
|
||||
@ -926,14 +960,16 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::StartTracingRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
info!(sl!(), "start_tracing {:?}", req);
|
||||
is_allowed!(req);
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
async fn stop_tracing(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
_req: protocols::agent::StopTracingRequest,
|
||||
req: protocols::agent::StopTracingRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
is_allowed!(req);
|
||||
Ok(Empty::new())
|
||||
}
|
||||
|
||||
@ -943,6 +979,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::CreateSandboxRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "create_sandbox", req);
|
||||
is_allowed!(req);
|
||||
|
||||
{
|
||||
let sandbox = self.sandbox.clone();
|
||||
@ -1008,6 +1045,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::DestroySandboxRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "destroy_sandbox", req);
|
||||
is_allowed!(req);
|
||||
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
let mut sandbox = s.lock().await;
|
||||
@ -1029,6 +1067,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::AddARPNeighborsRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "add_arp_neighbors", req);
|
||||
is_allowed!(req);
|
||||
|
||||
let neighs = req
|
||||
.neighbors
|
||||
@ -1062,6 +1101,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
ctx: &TtrpcContext,
|
||||
req: protocols::agent::OnlineCPUMemRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
is_allowed!(req);
|
||||
let s = Arc::clone(&self.sandbox);
|
||||
let sandbox = s.lock().await;
|
||||
trace_rpc_call!(ctx, "online_cpu_mem", req);
|
||||
@ -1079,6 +1119,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::ReseedRandomDevRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "reseed_random_dev", req);
|
||||
is_allowed!(req);
|
||||
|
||||
random::reseed_rng(req.data.as_slice())
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||
@ -1092,6 +1133,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::GuestDetailsRequest,
|
||||
) -> ttrpc::Result<GuestDetailsResponse> {
|
||||
trace_rpc_call!(ctx, "get_guest_details", req);
|
||||
is_allowed!(req);
|
||||
|
||||
info!(sl!(), "get guest details!");
|
||||
let mut resp = GuestDetailsResponse::new();
|
||||
@ -1120,6 +1162,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::MemHotplugByProbeRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "mem_hotplug_by_probe", req);
|
||||
is_allowed!(req);
|
||||
|
||||
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||
@ -1133,6 +1176,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::SetGuestDateTimeRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "set_guest_date_time", req);
|
||||
is_allowed!(req);
|
||||
|
||||
do_set_guest_date_time(req.Sec, req.Usec)
|
||||
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||
@ -1146,6 +1190,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::CopyFileRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "copy_file", req);
|
||||
is_allowed!(req);
|
||||
|
||||
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
|
||||
|
||||
@ -1158,6 +1203,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::GetMetricsRequest,
|
||||
) -> ttrpc::Result<Metrics> {
|
||||
trace_rpc_call!(ctx, "get_metrics", req);
|
||||
is_allowed!(req);
|
||||
|
||||
match get_metrics(&req) {
|
||||
Err(e) => Err(ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())),
|
||||
@ -1172,8 +1218,9 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
async fn get_oom_event(
|
||||
&self,
|
||||
_ctx: &TtrpcContext,
|
||||
_req: protocols::agent::GetOOMEventRequest,
|
||||
req: protocols::agent::GetOOMEventRequest,
|
||||
) -> ttrpc::Result<OOMEvent> {
|
||||
is_allowed!(req);
|
||||
let sandbox = self.sandbox.clone();
|
||||
let s = sandbox.lock().await;
|
||||
let event_rx = &s.event_rx.clone();
|
||||
@ -1199,6 +1246,7 @@ impl protocols::agent_ttrpc::AgentService for AgentService {
|
||||
req: protocols::agent::AddSwapRequest,
|
||||
) -> ttrpc::Result<Empty> {
|
||||
trace_rpc_call!(ctx, "add_swap", req);
|
||||
is_allowed!(req);
|
||||
|
||||
do_add_swap(&self.sandbox, &req)
|
||||
.await
|
||||
|
@ -7,6 +7,7 @@ use crate::config::AgentConfig;
|
||||
use anyhow::Result;
|
||||
use opentelemetry::sdk::propagation::TraceContextPropagator;
|
||||
use opentelemetry::{global, sdk::trace::Config, trace::TracerProvider};
|
||||
use serde::Deserialize;
|
||||
use slog::{info, o, Logger};
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
@ -17,7 +18,7 @@ use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::Registry;
|
||||
use ttrpc::r#async::TtrpcContext;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
pub enum TraceType {
|
||||
Disabled,
|
||||
Isolated,
|
||||
|
Loading…
Reference in New Issue
Block a user