From 36828bcc1db8d2e550fd436f4893c7894ce5661c Mon Sep 17 00:00:00 2001 From: Igor Gov Date: Sun, 19 Dec 2021 09:29:09 +0200 Subject: [PATCH] Bringing back the pod watch api server events to make acceptance test more stable (#541) --- cli/cmd/tapRunner.go | 102 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 83 insertions(+), 19 deletions(-) diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 442bae8e0..a4307fa46 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -151,6 +151,7 @@ func RunMizuTap() { } else { defer finishMizuExecution(kubernetesProvider, apiProvider) + go goUtils.HandleExcWrapper(watchApiServerEvents, ctx, kubernetesProvider, cancel) go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel) // block until exit signal or error @@ -564,6 +565,67 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k } func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { + podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName)) + podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) + eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) + isPodReady := false + timeAfter := time.After(25 * time.Second) + for { + select { + case wEvent, ok := <-eventChan: + if !ok { + eventChan = nil + continue + } + + switch wEvent.Type { + case kubernetes.EventAdded: + logger.Log.Debugf("Watching API Server pod loop, added") + case kubernetes.EventDeleted: + logger.Log.Infof("%s removed", kubernetes.ApiServerPodName) + cancel() + return + case kubernetes.EventModified: + modifiedPod, err := wEvent.ToPod() + if err != nil { + logger.Log.Errorf(uiUtils.Error, err) + cancel() + continue + } + + logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase) + + if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { + isPodReady = true + postApiServerStarted(ctx, kubernetesProvider, cancel, err) + } + case kubernetes.EventBookmark: + break + case kubernetes.EventError: + break + } + case err, ok := <-errorChan: + if !ok { + errorChan = nil + continue + } + + logger.Log.Errorf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err) + cancel() + + case <-timeAfter: + if !isPodReady { + logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time") + cancel() + } + case <-ctx.Done(): + logger.Log.Debugf("Watching API Server pod loop, ctx done") + return + } + } +} + +func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s", kubernetes.ApiServerPodName)) eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, podExactRegex, "pod") eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) @@ -594,25 +656,6 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi event.Note)) switch event.Reason { - case "Started": - go startProxyReportErrorIfAny(kubernetesProvider, cancel) - - url := GetApiServerUrl() - if err := apiProvider.TestConnection(); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath())) - cancel() - break - } - options, _ := getMizuApiFilteringOptions() - if err = startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, *options, state.startTime); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error starting mizu tapper syncer: %v", err)) - cancel() - } - - logger.Log.Infof("Mizu is available at %s", url) - if !config.Config.HeadlessMode { - uiUtils.OpenBrowser(url) - } case "FailedScheduling", "Failed": logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Mizu API Server status: %s - %s", event.Reason, event.Note)) cancel() @@ -632,6 +675,27 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi } } +func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, err error) { + go startProxyReportErrorIfAny(kubernetesProvider, cancel) + + url := GetApiServerUrl() + if err := apiProvider.TestConnection(); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath())) + cancel() + return + } + options, _ := getMizuApiFilteringOptions() + if err = startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, *options, state.startTime); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error starting mizu tapper syncer: %v", err)) + cancel() + } + + logger.Log.Infof("Mizu is available at %s", url) + if !config.Config.HeadlessMode { + uiUtils.OpenBrowser(url) + } +} + func getNamespaces(kubernetesProvider *kubernetes.Provider) []string { if config.Config.Tap.AllNamespaces { return []string{kubernetes.K8sAllNamespaces}