diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index e87c24e308..e80742b740 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -3020,6 +3020,7 @@ dependencies = [ "image-rs", "ipnetwork", "json-patch", + "kata-agent-policy", "kata-sys-util", "kata-types", "lazy_static", @@ -3069,6 +3070,23 @@ dependencies = [ "which", ] +[[package]] +name = "kata-agent-policy" +version = "0.1.0" +dependencies = [ + "anyhow", + "json-patch", + "logging", + "regorus", + "serde", + "serde_json", + "slog", + "slog-scope", + "slog-term", + "tokio", + "tokio-vsock 0.3.4", +] + [[package]] name = "kata-sys-util" version = "0.1.0" diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 20737f60c6..d06c52b105 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -88,6 +88,7 @@ regorus = { version = "0.2.6", default-features = false, features = [ ], optional = true } cdi = { git = "https://github.com/cncf-tags/container-device-interface-rs", rev = "fba5677a8e7cc962fc6e495fcec98d7d765e332a" } json-patch = "2.0.0" +kata-agent-policy = { path = "policy" } [dev-dependencies] tempfile = "3.1.0" @@ -97,7 +98,7 @@ rstest = "0.18.0" async-std = { version = "1.12.0", features = ["attributes"] } [workspace] -members = ["rustjail"] +members = ["rustjail", "policy"] [profile.release] lto = true diff --git a/src/agent/policy/Cargo.toml b/src/agent/policy/Cargo.toml new file mode 100644 index 0000000000..4bf98fb192 --- /dev/null +++ b/src/agent/policy/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "kata-agent-policy" +version = "0.1.0" +authors = ["The Kata Containers community "] +edition = "2018" +license = "Apache-2.0" + +[dependencies] +# Async runtime +tokio = { version = "1.39.0", features = ["full"] } +tokio-vsock = "0.3.4" + +anyhow = "1" + +# Configuration +serde = { version = "1.0.129", features = ["derive"] } +serde_json = "1.0.39" + +# Agent Policy +regorus = { version = "0.2.8", default-features = false, features = [ + "arc", + "regex", + "std", +] } +json-patch = "2.0.0" + + +# Note: this crate sets the slog 'max_*' features which allows the log level +# to be modified at runtime. +logging = { path = "../../libs/logging" } +slog = "2.5.2" +slog-scope = "4.1.2" +slog-term = "2.9.0" diff --git a/src/agent/policy/src/lib.rs b/src/agent/policy/src/lib.rs new file mode 100644 index 0000000000..994f67969e --- /dev/null +++ b/src/agent/policy/src/lib.rs @@ -0,0 +1,6 @@ +// Copyright (c) 2024 Edgeless Systems GmbH +// +// SPDX-License-Identifier: Apache-2.0 +// + +pub mod policy; diff --git a/src/agent/policy/src/policy.rs b/src/agent/policy/src/policy.rs new file mode 100644 index 0000000000..34f1073853 --- /dev/null +++ b/src/agent/policy/src/policy.rs @@ -0,0 +1,243 @@ +// Copyright (c) 2023 Microsoft Corporation +// Copyright (c) 2024 Edgeless Systems GmbH +// +// SPDX-License-Identifier: Apache-2.0 +// + +//! Policy evaluation for the kata-agent. + +use anyhow::{bail, Result}; +use slog::{debug, error, info, warn}; +use tokio::io::AsyncWriteExt; + +static POLICY_LOG_FILE: &str = "/tmp/policy.txt"; +static POLICY_DEFAULT_FILE: &str = "/etc/kata-opa/default-policy.rego"; + +/// Convenience macro to obtain the scope logger +macro_rules! sl { + () => { + slog_scope::logger() + }; +} + +/// Singleton policy object. +#[derive(Debug, Default)] +pub struct AgentPolicy { + /// When true policy errors are ignored, for debug purposes. + allow_failures: bool, + + /// "/tmp/policy.txt" log file for policy activity. + log_file: Option, + + /// Regorus engine + engine: regorus::Engine, +} + +#[derive(serde::Deserialize, Debug)] +struct MetadataResponse { + allowed: bool, + ops: Option, +} + +impl AgentPolicy { + /// Create AgentPolicy object. + pub fn new() -> Self { + Self { + allow_failures: false, + engine: Self::new_engine(), + ..Default::default() + } + } + + fn new_engine() -> regorus::Engine { + let mut engine = regorus::Engine::new(); + engine.set_strict_builtin_errors(false); + engine.set_gather_prints(true); + // assign a slice of the engine data "pstate" to be used as policy state + engine + .add_data( + regorus::Value::from_json_str( + r#"{ + "pstate": {} + }"#, + ) + .unwrap(), + ) + .unwrap(); + engine + } + + /// Initialize regorus. + pub async fn initialize( + &mut self, + log_level: usize, + default_policy_file: String, + log_file: Option, + ) -> Result<()> { + // log file path + let log_file_path = match log_file { + Some(path) => path, + None => POLICY_LOG_FILE.to_string(), + }; + let log_file_path = log_file_path.as_str(); + + if log_level >= slog::Level::Debug.as_usize() { + self.log_file = Some( + tokio::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&log_file_path) + .await?, + ); + debug!(sl!(), "policy: log file: {}", log_file_path); + } + + // Check if policy file has been set via AgentConfig + // If empty, use default file. + let mut default_policy_file = default_policy_file; + if default_policy_file.is_empty() { + default_policy_file = POLICY_DEFAULT_FILE.to_string(); + } + info!(sl!(), "default policy: {default_policy_file}"); + + self.engine.add_policy_from_file(default_policy_file)?; + self.update_allow_failures_flag().await?; + Ok(()) + } + + async fn apply_patch_to_state(&mut self, patch: json_patch::Patch) -> Result<()> { + // Convert the current engine data to a JSON value + let mut state = serde_json::to_value(self.engine.get_data())?; + + // Apply the patch to the state + json_patch::patch(&mut state, &patch)?; + + // Clear the existing data in the engine + self.engine.clear_data(); + + // Add the patched state back to the engine + self.engine + .add_data(regorus::Value::from_json_str(&state.to_string())?)?; + + Ok(()) + } + + /// Ask regorus if an API call should be allowed or not. + pub async fn allow_request(&mut self, ep: &str, ep_input: &str) -> Result<(bool, String)> { + debug!(sl!(), "policy check: {ep}"); + self.log_eval_input(ep, ep_input).await; + + let query = format!("data.agent_policy.{ep}"); + self.engine.set_input_json(ep_input)?; + + let results = self.engine.eval_query(query, false)?; + + let prints = match self.engine.take_prints() { + Ok(p) => p.join(" "), + Err(e) => format!("Failed to get policy log: {e}"), + }; + + if results.result.len() != 1 { + // Results are empty when AllowRequestsFailingPolicy is used to allow a Request that hasn't been defined in the policy + if self.allow_failures { + return Ok((true, prints)); + } + bail!( + "policy check: unexpected eval_query result len {:?}", + results + ); + } + + if results.result[0].expressions.len() != 1 { + bail!( + "policy check: unexpected eval_query result expressions {:?}", + results + ); + } + + let mut allow = match &results.result[0].expressions[0].value { + regorus::Value::Bool(b) => *b, + + // Match against a specific variant that could be interpreted as MetadataResponse + regorus::Value::Object(obj) => { + let json_str = serde_json::to_string(obj)?; + + self.log_eval_input(ep, &json_str).await; + + let metadata_response: MetadataResponse = serde_json::from_str(&json_str)?; + + if metadata_response.allowed { + if let Some(ops) = metadata_response.ops { + self.apply_patch_to_state(ops).await?; + } + } + metadata_response.allowed + } + + _ => { + error!(sl!(), "allow_request: unexpected eval_query result type"); + bail!( + "policy check: unexpected eval_query result type {:?}", + results + ); + } + }; + + if !allow && self.allow_failures { + warn!(sl!(), "policy: ignoring error for {ep}"); + allow = true; + } + + Ok((allow, prints)) + } + + /// Replace the Policy in regorus. + pub async fn set_policy(&mut self, policy: &str) -> Result<()> { + self.engine = Self::new_engine(); + self.engine + .add_policy("agent_policy".to_string(), policy.to_string())?; + self.update_allow_failures_flag().await?; + Ok(()) + } + + async fn log_eval_input(&mut self, ep: &str, input: &str) { + if let Some(log_file) = &mut self.log_file { + match ep { + "StatsContainerRequest" | "ReadStreamRequest" | "SetPolicyRequest" => { + // - StatsContainerRequest and ReadStreamRequest are called + // relatively often, so we're not logging them, to avoid + // growing this log file too much. + // - Confidential Containers Policy documents are relatively + // large, so we're not logging them here, for SetPolicyRequest. + // The Policy text can be obtained directly from the pod YAML. + } + _ => { + let log_entry = format!("[\"ep\":\"{ep}\",{input}],\n\n"); + + if let Err(e) = log_file.write_all(log_entry.as_bytes()).await { + warn!(sl!(), "policy: log_eval_input: write_all failed: {}", e); + } else if let Err(e) = log_file.flush().await { + warn!(sl!(), "policy: log_eval_input: flush failed: {}", e); + } + } + } + } + } + + async fn update_allow_failures_flag(&mut self) -> Result<()> { + self.allow_failures = match self.allow_request("AllowRequestsFailingPolicy", "{}").await { + Ok((allowed, _prints)) => { + if allowed { + warn!( + sl!(), + "policy: AllowRequestsFailingPolicy is enabled - will ignore errors" + ); + } + allowed + } + Err(_) => false, + }; + Ok(()) + } +} diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index f8a1a24da2..2e7698706c 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -134,7 +134,7 @@ lazy_static! { #[cfg(feature = "agent-policy")] lazy_static! { - static ref AGENT_POLICY: Mutex = Mutex::new(AgentPolicy::new()); + static ref AGENT_POLICY: Mutex = Mutex::new(AgentPolicy::new()); } #[derive(Parser)] @@ -633,7 +633,15 @@ fn init_agent_as_init(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result #[cfg(feature = "agent-policy")] async fn initialize_policy() -> Result<()> { - AGENT_POLICY.lock().await.initialize().await + AGENT_POLICY + .lock() + .await + .initialize( + AGENT_CONFIG.log_level.as_usize(), + AGENT_CONFIG.policy_file.clone(), + None, + ) + .await } // The Rust standard library had suppressed the default SIGPIPE behavior, @@ -651,7 +659,7 @@ use crate::config::AgentConfig; use std::os::unix::io::{FromRawFd, RawFd}; #[cfg(feature = "agent-policy")] -use crate::policy::AgentPolicy; +use kata_agent_policy::policy::AgentPolicy; #[cfg(test)] mod tests { diff --git a/src/agent/src/policy.rs b/src/agent/src/policy.rs index 08587a6d03..1b8f391c0b 100644 --- a/src/agent/src/policy.rs +++ b/src/agent/src/policy.rs @@ -3,22 +3,11 @@ // SPDX-License-Identifier: Apache-2.0 // -use anyhow::{bail, Result}; use protobuf::MessageDyn; -use tokio::io::AsyncWriteExt; use crate::rpc::ttrpc_error; -use crate::{AGENT_CONFIG, AGENT_POLICY}; - -static POLICY_LOG_FILE: &str = "/tmp/policy.txt"; -static POLICY_DEFAULT_FILE: &str = "/etc/kata-opa/default-policy.rego"; - -/// Convenience macro to obtain the scope logger -macro_rules! sl { - () => { - slog_scope::logger() - }; -} +use crate::AGENT_POLICY; +use kata_agent_policy::policy::AgentPolicy; async fn allow_request(policy: &mut AgentPolicy, ep: &str, request: &str) -> ttrpc::Result<()> { match policy.allow_request(ep, request).await { @@ -54,213 +43,3 @@ pub async fn do_set_policy(req: &protocols::agent::SetPolicyRequest) -> ttrpc::R .await .map_err(|e| ttrpc_error(ttrpc::Code::INVALID_ARGUMENT, e)) } - -/// Singleton policy object. -#[derive(Debug, Default)] -pub struct AgentPolicy { - /// When true policy errors are ignored, for debug purposes. - allow_failures: bool, - - /// "/tmp/policy.txt" log file for policy activity. - log_file: Option, - - /// Regorus engine - engine: regorus::Engine, -} - -#[derive(serde::Deserialize, Debug)] -struct MetadataResponse { - allowed: bool, - ops: Option, -} - -impl AgentPolicy { - /// Create AgentPolicy object. - pub fn new() -> Self { - Self { - allow_failures: false, - engine: Self::new_engine(), - ..Default::default() - } - } - - fn new_engine() -> regorus::Engine { - let mut engine = regorus::Engine::new(); - engine.set_strict_builtin_errors(false); - engine.set_gather_prints(true); - // assign a slice of the engine data "pstate" to be used as policy state - engine - .add_data( - regorus::Value::from_json_str( - r#"{ - "pstate": {} - }"#, - ) - .unwrap(), - ) - .unwrap(); - engine - } - - /// Initialize regorus. - pub async fn initialize(&mut self) -> Result<()> { - if AGENT_CONFIG.log_level.as_usize() >= slog::Level::Debug.as_usize() { - self.log_file = Some( - tokio::fs::OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(POLICY_LOG_FILE) - .await?, - ); - debug!(sl!(), "policy: log file: {}", POLICY_LOG_FILE); - } - - // Check if policy file has been set via AgentConfig - // If empty, use default file. - let mut default_policy_file = AGENT_CONFIG.policy_file.clone(); - if default_policy_file.is_empty() { - default_policy_file = POLICY_DEFAULT_FILE.to_string(); - } - info!(sl!(), "default policy: {default_policy_file}"); - - self.engine.add_policy_from_file(default_policy_file)?; - self.update_allow_failures_flag().await?; - Ok(()) - } - - async fn apply_patch_to_state(&mut self, patch: json_patch::Patch) -> Result<()> { - // Convert the current engine data to a JSON value - let mut state = serde_json::to_value(self.engine.get_data())?; - - // Apply the patch to the state - json_patch::patch(&mut state, &patch)?; - - // Clear the existing data in the engine - self.engine.clear_data(); - - // Add the patched state back to the engine - self.engine - .add_data(regorus::Value::from_json_str(&state.to_string())?)?; - - Ok(()) - } - - /// Ask regorus if an API call should be allowed or not. - async fn allow_request(&mut self, ep: &str, ep_input: &str) -> Result<(bool, String)> { - debug!(sl!(), "policy check: {ep}"); - self.log_eval_input(ep, ep_input).await; - - let query = format!("data.agent_policy.{ep}"); - self.engine.set_input_json(ep_input)?; - - let results = self.engine.eval_query(query, false)?; - - let prints = match self.engine.take_prints() { - Ok(p) => p.join(" "), - Err(e) => format!("Failed to get policy log: {e}"), - }; - - if results.result.len() != 1 { - // Results are empty when AllowRequestsFailingPolicy is used to allow a Request that hasn't been defined in the policy - if self.allow_failures { - return Ok((true, prints)); - } - bail!( - "policy check: unexpected eval_query result len {:?}", - results - ); - } - - if results.result[0].expressions.len() != 1 { - bail!( - "policy check: unexpected eval_query result expressions {:?}", - results - ); - } - - let mut allow = match &results.result[0].expressions[0].value { - regorus::Value::Bool(b) => *b, - - // Match against a specific variant that could be interpreted as MetadataResponse - regorus::Value::Object(obj) => { - let json_str = serde_json::to_string(obj)?; - - self.log_eval_input(ep, &json_str).await; - - let metadata_response: MetadataResponse = serde_json::from_str(&json_str)?; - - if metadata_response.allowed { - if let Some(ops) = metadata_response.ops { - self.apply_patch_to_state(ops).await?; - } - } - metadata_response.allowed - } - - _ => { - error!(sl!(), "allow_request: unexpected eval_query result type"); - bail!( - "policy check: unexpected eval_query result type {:?}", - results - ); - } - }; - - if !allow && self.allow_failures { - warn!(sl!(), "policy: ignoring error for {ep}"); - allow = true; - } - - Ok((allow, prints)) - } - - /// Replace the Policy in regorus. - pub async fn set_policy(&mut self, policy: &str) -> Result<()> { - self.engine = Self::new_engine(); - self.engine - .add_policy("agent_policy".to_string(), policy.to_string())?; - self.update_allow_failures_flag().await?; - Ok(()) - } - - async fn log_eval_input(&mut self, ep: &str, input: &str) { - if let Some(log_file) = &mut self.log_file { - match ep { - "StatsContainerRequest" | "ReadStreamRequest" | "SetPolicyRequest" => { - // - StatsContainerRequest and ReadStreamRequest are called - // relatively often, so we're not logging them, to avoid - // growing this log file too much. - // - Confidential Containers Policy documents are relatively - // large, so we're not logging them here, for SetPolicyRequest. - // The Policy text can be obtained directly from the pod YAML. - } - _ => { - let log_entry = format!("[\"ep\":\"{ep}\",{input}],\n\n"); - - if let Err(e) = log_file.write_all(log_entry.as_bytes()).await { - warn!(sl!(), "policy: log_eval_input: write_all failed: {}", e); - } else if let Err(e) = log_file.flush().await { - warn!(sl!(), "policy: log_eval_input: flush failed: {}", e); - } - } - } - } - } - - async fn update_allow_failures_flag(&mut self) -> Result<()> { - self.allow_failures = match self.allow_request("AllowRequestsFailingPolicy", "{}").await { - Ok((allowed, _prints)) => { - if allowed { - warn!( - sl!(), - "policy: AllowRequestsFailingPolicy is enabled - will ignore errors" - ); - } - allowed - } - Err(_) => false, - }; - Ok(()) - } -}