mirror of
https://github.com/falcosecurity/falco.git
synced 2025-10-22 12:27:10 +00:00
Pass the travis branch to run_regression_tests.sh. When downloading trace files, first look for a file traces-XXX-$BRANCH and if found download it. This allows testing out a set of changes with a trace file specifically for that branch, that can be moved to the normal file once the PR is merged. Also increase the timeout for the spawned falco process from 1 to 3 minutes. In debug mode, the kubernetes demo was taking slightly over 1 minute.
90 lines
3.4 KiB
Python
90 lines
3.4 KiB
Python
#!/usr/bin/env python
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
|
|
from avocado import Test
|
|
from avocado.utils import process
|
|
from avocado.utils import linux_modules
|
|
|
|
class FalcoTest(Test):
|
|
|
|
def setUp(self):
|
|
"""
|
|
Load the sysdig kernel module if not already loaded.
|
|
"""
|
|
self.falcodir = self.params.get('falcodir', '/', default=os.path.join(self.basedir, '../build'))
|
|
|
|
self.should_detect = self.params.get('detect', '*')
|
|
self.trace_file = self.params.get('trace_file', '*')
|
|
self.json_output = self.params.get('json_output', '*')
|
|
|
|
if self.should_detect:
|
|
self.detect_level = self.params.get('detect_level', '*')
|
|
|
|
# Doing this in 2 steps instead of simply using
|
|
# module_is_loaded to avoid logging lsmod output to the log.
|
|
lsmod_output = process.system_output("lsmod", verbose=False)
|
|
|
|
if linux_modules.parse_lsmod_for_module(lsmod_output, 'sysdig_probe') == {}:
|
|
self.log.debug("Loading sysdig kernel module")
|
|
process.run('sudo insmod {}/driver/sysdig-probe.ko'.format(self.falcodir))
|
|
|
|
self.str_variant = self.trace_file
|
|
|
|
def test(self):
|
|
self.log.info("Trace file %s", self.trace_file)
|
|
|
|
# Run the provided trace file though falco
|
|
cmd = '{}/userspace/falco/falco -r {}/../rules/falco_rules.yaml -c {}/../falco.yaml -e {} -o json_output={}'.format(
|
|
self.falcodir, self.falcodir, self.falcodir, self.trace_file, self.json_output)
|
|
|
|
self.falco_proc = process.SubProcess(cmd)
|
|
|
|
res = self.falco_proc.run(timeout=180, sig=9)
|
|
|
|
if res.exit_status != 0:
|
|
self.error("Falco command \"{}\" exited with non-zero return value {}".format(
|
|
cmd, res.exit_status))
|
|
|
|
# Get the number of events detected.
|
|
match = re.search('Events detected: (\d+)', res.stdout)
|
|
if match is None:
|
|
self.fail("Could not find a line 'Events detected: <count>' in falco output")
|
|
|
|
events_detected = int(match.group(1))
|
|
|
|
if not self.should_detect and events_detected > 0:
|
|
self.fail("Detected {} events when should have detected none".format(events_detected))
|
|
|
|
if self.should_detect:
|
|
if events_detected == 0:
|
|
self.fail("Detected {} events when should have detected > 0".format(events_detected))
|
|
|
|
level_line = '{}: (\d+)'.format(self.detect_level)
|
|
match = re.search(level_line, res.stdout)
|
|
|
|
if match is None:
|
|
self.fail("Could not find a line '{}: <count>' in falco output".format(self.detect_level))
|
|
|
|
events_detected = int(match.group(1))
|
|
|
|
if not events_detected > 0:
|
|
self.fail("Detected {} events at level {} when should have detected > 0".format(events_detected, self.detect_level))
|
|
|
|
if self.json_output:
|
|
# Just verify that any lines starting with '{' are valid json objects.
|
|
# Doesn't do any deep inspection of the contents.
|
|
for line in res.stdout.splitlines():
|
|
if line.startswith('{'):
|
|
obj = json.loads(line)
|
|
for attr in ['time', 'rule', 'priority', 'output']:
|
|
if not attr in obj:
|
|
self.fail("Falco JSON object {} does not contain property \"{}\"".format(line, attr))
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|