Add tests for event type rule identification

Add tests that verify that the event type identification functionality
is working. Notable changes:

 - Modify falco_test.py to additionally check for warnings when loading
   any set of rules and verify that the event types for each rule match
   expected values. This is controlled by the new multiplex fields
   "rules_warning" and "rules_events".

 - Instead of starting with an empty falco_tests.yaml from scratch from
   the downloaded trace files, use a checked-in version which defines
   two tests:
     - Loading the checked-in falco_rules.yaml and verify that no rules
       have warnings.
     - A sample falco_rules_warnings.yaml that has ~30 different
       mutations of rule filtering expressions. The test verifies for each
       rule whether or not the rule should result in a warning and what the
       extracted event types are.
   The generated tests from the trace files are appended to this file.

- Add an empty .scap file to use with the above tests.
This commit is contained in:
Mark Stemm
2016-07-13 18:42:34 -07:00
parent ddedf595ba
commit 7b68fc2692
5 changed files with 326 additions and 13 deletions

View File

@@ -3,6 +3,7 @@
import os
import re
import json
import sets
from avocado import Test
from avocado.utils import process
@@ -16,9 +17,34 @@ class FalcoTest(Test):
"""
self.falcodir = self.params.get('falcodir', '/', default=os.path.join(self.basedir, '../build'))
self.should_detect = self.params.get('detect', '*')
self.should_detect = self.params.get('detect', '*', default=False)
self.trace_file = self.params.get('trace_file', '*')
self.json_output = self.params.get('json_output', '*')
if not os.path.isabs(self.trace_file):
self.trace_file = os.path.join(self.basedir, self.trace_file)
self.json_output = self.params.get('json_output', '*', default=False)
self.rules_file = self.params.get('rules_file', '*', default=os.path.join(self.basedir, '../rules/falco_rules.yaml'))
if not os.path.isabs(self.rules_file):
self.rules_file = os.path.join(self.basedir, self.rules_file)
self.rules_warning = self.params.get('rules_warning', '*', default=False)
if self.rules_warning == False:
self.rules_warning = sets.Set()
else:
self.rules_warning = sets.Set(self.rules_warning)
# Maps from rule name to set of evttypes
self.rules_events = self.params.get('rules_events', '*', default=False)
if self.rules_events == False:
self.rules_events = {}
else:
events = {}
for item in self.rules_events:
for item2 in item:
events[item2[0]] = sets.Set(item2[1])
self.rules_events = events
if self.should_detect:
self.detect_level = self.params.get('detect_level', '*')
@@ -33,21 +59,38 @@ class FalcoTest(Test):
self.str_variant = self.trace_file
def test(self):
self.log.info("Trace file %s", self.trace_file)
def check_rules_warnings(self, res):
# Run the provided trace file though falco
cmd = '{}/userspace/falco/falco -r {}/../rules/falco_rules.yaml -c {}/../falco.yaml -e {} -o json_output={}'.format(
self.falcodir, self.falcodir, self.falcodir, self.trace_file, self.json_output)
found_warning = sets.Set()
self.falco_proc = process.SubProcess(cmd)
for match in re.finditer('Rule ([^:]+): warning \(([^)]+)\):', res.stderr):
rule = match.group(1)
warning = match.group(2)
found_warning.add(rule)
res = self.falco_proc.run(timeout=180, sig=9)
self.log.debug("Expected warning rules: {}".format(self.rules_warning))
self.log.debug("Actual warning rules: {}".format(found_warning))
if res.exit_status != 0:
self.error("Falco command \"{}\" exited with non-zero return value {}".format(
cmd, res.exit_status))
if found_warning != self.rules_warning:
self.fail("Expected rules with warnings {} does not match actual rules with warnings {}".format(self.rules_warning, found_warning))
def check_rules_events(self, res):
found_events = {}
for match in re.finditer('Event types for rule ([^:]+): (\S+)', res.stderr):
rule = match.group(1)
events = sets.Set(match.group(2).split(","))
found_events[rule] = events
self.log.debug("Expected events for rules: {}".format(self.rules_events))
self.log.debug("Actual events for rules: {}".format(found_events))
for rule in found_events.keys():
if found_events.get(rule) != self.rules_events.get(rule):
self.fail("rule {}: expected events {} differs from actual events {}".format(rule, self.rules_events.get(rule), found_events.get(rule)))
def check_detections(self, res):
# Get the number of events detected.
match = re.search('Events detected: (\d+)', res.stdout)
if match is None:
@@ -73,6 +116,7 @@ class FalcoTest(Test):
if not events_detected > 0:
self.fail("Detected {} events at level {} when should have detected > 0".format(events_detected, self.detect_level))
def check_json_output(self, res):
if self.json_output:
# Just verify that any lines starting with '{' are valid json objects.
# Doesn't do any deep inspection of the contents.
@@ -82,6 +126,27 @@ class FalcoTest(Test):
for attr in ['time', 'rule', 'priority', 'output']:
if not attr in obj:
self.fail("Falco JSON object {} does not contain property \"{}\"".format(line, attr))
def test(self):
self.log.info("Trace file %s", self.trace_file)
# Run the provided trace file though falco
cmd = '{}/userspace/falco/falco -r {} -c {}/../falco.yaml -e {} -o json_output={} -v'.format(
self.falcodir, self.rules_file, self.falcodir, self.trace_file, self.json_output)
self.falco_proc = process.SubProcess(cmd)
res = self.falco_proc.run(timeout=180, sig=9)
if res.exit_status != 0:
self.error("Falco command \"{}\" exited with non-zero return value {}".format(
cmd, res.exit_status))
self.check_rules_warnings(res)
if len(self.rules_events) > 0:
self.check_rules_events(res)
self.check_detections(res)
self.check_json_output(res)
pass