Refactor watch pods to allow reusing watch wrapper (#470)

Currently shared/kubernetes/watch.go:FilteredWatch only watches pods.
This PR makes it reusable for other types of resources.
This is done in preparation for watching k8s events.
This commit is contained in:
Nimrod Gilboa Markevich 2021-11-18 11:53:11 +02:00 committed by GitHub
parent dd53a36d5f
commit 2e75834dd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 172 additions and 51 deletions

View File

@ -9,19 +9,18 @@ import (
"strings"
"time"
"github.com/up9inc/mizu/cli/cmd/goUtils"
"github.com/getkin/kin-openapi/openapi3"
"gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/getkin/kin-openapi/openapi3"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/cmd/goUtils"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils"
@ -555,7 +554,8 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
@ -576,12 +576,19 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
return
case modifiedPod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodPending {
@ -642,34 +649,57 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
var prevPodPhase core.PodPhase
for {
select {
case addedPod, ok := <-added:
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
addedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
case removedPod, ok := <-removed:
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
removedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
case modifiedPod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
cancel()
break
continue
}
podStatus := modifiedPod.Status

View File

@ -68,7 +68,8 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
}
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, tapperSyncer.kubernetesProvider, tapperSyncer.config.TargetNamespaces, &tapperSyncer.config.PodFilterRegex)
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
restartTappers := func() {
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
@ -94,28 +95,48 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
for {
select {
case pod, ok := <-added:
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod, ok := <-removed:
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
@ -132,12 +153,8 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
continue
}
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
case <-tapperSyncer.context.Done():
logger.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`")
@ -148,6 +165,15 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
}
}
func (tapperSyncer *MizuTapperSyncer) handleErrorInWatchLoop(err error, restartTappersDebouncer *debounce.Debouncer) {
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
}
func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, changesFound bool) {
if matchingPods, err := tapperSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(tapperSyncer.context, &tapperSyncer.config.PodFilterRegex, tapperSyncer.config.TargetNamespaces); err != nil {
return err, false

View File

@ -0,0 +1,45 @@
package kubernetes
import (
"context"
"regexp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)
type PodWatchHelper struct {
kubernetesProvider *Provider
NameRegexFilter *regexp.Regexp
}
func NewPodWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp) *PodWatchHelper {
return &PodWatchHelper{
kubernetesProvider: kubernetesProvider,
NameRegexFilter: NameRegexFilter,
}
}
// Implements the EventFilterer Interface
func (pwh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
pod, err := wEvent.ToPod()
if err != nil {
return false, nil
}
if !pwh.NameRegexFilter.MatchString(pod.Name) {
return false, nil
}
return true, nil
}
// Implements the WatchCreator Interface
func (pwh *PodWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) {
watcher, err := pwh.kubernetesProvider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return nil, err
}
return watcher, nil
}

View File

@ -153,14 +153,6 @@ func (provider *Provider) WaitUtilNamespaceDeleted(ctx context.Context, name str
return err
}
func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) watch.Interface {
watcher, err := provider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
panic(err.Error())
}
return watcher
}
func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*core.Namespace, error) {
namespaceSpec := &core.Namespace{
ObjectMeta: metav1.ObjectMeta{

View File

@ -6,19 +6,25 @@ import (
"fmt"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"regexp"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/watch"
)
func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetNamespaces []string, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) {
addedChan := make(chan *corev1.Pod)
modifiedChan := make(chan *corev1.Pod)
removedChan := make(chan *corev1.Pod)
type EventFilterer interface {
Filter(*WatchEvent) (bool, error)
}
type WatchCreator interface {
NewWatcher(ctx context.Context, namespace string) (watch.Interface, error)
}
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) {
addedChan := make(chan *WatchEvent)
modifiedChan := make(chan *WatchEvent)
removedChan := make(chan *WatchEvent)
errorChan := make(chan error)
var wg sync.WaitGroup
@ -31,8 +37,13 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
watchRestartDebouncer := debounce.NewDebouncer(1 * time.Minute, func() {})
for {
watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace)
err := startWatchLoop(ctx, watcher, podFilter, addedChan, modifiedChan, removedChan) // blocking
watcher, err := watcherCreator.NewWatcher(ctx, targetNamespace)
if err != nil {
errorChan <- fmt.Errorf("error in k8 watch: %v", err)
break
}
err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking
watcher.Stop()
select {
@ -72,7 +83,7 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
return addedChan, modifiedChan, removedChan, errorChan
}
func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp, addedChan chan *corev1.Pod, modifiedChan chan *corev1.Pod, removedChan chan *corev1.Pod) error {
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error {
resultChan := watcher.ResultChan()
for {
select {
@ -81,26 +92,25 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *reg
return nil
}
if e.Type == watch.Error {
return apierrors.FromObject(e.Object)
wEvent := WatchEvent(e)
if wEvent.Type == watch.Error {
return apierrors.FromObject(wEvent.Object)
}
pod, ok := e.Object.(*corev1.Pod)
if !ok {
if pass, err := filterer.Filter(&wEvent); err != nil {
return err
} else if !pass {
continue
}
if !podFilter.MatchString(pod.Name) {
continue
}
switch e.Type {
switch wEvent.Type {
case watch.Added:
addedChan <- pod
addedChan <- &wEvent
case watch.Modified:
modifiedChan <- pod
modifiedChan <- &wEvent
case watch.Deleted:
removedChan <- pod
removedChan <- &wEvent
}
case <-ctx.Done():
return nil

View File

@ -0,0 +1,18 @@
package kubernetes
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
type WatchEvent watch.Event
func (we *WatchEvent) ToPod() (*corev1.Pod, error) {
pod, ok := we.Object.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("Invalid object type on pod event stream")
}
return pod, nil
}