From f9396e01ca415e1b5d3c967455123100fdb61b95 Mon Sep 17 00:00:00 2001 From: RamiBerm <54766858+RamiBerm@users.noreply.github.com> Date: Tue, 3 Aug 2021 15:02:31 +0300 Subject: [PATCH] TRA-3415 ignore unready pods (#160) * Update tapRunner.go and provider.go * Update tapRunner.go * Update tapRunner.go * Update tapRunner.go * Update tapRunner.go and provider.go Co-authored-by: RamiBerm --- cli/cmd/tapRunner.go | 72 ++++++++++++++++++++++++++++---------- cli/kubernetes/provider.go | 8 +++-- 2 files changed, 59 insertions(+), 21 deletions(-) diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index f2177d628..5e4ddc29f 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -54,12 +54,15 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { defer cancel() // cancel will be called when this function exits targetNamespace := getNamespace(tappingOptions, kubernetesProvider) - if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery, targetNamespace); err != nil { + if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, podRegexQuery, targetNamespace); err != nil { mizu.Log.Infof("Error listing pods: %v", err) return - } else { - currentlyTappedPods = matchingPods } + urlReadyChan := make(chan string) + go func() { + mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan) + }() + var namespacesStr string if targetNamespace != mizu.K8sAllNamespaces { @@ -86,10 +89,9 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { return } - urlReadyChan := make(chan string) mizu.CheckNewerVersion() go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions, urlReadyChan) // TODO convert this to job for built in pod ttl or have the running app handle this - go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions, urlReadyChan) + go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions) go syncApiStatus(ctx, cancel, tappingOptions) //block until exit signal or error @@ -232,17 +234,15 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) { } } -func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions, urlReadyChan chan string) { +func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) { targetNamespace := getNamespace(tappingOptions, kubernetesProvider) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), podRegex) restartTappers := func() { - if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex, targetNamespace); err != nil { + if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, podRegex, targetNamespace); err != nil { mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err) cancel() - } else { - currentlyTappedPods = matchingPods } nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods) @@ -257,19 +257,11 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro } } restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers) - timer := time.AfterFunc(time.Second*10, func() { - mizu.Log.Debugf("Waiting for URL...") - mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan) - }) for { select { - case newTarget := <-added: - mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", newTarget.Name)) - timer.Reset(time.Second * 2) - case removedTarget := <-removed: - mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedTarget.Name)) - timer.Reset(time.Second * 2) + case <-added: + case <-removed: restartTappersDebouncer.SetOn() case modifiedTarget := <-modified: // Act only if the modified pod has already obtained an IP address. @@ -292,6 +284,48 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro } } +func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, podRegex *regexp.Regexp, targetNamespace string) error { + if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, podRegex, targetNamespace); err != nil { + mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err) + return err + } else { + addedPods, removedPods := getPodArrayDiff(currentlyTappedPods, matchingPods) + for _, addedPod := range addedPods { + mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name)) + } + for _, removedPod := range removedPods { + mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name)) + } + currentlyTappedPods = matchingPods + } + return nil +} + +func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) { + added = getMissingPods(newPods, oldPods) + removed = getMissingPods(oldPods, newPods) + + return added, removed +} + +//returns pods present in pods1 array and missing in pods2 array +func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod { + missingPods := make([]core.Pod, 0) + for _, pod1 := range pods1 { + var found = false + for _, pod2 := range pods2 { + if pod1.UID == pod2.UID { + found = true + break + } + } + if !found { + missingPods = append(missingPods, pod1) + } + } + return missingPods +} + func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, tappingOptions *MizuTapOptions, urlReadyChan chan string) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName)) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex) diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 1fa057162..3ef165fb7 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -587,14 +587,14 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac return err } -func (provider *Provider) GetAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespace string) ([]core.Pod, error) { +func (provider *Provider) GetAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespace string) ([]core.Pod, error) { pods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } matchingPods := make([]core.Pod, 0) for _, pod := range pods.Items { - if regex.MatchString(pod.Name) { + if regex.MatchString(pod.Name) && isPodRunning(&pod) { matchingPods = append(matchingPods, pod) } } @@ -635,3 +635,7 @@ func loadKubernetesConfiguration(kubeConfigPath string) clientcmd.ClientConfig { }, ) } + +func isPodRunning(pod *core.Pod) bool { + return pod.Status.Phase == core.PodRunning +}