diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index ee21fda7f..395963c56 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -9,19 +9,18 @@ import ( "strings" "time" - "github.com/up9inc/mizu/cli/cmd/goUtils" + "github.com/getkin/kin-openapi/openapi3" + "gopkg.in/yaml.v3" + core "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" - "github.com/getkin/kin-openapi/openapi3" "github.com/up9inc/mizu/cli/apiserver" + "github.com/up9inc/mizu/cli/cmd/goUtils" "github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config/configStructs" "github.com/up9inc/mizu/cli/errormessage" - "gopkg.in/yaml.v3" - core "k8s.io/api/core/v1" - "github.com/up9inc/mizu/cli/mizu" "github.com/up9inc/mizu/cli/mizu/fsUtils" "github.com/up9inc/mizu/cli/uiUtils" @@ -555,7 +554,8 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName)) - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex) + podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) isPodReady := false timeAfter := time.After(25 * time.Second) for { @@ -576,12 +576,19 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi logger.Log.Infof("%s removed", kubernetes.ApiServerPodName) cancel() return - case modifiedPod, ok := <-modified: + case wEvent, ok := <-modified: if !ok { modified = nil continue } + modifiedPod, err := wEvent.ToPod() + if err != nil { + logger.Log.Errorf(uiUtils.Error, err) + cancel() + continue + } + logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase) if modifiedPod.Status.Phase == core.PodPending { @@ -642,34 +649,57 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName)) - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex) + podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) var prevPodPhase core.PodPhase for { select { - case addedPod, ok := <-added: + case wEvent, ok := <-added: if !ok { added = nil continue } + addedPod, err := wEvent.ToPod() + if err != nil { + logger.Log.Errorf(uiUtils.Error, err) + cancel() + continue + } + logger.Log.Debugf("Tapper is created [%s]", addedPod.Name) - case removedPod, ok := <-removed: + case wEvent, ok := <-removed: if !ok { removed = nil continue } + removedPod, err := wEvent.ToPod() + if err != nil { + logger.Log.Errorf(uiUtils.Error, err) + cancel() + continue + } + + logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name) - case modifiedPod, ok := <-modified: + case wEvent, ok := <-modified: if !ok { modified = nil continue } + modifiedPod, err := wEvent.ToPod() + if err != nil { + logger.Log.Errorf(uiUtils.Error, err) + cancel() + continue + } + if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue { logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message)) cancel() - break + continue } podStatus := modifiedPod.Status diff --git a/shared/kubernetes/mizuTapperSyncer.go b/shared/kubernetes/mizuTapperSyncer.go index 1526eadbd..dca6877f4 100644 --- a/shared/kubernetes/mizuTapperSyncer.go +++ b/shared/kubernetes/mizuTapperSyncer.go @@ -68,7 +68,8 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro } func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() { - added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, tapperSyncer.kubernetesProvider, tapperSyncer.config.TargetNamespaces, &tapperSyncer.config.PodFilterRegex) + podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex) + added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper) restartTappers := func() { err, changeFound := tapperSyncer.updateCurrentlyTappedPods() @@ -94,28 +95,48 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() { for { select { - case pod, ok := <-added: + case wEvent, ok := <-added: if !ok { added = nil continue } + pod, err := wEvent.ToPod() + if err != nil { + tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer) + continue + } + + logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace) restartTappersDebouncer.SetOn() - case pod, ok := <-removed: + case wEvent, ok := <-removed: if !ok { removed = nil continue } + pod, err := wEvent.ToPod() + if err != nil { + tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer) + continue + } + logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace) restartTappersDebouncer.SetOn() - case pod, ok := <-modified: + case wEvent, ok := <-modified: if !ok { modified = nil continue } + pod, err := wEvent.ToPod() + if err != nil { + tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer) + continue + } + + logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP) // Act only if the modified pod has already obtained an IP address. // After filtering for IPs, on a normal pod restart this includes the following events: @@ -132,12 +153,8 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() { continue } - logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err) - restartTappersDebouncer.Cancel() - tapperSyncer.ErrorOut <- K8sTapManagerError{ - OriginalError: err, - TapManagerReason: TapManagerPodWatchError, - } + tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer) + continue case <-tapperSyncer.context.Done(): logger.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`") @@ -148,6 +165,15 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() { } } +func (tapperSyncer *MizuTapperSyncer) handleErrorInWatchLoop(err error, restartTappersDebouncer *debounce.Debouncer) { + logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err) + restartTappersDebouncer.Cancel() + tapperSyncer.ErrorOut <- K8sTapManagerError{ + OriginalError: err, + TapManagerReason: TapManagerPodWatchError, + } +} + func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, changesFound bool) { if matchingPods, err := tapperSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(tapperSyncer.context, &tapperSyncer.config.PodFilterRegex, tapperSyncer.config.TargetNamespaces); err != nil { return err, false diff --git a/shared/kubernetes/podWatchHelper.go b/shared/kubernetes/podWatchHelper.go new file mode 100644 index 000000000..2184eeb43 --- /dev/null +++ b/shared/kubernetes/podWatchHelper.go @@ -0,0 +1,45 @@ +package kubernetes + +import ( + "context" + "regexp" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" +) + +type PodWatchHelper struct { + kubernetesProvider *Provider + NameRegexFilter *regexp.Regexp +} + +func NewPodWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp) *PodWatchHelper { + return &PodWatchHelper{ + kubernetesProvider: kubernetesProvider, + NameRegexFilter: NameRegexFilter, + } +} + +// Implements the EventFilterer Interface +func (pwh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) { + pod, err := wEvent.ToPod() + if err != nil { + return false, nil + } + + if !pwh.NameRegexFilter.MatchString(pod.Name) { + return false, nil + } + + return true, nil +} + +// Implements the WatchCreator Interface +func (pwh *PodWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) { + watcher, err := pwh.kubernetesProvider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true}) + if err != nil { + return nil, err + } + + return watcher, nil +} diff --git a/shared/kubernetes/provider.go b/shared/kubernetes/provider.go index be7b29834..d0647c3ef 100644 --- a/shared/kubernetes/provider.go +++ b/shared/kubernetes/provider.go @@ -153,14 +153,6 @@ func (provider *Provider) WaitUtilNamespaceDeleted(ctx context.Context, name str return err } -func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) watch.Interface { - watcher, err := provider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true}) - if err != nil { - panic(err.Error()) - } - return watcher -} - func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*core.Namespace, error) { namespaceSpec := &core.Namespace{ ObjectMeta: metav1.ObjectMeta{ diff --git a/shared/kubernetes/watch.go b/shared/kubernetes/watch.go index 9bd47ebe4..7e28e152a 100644 --- a/shared/kubernetes/watch.go +++ b/shared/kubernetes/watch.go @@ -6,19 +6,25 @@ import ( "fmt" "github.com/up9inc/mizu/shared/debounce" "github.com/up9inc/mizu/shared/logger" - "regexp" "sync" "time" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/watch" ) -func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetNamespaces []string, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) { - addedChan := make(chan *corev1.Pod) - modifiedChan := make(chan *corev1.Pod) - removedChan := make(chan *corev1.Pod) +type EventFilterer interface { + Filter(*WatchEvent) (bool, error) +} + +type WatchCreator interface { + NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) +} + +func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) { + addedChan := make(chan *WatchEvent) + modifiedChan := make(chan *WatchEvent) + removedChan := make(chan *WatchEvent) errorChan := make(chan error) var wg sync.WaitGroup @@ -31,8 +37,13 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName watchRestartDebouncer := debounce.NewDebouncer(1 * time.Minute, func() {}) for { - watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace) - err := startWatchLoop(ctx, watcher, podFilter, addedChan, modifiedChan, removedChan) // blocking + watcher, err := watcherCreator.NewWatcher(ctx, targetNamespace) + if err != nil { + errorChan <- fmt.Errorf("error in k8 watch: %v", err) + break + } + + err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking watcher.Stop() select { @@ -72,7 +83,7 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName return addedChan, modifiedChan, removedChan, errorChan } -func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp, addedChan chan *corev1.Pod, modifiedChan chan *corev1.Pod, removedChan chan *corev1.Pod) error { +func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error { resultChan := watcher.ResultChan() for { select { @@ -81,26 +92,25 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *reg return nil } - if e.Type == watch.Error { - return apierrors.FromObject(e.Object) + wEvent := WatchEvent(e) + + if wEvent.Type == watch.Error { + return apierrors.FromObject(wEvent.Object) } - pod, ok := e.Object.(*corev1.Pod) - if !ok { + if pass, err := filterer.Filter(&wEvent); err != nil { + return err + } else if !pass { continue } - if !podFilter.MatchString(pod.Name) { - continue - } - - switch e.Type { + switch wEvent.Type { case watch.Added: - addedChan <- pod + addedChan <- &wEvent case watch.Modified: - modifiedChan <- pod + modifiedChan <- &wEvent case watch.Deleted: - removedChan <- pod + removedChan <- &wEvent } case <-ctx.Done(): return nil diff --git a/shared/kubernetes/watchEvent.go b/shared/kubernetes/watchEvent.go new file mode 100644 index 000000000..fc1b7e11f --- /dev/null +++ b/shared/kubernetes/watchEvent.go @@ -0,0 +1,18 @@ +package kubernetes + +import ( + "fmt" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" +) + +type WatchEvent watch.Event + +func (we *WatchEvent) ToPod() (*corev1.Pod, error) { + pod, ok := we.Object.(*corev1.Pod) + if !ok { + return nil, fmt.Errorf("Invalid object type on pod event stream") + } + + return pod, nil +}