From cf54a1b0e1d868e63c2844c23d4e1ed850e3f578 Mon Sep 17 00:00:00 2001 From: Leonard Cohnen Date: Tue, 3 Dec 2024 20:52:24 +0100 Subject: [PATCH] agent: move policy module into separate crate The policy module augments the policy generated with genpolicy by keeping and providing state to each invocation. Therefore, it is not sufficient anymore to test the passing of requests in the genpolicy crate. Since in Rust, integration tests cannot call functions that are not exposed publicly, this commit factors out the policy module of the agent into its own crate and exposes the necessary functions to be consumed by the agent and an integration tests. The integration test itself is implemented in the following commits. Signed-off-by: Leonard Cohnen --- src/agent/Cargo.lock | 18 +++ src/agent/Cargo.toml | 3 +- src/agent/policy/Cargo.toml | 33 +++++ src/agent/policy/src/lib.rs | 6 + src/agent/policy/src/policy.rs | 243 +++++++++++++++++++++++++++++++++ src/agent/src/main.rs | 14 +- src/agent/src/policy.rs | 225 +----------------------------- 7 files changed, 315 insertions(+), 227 deletions(-) create mode 100644 src/agent/policy/Cargo.toml create mode 100644 src/agent/policy/src/lib.rs create mode 100644 src/agent/policy/src/policy.rs diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index e87c24e308..e80742b740 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -3020,6 +3020,7 @@ dependencies = [ "image-rs", "ipnetwork", "json-patch", + "kata-agent-policy", "kata-sys-util", "kata-types", "lazy_static", @@ -3069,6 +3070,23 @@ dependencies = [ "which", ] +[[package]] +name = "kata-agent-policy" +version = "0.1.0" +dependencies = [ + "anyhow", + "json-patch", + "logging", + "regorus", + "serde", + "serde_json", + "slog", + "slog-scope", + "slog-term", + "tokio", + "tokio-vsock 0.3.4", +] + [[package]] name = "kata-sys-util" version = "0.1.0" diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 20737f60c6..d06c52b105 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -88,6 +88,7 @@ regorus = { version = "0.2.6", default-features = false, features = [ ], optional = true } cdi = { git = "https://github.com/cncf-tags/container-device-interface-rs", rev = "fba5677a8e7cc962fc6e495fcec98d7d765e332a" } json-patch = "2.0.0" +kata-agent-policy = { path = "policy" } [dev-dependencies] tempfile = "3.1.0" @@ -97,7 +98,7 @@ rstest = "0.18.0" async-std = { version = "1.12.0", features = ["attributes"] } [workspace] -members = ["rustjail"] +members = ["rustjail", "policy"] [profile.release] lto = true diff --git a/src/agent/policy/Cargo.toml b/src/agent/policy/Cargo.toml new file mode 100644 index 0000000000..4bf98fb192 --- /dev/null +++ b/src/agent/policy/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "kata-agent-policy" +version = "0.1.0" +authors = ["The Kata Containers community "] +edition = "2018" +license = "Apache-2.0" + +[dependencies] +# Async runtime +tokio = { version = "1.39.0", features = ["full"] } +tokio-vsock = "0.3.4" + +anyhow = "1" + +# Configuration +serde = { version = "1.0.129", features = ["derive"] } +serde_json = "1.0.39" + +# Agent Policy +regorus = { version = "0.2.8", default-features = false, features = [ + "arc", + "regex", + "std", +] } +json-patch = "2.0.0" + + +# Note: this crate sets the slog 'max_*' features which allows the log level +# to be modified at runtime. +logging = { path = "../../libs/logging" } +slog = "2.5.2" +slog-scope = "4.1.2" +slog-term = "2.9.0" diff --git a/src/agent/policy/src/lib.rs b/src/agent/policy/src/lib.rs new file mode 100644 index 0000000000..994f67969e --- /dev/null +++ b/src/agent/policy/src/lib.rs @@ -0,0 +1,6 @@ +// Copyright (c) 2024 Edgeless Systems GmbH +// +// SPDX-License-Identifier: Apache-2.0 +// + +pub mod policy; diff --git a/src/agent/policy/src/policy.rs b/src/agent/policy/src/policy.rs new file mode 100644 index 0000000000..34f1073853 --- /dev/null +++ b/src/agent/policy/src/policy.rs @@ -0,0 +1,243 @@ +// Copyright (c) 2023 Microsoft Corporation +// Copyright (c) 2024 Edgeless Systems GmbH +// +// SPDX-License-Identifier: Apache-2.0 +// + +//! Policy evaluation for the kata-agent. + +use anyhow::{bail, Result}; +use slog::{debug, error, info, warn}; +use tokio::io::AsyncWriteExt; + +static POLICY_LOG_FILE: &str = "/tmp/policy.txt"; +static POLICY_DEFAULT_FILE: &str = "/etc/kata-opa/default-policy.rego"; + +/// Convenience macro to obtain the scope logger +macro_rules! sl { + () => { + slog_scope::logger() + }; +} + +/// Singleton policy object. +#[derive(Debug, Default)] +pub struct AgentPolicy { + /// When true policy errors are ignored, for debug purposes. + allow_failures: bool, + + /// "/tmp/policy.txt" log file for policy activity. + log_file: Option, + + /// Regorus engine + engine: regorus::Engine, +} + +#[derive(serde::Deserialize, Debug)] +struct MetadataResponse { + allowed: bool, + ops: Option, +} + +impl AgentPolicy { + /// Create AgentPolicy object. + pub fn new() -> Self { + Self { + allow_failures: false, + engine: Self::new_engine(), + ..Default::default() + } + } + + fn new_engine() -> regorus::Engine { + let mut engine = regorus::Engine::new(); + engine.set_strict_builtin_errors(false); + engine.set_gather_prints(true); + // assign a slice of the engine data "pstate" to be used as policy state + engine + .add_data( + regorus::Value::from_json_str( + r#"{ + "pstate": {} + }"#, + ) + .unwrap(), + ) + .unwrap(); + engine + } + + /// Initialize regorus. + pub async fn initialize( + &mut self, + log_level: usize, + default_policy_file: String, + log_file: Option, + ) -> Result<()> { + // log file path + let log_file_path = match log_file { + Some(path) => path, + None => POLICY_LOG_FILE.to_string(), + }; + let log_file_path = log_file_path.as_str(); + + if log_level >= slog::Level::Debug.as_usize() { + self.log_file = Some( + tokio::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&log_file_path) + .await?, + ); + debug!(sl!(), "policy: log file: {}", log_file_path); + } + + // Check if policy file has been set via AgentConfig + // If empty, use default file. + let mut default_policy_file = default_policy_file; + if default_policy_file.is_empty() { + default_policy_file = POLICY_DEFAULT_FILE.to_string(); + } + info!(sl!(), "default policy: {default_policy_file}"); + + self.engine.add_policy_from_file(default_policy_file)?; + self.update_allow_failures_flag().await?; + Ok(()) + } + + async fn apply_patch_to_state(&mut self, patch: json_patch::Patch) -> Result<()> { + // Convert the current engine data to a JSON value + let mut state = serde_json::to_value(self.engine.get_data())?; + + // Apply the patch to the state + json_patch::patch(&mut state, &patch)?; + + // Clear the existing data in the engine + self.engine.clear_data(); + + // Add the patched state back to the engine + self.engine + .add_data(regorus::Value::from_json_str(&state.to_string())?)?; + + Ok(()) + } + + /// Ask regorus if an API call should be allowed or not. + pub async fn allow_request(&mut self, ep: &str, ep_input: &str) -> Result<(bool, String)> { + debug!(sl!(), "policy check: {ep}"); + self.log_eval_input(ep, ep_input).await; + + let query = format!("data.agent_policy.{ep}"); + self.engine.set_input_json(ep_input)?; + + let results = self.engine.eval_query(query, false)?; + + let prints = match self.engine.take_prints() { + Ok(p) => p.join(" "), + Err(e) => format!("Failed to get policy log: {e}"), + }; + + if results.result.len() != 1 { + // Results are empty when AllowRequestsFailingPolicy is used to allow a Request that hasn't been defined in the policy + if self.allow_failures { + return Ok((true, prints)); + } + bail!( + "policy check: unexpected eval_query result len {:?}", + results + ); + } + + if results.result[0].expressions.len() != 1 { + bail!( + "policy check: unexpected eval_query result expressions {:?}", + results + ); + } + + let mut allow = match &results.result[0].expressions[0].value { + regorus::Value::Bool(b) => *b, + + // Match against a specific variant that could be interpreted as MetadataResponse + regorus::Value::Object(obj) => { + let json_str = serde_json::to_string(obj)?; + + self.log_eval_input(ep, &json_str).await; + + let metadata_response: MetadataResponse = serde_json::from_str(&json_str)?; + + if metadata_response.allowed { + if let Some(ops) = metadata_response.ops { + self.apply_patch_to_state(ops).await?; + } + } + metadata_response.allowed + } + + _ => { + error!(sl!(), "allow_request: unexpected eval_query result type"); + bail!( + "policy check: unexpected eval_query result type {:?}", + results + ); + } + }; + + if !allow && self.allow_failures { + warn!(sl!(), "policy: ignoring error for {ep}"); + allow = true; + } + + Ok((allow, prints)) + } + + /// Replace the Policy in regorus. + pub async fn set_policy(&mut self, policy: &str) -> Result<()> { + self.engine = Self::new_engine(); + self.engine + .add_policy("agent_policy".to_string(), policy.to_string())?; + self.update_allow_failures_flag().await?; + Ok(()) + } + + async fn log_eval_input(&mut self, ep: &str, input: &str) { + if let Some(log_file) = &mut self.log_file { + match ep { + "StatsContainerRequest" | "ReadStreamRequest" | "SetPolicyRequest" => { + // - StatsContainerRequest and ReadStreamRequest are called + // relatively often, so we're not logging them, to avoid + // growing this log file too much. + // - Confidential Containers Policy documents are relatively + // large, so we're not logging them here, for SetPolicyRequest. + // The Policy text can be obtained directly from the pod YAML. + } + _ => { + let log_entry = format!("[\"ep\":\"{ep}\",{input}],\n\n"); + + if let Err(e) = log_file.write_all(log_entry.as_bytes()).await { + warn!(sl!(), "policy: log_eval_input: write_all failed: {}", e); + } else if let Err(e) = log_file.flush().await { + warn!(sl!(), "policy: log_eval_input: flush failed: {}", e); + } + } + } + } + } + + async fn update_allow_failures_flag(&mut self) -> Result<()> { + self.allow_failures = match self.allow_request("AllowRequestsFailingPolicy", "{}").await { + Ok((allowed, _prints)) => { + if allowed { + warn!( + sl!(), + "policy: AllowRequestsFailingPolicy is enabled - will ignore errors" + ); + } + allowed + } + Err(_) => false, + }; + Ok(()) + } +} diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index f8a1a24da2..2e7698706c 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -134,7 +134,7 @@ lazy_static! { #[cfg(feature = "agent-policy")] lazy_static! { - static ref AGENT_POLICY: Mutex = Mutex::new(AgentPolicy::new()); + static ref AGENT_POLICY: Mutex = Mutex::new(AgentPolicy::new()); } #[derive(Parser)] @@ -633,7 +633,15 @@ fn init_agent_as_init(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result #[cfg(feature = "agent-policy")] async fn initialize_policy() -> Result<()> { - AGENT_POLICY.lock().await.initialize().await + AGENT_POLICY + .lock() + .await + .initialize( + AGENT_CONFIG.log_level.as_usize(), + AGENT_CONFIG.policy_file.clone(), + None, + ) + .await } // The Rust standard library had suppressed the default SIGPIPE behavior, @@ -651,7 +659,7 @@ use crate::config::AgentConfig; use std::os::unix::io::{FromRawFd, RawFd}; #[cfg(feature = "agent-policy")] -use crate::policy::AgentPolicy; +use kata_agent_policy::policy::AgentPolicy; #[cfg(test)] mod tests { diff --git a/src/agent/src/policy.rs b/src/agent/src/policy.rs index 08587a6d03..1b8f391c0b 100644 --- a/src/agent/src/policy.rs +++ b/src/agent/src/policy.rs @@ -3,22 +3,11 @@ // SPDX-License-Identifier: Apache-2.0 // -use anyhow::{bail, Result}; use protobuf::MessageDyn; -use tokio::io::AsyncWriteExt; use crate::rpc::ttrpc_error; -use crate::{AGENT_CONFIG, AGENT_POLICY}; - -static POLICY_LOG_FILE: &str = "/tmp/policy.txt"; -static POLICY_DEFAULT_FILE: &str = "/etc/kata-opa/default-policy.rego"; - -/// Convenience macro to obtain the scope logger -macro_rules! sl { - () => { - slog_scope::logger() - }; -} +use crate::AGENT_POLICY; +use kata_agent_policy::policy::AgentPolicy; async fn allow_request(policy: &mut AgentPolicy, ep: &str, request: &str) -> ttrpc::Result<()> { match policy.allow_request(ep, request).await { @@ -54,213 +43,3 @@ pub async fn do_set_policy(req: &protocols::agent::SetPolicyRequest) -> ttrpc::R .await .map_err(|e| ttrpc_error(ttrpc::Code::INVALID_ARGUMENT, e)) } - -/// Singleton policy object. -#[derive(Debug, Default)] -pub struct AgentPolicy { - /// When true policy errors are ignored, for debug purposes. - allow_failures: bool, - - /// "/tmp/policy.txt" log file for policy activity. - log_file: Option, - - /// Regorus engine - engine: regorus::Engine, -} - -#[derive(serde::Deserialize, Debug)] -struct MetadataResponse { - allowed: bool, - ops: Option, -} - -impl AgentPolicy { - /// Create AgentPolicy object. - pub fn new() -> Self { - Self { - allow_failures: false, - engine: Self::new_engine(), - ..Default::default() - } - } - - fn new_engine() -> regorus::Engine { - let mut engine = regorus::Engine::new(); - engine.set_strict_builtin_errors(false); - engine.set_gather_prints(true); - // assign a slice of the engine data "pstate" to be used as policy state - engine - .add_data( - regorus::Value::from_json_str( - r#"{ - "pstate": {} - }"#, - ) - .unwrap(), - ) - .unwrap(); - engine - } - - /// Initialize regorus. - pub async fn initialize(&mut self) -> Result<()> { - if AGENT_CONFIG.log_level.as_usize() >= slog::Level::Debug.as_usize() { - self.log_file = Some( - tokio::fs::OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(POLICY_LOG_FILE) - .await?, - ); - debug!(sl!(), "policy: log file: {}", POLICY_LOG_FILE); - } - - // Check if policy file has been set via AgentConfig - // If empty, use default file. - let mut default_policy_file = AGENT_CONFIG.policy_file.clone(); - if default_policy_file.is_empty() { - default_policy_file = POLICY_DEFAULT_FILE.to_string(); - } - info!(sl!(), "default policy: {default_policy_file}"); - - self.engine.add_policy_from_file(default_policy_file)?; - self.update_allow_failures_flag().await?; - Ok(()) - } - - async fn apply_patch_to_state(&mut self, patch: json_patch::Patch) -> Result<()> { - // Convert the current engine data to a JSON value - let mut state = serde_json::to_value(self.engine.get_data())?; - - // Apply the patch to the state - json_patch::patch(&mut state, &patch)?; - - // Clear the existing data in the engine - self.engine.clear_data(); - - // Add the patched state back to the engine - self.engine - .add_data(regorus::Value::from_json_str(&state.to_string())?)?; - - Ok(()) - } - - /// Ask regorus if an API call should be allowed or not. - async fn allow_request(&mut self, ep: &str, ep_input: &str) -> Result<(bool, String)> { - debug!(sl!(), "policy check: {ep}"); - self.log_eval_input(ep, ep_input).await; - - let query = format!("data.agent_policy.{ep}"); - self.engine.set_input_json(ep_input)?; - - let results = self.engine.eval_query(query, false)?; - - let prints = match self.engine.take_prints() { - Ok(p) => p.join(" "), - Err(e) => format!("Failed to get policy log: {e}"), - }; - - if results.result.len() != 1 { - // Results are empty when AllowRequestsFailingPolicy is used to allow a Request that hasn't been defined in the policy - if self.allow_failures { - return Ok((true, prints)); - } - bail!( - "policy check: unexpected eval_query result len {:?}", - results - ); - } - - if results.result[0].expressions.len() != 1 { - bail!( - "policy check: unexpected eval_query result expressions {:?}", - results - ); - } - - let mut allow = match &results.result[0].expressions[0].value { - regorus::Value::Bool(b) => *b, - - // Match against a specific variant that could be interpreted as MetadataResponse - regorus::Value::Object(obj) => { - let json_str = serde_json::to_string(obj)?; - - self.log_eval_input(ep, &json_str).await; - - let metadata_response: MetadataResponse = serde_json::from_str(&json_str)?; - - if metadata_response.allowed { - if let Some(ops) = metadata_response.ops { - self.apply_patch_to_state(ops).await?; - } - } - metadata_response.allowed - } - - _ => { - error!(sl!(), "allow_request: unexpected eval_query result type"); - bail!( - "policy check: unexpected eval_query result type {:?}", - results - ); - } - }; - - if !allow && self.allow_failures { - warn!(sl!(), "policy: ignoring error for {ep}"); - allow = true; - } - - Ok((allow, prints)) - } - - /// Replace the Policy in regorus. - pub async fn set_policy(&mut self, policy: &str) -> Result<()> { - self.engine = Self::new_engine(); - self.engine - .add_policy("agent_policy".to_string(), policy.to_string())?; - self.update_allow_failures_flag().await?; - Ok(()) - } - - async fn log_eval_input(&mut self, ep: &str, input: &str) { - if let Some(log_file) = &mut self.log_file { - match ep { - "StatsContainerRequest" | "ReadStreamRequest" | "SetPolicyRequest" => { - // - StatsContainerRequest and ReadStreamRequest are called - // relatively often, so we're not logging them, to avoid - // growing this log file too much. - // - Confidential Containers Policy documents are relatively - // large, so we're not logging them here, for SetPolicyRequest. - // The Policy text can be obtained directly from the pod YAML. - } - _ => { - let log_entry = format!("[\"ep\":\"{ep}\",{input}],\n\n"); - - if let Err(e) = log_file.write_all(log_entry.as_bytes()).await { - warn!(sl!(), "policy: log_eval_input: write_all failed: {}", e); - } else if let Err(e) = log_file.flush().await { - warn!(sl!(), "policy: log_eval_input: flush failed: {}", e); - } - } - } - } - } - - async fn update_allow_failures_flag(&mut self) -> Result<()> { - self.allow_failures = match self.allow_request("AllowRequestsFailingPolicy", "{}").await { - Ok((allowed, _prints)) => { - if allowed { - warn!( - sl!(), - "policy: AllowRequestsFailingPolicy is enabled - will ignore errors" - ); - } - allowed - } - Err(_) => false, - }; - Ok(()) - } -}