diff --git a/agent/main.go b/agent/main.go index ef2998f00..a9d94f998 100644 --- a/agent/main.go +++ b/agent/main.go @@ -22,7 +22,6 @@ import ( "path" "path/filepath" "plugin" - "regexp" "sort" "syscall" "time" @@ -273,8 +272,6 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) { if _, err := startMizuTapperSyncer(ctx, kubernetesProvider); err != nil { logger.Log.Fatalf("error initializing tapper syncer: %+v", err) } - - go watchMizuEvents(ctx, kubernetesProvider, cancel) } utils.StartServer(app) @@ -447,7 +444,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) ( MizuApiFilteringOptions: config.Config.MizuApiFilteringOptions, MizuServiceAccountExists: true, //assume service account exists since daemon mode will not function without it anyway Istio: config.Config.Istio, - }) + }, time.Now()) if err != nil { return nil, err @@ -477,6 +474,16 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) ( api.BroadcastToBrowserClients(serializedTapStatus) providers.TapStatus.Pods = tapStatus.Pods providers.ExpectedTapperAmount = tapPodChangeEvent.ExpectedTapperAmount + case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop") + return + } + if providers.TappersStatus == nil { + providers.TappersStatus = make(map[string]shared.TapperStatus) + } + providers.TappersStatus[tapperStatus.NodeName] = tapperStatus + case <-ctx.Done(): logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") return @@ -486,48 +493,3 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) ( return tapperSyncer, nil } - -func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { - // Round down because k8s CreationTimestamp is given in 1 sec resolution. - startTime := time.Now().Truncate(time.Second) - - mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix)) - eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex) - eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) - - for { - select { - case wEvent, ok := <-eventChan: - if !ok { - eventChan = nil - continue - } - - event, err := wEvent.ToEvent() - if err != nil { - logger.Log.Errorf("error parsing Mizu resource event: %+v", err) - cancel() - } - - if startTime.After(event.CreationTimestamp.Time) { - continue - } - - if event.Type == v1.EventTypeWarning { - logger.Log.Warningf("resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note) - } - case err, ok := <-errorChan: - if !ok { - errorChan = nil - continue - } - - logger.Log.Errorf("error in watch mizu resource events loop: %+v", err) - cancel() - - case <-ctx.Done(): - logger.Log.Debugf("watching Mizu resource events loop, ctx done") - return - } - } -} diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index 305033ace..b86c1a56e 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -24,14 +24,19 @@ func HealthCheck(c *gin.Context) { } } + tappers := make([]shared.TapperStatus, len(providers.TappersStatus)) + for _, value := range providers.TappersStatus { + tappers = append(tappers, value) + } + response := shared.HealthResponse{ - TapStatus: providers.TapStatus, - TappersCount: providers.TappersCount, + TapStatus: providers.TapStatus, + TappersCount: providers.TappersCount, + TappersStatus: tappers, } c.JSON(http.StatusOK, response) } - func PostTappedPods(c *gin.Context) { tapStatus := &shared.TapStatus{} if err := c.Bind(tapStatus); err != nil { @@ -52,6 +57,23 @@ func PostTappedPods(c *gin.Context) { } } +func PostTapperStatus(c *gin.Context) { + tapperStatus := &shared.TapperStatus{} + if err := c.Bind(tapperStatus); err != nil { + c.JSON(http.StatusBadRequest, err) + return + } + if err := validation.Validate(tapperStatus); err != nil { + c.JSON(http.StatusBadRequest, err) + return + } + logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus) + if providers.TappersStatus == nil { + providers.TappersStatus = make(map[string]shared.TapperStatus) + } + providers.TappersStatus[tapperStatus.NodeName] = *tapperStatus +} + func GetTappersCount(c *gin.Context) { c.JSON(http.StatusOK, providers.TappersCount) } diff --git a/agent/pkg/providers/status_provider.go b/agent/pkg/providers/status_provider.go index bcc4c3f6e..85fe8793b 100644 --- a/agent/pkg/providers/status_provider.go +++ b/agent/pkg/providers/status_provider.go @@ -15,12 +15,13 @@ import ( const tlsLinkRetainmentTime = time.Minute * 15 var ( - TappersCount int - TapStatus shared.TapStatus - authStatus *models.AuthStatus - RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime) + TappersCount int + TapStatus shared.TapStatus + TappersStatus map[string]shared.TapperStatus + authStatus *models.AuthStatus + RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime) ExpectedTapperAmount = -1 //only relevant in daemon mode as cli manages tappers otherwise - tappersCountLock = sync.Mutex{} + tappersCountLock = sync.Mutex{} ) func GetAuthStatus() (*models.AuthStatus, error) { diff --git a/agent/pkg/routes/status_routes.go b/agent/pkg/routes/status_routes.go index 505e0b5f5..dd601213d 100644 --- a/agent/pkg/routes/status_routes.go +++ b/agent/pkg/routes/status_routes.go @@ -11,6 +11,7 @@ func StatusRoutes(ginApp *gin.Engine) { routeGroup.GET("/health", controllers.HealthCheck) routeGroup.POST("/tappedPods", controllers.PostTappedPods) + routeGroup.POST("/tapperStatus", controllers.PostTapperStatus) routeGroup.GET("/tappersCount", controllers.GetTappersCount) routeGroup.GET("/tap", controllers.GetTappingStatus) diff --git a/cli/apiserver/provider.go b/cli/apiserver/provider.go index dee6c225e..dc5dff5e9 100644 --- a/cli/apiserver/provider.go +++ b/cli/apiserver/provider.go @@ -42,7 +42,7 @@ func (provider *Provider) TestConnection() error { retriesLeft := provider.retries for retriesLeft > 0 { if _, err := provider.GetHealthStatus(); err != nil { - logger.Log.Debugf("[ERROR] api server not ready yet %v", err) + logger.Log.Debugf("api server not ready yet %v", err) } else { logger.Log.Debugf("connection test to api server passed successfully") break @@ -81,6 +81,23 @@ func (provider *Provider) GetHealthStatus() (*shared.HealthResponse, error) { } } +func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) error { + tapperStatusUrl := fmt.Sprintf("%s/status/tapperStatus", provider.url) + + if jsonValue, err := json.Marshal(tapperStatus); err != nil { + return fmt.Errorf("failed Marshal the tapper status %w", err) + } else { + if response, err := provider.client.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil { + return fmt.Errorf("failed sending to API server the tapped pods %w", err) + } else if response.StatusCode != 200 { + return fmt.Errorf("failed sending to API server the tapper status, response status code %v", response.StatusCode) + } else { + logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus) + return nil + } + } +} + func (provider *Provider) ReportTappedPods(pods []core.Pod) error { tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", provider.url) diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 4efd9cd4a..d4fa44ce7 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -33,6 +33,9 @@ import ( const cleanupTimeout = time.Minute type tapState struct { + startTime time.Time + targetNamespaces []string + apiServerService *core.Service tapperSyncer *kubernetes.MizuTapperSyncer mizuServiceAccountExists bool @@ -42,7 +45,7 @@ var state tapState var apiProvider *apiserver.Provider func RunMizuTap() { - startTime := time.Now() + state.startTime = time.Now() mizuApiFilteringOptions, err := getMizuApiFilteringOptions() apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout) @@ -92,16 +95,16 @@ func RunMizuTap() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel will be called when this function exits - targetNamespaces := getNamespaces(kubernetesProvider) + state.targetNamespaces = getNamespaces(kubernetesProvider) - serializedMizuConfig, err := config.GetSerializedMizuAgentConfig(targetNamespaces, mizuApiFilteringOptions) + serializedMizuConfig, err := config.GetSerializedMizuAgentConfig(state.targetNamespaces, mizuApiFilteringOptions) if err != nil { logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error composing mizu config: %v", errormessage.FormatError(err))) return } if config.Config.IsNsRestrictedMode() { - if len(targetNamespaces) != 1 || !shared.Contains(targetNamespaces, config.Config.MizuResourcesNamespace) { + if len(state.targetNamespaces) != 1 || !shared.Contains(state.targetNamespaces, config.Config.MizuResourcesNamespace) { logger.Log.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n"+ "You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, config.MizuResourcesNamespaceConfigName) return @@ -109,18 +112,19 @@ func RunMizuTap() { } var namespacesStr string - if !shared.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) { - namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(targetNamespaces, "\", \"")) + if !shared.Contains(state.targetNamespaces, kubernetes.K8sAllNamespaces) { + namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(state.targetNamespaces, "\", \"")) } else { namespacesStr = "all namespaces" } logger.Log.Infof("Tapping pods in %s", namespacesStr) + if err := printTappedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error listing pods: %v", errormessage.FormatError(err))) + } + if config.Config.Tap.DryRun { - if err := printTappedPodsPreview(ctx, kubernetesProvider, targetNamespaces); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error listing pods: %v", errormessage.FormatError(err))) - } return } @@ -136,7 +140,7 @@ func RunMizuTap() { return } if config.Config.Tap.DaemonMode { - if err := handleDaemonModePostCreation(ctx, cancel, kubernetesProvider, targetNamespaces); err != nil { + if err := handleDaemonModePostCreation(ctx, cancel, kubernetesProvider, state.targetNamespaces); err != nil { defer finishMizuExecution(kubernetesProvider, apiProvider) cancel() } else { @@ -145,14 +149,7 @@ func RunMizuTap() { } else { defer finishMizuExecution(kubernetesProvider, apiProvider) - if err = startTapperSyncer(ctx, cancel, kubernetesProvider, targetNamespaces, *mizuApiFilteringOptions); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error starting mizu tapper syncer: %v", err)) - cancel() - } - go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel) - go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel) - go goUtils.HandleExcWrapper(watchMizuEvents, ctx, kubernetesProvider, cancel, startTime) // block until exit signal or error waitForFinish(ctx, cancel) @@ -185,7 +182,6 @@ func printTappedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes. if len(matchingPods) == 0 { printNoPodsFoundSuggestion(namespaces) } - logger.Log.Info("Pods that match the provided criteria at this instant:") for _, tappedPod := range matchingPods { logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", tappedPod.Name)) } @@ -205,7 +201,7 @@ func waitForDaemonModeToBeReady(cancel context.CancelFunc, kubernetesProvider *k return nil } -func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, mizuApiFilteringOptions api.TrafficFilteringOptions) error { +func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, mizuApiFilteringOptions api.TrafficFilteringOptions, startTime time.Time) error { tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ TargetNamespaces: targetNamespaces, PodFilterRegex: *config.Config.Tap.PodRegex(), @@ -218,20 +214,12 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider MizuApiFilteringOptions: mizuApiFilteringOptions, MizuServiceAccountExists: state.mizuServiceAccountExists, Istio: config.Config.Tap.Istio, - }) + }, startTime) if err != nil { return err } - for _, tappedPod := range tapperSyncer.CurrentlyTappedPods { - logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", tappedPod.Name)) - } - - if len(tapperSyncer.CurrentlyTappedPods) == 0 { - printNoPodsFoundSuggestion(targetNamespaces) - } - go func() { for { select { @@ -250,6 +238,14 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider if err := apiProvider.ReportTappedPods(tapperSyncer.CurrentlyTappedPods); err != nil { logger.Log.Debugf("[Error] failed update tapped pods %v", err) } + case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop") + return + } + if err := apiProvider.ReportTapperStatus(tapperStatus); err != nil { + logger.Log.Debugf("[Error] failed update tapper status %v", err) + } case <-ctx.Done(): logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") return @@ -557,171 +553,9 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k } func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { - podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName)) - podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) - eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) - isPodReady := false - timeAfter := time.After(25 * time.Second) - for { - select { - case wEvent, ok := <-eventChan: - if !ok { - eventChan = nil - continue - } - - switch wEvent.Type { - case kubernetes.EventAdded: - logger.Log.Debugf("Watching API Server pod loop, added") - case kubernetes.EventDeleted: - logger.Log.Infof("%s removed", kubernetes.ApiServerPodName) - cancel() - return - case kubernetes.EventModified: - modifiedPod, err := wEvent.ToPod() - if err != nil { - logger.Log.Errorf(uiUtils.Error, err) - cancel() - continue - } - - logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase) - - if modifiedPod.Status.Phase == core.PodPending { - if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue { - logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message) - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath())) - cancel() - break - } - - if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" { - logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message) - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath())) - cancel() - break - } - } - - if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { - isPodReady = true - go startProxyReportErrorIfAny(kubernetesProvider, cancel) - - url := GetApiServerUrl() - if err := apiProvider.TestConnection(); err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath())) - cancel() - break - } - - logger.Log.Infof("Mizu is available at %s", url) - if !config.Config.HeadlessMode { - uiUtils.OpenBrowser(url) - } - if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil { - logger.Log.Debugf("[Error] failed update tapped pods %v", err) - } - } - case kubernetes.EventBookmark: - break - case kubernetes.EventError: - break - } - case err, ok := <-errorChan: - if !ok { - errorChan = nil - continue - } - - logger.Log.Errorf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err) - cancel() - - case <-timeAfter: - if !isPodReady { - logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time") - cancel() - } - case <-ctx.Done(): - logger.Log.Debugf("Watching API Server pod loop, ctx done") - return - } - } -} - -func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { - podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName)) - podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) - eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) - - for { - select { - case wEvent, ok := <-eventChan: - if !ok { - eventChan = nil - continue - } - - pod, err := wEvent.ToPod() - if err != nil { - logger.Log.Errorf(uiUtils.Error, err) - cancel() - continue - } - - switch wEvent.Type { - case kubernetes.EventAdded: - logger.Log.Debugf("Tapper is created [%s]", pod.Name) - case kubernetes.EventDeleted: - logger.Log.Debugf("Tapper is removed [%s]", pod.Name) - case kubernetes.EventModified: - if pod.Status.Phase == core.PodPending && pod.Status.Conditions[0].Type == core.PodScheduled && pod.Status.Conditions[0].Status != core.ConditionTrue { - logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", pod.Name, pod.Status.Conditions[0].Message)) - cancel() - continue - } - - podStatus := pod.Status - - if podStatus.Phase == core.PodRunning { - state := podStatus.ContainerStatuses[0].State - if state.Terminated != nil { - switch state.Terminated.Reason { - case "OOMKilled": - logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", pod.Name)) - } - } - } - - logger.Log.Debugf("Tapper %s is %s", pod.Name, strings.ToLower(string(podStatus.Phase))) - case kubernetes.EventBookmark: - break - case kubernetes.EventError: - break - } - case err, ok := <-errorChan: - if !ok { - errorChan = nil - continue - } - - logger.Log.Errorf("[Error] Error in mizu tapper pod watch, err: %v", err) - cancel() - - case <-ctx.Done(): - logger.Log.Debugf("Watching tapper pod loop, ctx done") - return - } - } -} - -func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, startTime time.Time) { - // Round down because k8s CreationTimestamp is given in 1 sec resolution. - startTime = startTime.Truncate(time.Second) - - mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix)) - eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex) + podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s", kubernetes.ApiServerPodName)) + eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, podExactRegex, "pod") eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper) - for { select { case wEvent, ok := <-eventChan: @@ -732,16 +566,46 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide event, err := wEvent.ToEvent() if err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource event: %+v", err)) - cancel() + logger.Log.Errorf(fmt.Sprintf("Error parsing Mizu resource event: %+v", err)) } - if startTime.After(event.CreationTimestamp.Time) { + if state.startTime.After(event.CreationTimestamp.Time) { continue } - if event.Type == core.EventTypeWarning { - logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note)) + logger.Log.Debugf( + fmt.Sprintf("Watching API server events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s", + event.Name, + event.CreationTimestamp.Time, + event.Regarding.Name, + event.Regarding.Kind, + event.Reason, + event.Note)) + + switch event.Reason { + case "Started": + go startProxyReportErrorIfAny(kubernetesProvider, cancel) + + url := GetApiServerUrl() + if err := apiProvider.TestConnection(); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath())) + cancel() + break + } + options, _ := getMizuApiFilteringOptions() + if err = startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, *options, state.startTime); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error starting mizu tapper syncer: %v", err)) + cancel() + } + + logger.Log.Infof("Mizu is available at %s", url) + if !config.Config.HeadlessMode { + uiUtils.OpenBrowser(url) + } + case "FailedScheduling", "Failed", "Killing": + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Mizu API Server status: %s - %s", event.Reason, event.Note)) + cancel() + break } case err, ok := <-errorChan: if !ok { @@ -749,11 +613,9 @@ func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provide continue } - logger.Log.Errorf("error in watch mizu resource events loop: %+v", err) - cancel() - + logger.Log.Errorf("Watching API server events loop, error: %+v", err) case <-ctx.Done(): - logger.Log.Debugf("watching Mizu resource events loop, ctx done") + logger.Log.Debugf("Watching API server events loop, ctx done") return } } diff --git a/shared/kubernetes/eventWatchHelper.go b/shared/kubernetes/eventWatchHelper.go index 3ec3cb956..332fdd4e6 100644 --- a/shared/kubernetes/eventWatchHelper.go +++ b/shared/kubernetes/eventWatchHelper.go @@ -3,6 +3,7 @@ package kubernetes import ( "context" "regexp" + "strings" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" @@ -10,13 +11,15 @@ import ( type EventWatchHelper struct { kubernetesProvider *Provider - NameRegexFilter *regexp.Regexp + NameRegexFilter *regexp.Regexp + Kind string } -func NewEventWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp) *EventWatchHelper { +func NewEventWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp, kind string) *EventWatchHelper { return &EventWatchHelper{ kubernetesProvider: kubernetesProvider, - NameRegexFilter: NameRegexFilter, + NameRegexFilter: NameRegexFilter, + Kind: kind, } } @@ -31,6 +34,10 @@ func (wh *EventWatchHelper) Filter(wEvent *WatchEvent) (bool, error) { return false, nil } + if strings.ToLower(event.Regarding.Kind) != strings.ToLower(wh.Kind) { + return false, nil + } + return true, nil } diff --git a/shared/kubernetes/mizuTapperSyncer.go b/shared/kubernetes/mizuTapperSyncer.go index ee1f86ad4..fd11ded3e 100644 --- a/shared/kubernetes/mizuTapperSyncer.go +++ b/shared/kubernetes/mizuTapperSyncer.go @@ -23,13 +23,15 @@ type TappedPodChangeEvent struct { // MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created type MizuTapperSyncer struct { - context context.Context - CurrentlyTappedPods []core.Pod - config TapperSyncerConfig - kubernetesProvider *Provider - TapPodChangesOut chan TappedPodChangeEvent - ErrorOut chan K8sTapManagerError - nodeToTappedPodIPMap map[string][]string + startTime time.Time + context context.Context + CurrentlyTappedPods []core.Pod + config TapperSyncerConfig + kubernetesProvider *Provider + TapPodChangesOut chan TappedPodChangeEvent + TapperStatusChangedOut chan shared.TapperStatus + ErrorOut chan K8sTapManagerError + nodeToTappedPodIPMap map[string][]string } type TapperSyncerConfig struct { @@ -46,14 +48,16 @@ type TapperSyncerConfig struct { Istio bool } -func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig) (*MizuTapperSyncer, error) { +func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig, startTime time.Time) (*MizuTapperSyncer, error) { syncer := &MizuTapperSyncer{ - context: ctx, - CurrentlyTappedPods: make([]core.Pod, 0), - config: config, - kubernetesProvider: kubernetesProvider, - TapPodChangesOut: make(chan TappedPodChangeEvent, 100), - ErrorOut: make(chan K8sTapManagerError, 100), + startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution. + context: ctx, + CurrentlyTappedPods: make([]core.Pod, 0), + config: config, + kubernetesProvider: kubernetesProvider, + TapPodChangesOut: make(chan TappedPodChangeEvent, 100), + TapperStatusChangedOut: make(chan shared.TapperStatus, 100), + ErrorOut: make(chan K8sTapManagerError, 100), } if err, _ := syncer.updateCurrentlyTappedPods(); err != nil { @@ -65,9 +69,72 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro } go syncer.watchPodsForTapping() + go syncer.watchTapperEvents() return syncer, nil } +func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() { + mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName)) + eventWatchHelper := NewEventWatchHelper(tapperSyncer.kubernetesProvider, mizuResourceRegex, "pod") + eventChan, errorChan := FilteredWatch(tapperSyncer.context, eventWatchHelper, []string{tapperSyncer.config.MizuResourcesNamespace}, eventWatchHelper) + + for { + select { + case wEvent, ok := <-eventChan: + if !ok { + eventChan = nil + continue + } + + event, err := wEvent.ToEvent() + if err != nil { + logger.Log.Errorf(fmt.Sprintf("Error parsing Mizu resource event: %+v", err)) + } + + if tapperSyncer.startTime.After(event.CreationTimestamp.Time) { + continue + } + + logger.Log.Debugf( + fmt.Sprintf("Watching tapper events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s", + event.Name, + event.CreationTimestamp.Time, + event.Regarding.Name, + event.Regarding.Kind, + event.Reason, + event.Note)) + + pod, err1 := tapperSyncer.kubernetesProvider.GetPod(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, event.Regarding.Name) + if err1 != nil { + logger.Log.Debugf(fmt.Sprintf("Failed to get tapper pod %s", event.Regarding.Name)) + continue + } + + nodeName := "" + if event.Reason != "FailedScheduling" { + nodeName = pod.Spec.NodeName + } else { + nodeName = pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values[0] + } + + taperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: event.Reason} + tapperSyncer.TapperStatusChangedOut <- taperStatus + + case err, ok := <-errorChan: + if !ok { + errorChan = nil + continue + } + + logger.Log.Errorf("Watching tapper events loop, error: %+v", err) + + case <-tapperSyncer.context.Done(): + logger.Log.Debugf("Watching tapper events loop, ctx done") + return + } + } +} + func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() { podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex) eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper) @@ -108,7 +175,6 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() { continue } - switch wEvent.Type { case EventAdded: logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace) diff --git a/shared/kubernetes/provider.go b/shared/kubernetes/provider.go index 523b9c503..41f32214f 100644 --- a/shared/kubernetes/provider.go +++ b/shared/kubernetes/provider.go @@ -351,8 +351,8 @@ func (provider *Provider) CreateService(ctx context.Context, namespace string, s } func (provider *Provider) DoesServicesExist(ctx context.Context, namespace string, name string) (bool, error) { - resource, err := provider.clientSet.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) - return provider.doesResourceExist(resource, err) + serviceResource, err := provider.clientSet.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) + return provider.doesResourceExist(serviceResource, err) } func (provider *Provider) doesResourceExist(resource interface{}, err error) (bool, error) { @@ -642,7 +642,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac "--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp), "--nodefrag", } - + if istio { mizuCmd = append(mizuCmd, "--procfs", procfsMountPath, "--istio") } @@ -653,13 +653,13 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac agentContainer.WithImagePullPolicy(imagePullPolicy) caps := applyconfcore.Capabilities().WithDrop("ALL").WithAdd("NET_RAW").WithAdd("NET_ADMIN") - + if istio { - caps = caps.WithAdd("SYS_ADMIN") // for reading /proc/PID/net/ns - caps = caps.WithAdd("SYS_PTRACE") // for setting netns to other process + caps = caps.WithAdd("SYS_ADMIN") // for reading /proc/PID/net/ns + caps = caps.WithAdd("SYS_PTRACE") // for setting netns to other process caps = caps.WithAdd("DAC_OVERRIDE") // for reading /proc/PID/environ } - + agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithCapabilities(caps)) agentContainer.WithCommand(mizuCmd...) @@ -780,10 +780,10 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac return err } -func (provider *Provider) ListAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) { +func (provider *Provider) listPodsImpl(ctx context.Context, regex *regexp.Regexp, namespaces []string, listOptions metav1.ListOptions) ([]core.Pod, error) { var pods []core.Pod for _, namespace := range namespaces { - namespacePods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + namespacePods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, listOptions) if err != nil { return nil, fmt.Errorf("failed to get pods in ns: [%s], %w", namespace, err) } @@ -800,6 +800,14 @@ func (provider *Provider) ListAllPodsMatchingRegex(ctx context.Context, regex *r return matchingPods, nil } +func (provider *Provider) ListAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) { + return provider.listPodsImpl(ctx, regex, namespaces, metav1.ListOptions{}) +} + +func (provider *Provider) GetPod(ctx context.Context, namespaces string, podName string) (*core.Pod, error) { + return provider.clientSet.CoreV1().Pods(namespaces).Get(ctx, podName, metav1.GetOptions{}) +} + func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) { pods, err := provider.ListAllPodsMatchingRegex(ctx, regex, namespaces) if err != nil { diff --git a/shared/kubernetes/utils.go b/shared/kubernetes/utils.go index 6a020e259..8aa84344b 100644 --- a/shared/kubernetes/utils.go +++ b/shared/kubernetes/utils.go @@ -57,11 +57,10 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod { return missingPods } - func GetPodInfosForPods(pods []core.Pod) []shared.PodInfo { podInfos := make([]shared.PodInfo, 0) for _, pod := range pods { - podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace}) + podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName}) } return podInfos } diff --git a/shared/models.go b/shared/models.go index 1d3893274..c12edcb72 100644 --- a/shared/models.go +++ b/shared/models.go @@ -67,6 +67,12 @@ type WebSocketStatusMessage struct { TappingStatus TapStatus `json:"tappingStatus"` } +type TapperStatus struct { + TapperName string `json:"tapperName"` + NodeName string `json:"nodeName"` + Status string `json:"status"` +} + type TapStatus struct { Pods []PodInfo `json:"pods"` TLSLinks []TLSLinkInfo `json:"tlsLinks"` @@ -75,6 +81,7 @@ type TapStatus struct { type PodInfo struct { Namespace string `json:"namespace"` Name string `json:"name"` + NodeName string `json:"nodeName"` } type TLSLinkInfo struct { @@ -110,8 +117,9 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc } type HealthResponse struct { - TapStatus TapStatus `json:"tapStatus"` - TappersCount int `json:"tappersCount"` + TapStatus TapStatus `json:"tapStatus"` + TappersCount int `json:"tappersCount"` + TappersStatus []TapperStatus `json:"tappersStatus"` } type VersionResponse struct {