Use one channel for events instead of three (#495)

Use one channel for events instead of three separate channels by event type
This commit is contained in:
Nimrod Gilboa Markevich
2021-11-23 15:06:27 +02:00
committed by GitHub
parent 01d6005a7b
commit 1c18eb1b84
5 changed files with 128 additions and 243 deletions

View File

@@ -493,19 +493,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix)) mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex) eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for { for {
select { select {
case wEvent, ok := <-added: case wEvent, ok := <-eventChan:
if !ok { if !ok {
added = nil eventChan = nil
continue continue
} }
event, err := wEvent.ToEvent() event, err := wEvent.ToEvent()
if err != nil { if err != nil {
logger.Log.Errorf("error parsing Mizu resource added event: %+v", err) logger.Log.Errorf("error parsing Mizu resource event: %+v", err)
cancel() cancel()
} }
@@ -514,45 +514,7 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
} }
if event.Type == v1.EventTypeWarning { if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note) logger.Log.Warningf("resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource removed event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf("error parsing Mizu resource modified event: %+v", err)
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == v1.EventTypeWarning {
logger.Log.Warningf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)
} }
case err, ok := <-errorChan: case err, ok := <-errorChan:
if !ok { if !ok {

View File

@@ -559,76 +559,73 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName)) podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
isPodReady := false isPodReady := false
timeAfter := time.After(25 * time.Second) timeAfter := time.After(25 * time.Second)
for { for {
select { select {
case _, ok := <-added: case wEvent, ok := <-eventChan:
if !ok { if !ok {
added = nil eventChan = nil
continue continue
} }
logger.Log.Debugf("Watching API Server pod loop, added") switch wEvent.Type {
case _, ok := <-removed: case kubernetes.EventAdded:
if !ok { logger.Log.Debugf("Watching API Server pod loop, added")
removed = nil case kubernetes.EventDeleted:
continue logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
}
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
return
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel() cancel()
continue return
} case kubernetes.EventModified:
modifiedPod, err := wEvent.ToPod()
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase) if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel() cancel()
break continue
} }
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" { logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
cancel()
break
}
}
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { if modifiedPod.Status.Phase == core.PodPending {
isPodReady = true if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
go startProxyReportErrorIfAny(kubernetesProvider, cancel) logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
}
url := GetApiServerUrl() if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
if err := apiProvider.TestConnection(); err != nil { logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath())) logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
cancel() cancel()
break break
}
} }
logger.Log.Infof("Mizu is available at %s", url) if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
if !config.Config.HeadlessMode { isPodReady = true
uiUtils.OpenBrowser(url) go startProxyReportErrorIfAny(kubernetesProvider, cancel)
}
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil { url := GetApiServerUrl()
logger.Log.Debugf("[Error] failed update tapped pods %v", err) if err := apiProvider.TestConnection(); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
}
logger.Log.Infof("Mizu is available at %s", url)
if !config.Config.HeadlessMode {
uiUtils.OpenBrowser(url)
}
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
} }
case kubernetes.EventBookmark:
break
case kubernetes.EventError:
break
} }
case err, ok := <-errorChan: case err, ok := <-errorChan:
if !ok { if !ok {
@@ -654,70 +651,53 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName)) podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
for { for {
select { select {
case wEvent, ok := <-added: case wEvent, ok := <-eventChan:
if !ok { if !ok {
added = nil eventChan = nil
continue continue
} }
addedPod, err := wEvent.ToPod() pod, err := wEvent.ToPod()
if err != nil { if err != nil {
logger.Log.Errorf(uiUtils.Error, err) logger.Log.Errorf(uiUtils.Error, err)
cancel() cancel()
continue continue
} }
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name) switch wEvent.Type {
case wEvent, ok := <-removed: case kubernetes.EventAdded:
if !ok { logger.Log.Debugf("Tapper is created [%s]", pod.Name)
removed = nil case kubernetes.EventDeleted:
continue logger.Log.Debugf("Tapper is removed [%s]", pod.Name)
} case kubernetes.EventModified:
if pod.Status.Phase == core.PodPending && pod.Status.Conditions[0].Type == core.PodScheduled && pod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", pod.Name, pod.Status.Conditions[0].Message))
cancel()
continue
}
removedPod, err := wEvent.ToPod() podStatus := pod.Status
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
if podStatus.Phase == core.PodRunning {
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name) state := podStatus.ContainerStatuses[0].State
case wEvent, ok := <-modified: if state.Terminated != nil {
if !ok { switch state.Terminated.Reason {
modified = nil case "OOMKilled":
continue logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", pod.Name))
} }
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
cancel()
continue
}
podStatus := modifiedPod.Status
if podStatus.Phase == core.PodRunning {
state := podStatus.ContainerStatuses[0].State
if state.Terminated != nil {
switch state.Terminated.Reason {
case "OOMKilled":
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name))
} }
} }
}
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase))) logger.Log.Debugf("Tapper %s is %s", pod.Name, strings.ToLower(string(podStatus.Phase)))
case kubernetes.EventBookmark:
break
case kubernetes.EventError:
break
}
case err, ok := <-errorChan: case err, ok := <-errorChan:
if !ok { if !ok {
errorChan = nil errorChan = nil
@@ -740,57 +720,19 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix)) mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex) eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
for { for {
select { select {
case wEvent, ok := <-added: case wEvent, ok := <-eventChan:
if !ok { if !ok {
added = nil eventChan = nil
continue continue
} }
event, err := wEvent.ToEvent() event, err := wEvent.ToEvent()
if err != nil { if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource added event: %+v", err)) logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource removed event: %+v", err))
cancel()
}
if startTime.After(event.CreationTimestamp.Time) {
continue
}
if event.Type == core.EventTypeWarning {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
}
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource modified event: %+v", err))
cancel() cancel()
} }

View File

@@ -70,7 +70,7 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() { func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex) podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper) eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
restartTappers := func() { restartTappers := func() {
err, changeFound := tapperSyncer.updateCurrentlyTappedPods() err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
@@ -96,9 +96,9 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
for { for {
select { select {
case wEvent, ok := <-added: case wEvent, ok := <-eventChan:
if !ok { if !ok {
added = nil eventChan = nil
continue continue
} }
@@ -109,44 +109,28 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
} }
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace) switch wEvent.Type {
restartTappersDebouncer.SetOn() case EventAdded:
case wEvent, ok := <-removed: logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
if !ok {
removed = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
restartTappersDebouncer.SetOn() restartTappersDebouncer.SetOn()
case EventDeleted:
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case EventModified:
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
restartTappersDebouncer.SetOn()
}
case EventBookmark:
break
case EventError:
break
} }
case err, ok := <-errorChan: case err, ok := <-errorChan:
if !ok { if !ok {

View File

@@ -20,10 +20,8 @@ type WatchCreator interface {
NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error)
} }
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) { func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (<-chan *WatchEvent, <-chan error) {
addedChan := make(chan *WatchEvent) eventChan := make(chan *WatchEvent)
modifiedChan := make(chan *WatchEvent)
removedChan := make(chan *WatchEvent)
errorChan := make(chan error) errorChan := make(chan error)
var wg sync.WaitGroup var wg sync.WaitGroup
@@ -42,7 +40,7 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
break break
} }
err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking err = startWatchLoop(ctx, watcher, filterer, eventChan) // blocking
watcher.Stop() watcher.Stop()
select { select {
@@ -73,16 +71,14 @@ func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNames
go func() { go func() {
<-ctx.Done() <-ctx.Done()
wg.Wait() wg.Wait()
close(addedChan) close(eventChan)
close(modifiedChan)
close(removedChan)
close(errorChan) close(errorChan)
}() }()
return addedChan, modifiedChan, removedChan, errorChan return eventChan, errorChan
} }
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error { func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, eventChan chan<- *WatchEvent) error {
resultChan := watcher.ResultChan() resultChan := watcher.ResultChan()
for { for {
select { select {
@@ -103,14 +99,7 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer Event
continue continue
} }
switch wEvent.Type { eventChan <- &wEvent
case watch.Added:
addedChan <- &wEvent
case watch.Modified:
modifiedChan <- &wEvent
case watch.Deleted:
removedChan <- &wEvent
}
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
} }

View File

@@ -10,6 +10,14 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
) )
const (
EventAdded watch.EventType = watch.Added
EventModified watch.EventType = watch.Modified
EventDeleted watch.EventType = watch.Deleted
EventBookmark watch.EventType = watch.Bookmark
EventError watch.EventType = watch.Error
)
type InvalidObjectType struct { type InvalidObjectType struct {
RequestedType reflect.Type RequestedType reflect.Type
} }