agent: move policy module into separate crate

The policy module augments the policy generated with genpolicy by keeping and
providing state to each invocation.
Therefore, it is not sufficient anymore to test the passing of requests in
the genpolicy crate.

Since in Rust, integration tests cannot call functions that are not exposed
publicly, this commit factors out the policy module of the agent into its
own crate and exposes the necessary functions to be consumed by the agent
and an integration tests. The integration test itself is implemented in the
following commits.

Signed-off-by: Leonard Cohnen <lc@edgeless.systems>
This commit is contained in:
Leonard Cohnen 2024-12-03 20:52:24 +01:00
parent ec7b2aa441
commit cf54a1b0e1
7 changed files with 315 additions and 227 deletions

18
src/agent/Cargo.lock generated
View File

@ -3020,6 +3020,7 @@ dependencies = [
"image-rs",
"ipnetwork",
"json-patch",
"kata-agent-policy",
"kata-sys-util",
"kata-types",
"lazy_static",
@ -3069,6 +3070,23 @@ dependencies = [
"which",
]
[[package]]
name = "kata-agent-policy"
version = "0.1.0"
dependencies = [
"anyhow",
"json-patch",
"logging",
"regorus",
"serde",
"serde_json",
"slog",
"slog-scope",
"slog-term",
"tokio",
"tokio-vsock 0.3.4",
]
[[package]]
name = "kata-sys-util"
version = "0.1.0"

View File

@ -88,6 +88,7 @@ regorus = { version = "0.2.6", default-features = false, features = [
], optional = true }
cdi = { git = "https://github.com/cncf-tags/container-device-interface-rs", rev = "fba5677a8e7cc962fc6e495fcec98d7d765e332a" }
json-patch = "2.0.0"
kata-agent-policy = { path = "policy" }
[dev-dependencies]
tempfile = "3.1.0"
@ -97,7 +98,7 @@ rstest = "0.18.0"
async-std = { version = "1.12.0", features = ["attributes"] }
[workspace]
members = ["rustjail"]
members = ["rustjail", "policy"]
[profile.release]
lto = true

View File

@ -0,0 +1,33 @@
[package]
name = "kata-agent-policy"
version = "0.1.0"
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
edition = "2018"
license = "Apache-2.0"
[dependencies]
# Async runtime
tokio = { version = "1.39.0", features = ["full"] }
tokio-vsock = "0.3.4"
anyhow = "1"
# Configuration
serde = { version = "1.0.129", features = ["derive"] }
serde_json = "1.0.39"
# Agent Policy
regorus = { version = "0.2.8", default-features = false, features = [
"arc",
"regex",
"std",
] }
json-patch = "2.0.0"
# Note: this crate sets the slog 'max_*' features which allows the log level
# to be modified at runtime.
logging = { path = "../../libs/logging" }
slog = "2.5.2"
slog-scope = "4.1.2"
slog-term = "2.9.0"

View File

@ -0,0 +1,6 @@
// Copyright (c) 2024 Edgeless Systems GmbH
//
// SPDX-License-Identifier: Apache-2.0
//
pub mod policy;

View File

@ -0,0 +1,243 @@
// Copyright (c) 2023 Microsoft Corporation
// Copyright (c) 2024 Edgeless Systems GmbH
//
// SPDX-License-Identifier: Apache-2.0
//
//! Policy evaluation for the kata-agent.
use anyhow::{bail, Result};
use slog::{debug, error, info, warn};
use tokio::io::AsyncWriteExt;
static POLICY_LOG_FILE: &str = "/tmp/policy.txt";
static POLICY_DEFAULT_FILE: &str = "/etc/kata-opa/default-policy.rego";
/// Convenience macro to obtain the scope logger
macro_rules! sl {
() => {
slog_scope::logger()
};
}
/// Singleton policy object.
#[derive(Debug, Default)]
pub struct AgentPolicy {
/// When true policy errors are ignored, for debug purposes.
allow_failures: bool,
/// "/tmp/policy.txt" log file for policy activity.
log_file: Option<tokio::fs::File>,
/// Regorus engine
engine: regorus::Engine,
}
#[derive(serde::Deserialize, Debug)]
struct MetadataResponse {
allowed: bool,
ops: Option<json_patch::Patch>,
}
impl AgentPolicy {
/// Create AgentPolicy object.
pub fn new() -> Self {
Self {
allow_failures: false,
engine: Self::new_engine(),
..Default::default()
}
}
fn new_engine() -> regorus::Engine {
let mut engine = regorus::Engine::new();
engine.set_strict_builtin_errors(false);
engine.set_gather_prints(true);
// assign a slice of the engine data "pstate" to be used as policy state
engine
.add_data(
regorus::Value::from_json_str(
r#"{
"pstate": {}
}"#,
)
.unwrap(),
)
.unwrap();
engine
}
/// Initialize regorus.
pub async fn initialize(
&mut self,
log_level: usize,
default_policy_file: String,
log_file: Option<String>,
) -> Result<()> {
// log file path
let log_file_path = match log_file {
Some(path) => path,
None => POLICY_LOG_FILE.to_string(),
};
let log_file_path = log_file_path.as_str();
if log_level >= slog::Level::Debug.as_usize() {
self.log_file = Some(
tokio::fs::OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&log_file_path)
.await?,
);
debug!(sl!(), "policy: log file: {}", log_file_path);
}
// Check if policy file has been set via AgentConfig
// If empty, use default file.
let mut default_policy_file = default_policy_file;
if default_policy_file.is_empty() {
default_policy_file = POLICY_DEFAULT_FILE.to_string();
}
info!(sl!(), "default policy: {default_policy_file}");
self.engine.add_policy_from_file(default_policy_file)?;
self.update_allow_failures_flag().await?;
Ok(())
}
async fn apply_patch_to_state(&mut self, patch: json_patch::Patch) -> Result<()> {
// Convert the current engine data to a JSON value
let mut state = serde_json::to_value(self.engine.get_data())?;
// Apply the patch to the state
json_patch::patch(&mut state, &patch)?;
// Clear the existing data in the engine
self.engine.clear_data();
// Add the patched state back to the engine
self.engine
.add_data(regorus::Value::from_json_str(&state.to_string())?)?;
Ok(())
}
/// Ask regorus if an API call should be allowed or not.
pub async fn allow_request(&mut self, ep: &str, ep_input: &str) -> Result<(bool, String)> {
debug!(sl!(), "policy check: {ep}");
self.log_eval_input(ep, ep_input).await;
let query = format!("data.agent_policy.{ep}");
self.engine.set_input_json(ep_input)?;
let results = self.engine.eval_query(query, false)?;
let prints = match self.engine.take_prints() {
Ok(p) => p.join(" "),
Err(e) => format!("Failed to get policy log: {e}"),
};
if results.result.len() != 1 {
// Results are empty when AllowRequestsFailingPolicy is used to allow a Request that hasn't been defined in the policy
if self.allow_failures {
return Ok((true, prints));
}
bail!(
"policy check: unexpected eval_query result len {:?}",
results
);
}
if results.result[0].expressions.len() != 1 {
bail!(
"policy check: unexpected eval_query result expressions {:?}",
results
);
}
let mut allow = match &results.result[0].expressions[0].value {
regorus::Value::Bool(b) => *b,
// Match against a specific variant that could be interpreted as MetadataResponse
regorus::Value::Object(obj) => {
let json_str = serde_json::to_string(obj)?;
self.log_eval_input(ep, &json_str).await;
let metadata_response: MetadataResponse = serde_json::from_str(&json_str)?;
if metadata_response.allowed {
if let Some(ops) = metadata_response.ops {
self.apply_patch_to_state(ops).await?;
}
}
metadata_response.allowed
}
_ => {
error!(sl!(), "allow_request: unexpected eval_query result type");
bail!(
"policy check: unexpected eval_query result type {:?}",
results
);
}
};
if !allow && self.allow_failures {
warn!(sl!(), "policy: ignoring error for {ep}");
allow = true;
}
Ok((allow, prints))
}
/// Replace the Policy in regorus.
pub async fn set_policy(&mut self, policy: &str) -> Result<()> {
self.engine = Self::new_engine();
self.engine
.add_policy("agent_policy".to_string(), policy.to_string())?;
self.update_allow_failures_flag().await?;
Ok(())
}
async fn log_eval_input(&mut self, ep: &str, input: &str) {
if let Some(log_file) = &mut self.log_file {
match ep {
"StatsContainerRequest" | "ReadStreamRequest" | "SetPolicyRequest" => {
// - StatsContainerRequest and ReadStreamRequest are called
// relatively often, so we're not logging them, to avoid
// growing this log file too much.
// - Confidential Containers Policy documents are relatively
// large, so we're not logging them here, for SetPolicyRequest.
// The Policy text can be obtained directly from the pod YAML.
}
_ => {
let log_entry = format!("[\"ep\":\"{ep}\",{input}],\n\n");
if let Err(e) = log_file.write_all(log_entry.as_bytes()).await {
warn!(sl!(), "policy: log_eval_input: write_all failed: {}", e);
} else if let Err(e) = log_file.flush().await {
warn!(sl!(), "policy: log_eval_input: flush failed: {}", e);
}
}
}
}
}
async fn update_allow_failures_flag(&mut self) -> Result<()> {
self.allow_failures = match self.allow_request("AllowRequestsFailingPolicy", "{}").await {
Ok((allowed, _prints)) => {
if allowed {
warn!(
sl!(),
"policy: AllowRequestsFailingPolicy is enabled - will ignore errors"
);
}
allowed
}
Err(_) => false,
};
Ok(())
}
}

View File

@ -134,7 +134,7 @@ lazy_static! {
#[cfg(feature = "agent-policy")]
lazy_static! {
static ref AGENT_POLICY: Mutex<policy::AgentPolicy> = Mutex::new(AgentPolicy::new());
static ref AGENT_POLICY: Mutex<AgentPolicy> = Mutex::new(AgentPolicy::new());
}
#[derive(Parser)]
@ -633,7 +633,15 @@ fn init_agent_as_init(logger: &Logger, unified_cgroup_hierarchy: bool) -> Result
#[cfg(feature = "agent-policy")]
async fn initialize_policy() -> Result<()> {
AGENT_POLICY.lock().await.initialize().await
AGENT_POLICY
.lock()
.await
.initialize(
AGENT_CONFIG.log_level.as_usize(),
AGENT_CONFIG.policy_file.clone(),
None,
)
.await
}
// The Rust standard library had suppressed the default SIGPIPE behavior,
@ -651,7 +659,7 @@ use crate::config::AgentConfig;
use std::os::unix::io::{FromRawFd, RawFd};
#[cfg(feature = "agent-policy")]
use crate::policy::AgentPolicy;
use kata_agent_policy::policy::AgentPolicy;
#[cfg(test)]
mod tests {

View File

@ -3,22 +3,11 @@
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::{bail, Result};
use protobuf::MessageDyn;
use tokio::io::AsyncWriteExt;
use crate::rpc::ttrpc_error;
use crate::{AGENT_CONFIG, AGENT_POLICY};
static POLICY_LOG_FILE: &str = "/tmp/policy.txt";
static POLICY_DEFAULT_FILE: &str = "/etc/kata-opa/default-policy.rego";
/// Convenience macro to obtain the scope logger
macro_rules! sl {
() => {
slog_scope::logger()
};
}
use crate::AGENT_POLICY;
use kata_agent_policy::policy::AgentPolicy;
async fn allow_request(policy: &mut AgentPolicy, ep: &str, request: &str) -> ttrpc::Result<()> {
match policy.allow_request(ep, request).await {
@ -54,213 +43,3 @@ pub async fn do_set_policy(req: &protocols::agent::SetPolicyRequest) -> ttrpc::R
.await
.map_err(|e| ttrpc_error(ttrpc::Code::INVALID_ARGUMENT, e))
}
/// Singleton policy object.
#[derive(Debug, Default)]
pub struct AgentPolicy {
/// When true policy errors are ignored, for debug purposes.
allow_failures: bool,
/// "/tmp/policy.txt" log file for policy activity.
log_file: Option<tokio::fs::File>,
/// Regorus engine
engine: regorus::Engine,
}
#[derive(serde::Deserialize, Debug)]
struct MetadataResponse {
allowed: bool,
ops: Option<json_patch::Patch>,
}
impl AgentPolicy {
/// Create AgentPolicy object.
pub fn new() -> Self {
Self {
allow_failures: false,
engine: Self::new_engine(),
..Default::default()
}
}
fn new_engine() -> regorus::Engine {
let mut engine = regorus::Engine::new();
engine.set_strict_builtin_errors(false);
engine.set_gather_prints(true);
// assign a slice of the engine data "pstate" to be used as policy state
engine
.add_data(
regorus::Value::from_json_str(
r#"{
"pstate": {}
}"#,
)
.unwrap(),
)
.unwrap();
engine
}
/// Initialize regorus.
pub async fn initialize(&mut self) -> Result<()> {
if AGENT_CONFIG.log_level.as_usize() >= slog::Level::Debug.as_usize() {
self.log_file = Some(
tokio::fs::OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(POLICY_LOG_FILE)
.await?,
);
debug!(sl!(), "policy: log file: {}", POLICY_LOG_FILE);
}
// Check if policy file has been set via AgentConfig
// If empty, use default file.
let mut default_policy_file = AGENT_CONFIG.policy_file.clone();
if default_policy_file.is_empty() {
default_policy_file = POLICY_DEFAULT_FILE.to_string();
}
info!(sl!(), "default policy: {default_policy_file}");
self.engine.add_policy_from_file(default_policy_file)?;
self.update_allow_failures_flag().await?;
Ok(())
}
async fn apply_patch_to_state(&mut self, patch: json_patch::Patch) -> Result<()> {
// Convert the current engine data to a JSON value
let mut state = serde_json::to_value(self.engine.get_data())?;
// Apply the patch to the state
json_patch::patch(&mut state, &patch)?;
// Clear the existing data in the engine
self.engine.clear_data();
// Add the patched state back to the engine
self.engine
.add_data(regorus::Value::from_json_str(&state.to_string())?)?;
Ok(())
}
/// Ask regorus if an API call should be allowed or not.
async fn allow_request(&mut self, ep: &str, ep_input: &str) -> Result<(bool, String)> {
debug!(sl!(), "policy check: {ep}");
self.log_eval_input(ep, ep_input).await;
let query = format!("data.agent_policy.{ep}");
self.engine.set_input_json(ep_input)?;
let results = self.engine.eval_query(query, false)?;
let prints = match self.engine.take_prints() {
Ok(p) => p.join(" "),
Err(e) => format!("Failed to get policy log: {e}"),
};
if results.result.len() != 1 {
// Results are empty when AllowRequestsFailingPolicy is used to allow a Request that hasn't been defined in the policy
if self.allow_failures {
return Ok((true, prints));
}
bail!(
"policy check: unexpected eval_query result len {:?}",
results
);
}
if results.result[0].expressions.len() != 1 {
bail!(
"policy check: unexpected eval_query result expressions {:?}",
results
);
}
let mut allow = match &results.result[0].expressions[0].value {
regorus::Value::Bool(b) => *b,
// Match against a specific variant that could be interpreted as MetadataResponse
regorus::Value::Object(obj) => {
let json_str = serde_json::to_string(obj)?;
self.log_eval_input(ep, &json_str).await;
let metadata_response: MetadataResponse = serde_json::from_str(&json_str)?;
if metadata_response.allowed {
if let Some(ops) = metadata_response.ops {
self.apply_patch_to_state(ops).await?;
}
}
metadata_response.allowed
}
_ => {
error!(sl!(), "allow_request: unexpected eval_query result type");
bail!(
"policy check: unexpected eval_query result type {:?}",
results
);
}
};
if !allow && self.allow_failures {
warn!(sl!(), "policy: ignoring error for {ep}");
allow = true;
}
Ok((allow, prints))
}
/// Replace the Policy in regorus.
pub async fn set_policy(&mut self, policy: &str) -> Result<()> {
self.engine = Self::new_engine();
self.engine
.add_policy("agent_policy".to_string(), policy.to_string())?;
self.update_allow_failures_flag().await?;
Ok(())
}
async fn log_eval_input(&mut self, ep: &str, input: &str) {
if let Some(log_file) = &mut self.log_file {
match ep {
"StatsContainerRequest" | "ReadStreamRequest" | "SetPolicyRequest" => {
// - StatsContainerRequest and ReadStreamRequest are called
// relatively often, so we're not logging them, to avoid
// growing this log file too much.
// - Confidential Containers Policy documents are relatively
// large, so we're not logging them here, for SetPolicyRequest.
// The Policy text can be obtained directly from the pod YAML.
}
_ => {
let log_entry = format!("[\"ep\":\"{ep}\",{input}],\n\n");
if let Err(e) = log_file.write_all(log_entry.as_bytes()).await {
warn!(sl!(), "policy: log_eval_input: write_all failed: {}", e);
} else if let Err(e) = log_file.flush().await {
warn!(sl!(), "policy: log_eval_input: flush failed: {}", e);
}
}
}
}
}
async fn update_allow_failures_flag(&mut self) -> Result<()> {
self.allow_failures = match self.allow_request("AllowRequestsFailingPolicy", "{}").await {
Ok((allowed, _prints)) => {
if allowed {
warn!(
sl!(),
"policy: AllowRequestsFailingPolicy is enabled - will ignore errors"
);
}
allowed
}
Err(_) => false,
};
Ok(())
}
}