mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-26 08:14:42 +00:00
TRA-3803 handle k8s watch timeouts (#372)
* Update watch.go and debounce.go * Update debounce.go * Update watch.go * Update watch.go * Update watch.go * Update watch.go * Update watch.go Co-authored-by: Rami <rami@rami-work>
This commit is contained in:
parent
145e7cda01
commit
0473181f0a
@ -3,10 +3,15 @@ package kubernetes
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/shared/debounce"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
@ -16,6 +21,7 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
|
||||
removedChan := make(chan *corev1.Pod)
|
||||
errorChan := make(chan error)
|
||||
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, targetNamespace := range targetNamespaces {
|
||||
@ -23,36 +29,33 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
|
||||
|
||||
go func(targetNamespace string) {
|
||||
defer wg.Done()
|
||||
watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace)
|
||||
watchRestartDebouncer := debounce.NewDebouncer(1 * time.Minute, func() {})
|
||||
|
||||
for {
|
||||
watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace)
|
||||
err := startWatchLoop(ctx, watcher, podFilter, addedChan, modifiedChan, removedChan) // blocking
|
||||
watcher.Stop()
|
||||
|
||||
select {
|
||||
case e := <-watcher.ResultChan():
|
||||
if e.Object == nil {
|
||||
errorChan <- errors.New("kubernetes pod watch failed")
|
||||
return
|
||||
}
|
||||
|
||||
pod, ok := e.Object.(*corev1.Pod)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if !podFilter.MatchString(pod.Name) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch e.Type {
|
||||
case watch.Added:
|
||||
addedChan <- pod
|
||||
case watch.Modified:
|
||||
modifiedChan <- pod
|
||||
case watch.Deleted:
|
||||
removedChan <- pod
|
||||
}
|
||||
case <-ctx.Done():
|
||||
watcher.Stop()
|
||||
case <- ctx.Done():
|
||||
return
|
||||
default:
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
errorChan <- fmt.Errorf("error in k8 watch: %v", err)
|
||||
break
|
||||
} else {
|
||||
if !watchRestartDebouncer.IsOn() {
|
||||
watchRestartDebouncer.SetOn()
|
||||
logger.Log.Debug("k8s watch channel closed, restarting watcher")
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
} else {
|
||||
errorChan <- errors.New("k8s watch unstable, closes frequently")
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}(targetNamespace)
|
||||
@ -69,3 +72,39 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
|
||||
|
||||
return addedChan, modifiedChan, removedChan, errorChan
|
||||
}
|
||||
|
||||
func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp, addedChan chan *corev1.Pod, modifiedChan chan *corev1.Pod, removedChan chan *corev1.Pod) error {
|
||||
resultChan := watcher.ResultChan()
|
||||
for {
|
||||
select {
|
||||
case e, isChannelOpen := <-resultChan:
|
||||
if !isChannelOpen {
|
||||
return nil
|
||||
}
|
||||
|
||||
if e.Type == watch.Error {
|
||||
return apierrors.FromObject(e.Object)
|
||||
}
|
||||
|
||||
pod, ok := e.Object.(*corev1.Pod)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if !podFilter.MatchString(pod.Name) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch e.Type {
|
||||
case watch.Added:
|
||||
addedChan <- pod
|
||||
case watch.Modified:
|
||||
modifiedChan <- pod
|
||||
case watch.Deleted:
|
||||
removedChan <- pod
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -52,3 +52,7 @@ func (d *Debouncer) SetOn() error {
|
||||
d.timer = time.AfterFunc(d.timeout, d.callback)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Debouncer) IsOn() bool {
|
||||
return d.running
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user