diff --git a/agent/pkg/utils/utils.go b/agent/pkg/utils/utils.go index 95b92e288..a783d128a 100644 --- a/agent/pkg/utils/utils.go +++ b/agent/pkg/utils/utils.go @@ -48,7 +48,8 @@ func StartServer(app *gin.Engine) { func GetTappedPodsStatus() []shared.TappedPodStatus { tappedPodsStatus := make([]shared.TappedPodStatus, 0) for _, pod := range providers.TapStatus.Pods { - isTapped := strings.ToLower(providers.TappersStatus[pod.NodeName].Status) == "started" + status := strings.ToLower(providers.TappersStatus[pod.NodeName].Status) + isTapped := status == "running" tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped}) } return tappedPodsStatus diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 28c0bd67e..9b53f7f17 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -33,8 +33,8 @@ import ( const cleanupTimeout = time.Minute type tapState struct { - startTime time.Time - targetNamespaces []string + startTime time.Time + targetNamespaces []string mizuServiceAccountExists bool } @@ -420,7 +420,8 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr event, err := wEvent.ToEvent() if err != nil { - logger.Log.Errorf(fmt.Sprintf("Error parsing Mizu resource event: %+v", err)) + logger.Log.Debugf("[ERROR] parsing Mizu resource event: %+v", err) + continue } if state.startTime.After(event.CreationTimestamp.Time) { @@ -448,7 +449,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr continue } - logger.Log.Errorf("Watching API server events loop, error: %+v", err) + logger.Log.Debugf("[Error] Watching API server events loop, error: %+v", err) case <-ctx.Done(): logger.Log.Debugf("Watching API server events loop, ctx done") return diff --git a/shared/kubernetes/mizuTapperSyncer.go b/shared/kubernetes/mizuTapperSyncer.go index 31dba52d1..2f9748a57 100644 --- a/shared/kubernetes/mizuTapperSyncer.go +++ b/shared/kubernetes/mizuTapperSyncer.go @@ -70,9 +70,49 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro go syncer.watchPodsForTapping() go syncer.watchTapperEvents() + go syncer.watchTapperPods() return syncer, nil } +func (tapperSyncer *MizuTapperSyncer) watchTapperPods() { + mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName)) + podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, mizuResourceRegex) + eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, []string{tapperSyncer.config.MizuResourcesNamespace}, podWatchHelper) + + for { + select { + case wEvent, ok := <-eventChan: + if !ok { + eventChan = nil + continue + } + + pod, err := wEvent.ToPod() + if err != nil { + logger.Log.Debugf("[ERROR] parsing Mizu resource pod: %+v", err) + continue + } + + logger.Log.Debugf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase) + if pod.Spec.NodeName != "" { + tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)} + tapperSyncer.TapperStatusChangedOut <- tapperStatus + } + + case err, ok := <-errorChan: + if !ok { + errorChan = nil + continue + } + logger.Log.Debugf("[ERROR] Watching tapper pods loop, error: %+v", err) + + case <-tapperSyncer.context.Done(): + logger.Log.Debugf("Watching tapper pods loop, ctx done") + return + } + } +} + func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() { mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName)) eventWatchHelper := NewEventWatchHelper(tapperSyncer.kubernetesProvider, mizuResourceRegex, "pod") @@ -88,7 +128,8 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() { event, err := wEvent.ToEvent() if err != nil { - logger.Log.Errorf(fmt.Sprintf("Error parsing Mizu resource event: %+v", err)) + logger.Log.Debugf("[ERROR] parsing Mizu resource event: %+v", err) + continue } if tapperSyncer.startTime.After(event.CreationTimestamp.Time) { @@ -117,8 +158,8 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() { nodeName = pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values[0] } - taperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: event.Reason} - tapperSyncer.TapperStatusChangedOut <- taperStatus + tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: string(pod.Status.Phase)} + tapperSyncer.TapperStatusChangedOut <- tapperStatus case err, ok := <-errorChan: if !ok { @@ -126,7 +167,7 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() { continue } - logger.Log.Errorf("Watching tapper events loop, error: %+v", err) + logger.Log.Debugf("[ERROR] Watching tapper events loop, error: %+v", err) case <-tapperSyncer.context.Done(): logger.Log.Debugf("Watching tapper events loop, ctx done")