genpolicy: validate container sandbox names

Make sure all container sandbox names match the sandbox name of the first container.

Signed-off-by: Saul Paredes <saulparedes@microsoft.com>
This commit is contained in:
Saul Paredes 2024-10-07 16:42:04 -07:00
parent 52d1aea1f7
commit c207312260

View File

@ -51,7 +51,7 @@ default WriteStreamRequest := false
# them and inspect OPA logs for the root cause of a failure.
default AllowRequestsFailingPolicy := false
CreateContainerRequest {
CreateContainerRequest:= {"ops": ops, "allowed": true} {
# Check if the input request should be rejected even before checking the
# policy_data.containers information.
allow_create_container_input
@ -60,6 +60,14 @@ CreateContainerRequest {
i_storages := input.storages
i_devices := input.devices
# array of possible state operations
ops_builder := []
# check sandbox name
sandbox_name = i_oci.Annotations["io.kubernetes.cri.sandbox-name"]
add_sandbox_name_to_state := state_allows("sandbox_name", sandbox_name)
ops := concat_op_if_not_null(ops_builder, add_sandbox_name_to_state)
# Check if any element from the policy_data.containers array allows the input request.
some p_container in policy_data.containers
print("======== CreateContainerRequest: trying next policy container")
@ -121,6 +129,47 @@ allow_create_container_input {
print("allow_create_container_input: true")
}
# value hasn't been seen before, save it to state
state_allows(key, value) = action {
state := get_state()
not state[key]
print("state_allows: saving to state key =", key, "value =", value)
path := get_state_path(key)
action := {
"op": "add",
"path": path,
"value": value,
}
}
# value matches what's in state, allow it
state_allows(key, value) = action {
state := get_state()
value == state[key]
print("state_allows: found key =", key, "value =", value, " in state")
action := null
}
# helper functions to interact with the state
get_state() = state {
state := data["pstate"]
}
get_state_path(key) = path {
path := concat("/", ["", key]) # prepend "/" to key
}
# Helper functions to conditionally concatenate if op is not null
concat_op_if_not_null(ops, op) = result {
op == null
result := ops
}
concat_op_if_not_null(ops, op) = result {
op != null
result := array.concat(ops, [op])
}
# Reject unexpected annotations.
allow_anno(p_oci, i_oci) {
print("allow_anno 1: start")