new(test): add "output_strictly_contains" option

Signed-off-by: Leonardo Grasso <me@leonardograsso.com>
This commit is contained in:
Leonardo Grasso 2020-10-09 12:44:23 +02:00 committed by poiana
parent 8b56360f8c
commit b0942f8774

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python
#
# Copyright (C) 2019 The Falco Authors.
#
@ -31,6 +31,7 @@ from avocado.utils import process
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
class FalcoTest(Test):
def setUp(self):
@ -49,17 +50,20 @@ class FalcoTest(Test):
self.stdout_is = self.params.get('stdout_is', '*', default='')
self.stderr_is = self.params.get('stderr_is', '*', default='')
self.stdout_contains = self.params.get('stdout_contains', '*', default='')
self.stdout_contains = self.params.get(
'stdout_contains', '*', default='')
if not isinstance(self.stdout_contains, list):
self.stdout_contains = [self.stdout_contains]
self.stderr_contains = self.params.get('stderr_contains', '*', default='')
self.stderr_contains = self.params.get(
'stderr_contains', '*', default='')
if not isinstance(self.stderr_contains, list):
self.stderr_contains = [self.stderr_contains]
self.stdout_not_contains = self.params.get('stdout_not_contains', '*', default='')
self.stdout_not_contains = self.params.get(
'stdout_not_contains', '*', default='')
if not isinstance(self.stdout_not_contains, list):
if self.stdout_not_contains == '':
@ -67,7 +71,8 @@ class FalcoTest(Test):
else:
self.stdout_not_contains = [self.stdout_not_contains]
self.stderr_not_contains = self.params.get('stderr_not_contains', '*', default='')
self.stderr_not_contains = self.params.get(
'stderr_not_contains', '*', default='')
if not isinstance(self.stderr_not_contains, list):
if self.stderr_not_contains == '':
@ -83,15 +88,18 @@ class FalcoTest(Test):
self.trace_file = os.path.join(build_dir, "test", self.trace_file)
self.json_output = self.params.get('json_output', '*', default=False)
self.json_include_output_property = self.params.get('json_include_output_property', '*', default=True)
self.json_include_output_property = self.params.get(
'json_include_output_property', '*', default=True)
self.all_events = self.params.get('all_events', '*', default=False)
self.priority = self.params.get('priority', '*', default='debug')
self.rules_file = self.params.get('rules_file', '*', default=os.path.join(self.basedir, '../rules/falco_rules.yaml'))
self.rules_file = self.params.get(
'rules_file', '*', default=os.path.join(self.basedir, '../rules/falco_rules.yaml'))
if not isinstance(self.rules_file, list):
self.rules_file = [self.rules_file]
self.validate_rules_file = self.params.get('validate_rules_file', '*', default=False)
self.validate_rules_file = self.params.get(
'validate_rules_file', '*', default=False)
if self.validate_rules_file == False:
self.validate_rules_file = []
@ -118,13 +126,15 @@ class FalcoTest(Test):
file = os.path.join(self.basedir, file)
self.rules_args = self.rules_args + "-r " + file + " "
self.conf_file = self.params.get('conf_file', '*', default=os.path.join(self.basedir, '../falco.yaml'))
self.conf_file = self.params.get(
'conf_file', '*', default=os.path.join(self.basedir, '../falco.yaml'))
if not os.path.isabs(self.conf_file):
self.conf_file = os.path.join(self.basedir, self.conf_file)
self.run_duration = self.params.get('run_duration', '*', default='')
self.disabled_rules = self.params.get('disabled_rules', '*', default='')
self.disabled_rules = self.params.get(
'disabled_rules', '*', default='')
if self.disabled_rules == '':
self.disabled_rules = []
@ -137,7 +147,8 @@ class FalcoTest(Test):
for rule in self.disabled_rules:
self.disabled_args = self.disabled_args + "-D " + rule + " "
self.detect_counts = self.params.get('detect_counts', '*', default=False)
self.detect_counts = self.params.get(
'detect_counts', '*', default=False)
if self.detect_counts == False:
self.detect_counts = {}
else:
@ -147,7 +158,8 @@ class FalcoTest(Test):
detect_counts[key] = value
self.detect_counts = detect_counts
self.rules_warning = self.params.get('rules_warning', '*', default=False)
self.rules_warning = self.params.get(
'rules_warning', '*', default=False)
if self.rules_warning == False:
self.rules_warning = set()
else:
@ -172,9 +184,11 @@ class FalcoTest(Test):
self.package = self.params.get('package', '*', default='None')
self.addl_docker_run_args = self.params.get('addl_docker_run_args', '*', default='')
self.addl_docker_run_args = self.params.get(
'addl_docker_run_args', '*', default='')
self.copy_local_driver = self.params.get('copy_local_driver', '*', default=False)
self.copy_local_driver = self.params.get(
'copy_local_driver', '*', default=False)
# Used by possibly_copy_local_driver as well as docker run
self.module_dir = os.path.expanduser("~/.falco")
@ -197,9 +211,29 @@ class FalcoTest(Test):
os.makedirs(filedir)
self.outputs = outputs
self.output_strictly_contains = self.params.get(
'output_strictly_contains', '*', default='')
if self.output_strictly_contains == '':
self.output_strictly_contains = {}
else:
output_strictly_contains = []
for item in self.output_strictly_contains:
for key, value in list(item.items()):
output = {}
output['actual'] = key
output['expected'] = value
output_strictly_contains.append(output)
filedir = os.path.dirname(output['expected'])
# Create the parent directory for the file if it doesn't exist.
if not os.path.isdir(filedir):
os.makedirs(filedir)
self.output_strictly_contains = output_strictly_contains
self.grpcurl_res = None
self.grpc_observer = None
self.grpc_address = self.params.get('address', 'grpc/*', default='/var/run/falco.sock')
self.grpc_address = self.params.get(
'address', 'grpc/*', default='/var/run/falco.sock')
if self.grpc_address.startswith("unix://"):
self.is_grpc_using_unix_socket = True
self.grpc_address = self.grpc_address[len("unix://"):]
@ -218,14 +252,15 @@ class FalcoTest(Test):
self.disable_tags = self.params.get('disable_tags', '*', default='')
if self.disable_tags == '':
self.disable_tags=[]
self.disable_tags = []
self.run_tags = self.params.get('run_tags', '*', default='')
if self.run_tags == '':
self.run_tags=[]
self.run_tags = []
self.time_iso_8601 = self.params.get('time_iso_8601', '*', default=False)
self.time_iso_8601 = self.params.get(
'time_iso_8601', '*', default=False)
def tearDown(self):
if self.package != 'None':
@ -244,7 +279,8 @@ class FalcoTest(Test):
self.log.debug("Actual warning rules: {}".format(found_warning))
if found_warning != self.rules_warning:
self.fail("Expected rules with warnings {} does not match actual rules with warnings {}".format(self.rules_warning, found_warning))
self.fail("Expected rules with warnings {} does not match actual rules with warnings {}".format(
self.rules_warning, found_warning))
def check_rules_events(self, res):
@ -255,50 +291,60 @@ class FalcoTest(Test):
events = set(match.group(2).split(","))
found_events[rule] = events
self.log.debug("Expected events for rules: {}".format(self.rules_events))
self.log.debug(
"Expected events for rules: {}".format(self.rules_events))
self.log.debug("Actual events for rules: {}".format(found_events))
for rule in list(found_events.keys()):
if found_events.get(rule) != self.rules_events.get(rule):
self.fail("rule {}: expected events {} differs from actual events {}".format(rule, self.rules_events.get(rule), found_events.get(rule)))
self.fail("rule {}: expected events {} differs from actual events {}".format(
rule, self.rules_events.get(rule), found_events.get(rule)))
def check_detections(self, res):
# Get the number of events detected.
match = re.search('Events detected: (\d+)', res.stdout.decode("utf-8"))
if match is None:
self.fail("Could not find a line 'Events detected: <count>' in falco output")
self.fail(
"Could not find a line 'Events detected: <count>' in falco output")
events_detected = int(match.group(1))
if not self.should_detect and events_detected > 0:
self.fail("Detected {} events when should have detected none".format(events_detected))
self.fail("Detected {} events when should have detected none".format(
events_detected))
if self.should_detect:
if events_detected == 0:
self.fail("Detected {} events when should have detected > 0".format(events_detected))
self.fail("Detected {} events when should have detected > 0".format(
events_detected))
for level in self.detect_level:
level_line = '(?i){}: (\d+)'.format(level)
match = re.search(level_line, res.stdout.decode("utf-8"))
if match is None:
self.fail("Could not find a line '{}: <count>' in falco output".format(level))
self.fail(
"Could not find a line '{}: <count>' in falco output".format(level))
events_detected = int(match.group(1))
if not events_detected > 0:
self.fail("Detected {} events at level {} when should have detected > 0".format(events_detected, level))
self.fail("Detected {} events at level {} when should have detected > 0".format(
events_detected, level))
def check_detections_by_rule(self, res):
# Get the number of events detected for each rule. Must match the expected counts.
match = re.search('Triggered rules by rule name:(.*)', res.stdout.decode("utf-8"), re.DOTALL)
match = re.search('Triggered rules by rule name:(.*)',
res.stdout.decode("utf-8"), re.DOTALL)
if match is None:
self.fail("Could not find a block 'Triggered rules by rule name: ...' in falco output")
self.fail(
"Could not find a block 'Triggered rules by rule name: ...' in falco output")
triggered_rules = match.group(1)
for rule, count in list(self.detect_counts.items()):
expected = '\s{}: (\d+)'.format(re.sub(r'([$\.*+?()[\]{}|^])', r'\\\1', rule))
expected = '\s{}: (\d+)'.format(
re.sub(r'([$\.*+?()[\]{}|^])', r'\\\1', rule))
match = re.search(expected, triggered_rules)
if match is None:
@ -307,9 +353,11 @@ class FalcoTest(Test):
actual_count = int(match.group(1))
if actual_count != count:
self.fail("Different counts for rule {}: expected={}, actual={}".format(rule, count, actual_count))
self.fail("Different counts for rule {}: expected={}, actual={}".format(
rule, count, actual_count))
else:
self.log.debug("Found expected count for rule {}: {}".format(rule, count))
self.log.debug(
"Found expected count for rule {}: {}".format(rule, count))
def check_outputs(self):
for output in self.outputs:
@ -324,7 +372,8 @@ class FalcoTest(Test):
found = True
if found == False:
self.fail("Could not find a line '{}' in file '{}'".format(output['line'], output['file']))
self.fail("Could not find a line '{}' in file '{}'".format(
output['line'], output['file']))
return True
@ -341,7 +390,27 @@ class FalcoTest(Test):
attrs = ['time', 'rule', 'priority']
for attr in attrs:
if not attr in obj:
self.fail("Falco JSON object {} does not contain property \"{}\"".format(line, attr))
self.fail(
"Falco JSON object {} does not contain property \"{}\"".format(line, attr))
def check_output_strictly_contains(self, res):
for output in self.output_strictly_contains:
# Read the expected output (from a file) and actual output (either from a file or the stdout),
# then check if the actual one strictly contains the expected one.
expected = open(output['expected']).read()
if output['actual'] == 'stdout':
actual = res.stdout.decode("utf-8")
else:
actual = open(output['actual']).read()
if expected not in actual:
self.fail("Output '{}' does not strictly contains the expected content '{}'".format(
output['actual'], output['expected']))
return False
return True
def install_package(self):
@ -360,35 +429,39 @@ class FalcoTest(Test):
self.module_dir, self.addl_docker_run_args, image)
elif self.package.endswith(".deb"):
self.falco_binary_path = '/usr/bin/falco';
self.falco_binary_path = '/usr/bin/falco'
package_glob = "{}/{}".format(self.falcodir, self.package)
matches = glob.glob(package_glob)
if len(matches) != 1:
self.fail("Package path {} did not match exactly 1 file. Instead it matched: {}", package_glob, ",".join(matches))
self.fail("Package path {} did not match exactly 1 file. Instead it matched: {}",
package_glob, ",".join(matches))
package_path = matches[0]
cmdline = "dpkg -i {}".format(package_path)
self.log.debug("Installing debian package via \"{}\"".format(cmdline))
self.log.debug(
"Installing debian package via \"{}\"".format(cmdline))
res = process.run(cmdline, timeout=120, sudo=True)
elif self.package.endswith(".rpm"):
self.falco_binary_path = '/usr/bin/falco';
self.falco_binary_path = '/usr/bin/falco'
package_glob = "{}/{}".format(self.falcodir, self.package)
matches = glob.glob(package_glob)
if len(matches) != 1:
self.fail("Package path {} did not match exactly 1 file. Instead it matched: {}", package_glob, ",".join(matches))
self.fail("Package path {} did not match exactly 1 file. Instead it matched: {}",
package_glob, ",".join(matches))
package_path = matches[0]
cmdline = "rpm -i --nodeps --noscripts {}".format(package_path)
self.log.debug("Installing centos package via \"{}\"".format(cmdline))
self.log.debug(
"Installing centos package via \"{}\"".format(cmdline))
res = process.run(cmdline, timeout=120, sudo=True)
def uninstall_package(self):
@ -398,25 +471,29 @@ class FalcoTest(Test):
elif self.package.endswith(".rpm"):
cmdline = "rpm -e --noscripts --nodeps falco"
self.log.debug("Uninstalling centos package via \"{}\"".format(cmdline))
self.log.debug(
"Uninstalling centos package via \"{}\"".format(cmdline))
res = process.run(cmdline, timeout=120, sudo=True)
elif self.package.endswith(".deb"):
cmdline = "dpkg --purge falco"
self.log.debug("Uninstalling debian package via \"{}\"".format(cmdline))
self.log.debug(
"Uninstalling debian package via \"{}\"".format(cmdline))
res = process.run(cmdline, timeout=120, sudo=True)
def possibly_copy_driver(self):
# Remove the contents of ~/.falco regardless of copy_local_driver.
self.log.debug("Checking for module dir {}".format(self.module_dir))
if os.path.isdir(self.module_dir):
self.log.info("Removing files below directory {}".format(self.module_dir))
self.log.info(
"Removing files below directory {}".format(self.module_dir))
for rmfile in glob.glob(self.module_dir + "/*"):
self.log.debug("Removing file {}".format(rmfile))
os.remove(rmfile)
if self.copy_local_driver:
verlines = [str.strip() for str in subprocess.check_output([self.falco_binary_path, "--version"]).splitlines()]
verlines = [str.strip() for str in subprocess.check_output(
[self.falco_binary_path, "--version"]).splitlines()]
verstr = verlines[0].decode("utf-8")
self.log.info("verstr {}".format(verstr))
falco_version = verstr.split(" ")[2]
@ -428,10 +505,12 @@ class FalcoTest(Test):
# falco-driver-loader has a more comprehensive set of ways to
# find the config hash. We only look at /boot/config-<kernel release>
md5_output = subprocess.check_output(["md5sum", "/boot/config-{}".format(kernel_release)]).rstrip()
md5_output = subprocess.check_output(
["md5sum", "/boot/config-{}".format(kernel_release)]).rstrip()
config_hash = md5_output.split(" ")[0]
probe_filename = "falco-{}-{}-{}-{}.ko".format(falco_version, arch, kernel_release, config_hash)
probe_filename = "falco-{}-{}-{}-{}.ko".format(
falco_version, arch, kernel_release, config_hash)
driver_path = os.path.join(self.falcodir, "driver", "falco.ko")
module_path = os.path.join(self.module_dir, probe_filename)
self.log.debug("Copying {} to {}".format(driver_path, module_path))
@ -444,9 +523,11 @@ class FalcoTest(Test):
self.fail("This test suite supports gRPC with unix socket only")
cmdline = "grpcurl -import-path ../userspace/falco " \
"-proto {} -plaintext -unix {} " \
"{}/{}".format(self.grpc_proto, self.grpc_address, self.grpc_service, self.grpc_method)
"-proto {} -plaintext -unix {} " \
"{}/{}".format(self.grpc_proto, self.grpc_address,
self.grpc_service, self.grpc_method)
that = self
class GRPCUnixSocketEventHandler(PatternMatchingEventHandler):
def on_created(self, event):
# that.log.info("EVENT: {}", event)
@ -455,7 +536,7 @@ class FalcoTest(Test):
path = os.path.dirname(self.grpc_address)
process.run("mkdir -p {}".format(path))
event_handler = GRPCUnixSocketEventHandler(patterns=['*'],
ignore_directories=True)
ignore_directories=True)
self.grpc_observer = Observer()
self.grpc_observer.schedule(event_handler, path, recursive=False)
self.grpc_observer.start()
@ -476,13 +557,14 @@ class FalcoTest(Test):
found = True
if found == False:
self.fail("Could not find a line '{}' in gRPC responses".format(exp_result))
self.fail(
"Could not find a line '{}' in gRPC responses".format(exp_result))
def test(self):
self.log.info("Trace file %s", self.trace_file)
self.falco_binary_path = '{}/userspace/falco/falco'.format(self.falcodir)
self.falco_binary_path = '{}/userspace/falco/falco'.format(
self.falcodir)
self.possibly_copy_driver()
@ -501,9 +583,11 @@ class FalcoTest(Test):
if self.psp_file != "":
if not os.path.isfile(self.psp_conv_path):
self.log.info("Downloading {} to {}".format(self.psp_conv_url, self.psp_conv_path))
self.log.info("Downloading {} to {}".format(
self.psp_conv_url, self.psp_conv_path))
urllib.request.urlretrieve(self.psp_conv_url, self.psp_conv_path)
urllib.request.urlretrieve(
self.psp_conv_url, self.psp_conv_path)
os.chmod(self.psp_conv_path, stat.S_IEXEC)
conv_cmd = '{} convert psp --psp-path {} --rules-path {}'.format(
@ -521,7 +605,6 @@ class FalcoTest(Test):
psp_rules = myfile.read()
self.log.debug("Converted Rules: {}".format(psp_rules))
# Run falco
cmd = '{} {} {} -c {} {} -o json_output={} -o json_include_output_property={} -o priority={} -v'.format(
self.falco_binary_path, self.rules_args, self.disabled_args, self.conf_file, trace_arg, self.json_output, self.json_include_output_property, self.priority)
@ -557,22 +640,26 @@ class FalcoTest(Test):
for pattern in self.stderr_contains:
match = re.search(pattern, res.stderr.decode("utf-8"))
if match is None:
self.fail("Stderr of falco process did not contain content matching {}".format(pattern))
self.fail(
"Stderr of falco process did not contain content matching {}".format(pattern))
for pattern in self.stdout_contains:
match = re.search(pattern, res.stdout.decode("utf-8"))
if match is None:
self.fail("Stdout of falco process '{}' did not contain content matching {}".format(res.stdout.decode("utf-8"), pattern))
self.fail("Stdout of falco process '{}' did not contain content matching {}".format(
res.stdout.decode("utf-8"), pattern))
for pattern in self.stderr_not_contains:
match = re.search(pattern, res.stderr.decode("utf-8"))
if match is not None:
self.fail("Stderr of falco process contained content matching {} when it should have not".format(pattern))
self.fail(
"Stderr of falco process contained content matching {} when it should have not".format(pattern))
for pattern in self.stdout_not_contains:
match = re.search(pattern, res.stdout.decode("utf-8"))
if match is not None:
self.fail("Stdout of falco process '{}' did contain content matching {} when it should have not".format(res.stdout.decode("utf-8"), pattern))
self.fail("Stdout of falco process '{}' did contain content matching {} when it should have not".format(
res.stdout.decode("utf-8"), pattern))
if res.exit_status != self.exit_status:
self.error("Falco command \"{}\" exited with unexpected return value {} (!= {})".format(
@ -590,6 +677,7 @@ class FalcoTest(Test):
self.check_detections_by_rule(res)
self.check_json_output(res)
self.check_outputs()
self.check_output_strictly_contains(res)
self.check_grpc()
pass