kubeshark/kubernetes/watch.go
2022-11-29 03:55:50 +03:00

110 lines
2.3 KiB
Go

package kubernetes
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/kubeshark/kubeshark/debounce"
"github.com/rs/zerolog/log"
"k8s.io/apimachinery/pkg/watch"
)
type EventFilterer interface {
Filter(*WatchEvent) (bool, error)
}
type WatchCreator interface {
NewWatcher(ctx context.Context, namespace string) (watch.Interface, error)
}
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (<-chan *WatchEvent, <-chan error) {
eventChan := make(chan *WatchEvent)
errorChan := make(chan error)
var wg sync.WaitGroup
for _, targetNamespace := range targetNamespaces {
wg.Add(1)
go func(targetNamespace string) {
defer wg.Done()
watchRestartDebouncer := debounce.NewDebouncer(1*time.Minute, func() {})
for {
watcher, err := watcherCreator.NewWatcher(ctx, targetNamespace)
if err != nil {
errorChan <- fmt.Errorf("error in k8s watch: %v", err)
break
}
err = startWatchLoop(ctx, watcher, filterer, eventChan) // blocking
watcher.Stop()
select {
case <-ctx.Done():
return
default:
break
}
if err != nil {
errorChan <- fmt.Errorf("error in k8s watch: %v", err)
break
} else {
if !watchRestartDebouncer.IsOn() {
if err := watchRestartDebouncer.SetOn(); err != nil {
log.Error().Err(err).Send()
}
log.Warn().Msg("K8s watch channel closed, restarting watcher...")
time.Sleep(time.Second * 5)
continue
} else {
errorChan <- errors.New("K8s watch unstable, closes frequently")
break
}
}
}
}(targetNamespace)
}
go func() {
<-ctx.Done()
wg.Wait()
close(eventChan)
close(errorChan)
}()
return eventChan, errorChan
}
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, eventChan chan<- *WatchEvent) error {
resultChan := watcher.ResultChan()
for {
select {
case e, isChannelOpen := <-resultChan:
if !isChannelOpen {
return nil
}
wEvent := WatchEvent(e)
if wEvent.Type == watch.Error {
return wEvent.ToError()
}
if pass, err := filterer.Filter(&wEvent); err != nil {
return err
} else if !pass {
continue
}
eventChan <- &wEvent
case <-ctx.Done():
return nil
}
}
}