Update main.go, messageSensitiveDataCleaner.go, and 6 more files...

This commit is contained in:
RamiBerm
2021-06-01 14:25:52 +03:00
parent 47237f05a5
commit 4bc16fa0b4
8 changed files with 114 additions and 53 deletions

View File

@@ -3,6 +3,7 @@ package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/shared"
"os"
"os/signal"
"regexp"
@@ -26,6 +27,10 @@ const (
var currentlyTappedPods []core.Pod
func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
mizuApiFilteringOptions, err := getMizuApiFilteringOptions(tappingOptions)
if err != nil {
return
}
kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath, tappingOptions.Namespace)
defer cleanUpMizuResources(kubernetesProvider)
@@ -43,7 +48,7 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
return
}
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions, mizuApiFilteringOptions); err != nil {
return
}
@@ -57,8 +62,8 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
// TODO handle incoming traffic from tapper using a channel
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions); err != nil {
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.FilteringOptions) error {
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil {
return err
}
@@ -69,11 +74,11 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return nil
}
func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions) error {
func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.FilteringOptions) error {
var err error
mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider)
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists)
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions)
if err != nil {
fmt.Printf("Error creating mizu collector pod: %v\n", err)
return err
@@ -88,6 +93,24 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr
return nil
}
func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.FilteringOptions, error) {
if tappingOptions.PlainTextFilterRegexes == nil || len(tappingOptions.PlainTextFilterRegexes) == 0 {
return nil, nil
}
compiledRegexSlice := make([]*shared.SerializableRegexp, 0)
for _, regexStr := range tappingOptions.PlainTextFilterRegexes {
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
if err != nil {
fmt.Printf("Regex %s is invalid: %v", regexStr, err)
return nil, err
}
compiledRegexSlice = append(compiledRegexSlice, compiledRegex)
}
return &shared.FilteringOptions{PlainTextFilterRegexes: compiledRegexSlice}, nil
}
func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
ctx,

View File

@@ -6,19 +6,20 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/up9inc/mizu/shared"
"path/filepath"
"regexp"
applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1"
applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1"
applyconfcore "k8s.io/client-go/applyconfigurations/core/v1"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1"
applyconfcore "k8s.io/client-go/applyconfigurations/core/v1"
applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
@@ -85,7 +86,11 @@ func (provider *Provider) GetPods(ctx context.Context, namespace string) {
fmt.Printf("There are %d pods in Namespace %s\n", len(pods.Items), namespace)
}
func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool) (*core.Pod, error) {
func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.FilteringOptions) (*core.Pod, error) {
marshaledFilteringOptions, err := json.Marshal(mizuApiFilteringOptions)
if err != nil {
return nil, err
}
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
@@ -101,9 +106,13 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
Command: []string {"./mizuagent", "--aggregator"},
Env: []core.EnvVar{
{
Name: "HOST_MODE",
Name: shared.HostModeEnvVar,
Value: "1",
},
{
Name: shared.MizuFilteringOptionsEnvVar,
Value: string(marshaledFilteringOptions),
},
},
},
},
@@ -232,12 +241,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged))
agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp))
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName("HOST_MODE").WithValue("1"),
applyconfcore.EnvVar().WithName("AGGREGATOR_ADDRESS").WithValue(aggregatorPodIp),
applyconfcore.EnvVar().WithName("TAPPED_ADDRESSES_PER_HOST").WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName("NODE_NAME").WithValueFrom(
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(
applyconfcore.EnvVarSource().WithFieldRef(
applyconfcore.ObjectFieldSelector().WithAPIVersion("v1").WithFieldPath("spec.nodeName"),
),