From 620f046a26bcc4d772f8f66179c3e5b12da346b6 Mon Sep 17 00:00:00 2001 From: nimrod-up9 <59927337+nimrod-up9@users.noreply.github.com> Date: Wed, 26 May 2021 17:25:12 +0300 Subject: [PATCH] TRA-3257 Dynamic tappers (#57) * Defer cleanup. * Split createMizuResources into two functions. * Re-create daemonset when changes to tapped pods occur. * Reordered imports. * Use Printf instead of Println. * Workaround for variable scope. * WIP Apply daemonset instead of create. * Whitespaces. * Fixed: Using the right types for Apply. * Fixed missing pod IP by adding a delay. * Debounce pod restart. * Proper field manager name. --- cli/cmd/tapRunner.go | 163 ++++++++++++++++++++++++++----------- cli/debounce/debounce.go | 42 ++++++++++ cli/kubernetes/provider.go | 111 +++++++++++-------------- 3 files changed, 204 insertions(+), 112 deletions(-) create mode 100644 cli/debounce/debounce.go diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 43d780963..040957ecb 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -3,14 +3,24 @@ package cmd import ( "context" "fmt" - "github.com/up9inc/mizu/cli/kubernetes" - core "k8s.io/api/core/v1" - "github.com/up9inc/mizu/cli/mizu" "os" "os/signal" "regexp" "syscall" "time" + + core "k8s.io/api/core/v1" + + "github.com/up9inc/mizu/cli/debounce" + "github.com/up9inc/mizu/cli/kubernetes" + "github.com/up9inc/mizu/cli/mizu" +) + +var mizuServiceAccountExists bool +var aggregatorService *core.Service + +const ( + updateTappersDelay = 5 * time.Second ) var currentlyTappedPods []core.Pod @@ -18,59 +28,87 @@ var currentlyTappedPods []core.Pod func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath, tappingOptions.Namespace) + defer cleanUpMizuResources(kubernetesProvider) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel will be called when this function exits - matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery) - if err != nil { - fmt.Printf("Error getting pods to tap %v\n", err) + if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery); err != nil { return + } else { + currentlyTappedPods = matchingPods } - currentlyTappedPods = matchingPods - nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, matchingPods) + nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, currentlyTappedPods) if err != nil { - cleanUpMizuResources(kubernetesProvider) - return - } - err = createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions) - if err != nil { - cleanUpMizuResources(kubernetesProvider) return } - go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions) //TODO convert this to job for built in pod ttl or have the running app handle this + if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { + return + } + + go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions) // TODO convert this to job for built in pod ttl or have the running app handle this + go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions) go syncApiStatus(ctx, cancel, tappingOptions) - waitForFinish(ctx, cancel) //block until exit signal or error + + //block until exit signal or error + waitForFinish(ctx, cancel) // TODO handle incoming traffic from tapper using a channel - - //cleanup - fmt.Printf("\nRemoving mizu resources\n") - cleanUpMizuResources(kubernetesProvider) } func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { - mizuServiceAccountExists := createRBACIfNecessary(ctx, kubernetesProvider) - _, err := kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists) + if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions); err != nil { + return err + } + + if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { + return err + } + + return nil +} + +func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions) error { + var err error + + mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider) + _, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists) if err != nil { fmt.Printf("Error creating mizu collector pod: %v\n", err) return err } - aggregatorService, err := kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, mizu.AggregatorPodName) + + aggregatorService, err = kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, mizu.AggregatorPodName) if err != nil { fmt.Printf("Error creating mizu collector service: %v\n", err) return err } - err = kubernetesProvider.CreateMizuTapperDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName, tappingOptions.MizuImage, mizu.TapperPodName, fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace), nodeToTappedPodIPMap, mizuServiceAccountExists) - if err != nil { + + return nil +} + +func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { + if err := kubernetesProvider.ApplyMizuTapperDaemonSet( + ctx, + mizu.ResourcesNamespace, + mizu.TapperDaemonSetName, + tappingOptions.MizuImage, + mizu.TapperPodName, + fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace), + nodeToTappedPodIPMap, + mizuServiceAccountExists, + ); err != nil { fmt.Printf("Error creating mizu tapper daemonset: %v\n", err) return err } + return nil } func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) { + fmt.Printf("\nRemoving mizu resources\n") + removalCtx, _ := context.WithTimeout(context.Background(), 5 * time.Second) if err := kubernetesProvider.RemovePod(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil { fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err); @@ -83,28 +121,59 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) { } } -// will be relevant in the future -//func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp) { -// added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex) -// for { -// select { -// case newTarget := <- added: -// fmt.Printf("+%s\n", newTarget.Name) -// -// case removedTarget := <- removed: -// fmt.Printf("-%s\n", removedTarget.Name) -// -// case <- modified: -// continue -// -// case <- errorChan: -// cancel() -// -// case <- ctx.Done(): -// return -// } -// } -//} +func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) { + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex) + + restartTappers := func() { + if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex); err != nil { + fmt.Printf("Error getting pods by regex: %s (%v,%+v)\n", err, err, err) + cancel() + } else { + currentlyTappedPods = matchingPods + } + + nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, currentlyTappedPods) + if err != nil { + fmt.Printf("Error building node to ips map: %s (%v,%+v)\n", err, err, err) + cancel() + } + + if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { + fmt.Printf("Error updating daemonset: %s (%v,%+v)\n", err, err, err) + cancel() + } + } + restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers) + + for { + select { + case newTarget := <- added: + fmt.Printf("+%s\n", newTarget.Name) + + case removedTarget := <- removed: + fmt.Printf("-%s\n", removedTarget.Name) + restartTappersDebouncer.SetOn() + + case modifiedTarget := <- modified: + // Act only if the modified pod has already obtained an IP address. + // After filtering for IPs, on a normal pod restart this includes the following events: + // - Pod deletion + // - Pod reaches start state + // - Pod reaches ready state + // Ready/unready transitions might also trigger this event. + if modifiedTarget.Status.PodIP != "" { + restartTappersDebouncer.SetOn() + } + + case <- errorChan: + // TODO: Does this also perform cleanup? + cancel() + + case <- ctx.Done(): + return + } + } +} func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, tappingOptions *MizuTapOptions) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.AggregatorPodName)) diff --git a/cli/debounce/debounce.go b/cli/debounce/debounce.go new file mode 100644 index 000000000..7772591ef --- /dev/null +++ b/cli/debounce/debounce.go @@ -0,0 +1,42 @@ +package debounce + +import ( + "time" +) + +func NewDebouncer(timeout time.Duration, callback func()) *Debouncer { + var debouncer Debouncer + debouncer.setTimeout(timeout) + debouncer.setCallback(callback) + return &debouncer +} + +type Debouncer struct { + callback func() + running bool + timeout time.Duration + timer *time.Timer +} + +func (d *Debouncer) setTimeout(timeout time.Duration) { + // TODO: Return err if d.running + d.timeout = timeout +} + +func (d *Debouncer) setCallback(callback func()) { + callbackWrapped := func() { + callback() + d.running = false + } + + d.callback = callbackWrapped +} + +func (d *Debouncer) SetOn() { + if d.running == true { + return + } + + d.running = true + d.timer = time.AfterFunc(d.timeout, d.callback) +} diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 62b39a1d5..348068b4c 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -6,7 +6,13 @@ import ( "encoding/json" "errors" "fmt" - apps "k8s.io/api/apps/v1" + + "path/filepath" + "regexp" + + applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1" + applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1" + applyconfcore "k8s.io/client-go/applyconfigurations/core/v1" core "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -22,8 +28,6 @@ import ( "k8s.io/client-go/tools/clientcmd" _ "k8s.io/client-go/tools/portforward" "k8s.io/client-go/util/homedir" - "path/filepath" - "regexp" ) type Provider struct { @@ -34,7 +38,8 @@ type Provider struct { } const ( - serviceAccountName = "mizu-service-account" + serviceAccountName = "mizu-service-account" + fieldManagerName = "mizu-manager" ) func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider { @@ -104,6 +109,7 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace }, DNSPolicy: "ClusterFirstWithHostNet", TerminationGracePeriodSeconds: new(int64), + // Affinity: TODO: define node selector for all relevant nodes for this mizu instance }, } //define the service account only when it exists to prevent pod crash @@ -212,75 +218,50 @@ func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{}) } -func (provider *Provider) CreateMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool) error { +func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool) error { nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap) if err != nil { return err } privileged := true - podTemplate := core.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"app": tapperPodName}, - }, - Spec: core.PodSpec{ - HostNetwork: true, // very important to make passive tapper see traffic - Containers: []core.Container{ - { - Name: tapperPodName, - Image: podImage, - ImagePullPolicy: core.PullAlways, - SecurityContext: &core.SecurityContext{ - Privileged: &privileged, // must be privileged to get node level traffic - }, - Command: []string {"./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp)}, - Env: []core.EnvVar{ - { - Name: "HOST_MODE", - Value: "1", - }, - { - Name: "AGGREGATOR_ADDRESS", - Value: aggregatorPodIp, - }, - { - Name: "TAPPED_ADDRESSES_PER_HOST", - Value: string(nodeToTappedPodIPMapJsonStr), - }, - { - Name: "NODE_NAME", - ValueFrom: &core.EnvVarSource{ - FieldRef: &core.ObjectFieldSelector { - APIVersion: "v1", - FieldPath: "spec.nodeName", - }, - }, - }, - }, - }, - }, - DNSPolicy: "ClusterFirstWithHostNet", - TerminationGracePeriodSeconds: new(int64), - // Affinity: TODO: define node selector for all relevant nodes for this mizu instance - }, - } + agentContainer := applyconfcore.Container() + agentContainer.WithName(tapperPodName) + agentContainer.WithImage(podImage) + agentContainer.WithImagePullPolicy(core.PullAlways) + agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged)) + agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp)) + agentContainer.WithEnv( + applyconfcore.EnvVar().WithName("HOST_MODE").WithValue("1"), + applyconfcore.EnvVar().WithName("AGGREGATOR_ADDRESS").WithValue(aggregatorPodIp), + applyconfcore.EnvVar().WithName("TAPPED_ADDRESSES_PER_HOST").WithValue(string(nodeToTappedPodIPMapJsonStr)), + ) + agentContainer.WithEnv( + applyconfcore.EnvVar().WithName("NODE_NAME").WithValueFrom( + applyconfcore.EnvVarSource().WithFieldRef( + applyconfcore.ObjectFieldSelector().WithAPIVersion("v1").WithFieldPath("spec.nodeName"), + ), + ), + ) + + podSpec := applyconfcore.PodSpec().WithHostNetwork(true).WithDNSPolicy("ClusterFirstWithHostNet").WithTerminationGracePeriodSeconds(0) if linkServiceAccount { - podTemplate.Spec.ServiceAccountName = serviceAccountName + podSpec.WithServiceAccountName(serviceAccountName) } - labelSelector := metav1.LabelSelector{ - MatchLabels: map[string]string{"app": tapperPodName}, - } - daemonSet := apps.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: daemonSetName, - Namespace: namespace, - }, - Spec: apps.DaemonSetSpec{ - Selector: &labelSelector, - Template: podTemplate, - }, - } - _, err = provider.clientSet.AppsV1().DaemonSets(namespace).Create(ctx, &daemonSet, metav1.CreateOptions{}) + podSpec.WithContainers(agentContainer) + + + podTemplate := applyconfcore.PodTemplateSpec() + podTemplate.WithLabels(map[string]string{"app": tapperPodName}) + podTemplate.WithSpec(podSpec) + + labelSelector := applyconfmeta.LabelSelector() + labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName}) + + daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace) + daemonSet.WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate)) + + _, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, metav1.ApplyOptions{FieldManager: fieldManagerName}) return err }