diff --git a/agent/main.go b/agent/main.go index c3a87fe69..ef2998f00 100644 --- a/agent/main.go +++ b/agent/main.go @@ -493,19 +493,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix)) eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex) - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) + eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) for { select { - case wEvent, ok := <-added: + case wEvent, ok := <-eventChan: if !ok { - added = nil + eventChan = nil continue } event, err := wEvent.ToEvent() if err != nil { - logger.Log.Errorf("error parsing Mizu resource added event: %+v", err) + logger.Log.Errorf("error parsing Mizu resource event: %+v", err) cancel() } @@ -514,45 +514,7 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide } if event.Type == v1.EventTypeWarning { - logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note) - } - case wEvent, ok := <-removed: - if !ok { - removed = nil - continue - } - - event, err := wEvent.ToEvent() - if err != nil { - logger.Log.Errorf("error parsing Mizu resource removed event: %+v", err) - cancel() - } - - if startTime.After(event.CreationTimestamp.Time) { - continue - } - - if event.Type == v1.EventTypeWarning { - logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note) - } - case wEvent, ok := <-modified: - if !ok { - modified = nil - continue - } - - event, err := wEvent.ToEvent() - if err != nil { - logger.Log.Errorf("error parsing Mizu resource modified event: %+v", err) - cancel() - } - - if startTime.After(event.CreationTimestamp.Time) { - continue - } - - if event.Type == v1.EventTypeWarning { - logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note) + logger.Log.Warningf("resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note) } case err, ok := <-errorChan: if !ok { diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index b1ffc5ff2..4efd9cd4a 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -559,76 +559,73 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName)) podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) + eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) isPodReady := false timeAfter := time.After(25 * time.Second) for { select { - case _, ok := <-added: + case wEvent, ok := <-eventChan: if !ok { - added = nil + eventChan = nil continue } - logger.Log.Debugf("Watching API Server pod loop, added") - case _, ok := <-removed: - if !ok { - removed = nil - continue - } - - logger.Log.Infof("%s removed", kubernetes.ApiServerPodName) - cancel() - return - case wEvent, ok := <-modified: - if !ok { - modified = nil - continue - } - - modifiedPod, err := wEvent.ToPod() - if err != nil { - logger.Log.Errorf(uiUtils.Error, err) + switch wEvent.Type { + case kubernetes.EventAdded: + logger.Log.Debugf("Watching API Server pod loop, added") + case kubernetes.EventDeleted: + logger.Log.Infof("%s removed", kubernetes.ApiServerPodName) cancel() - continue - } - - logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase) - - if modifiedPod.Status.Phase == core.PodPending { - if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue { - logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message) - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath())) + return + case kubernetes.EventModified: + modifiedPod, err := wEvent.ToPod() + if err != nil { + logger.Log.Errorf(uiUtils.Error, err) cancel() - break + continue } - if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" { - logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message) - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath())) - cancel() - break - } - } + logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase) - if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { - isPodReady = true - go startProxyReportErrorIfAny(kubernetesProvider, cancel) + if modifiedPod.Status.Phase == core.PodPending { + if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue { + logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message) + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath())) + cancel() + break + } - url := GetApiServerUrl() - if err := apiProvider.TestConnection(); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath())) - cancel() - break + if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" { + logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message) + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath())) + cancel() + break + } } - logger.Log.Infof("Mizu is available at %s", url) - if !config.Config.HeadlessMode { - uiUtils.OpenBrowser(url) - } - if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil { - logger.Log.Debugf("[Error] failed update tapped pods %v", err) + if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { + isPodReady = true + go startProxyReportErrorIfAny(kubernetesProvider, cancel) + + url := GetApiServerUrl() + if err := apiProvider.TestConnection(); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath())) + cancel() + break + } + + logger.Log.Infof("Mizu is available at %s", url) + if !config.Config.HeadlessMode { + uiUtils.OpenBrowser(url) + } + if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil { + logger.Log.Debugf("[Error] failed update tapped pods %v", err) + } } + case kubernetes.EventBookmark: + break + case kubernetes.EventError: + break } case err, ok := <-errorChan: if !ok { @@ -654,70 +651,53 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName)) podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) + eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) + for { select { - case wEvent, ok := <-added: + case wEvent, ok := <-eventChan: if !ok { - added = nil + eventChan = nil continue } - addedPod, err := wEvent.ToPod() + pod, err := wEvent.ToPod() if err != nil { logger.Log.Errorf(uiUtils.Error, err) cancel() continue } - logger.Log.Debugf("Tapper is created [%s]", addedPod.Name) - case wEvent, ok := <-removed: - if !ok { - removed = nil - continue - } + switch wEvent.Type { + case kubernetes.EventAdded: + logger.Log.Debugf("Tapper is created [%s]", pod.Name) + case kubernetes.EventDeleted: + logger.Log.Debugf("Tapper is removed [%s]", pod.Name) + case kubernetes.EventModified: + if pod.Status.Phase == core.PodPending && pod.Status.Conditions[0].Type == core.PodScheduled && pod.Status.Conditions[0].Status != core.ConditionTrue { + logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", pod.Name, pod.Status.Conditions[0].Message)) + cancel() + continue + } - removedPod, err := wEvent.ToPod() - if err != nil { - logger.Log.Errorf(uiUtils.Error, err) - cancel() - continue - } + podStatus := pod.Status - - logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name) - case wEvent, ok := <-modified: - if !ok { - modified = nil - continue - } - - modifiedPod, err := wEvent.ToPod() - if err != nil { - logger.Log.Errorf(uiUtils.Error, err) - cancel() - continue - } - - if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue { - logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message)) - cancel() - continue - } - - podStatus := modifiedPod.Status - - if podStatus.Phase == core.PodRunning { - state := podStatus.ContainerStatuses[0].State - if state.Terminated != nil { - switch state.Terminated.Reason { - case "OOMKilled": - logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name)) + if podStatus.Phase == core.PodRunning { + state := podStatus.ContainerStatuses[0].State + if state.Terminated != nil { + switch state.Terminated.Reason { + case "OOMKilled": + logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", pod.Name)) + } } } - } - logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase))) + logger.Log.Debugf("Tapper %s is %s", pod.Name, strings.ToLower(string(podStatus.Phase))) + case kubernetes.EventBookmark: + break + case kubernetes.EventError: + break + } case err, ok := <-errorChan: if !ok { errorChan = nil @@ -740,57 +720,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix)) eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex) - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) + eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) for { select { - case wEvent, ok := <-added: + case wEvent, ok := <-eventChan: if !ok { - added = nil + eventChan = nil continue } event, err := wEvent.ToEvent() if err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource added event: %+v", err)) - cancel() - } - - if startTime.After(event.CreationTimestamp.Time) { - continue - } - - if event.Type == core.EventTypeWarning { - logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)) - } - case wEvent, ok := <-removed: - if !ok { - removed = nil - continue - } - - event, err := wEvent.ToEvent() - if err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource removed event: %+v", err)) - cancel() - } - - if startTime.After(event.CreationTimestamp.Time) { - continue - } - - if event.Type == core.EventTypeWarning { - logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)) - } - case wEvent, ok := <-modified: - if !ok { - modified = nil - continue - } - - event, err := wEvent.ToEvent() - if err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource modified event: %+v", err)) + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource event: %+v", err)) cancel() } diff --git a/shared/kubernetes/mizuTapperSyncer.go b/shared/kubernetes/mizuTapperSyncer.go index 10b8a5334..ee1f86ad4 100644 --- a/shared/kubernetes/mizuTapperSyncer.go +++ b/shared/kubernetes/mizuTapperSyncer.go @@ -70,7 +70,7 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() { podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex) - added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper) + eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper) restartTappers := func() { err, changeFound := tapperSyncer.updateCurrentlyTappedPods() @@ -96,9 +96,9 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() { for { select { - case wEvent, ok := <-added: + case wEvent, ok := <-eventChan: if !ok { - added = nil + eventChan = nil continue } @@ -109,44 +109,28 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() { } - logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace) - restartTappersDebouncer.SetOn() - case wEvent, ok := <-removed: - if !ok { - removed = nil - continue - } - - pod, err := wEvent.ToPod() - if err != nil { - tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer) - continue - } - - logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace) - restartTappersDebouncer.SetOn() - case wEvent, ok := <-modified: - if !ok { - modified = nil - continue - } - - pod, err := wEvent.ToPod() - if err != nil { - tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer) - continue - } - - - logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP) - // Act only if the modified pod has already obtained an IP address. - // After filtering for IPs, on a normal pod restart this includes the following events: - // - Pod deletion - // - Pod reaches start state - // - Pod reaches ready state - // Ready/unready transitions might also trigger this event. - if pod.Status.PodIP != "" { + switch wEvent.Type { + case EventAdded: + logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace) restartTappersDebouncer.SetOn() + case EventDeleted: + logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace) + restartTappersDebouncer.SetOn() + case EventModified: + logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP) + // Act only if the modified pod has already obtained an IP address. + // After filtering for IPs, on a normal pod restart this includes the following events: + // - Pod deletion + // - Pod reaches start state + // - Pod reaches ready state + // Ready/unready transitions might also trigger this event. + if pod.Status.PodIP != "" { + restartTappersDebouncer.SetOn() + } + case EventBookmark: + break + case EventError: + break } case err, ok := <-errorChan: if !ok { diff --git a/shared/kubernetes/watch.go b/shared/kubernetes/watch.go index a9588aa19..6acc3ef7f 100644 --- a/shared/kubernetes/watch.go +++ b/shared/kubernetes/watch.go @@ -20,10 +20,8 @@ type WatchCreator interface { NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) } -func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) { - addedChan := make(chan *WatchEvent) - modifiedChan := make(chan *WatchEvent) - removedChan := make(chan *WatchEvent) +func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (<-chan *WatchEvent, <-chan error) { + eventChan := make(chan *WatchEvent) errorChan := make(chan error) var wg sync.WaitGroup @@ -42,7 +40,7 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames break } - err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking + err = startWatchLoop(ctx, watcher, filterer, eventChan) // blocking watcher.Stop() select { @@ -73,16 +71,14 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames go func() { <-ctx.Done() wg.Wait() - close(addedChan) - close(modifiedChan) - close(removedChan) + close(eventChan) close(errorChan) }() - return addedChan, modifiedChan, removedChan, errorChan + return eventChan, errorChan } -func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error { +func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, eventChan chan<- *WatchEvent) error { resultChan := watcher.ResultChan() for { select { @@ -103,14 +99,7 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer Event continue } - switch wEvent.Type { - case watch.Added: - addedChan <- &wEvent - case watch.Modified: - modifiedChan <- &wEvent - case watch.Deleted: - removedChan <- &wEvent - } + eventChan <- &wEvent case <-ctx.Done(): return nil } diff --git a/shared/kubernetes/watchEvent.go b/shared/kubernetes/watchEvent.go index ed07e4cf1..81e1d0b09 100644 --- a/shared/kubernetes/watchEvent.go +++ b/shared/kubernetes/watchEvent.go @@ -10,6 +10,14 @@ import ( "k8s.io/apimachinery/pkg/watch" ) +const ( + EventAdded watch.EventType = watch.Added + EventModified watch.EventType = watch.Modified + EventDeleted watch.EventType = watch.Deleted + EventBookmark watch.EventType = watch.Bookmark + EventError watch.EventType = watch.Error +) + type InvalidObjectType struct { RequestedType reflect.Type }