From 683d199774c7a969ab7a6d09c2ce5e7bc6daadae Mon Sep 17 00:00:00 2001 From: RoyUP9 <87927115+RoyUP9@users.noreply.github.com> Date: Thu, 5 Aug 2021 11:19:29 +0300 Subject: [PATCH] added support of multiple namespaces (#167) --- cli/cmd/tap.go | 2 +- cli/cmd/tapRunner.go | 49 ++++++++++--------- cli/kubernetes/provider.go | 16 ++++-- cli/kubernetes/watch.go | 75 +++++++++++++++++------------ cli/mizu/configStructs/tapConfig.go | 4 +- 5 files changed, 84 insertions(+), 62 deletions(-) diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index bfb62dcc6..5538a0fd4 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -56,7 +56,7 @@ func init() { defaults.Set(&defaultTapConfig) tapCmd.Flags().Uint16P(configStructs.GuiPortTapName, "p", defaultTapConfig.GuiPort, "Provide a custom port for the web interface webserver") - tapCmd.Flags().StringP(configStructs.NamespaceTapName, "n", defaultTapConfig.Namespace, "Namespace selector") + tapCmd.Flags().StringArrayP(configStructs.NamespacesTapName, "n", defaultTapConfig.Namespaces, "Namespaces selector") tapCmd.Flags().Bool(configStructs.AnalysisTapName, defaultTapConfig.Analysis, "Uploads traffic to UP9 for further analysis (Beta)") tapCmd.Flags().BoolP(configStructs.AllNamespacesTapName, "A", defaultTapConfig.AllNamespaces, "Tap all namespaces") tapCmd.Flags().StringP(configStructs.KubeConfigPathTapName, "k", defaultTapConfig.KubeConfigPath, "Path to kube-config file") diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 91c277376..c4f2ac23d 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -11,6 +11,7 @@ import ( "os/signal" "path" "regexp" + "strings" "syscall" "time" @@ -65,24 +66,25 @@ func RunMizuTap() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel will be called when this function exits - targetNamespace := getNamespace(kubernetesProvider) + targetNamespaces := getNamespaces(kubernetesProvider) + var namespacesStr string - if targetNamespace != mizu.K8sAllNamespaces { - namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace) + if targetNamespaces[0] != mizu.K8sAllNamespaces { + namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(targetNamespaces, "\", \"")) } else { namespacesStr = "all namespaces" } mizu.CheckNewerVersion() mizu.Log.Infof("Tapping pods in %s", namespacesStr) - if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil { + if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces); err != nil { mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error getting pods by regex: %v", errormessage.FormatError(err))) return } if len(state.currentlyTappedPods) == 0 { var suggestionStr string - if targetNamespace != mizu.K8sAllNamespaces { + if targetNamespaces[0] != mizu.K8sAllNamespaces { suggestionStr = ". Select a different namespace with -n or tap all namespaces with -A" } mizu.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr)) @@ -100,7 +102,7 @@ func RunMizuTap() { } go createProxyToApiServerPod(ctx, kubernetesProvider, cancel) - go watchPodsForTapping(ctx, kubernetesProvider, cancel) + go watchPodsForTapping(ctx, kubernetesProvider, targetNamespaces, cancel) //block until exit signal or error waitForFinish(ctx, cancel) @@ -166,13 +168,13 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro } opts := &kubernetes.ApiServerOptions{ - Namespace: mizu.Config.ResourcesNamespace(), - PodName: mizu.ApiServerPodName, - PodImage: mizu.Config.MizuImage, - ServiceAccountName: serviceAccountName, - IsNamespaceRestricted: !mizu.Config.IsOwnNamespace(), + Namespace: mizu.Config.ResourcesNamespace(), + PodName: mizu.ApiServerPodName, + PodImage: mizu.Config.MizuImage, + ServiceAccountName: serviceAccountName, + IsNamespaceRestricted: !mizu.Config.IsOwnNamespace(), MizuApiFilteringOptions: mizuApiFilteringOptions, - MaxEntriesDBSizeBytes: mizu.Config.Tap.MaxEntriesDBSizeBytes(), + MaxEntriesDBSizeBytes: mizu.Config.Tap.MaxEntriesDBSizeBytes(), } _, err = kubernetesProvider.CreateMizuApiServerPod(ctx, opts) if err != nil { @@ -347,12 +349,11 @@ func reportTappedPods() { } } -func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { - targetNamespace := getNamespace(kubernetesProvider) - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), mizu.Config.Tap.PodRegex()) +func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, targetNamespaces []string, cancel context.CancelFunc) { + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, targetNamespaces, mizu.Config.Tap.PodRegex()) restartTappers := func() { - err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace) + err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces) if err != nil { mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error getting pods by regex: %v", errormessage.FormatError(err))) cancel() @@ -407,9 +408,9 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro } } -func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) (error, bool) { +func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespaces []string) (error, bool) { changeFound := false - if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespace); err != nil { + if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespaces); err != nil { return err, false } else { addedPods, removedPods := getPodArrayDiff(state.currentlyTappedPods, matchingPods) @@ -454,7 +455,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod { func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName)) - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.Config.ResourcesNamespace()), podExactRegex) + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{mizu.Config.ResourcesNamespace()}, podExactRegex) isPodReady := false timeAfter := time.After(25 * time.Second) for { @@ -567,12 +568,12 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) { } } -func getNamespace(kubernetesProvider *kubernetes.Provider) string { +func getNamespaces(kubernetesProvider *kubernetes.Provider) []string { if mizu.Config.Tap.AllNamespaces { - return mizu.K8sAllNamespaces - } else if len(mizu.Config.Tap.Namespace) > 0 { - return mizu.Config.Tap.Namespace + return []string{mizu.K8sAllNamespaces} + } else if len(mizu.Config.Tap.Namespaces) > 0 { + return mizu.Config.Tap.Namespaces } else { - return kubernetesProvider.CurrentNamespace() + return []string{kubernetesProvider.CurrentNamespace()} } } diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index e31f54327..026c77d94 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -691,13 +691,19 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac return err } -func (provider *Provider) GetAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespace string) ([]core.Pod, error) { - pods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) - if err != nil { - return nil, err +func (provider *Provider) GetAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) { + var pods []core.Pod + for _, namespace := range namespaces { + namespacePods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get pods in ns: %s, %w", namespace, err) + } + + pods = append(pods, namespacePods.Items...) } + matchingPods := make([]core.Pod, 0) - for _, pod := range pods.Items { + for _, pod := range pods { if regex.MatchString(pod.Name) && isPodRunning(&pod) { matchingPods = append(matchingPods, pod) } diff --git a/cli/kubernetes/watch.go b/cli/kubernetes/watch.go index 9d914d4e5..7916feef6 100644 --- a/cli/kubernetes/watch.go +++ b/cli/kubernetes/watch.go @@ -4,49 +4,64 @@ import ( "context" "errors" "regexp" + "sync" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/watch" ) -func FilteredWatch(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) { +func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetNamespaces []string, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) { addedChan := make(chan *corev1.Pod) modifiedChan := make(chan *corev1.Pod) removedChan := make(chan *corev1.Pod) errorChan := make(chan error) - go func() { - for { - select { - case e := <-watcher.ResultChan(): - if e.Object == nil { - errorChan <- errors.New("kubernetes pod watch failed") + var wg sync.WaitGroup + + for _, targetNamespace := range targetNamespaces { + wg.Add(1) + + go func(targetNamespace string) { + defer wg.Done() + watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace) + + for { + select { + case e := <-watcher.ResultChan(): + if e.Object == nil { + errorChan <- errors.New("kubernetes pod watch failed") + return + } + + pod := e.Object.(*corev1.Pod) + + if !podFilter.MatchString(pod.Name) { + continue + } + + switch e.Type { + case watch.Added: + addedChan <- pod + case watch.Modified: + modifiedChan <- pod + case watch.Deleted: + removedChan <- pod + } + case <-ctx.Done(): + watcher.Stop() return } - - pod := e.Object.(*corev1.Pod) - - if !podFilter.MatchString(pod.Name) { - continue - } - - switch e.Type { - case watch.Added: - addedChan <- pod - case watch.Modified: - modifiedChan <- pod - case watch.Deleted: - removedChan <- pod - } - case <-ctx.Done(): - watcher.Stop() - close(addedChan) - close(modifiedChan) - close(removedChan) - close(errorChan) - return } - } + }(targetNamespace) + } + + go func() { + <-ctx.Done() + wg.Wait() + close(addedChan) + close(modifiedChan) + close(removedChan) + close(errorChan) }() return addedChan, modifiedChan, removedChan, errorChan diff --git a/cli/mizu/configStructs/tapConfig.go b/cli/mizu/configStructs/tapConfig.go index d00d5097a..2ac9352cd 100644 --- a/cli/mizu/configStructs/tapConfig.go +++ b/cli/mizu/configStructs/tapConfig.go @@ -11,7 +11,7 @@ import ( const ( GuiPortTapName = "gui-port" - NamespaceTapName = "namespace" + NamespacesTapName = "namespaces" AnalysisTapName = "analysis" AllNamespacesTapName = "all-namespaces" KubeConfigPathTapName = "kube-config" @@ -29,7 +29,7 @@ type TapConfig struct { SleepIntervalSec int `yaml:"upload-interval" default:"10"` PodRegexStr string `yaml:"regex" default:".*"` GuiPort uint16 `yaml:"gui-port" default:"8899"` - Namespace string `yaml:"namespace"` + Namespaces []string `yaml:"namespaces"` Analysis bool `yaml:"analysis" default:"false"` AllNamespaces bool `yaml:"all-namespaces" default:"false"` KubeConfigPath string `yaml:"kube-config"`