Fix: [EKS] server could not find the requested resourceevents.events.k8s.io (#565)

This commit is contained in:
Igor Gov
2021-12-28 13:24:51 +02:00
committed by GitHub
parent 1e1b5f0c0f
commit b039c2abad
3 changed files with 52 additions and 9 deletions

View File

@@ -70,9 +70,49 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
go syncer.watchPodsForTapping()
go syncer.watchTapperEvents()
go syncer.watchTapperPods()
return syncer, nil
}
func (tapperSyncer *MizuTapperSyncer) watchTapperPods() {
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName))
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, mizuResourceRegex)
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, []string{tapperSyncer.config.MizuResourcesNamespace}, podWatchHelper)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
logger.Log.Debugf("[ERROR] parsing Mizu resource pod: %+v", err)
continue
}
logger.Log.Debugf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase)
if pod.Spec.NodeName != "" {
tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)}
tapperSyncer.TapperStatusChangedOut <- tapperStatus
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Debugf("[ERROR] Watching tapper pods loop, error: %+v", err)
case <-tapperSyncer.context.Done():
logger.Log.Debugf("Watching tapper pods loop, ctx done")
return
}
}
}
func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() {
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName))
eventWatchHelper := NewEventWatchHelper(tapperSyncer.kubernetesProvider, mizuResourceRegex, "pod")
@@ -88,7 +128,8 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() {
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(fmt.Sprintf("Error parsing Mizu resource event: %+v", err))
logger.Log.Debugf("[ERROR] parsing Mizu resource event: %+v", err)
continue
}
if tapperSyncer.startTime.After(event.CreationTimestamp.Time) {
@@ -117,8 +158,8 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() {
nodeName = pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values[0]
}
taperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: event.Reason}
tapperSyncer.TapperStatusChangedOut <- taperStatus
tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: string(pod.Status.Phase)}
tapperSyncer.TapperStatusChangedOut <- tapperStatus
case err, ok := <-errorChan:
if !ok {
@@ -126,7 +167,7 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() {
continue
}
logger.Log.Errorf("Watching tapper events loop, error: %+v", err)
logger.Log.Debugf("[ERROR] Watching tapper events loop, error: %+v", err)
case <-tapperSyncer.context.Done():
logger.Log.Debugf("Watching tapper events loop, ctx done")