diff --git a/agent/main.go b/agent/main.go index e7196a211..fbe5ab677 100644 --- a/agent/main.go +++ b/agent/main.go @@ -459,7 +459,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e return } logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr) - case _, ok := <-tapperSyncer.TapPodChangesOut: + case tapPodChangeEvent, ok := <-tapperSyncer.TapPodChangesOut: if !ok { logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop") return @@ -472,6 +472,7 @@ func startMizuTapperSyncer(ctx context.Context) (*kubernetes.MizuTapperSyncer, e } api.BroadcastToBrowserClients(serializedTapStatus) providers.TapStatus.Pods = tapStatus.Pods + providers.ExpectedTapperAmount = tapPodChangeEvent.ExpectedTapperAmount case <-ctx.Done(): logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") return diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index 6cd4deeff..305033ace 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -2,7 +2,9 @@ package controllers import ( "encoding/json" + "fmt" "mizuserver/pkg/api" + "mizuserver/pkg/config" "mizuserver/pkg/holder" "mizuserver/pkg/providers" "mizuserver/pkg/up9" @@ -15,6 +17,13 @@ import ( ) func HealthCheck(c *gin.Context) { + if config.Config.DaemonMode { + if providers.ExpectedTapperAmount != providers.TappersCount { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("expecting more tappers than are actually connected (%d expected, %d connected)", providers.ExpectedTapperAmount, providers.TappersCount)) + return + } + } + response := shared.HealthResponse{ TapStatus: providers.TapStatus, TappersCount: providers.TappersCount, diff --git a/agent/pkg/providers/status_provider.go b/agent/pkg/providers/status_provider.go index 15928b9bf..bcc4c3f6e 100644 --- a/agent/pkg/providers/status_provider.go +++ b/agent/pkg/providers/status_provider.go @@ -19,7 +19,7 @@ var ( TapStatus shared.TapStatus authStatus *models.AuthStatus RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime) - + ExpectedTapperAmount = -1 //only relevant in daemon mode as cli manages tappers otherwise tappersCountLock = sync.Mutex{} ) diff --git a/shared/kubernetes/mizuTapperSyncer.go b/shared/kubernetes/mizuTapperSyncer.go index 1d67e98cd..1526eadbd 100644 --- a/shared/kubernetes/mizuTapperSyncer.go +++ b/shared/kubernetes/mizuTapperSyncer.go @@ -16,18 +16,20 @@ import ( const updateTappersDelay = 5 * time.Second type TappedPodChangeEvent struct { - Added []core.Pod - Removed []core.Pod + Added []core.Pod + Removed []core.Pod + ExpectedTapperAmount int } // MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created type MizuTapperSyncer struct { - context context.Context - CurrentlyTappedPods []core.Pod - config TapperSyncerConfig - kubernetesProvider *Provider - TapPodChangesOut chan TappedPodChangeEvent - ErrorOut chan K8sTapManagerError + context context.Context + CurrentlyTappedPods []core.Pod + config TapperSyncerConfig + kubernetesProvider *Provider + TapPodChangesOut chan TappedPodChangeEvent + ErrorOut chan K8sTapManagerError + nodeToTappedPodIPMap map[string][]string } type TapperSyncerConfig struct { @@ -160,9 +162,11 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch } if len(addedPods) > 0 || len(removedPods) > 0 { tapperSyncer.CurrentlyTappedPods = podsToTap + tapperSyncer.nodeToTappedPodIPMap = GetNodeHostToTappedPodIpsMap(tapperSyncer.CurrentlyTappedPods) tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{ - Added: addedPods, - Removed: removedPods, + Added: addedPods, + Removed: removedPods, + ExpectedTapperAmount: len(tapperSyncer.nodeToTappedPodIPMap), } return nil, true } @@ -171,9 +175,7 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch } func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error { - nodeToTappedPodIPMap := GetNodeHostToTappedPodIpsMap(tapperSyncer.CurrentlyTappedPods) - - if len(nodeToTappedPodIPMap) > 0 { + if len(tapperSyncer.nodeToTappedPodIPMap) > 0 { var serviceAccountName string if tapperSyncer.config.MizuServiceAccountExists { serviceAccountName = ServiceAccountName @@ -188,7 +190,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error { tapperSyncer.config.AgentImage, TapperPodName, fmt.Sprintf("%s.%s.svc.cluster.local", ApiServerPodName, tapperSyncer.config.MizuResourcesNamespace), - nodeToTappedPodIPMap, + tapperSyncer.nodeToTappedPodIPMap, serviceAccountName, tapperSyncer.config.TapperResources, tapperSyncer.config.ImagePullPolicy, @@ -197,7 +199,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error { ); err != nil { return err } - logger.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap)) + logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodIPMap)) } else { if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil { return err