diff --git a/cli/kubernetes/watch.go b/cli/kubernetes/watch.go index b2df708db..37e5bd576 100644 --- a/cli/kubernetes/watch.go +++ b/cli/kubernetes/watch.go @@ -3,10 +3,15 @@ package kubernetes import ( "context" "errors" + "fmt" + "github.com/up9inc/mizu/shared/debounce" + "github.com/up9inc/mizu/shared/logger" "regexp" "sync" + "time" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/watch" ) @@ -16,6 +21,7 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName removedChan := make(chan *corev1.Pod) errorChan := make(chan error) + var wg sync.WaitGroup for _, targetNamespace := range targetNamespaces { @@ -23,36 +29,33 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName go func(targetNamespace string) { defer wg.Done() - watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace) + watchRestartDebouncer := debounce.NewDebouncer(1 * time.Minute, func() {}) for { + watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace) + err := startWatchLoop(ctx, watcher, podFilter, addedChan, modifiedChan, removedChan) // blocking + watcher.Stop() + select { - case e := <-watcher.ResultChan(): - if e.Object == nil { - errorChan <- errors.New("kubernetes pod watch failed") - return - } - - pod, ok := e.Object.(*corev1.Pod) - if !ok { - continue - } - - if !podFilter.MatchString(pod.Name) { - continue - } - - switch e.Type { - case watch.Added: - addedChan <- pod - case watch.Modified: - modifiedChan <- pod - case watch.Deleted: - removedChan <- pod - } - case <-ctx.Done(): - watcher.Stop() + case <- ctx.Done(): return + default: + break + } + + if err != nil { + errorChan <- fmt.Errorf("error in k8 watch: %v", err) + break + } else { + if !watchRestartDebouncer.IsOn() { + watchRestartDebouncer.SetOn() + logger.Log.Debug("k8s watch channel closed, restarting watcher") + time.Sleep(time.Second * 5) + continue + } else { + errorChan <- errors.New("k8s watch unstable, closes frequently") + break + } } } }(targetNamespace) @@ -69,3 +72,39 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName return addedChan, modifiedChan, removedChan, errorChan } + +func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp, addedChan chan *corev1.Pod, modifiedChan chan *corev1.Pod, removedChan chan *corev1.Pod) error { + resultChan := watcher.ResultChan() + for { + select { + case e, isChannelOpen := <-resultChan: + if !isChannelOpen { + return nil + } + + if e.Type == watch.Error { + return apierrors.FromObject(e.Object) + } + + pod, ok := e.Object.(*corev1.Pod) + if !ok { + continue + } + + if !podFilter.MatchString(pod.Name) { + continue + } + + switch e.Type { + case watch.Added: + addedChan <- pod + case watch.Modified: + modifiedChan <- pod + case watch.Deleted: + removedChan <- pod + } + case <-ctx.Done(): + return nil + } + } +} diff --git a/shared/debounce/debounce.go b/shared/debounce/debounce.go index 74e15e2c6..9070016bc 100644 --- a/shared/debounce/debounce.go +++ b/shared/debounce/debounce.go @@ -52,3 +52,7 @@ func (d *Debouncer) SetOn() error { d.timer = time.AfterFunc(d.timeout, d.callback) return nil } + +func (d *Debouncer) IsOn() bool { + return d.running +}