diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index ee2777c65..0363e1b24 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -379,13 +379,28 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro for { select { - case pod := <-added: + case pod, ok := <-added: + if !ok { + added = nil + continue + } + logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace) restartTappersDebouncer.SetOn() - case pod := <-removed: + case pod, ok := <-removed: + if !ok { + removed = nil + continue + } + logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace) restartTappersDebouncer.SetOn() - case pod := <-modified: + case pod, ok := <-modified: + if !ok { + modified = nil + continue + } + logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP) // Act only if the modified pod has already obtained an IP address. // After filtering for IPs, on a normal pod restart this includes the following events: @@ -396,8 +411,12 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro if pod.Status.PodIP != "" { restartTappersDebouncer.SetOn() } + case err, ok := <-errorChan: + if !ok { + errorChan = nil + continue + } - case err := <-errorChan: logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err) restartTappersDebouncer.Cancel() // TODO: Does this also perform cleanup? @@ -477,21 +496,28 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi timeAfter := time.After(25 * time.Second) for { select { - case <-ctx.Done(): - logger.Log.Debugf("Watching API Server pod loop, ctx done") - return - case <-added: + case _, ok := <-added: + if !ok { + added = nil + continue + } + logger.Log.Debugf("Watching API Server pod loop, added") - continue - case <-removed: + case _, ok := <-removed: + if !ok { + removed = nil + continue + } + logger.Log.Infof("%s removed", mizu.ApiServerPodName) cancel() return - case modifiedPod := <-modified: - if modifiedPod == nil { - logger.Log.Debugf("Watching API Server pod loop, modifiedPod with nil") + case modifiedPod, ok := <-modified: + if !ok { + modified = nil continue } + logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase) if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { isPodReady = true @@ -510,14 +536,23 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi logger.Log.Debugf("[Error] failed update tapped pods %v", err) } } + case _, ok := <-errorChan: + if !ok { + errorChan = nil + continue + } + + logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace) + cancel() + case <-timeAfter: if !isPodReady { logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time") cancel() } - case <-errorChan: - logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace) - cancel() + case <-ctx.Done(): + logger.Log.Debugf("Watching API Server pod loop, ctx done") + return } } }