From 2834ae1e85e6eed3eab4788eaf625cdf00f70453 Mon Sep 17 00:00:00 2001 From: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com> Date: Tue, 4 Jan 2022 09:48:22 +0200 Subject: [PATCH 1/5] Support custom RBAC resources (#572) Motivation: Allow users to change the default RBAC resources (ServiceAccount, ClusterRole, ClusterRoleBinding, Role and RoleBinding) without having Mizu delete them every run. Adds app.kubernetes.io/created-by and app.kubernetes.io/managed-by labels to all resources. The value of app.kubernetes.io/created-by is either mizu-cli or mizu-agent. The value of app.kubernetes.io/managed-by is mizu. When Mizu cleans resources (ctrl-c in tap cmd or mizu clean cmd) it removes all RBAC resources that have managed-by=mizu, and only those. A user may have a ClusterRole named mizu-clusterrole. If it doesn't have the label app.kubernetes.io/managed-by=mizu, then Mizu won't overwrite it and won't delete it. Other resources (deployments, services etc.) are always removed, regardless of their labels. --- cli/resources/cleanResources.go | 71 ++++++++++++------ shared/kubernetes/consts.go | 9 +++ shared/kubernetes/provider.go | 128 +++++++++++++++++++++++++++++--- 3 files changed, 175 insertions(+), 33 deletions(-) diff --git a/cli/resources/cleanResources.go b/cli/resources/cleanResources.go index 8fc3b9936..e92e1471d 100644 --- a/cli/resources/cleanResources.go +++ b/cli/resources/cleanResources.go @@ -42,14 +42,29 @@ func cleanUpNonRestrictedMode(ctx context.Context, cancel context.CancelFunc, ku defer waitUntilNamespaceDeleted(ctx, cancel, kubernetesProvider, mizuResourcesNamespace) } - if err := kubernetesProvider.RemoveClusterRole(ctx, kubernetes.ClusterRoleName); err != nil { - resourceDesc := fmt.Sprintf("ClusterRole %s", kubernetes.ClusterRoleName) + if resources, err := kubernetesProvider.ListManagedClusterRoles(ctx); err != nil { + resourceDesc := "ClusterRoles" handleDeletionError(err, resourceDesc, &leftoverResources) + } else { + for _, resource := range resources.Items { + if err := kubernetesProvider.RemoveClusterRole(ctx, resource.Name); err != nil { + resourceDesc := fmt.Sprintf("ClusterRole %s", resource.Name) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + } } - if err := kubernetesProvider.RemoveClusterRoleBinding(ctx, kubernetes.ClusterRoleBindingName); err != nil { - resourceDesc := fmt.Sprintf("ClusterRoleBinding %s", kubernetes.ClusterRoleBindingName) + + if resources, err := kubernetesProvider.ListManagedClusterRoleBindings(ctx); err != nil { + resourceDesc := "ClusterRoleBindings" handleDeletionError(err, resourceDesc, &leftoverResources) + } else { + for _, resource := range resources.Items { + if err := kubernetesProvider.RemoveClusterRoleBinding(ctx, resource.Name); err != nil { + resourceDesc := fmt.Sprintf("ClusterRoleBinding %s", resource.Name) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + } } return leftoverResources @@ -91,14 +106,40 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P handleDeletionError(err, resourceDesc, &leftoverResources) } - if err := kubernetesProvider.RemoveServicAccount(ctx, mizuResourcesNamespace, kubernetes.ServiceAccountName); err != nil { - resourceDesc := fmt.Sprintf("Service Account %s in namespace %s", kubernetes.ServiceAccountName, mizuResourcesNamespace) + if resources, err := kubernetesProvider.ListManagedServiceAccounts(ctx, mizuResourcesNamespace); err != nil { + resourceDesc := fmt.Sprintf("ServiceAccounts in namespace %s", mizuResourcesNamespace) handleDeletionError(err, resourceDesc, &leftoverResources) + } else { + for _, resource := range resources.Items { + if err := kubernetesProvider.RemoveServicAccount(ctx, mizuResourcesNamespace, resource.Name); err != nil { + resourceDesc := fmt.Sprintf("ServiceAccount %s in namespace %s", resource.Name, mizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + } } - if err := kubernetesProvider.RemoveRole(ctx, mizuResourcesNamespace, kubernetes.RoleName); err != nil { - resourceDesc := fmt.Sprintf("Role %s in namespace %s", kubernetes.RoleName, mizuResourcesNamespace) + if resources, err := kubernetesProvider.ListManagedRoles(ctx, mizuResourcesNamespace); err != nil { + resourceDesc := fmt.Sprintf("Roles in namespace %s", mizuResourcesNamespace) handleDeletionError(err, resourceDesc, &leftoverResources) + } else { + for _, resource := range resources.Items { + if err := kubernetesProvider.RemoveRole(ctx, mizuResourcesNamespace, resource.Name); err != nil { + resourceDesc := fmt.Sprintf("Role %s in namespace %s", resource.Name, mizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + } + } + + if resources, err := kubernetesProvider.ListManagedRoleBindings(ctx, mizuResourcesNamespace); err != nil { + resourceDesc := fmt.Sprintf("RoleBindings in namespace %s", mizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } else { + for _, resource := range resources.Items { + if err := kubernetesProvider.RemoveRoleBinding(ctx, mizuResourcesNamespace, resource.Name); err != nil { + resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", resource.Name, mizuResourcesNamespace) + handleDeletionError(err, resourceDesc, &leftoverResources) + } + } } if err := kubernetesProvider.RemovePod(ctx, mizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil { @@ -107,10 +148,6 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P } //install mode resources - if err := kubernetesProvider.RemoveRoleBinding(ctx, mizuResourcesNamespace, kubernetes.RoleBindingName); err != nil { - resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", kubernetes.RoleBindingName, mizuResourcesNamespace) - handleDeletionError(err, resourceDesc, &leftoverResources) - } if err := kubernetesProvider.RemoveDeployment(ctx, mizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil { resourceDesc := fmt.Sprintf("Deployment %s in namespace %s", kubernetes.ApiServerPodName, mizuResourcesNamespace) @@ -122,16 +159,6 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P handleDeletionError(err, resourceDesc, &leftoverResources) } - if err := kubernetesProvider.RemoveRole(ctx, mizuResourcesNamespace, kubernetes.DaemonRoleName); err != nil { - resourceDesc := fmt.Sprintf("Role %s in namespace %s", kubernetes.DaemonRoleName, mizuResourcesNamespace) - handleDeletionError(err, resourceDesc, &leftoverResources) - } - - if err := kubernetesProvider.RemoveRoleBinding(ctx, mizuResourcesNamespace, kubernetes.DaemonRoleBindingName); err != nil { - resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", kubernetes.DaemonRoleBindingName, mizuResourcesNamespace) - handleDeletionError(err, resourceDesc, &leftoverResources) - } - return leftoverResources } diff --git a/shared/kubernetes/consts.go b/shared/kubernetes/consts.go index 68acdd1ec..de84d5714 100644 --- a/shared/kubernetes/consts.go +++ b/shared/kubernetes/consts.go @@ -17,3 +17,12 @@ const ( PersistentVolumeClaimName = MizuResourcesPrefix + "volume-claim" MinKubernetesServerVersion = "1.16.0" ) + +const ( + LabelPrefixApp = "app.kubernetes.io/" + LabelManagedBy = LabelPrefixApp + "managed-by" + LabelCreatedBy = LabelPrefixApp + "created-by" + LabelValueMizu = "mizu" + LabelValueMizuCLI = "mizu-cli" + LabelValueMizuAgent = "mizu-agent" +) diff --git a/shared/kubernetes/provider.go b/shared/kubernetes/provider.go index a1a0195d1..d0a28cc48 100644 --- a/shared/kubernetes/provider.go +++ b/shared/kubernetes/provider.go @@ -43,6 +43,8 @@ type Provider struct { kubernetesConfig clientcmd.ClientConfig clientConfig restclient.Config Namespace string + managedBy string + createdBy string } const ( @@ -86,6 +88,8 @@ func NewProvider(kubeConfigPath string) (*Provider, error) { clientSet: clientSet, kubernetesConfig: kubernetesConfig, clientConfig: *restClientConfig, + managedBy: LabelValueMizu, + createdBy: LabelValueMizuCLI, }, nil } @@ -103,6 +107,8 @@ func NewProviderInCluster() (*Provider, error) { clientSet: clientSet, kubernetesConfig: nil, // not relevant in cluster clientConfig: *restClientConfig, + managedBy: LabelValueMizu, + createdBy: LabelValueMizuAgent, }, nil } @@ -158,6 +164,10 @@ func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*co namespaceSpec := &core.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: name, + Labels: map[string]string{ + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, } return provider.clientSet.CoreV1().Namespaces().Create(ctx, namespaceSpec, metav1.CreateOptions{}) @@ -243,7 +253,11 @@ func (provider *Provider) GetMizuApiServerPodObject(opts *ApiServerOptions, moun pod := &core.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: opts.PodName, - Labels: map[string]string{"app": opts.PodName}, + Labels: map[string]string{ + "app": opts.PodName, + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, Spec: core.PodSpec{ Containers: []core.Container{ @@ -303,6 +317,10 @@ func (provider *Provider) CreateDeployment(ctx context.Context, namespace string deployment := &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: deploymentName, + Labels: map[string]string{ + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, Spec: v1.DeploymentSpec{ Selector: &metav1.LabelSelector{ @@ -319,6 +337,10 @@ func (provider *Provider) CreateService(ctx context.Context, namespace string, s service := core.Service{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, + Labels: map[string]string{ + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, Spec: core.ServiceSpec{ Ports: []core.ServicePort{{TargetPort: intstr.FromInt(shared.DefaultApiServerPort), Port: 80}}, @@ -351,13 +373,21 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string, serviceAccount := &core.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: serviceAccountName, - Labels: map[string]string{"mizu-cli-version": version}, + Labels: map[string]string{ + "mizu-cli-version": version, + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, } clusterRole := &rbac.ClusterRole{ ObjectMeta: metav1.ObjectMeta{ Name: clusterRoleName, - Labels: map[string]string{"mizu-cli-version": version}, + Labels: map[string]string{ + "mizu-cli-version": version, + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, Rules: []rbac.PolicyRule{ { @@ -370,7 +400,11 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string, clusterRoleBinding := &rbac.ClusterRoleBinding{ ObjectMeta: metav1.ObjectMeta{ Name: clusterRoleBindingName, - Labels: map[string]string{"mizu-cli-version": version}, + Labels: map[string]string{ + "mizu-cli-version": version, + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, RoleRef: rbac.RoleRef{ Name: clusterRoleName, @@ -404,13 +438,21 @@ func (provider *Provider) CreateMizuRBACNamespaceRestricted(ctx context.Context, serviceAccount := &core.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: serviceAccountName, - Labels: map[string]string{"mizu-cli-version": version}, + Labels: map[string]string{ + "mizu-cli-version": version, + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, } role := &rbac.Role{ ObjectMeta: metav1.ObjectMeta{ Name: roleName, - Labels: map[string]string{"mizu-cli-version": version}, + Labels: map[string]string{ + "mizu-cli-version": version, + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, Rules: []rbac.PolicyRule{ { @@ -423,7 +465,11 @@ func (provider *Provider) CreateMizuRBACNamespaceRestricted(ctx context.Context, roleBinding := &rbac.RoleBinding{ ObjectMeta: metav1.ObjectMeta{ Name: roleBindingName, - Labels: map[string]string{"mizu-cli-version": version}, + Labels: map[string]string{ + "mizu-cli-version": version, + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, RoleRef: rbac.RoleRef{ Name: roleName, @@ -457,7 +503,11 @@ func (provider *Provider) CreateDaemonsetRBAC(ctx context.Context, namespace str role := &rbac.Role{ ObjectMeta: metav1.ObjectMeta{ Name: roleName, - Labels: map[string]string{"mizu-cli-version": version}, + Labels: map[string]string{ + "mizu-cli-version": version, + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, Rules: []rbac.PolicyRule{ { @@ -475,7 +525,11 @@ func (provider *Provider) CreateDaemonsetRBAC(ctx context.Context, namespace str roleBinding := &rbac.RoleBinding{ ObjectMeta: metav1.ObjectMeta{ Name: roleBindingName, - Labels: map[string]string{"mizu-cli-version": version}, + Labels: map[string]string{ + "mizu-cli-version": version, + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, RoleRef: rbac.RoleRef{ Name: roleName, @@ -588,6 +642,10 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, }, ObjectMeta: metav1.ObjectMeta{ Name: configMapName, + Labels: map[string]string{ + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, Data: configMapData, } @@ -746,14 +804,23 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac podSpec.WithVolumes(&configMapVolume, procfsVolume) podTemplate := applyconfcore.PodTemplateSpec() - podTemplate.WithLabels(map[string]string{"app": tapperPodName}) + podTemplate.WithLabels(map[string]string{ + "app": tapperPodName, + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }) podTemplate.WithSpec(podSpec) labelSelector := applyconfmeta.LabelSelector() labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName}) daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace) - daemonSet.WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate)) + daemonSet. + WithLabels(map[string]string{ + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }). + WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate)) _, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, metav1.ApplyOptions{FieldManager: fieldManagerName}) return err @@ -827,6 +894,41 @@ func (provider *Provider) GetNamespaceEvents(ctx context.Context, namespace stri return eventList.String(), nil } +func (provider *Provider) ListManagedServiceAccounts(ctx context.Context, namespace string) (*core.ServiceAccountList, error) { + listOptions := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", LabelManagedBy, provider.managedBy), + } + return provider.clientSet.CoreV1().ServiceAccounts(namespace).List(ctx, listOptions) +} + +func (provider *Provider) ListManagedClusterRoles(ctx context.Context) (*rbac.ClusterRoleList, error) { + listOptions := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", LabelManagedBy, provider.managedBy), + } + return provider.clientSet.RbacV1().ClusterRoles().List(ctx, listOptions) +} + +func (provider *Provider) ListManagedClusterRoleBindings(ctx context.Context) (*rbac.ClusterRoleBindingList, error) { + listOptions := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", LabelManagedBy, provider.managedBy), + } + return provider.clientSet.RbacV1().ClusterRoleBindings().List(ctx, listOptions) +} + +func (provider *Provider) ListManagedRoles(ctx context.Context, namespace string) (*rbac.RoleList, error) { + listOptions := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", LabelManagedBy, provider.managedBy), + } + return provider.clientSet.RbacV1().Roles(namespace).List(ctx, listOptions) +} + +func (provider *Provider) ListManagedRoleBindings(ctx context.Context, namespace string) (*rbac.RoleBindingList, error) { + listOptions := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", LabelManagedBy, provider.managedBy), + } + return provider.clientSet.RbacV1().RoleBindings(namespace).List(ctx, listOptions) +} + func (provider *Provider) IsDefaultStorageProviderAvailable(ctx context.Context) (bool, error) { storageClassList, err := provider.clientSet.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{}) if err != nil { @@ -845,6 +947,10 @@ func (provider *Provider) CreatePersistentVolumeClaim(ctx context.Context, names volumeClaim := &core.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: volumeClaimName, + Labels: map[string]string{ + LabelManagedBy: provider.managedBy, + LabelCreatedBy: provider.createdBy, + }, }, Spec: core.PersistentVolumeClaimSpec{ AccessModes: []core.PersistentVolumeAccessMode{core.ReadWriteOnce}, From 02b2cbaa03a7ce581315fa0fd4def336816d04f5 Mon Sep 17 00:00:00 2001 From: RoyUP9 <87927115+RoyUP9@users.noreply.github.com> Date: Tue, 4 Jan 2022 17:27:14 +0200 Subject: [PATCH 2/5] Added update config route for install mode (#581) --- agent/main.go | 75 +--------- agent/pkg/api/socket_routes.go | 9 +- .../pkg/controllers/standalone_controller.go | 133 ++++++++++++++++++ agent/pkg/controllers/status_controller.go | 15 +- agent/pkg/models/models.go | 4 + agent/pkg/providers/status_provider.go | 1 - agent/pkg/routes/standalone_routes.go | 13 ++ shared/kubernetes/mizuTapperSyncer.go | 8 +- 8 files changed, 179 insertions(+), 79 deletions(-) create mode 100644 agent/pkg/controllers/standalone_controller.go create mode 100644 agent/pkg/routes/standalone_routes.go diff --git a/agent/main.go b/agent/main.go index e9609c586..40b92fd54 100644 --- a/agent/main.go +++ b/agent/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "encoding/json" "errors" "flag" @@ -11,7 +10,6 @@ import ( "mizuserver/pkg/config" "mizuserver/pkg/controllers" "mizuserver/pkg/models" - "mizuserver/pkg/providers" "mizuserver/pkg/routes" "mizuserver/pkg/up9" "mizuserver/pkg/utils" @@ -22,14 +20,12 @@ import ( "path" "path/filepath" "plugin" - "regexp" "sort" "strconv" "strings" "syscall" "time" - "github.com/up9inc/mizu/shared/kubernetes" v1 "k8s.io/api/core/v1" "github.com/antelman107/net-wait-go/wait" @@ -256,13 +252,18 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) { app.Use(DisableRootStaticCache()) if err := setUIMode(); err != nil { - logger.Log.Panicf("Error setting ui mode, err: %v", err) + logger.Log.Errorf("Error setting ui mode, err: %v", err) } app.Use(static.ServeRoot("/", "./site")) app.Use(CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before api.WebSocketRoutes(app, &eventHandlers, startTime) + + if config.Config.StandaloneMode { + routes.StandaloneRoutes(app) + } + routes.QueryRoutes(app) routes.EntriesRoutes(app) routes.MetadataRoutes(app) @@ -471,67 +472,3 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) { } } } - -func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, targetNamespaces []string, podFilterRegex regexp.Regexp, ignoredUserAgents []string, mizuApiFilteringOptions tapApi.TrafficFilteringOptions, istio bool) (*kubernetes.MizuTapperSyncer, error) { - tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ - TargetNamespaces: targetNamespaces, - PodFilterRegex: podFilterRegex, - MizuResourcesNamespace: config.Config.MizuResourcesNamespace, - AgentImage: config.Config.AgentImage, - TapperResources: config.Config.TapperResources, - ImagePullPolicy: v1.PullPolicy(config.Config.PullPolicy), - LogLevel: config.Config.LogLevel, - IgnoredUserAgents: ignoredUserAgents, - MizuApiFilteringOptions: mizuApiFilteringOptions, - MizuServiceAccountExists: true, //assume service account exists since install mode will not function without it anyway - Istio: istio, - }, time.Now()) - - if err != nil { - return nil, err - } - - // handle tapperSyncer events (pod changes and errors) - go func() { - for { - select { - case syncerErr, ok := <-tapperSyncer.ErrorOut: - if !ok { - logger.Log.Debug("mizuTapperSyncer err channel closed, ending listener loop") - return - } - logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr) - case tapPodChangeEvent, ok := <-tapperSyncer.TapPodChangesOut: - if !ok { - logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop") - return - } - providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)} - - tappedPodsStatus := utils.GetTappedPodsStatus() - - serializedTapStatus, err := json.Marshal(shared.CreateWebSocketStatusMessage(tappedPodsStatus)) - if err != nil { - logger.Log.Fatalf("error serializing tap status: %v", err) - } - api.BroadcastToBrowserClients(serializedTapStatus) - providers.ExpectedTapperAmount = tapPodChangeEvent.ExpectedTapperAmount - case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut: - if !ok { - logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop") - return - } - if providers.TappersStatus == nil { - providers.TappersStatus = make(map[string]shared.TapperStatus) - } - providers.TappersStatus[tapperStatus.NodeName] = tapperStatus - - case <-ctx.Done(): - logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") - return - } - } - }() - - return tapperSyncer, nil -} diff --git a/agent/pkg/api/socket_routes.go b/agent/pkg/api/socket_routes.go index a96bb11ff..89e8f45f0 100644 --- a/agent/pkg/api/socket_routes.go +++ b/agent/pkg/api/socket_routes.go @@ -83,10 +83,10 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even meta := make(chan []byte) defer func() { + socketCleanup(socketId, connectedWebsockets[socketId]) data <- []byte(basenine.CloseChannel) meta <- []byte(basenine.CloseChannel) connection.Close() - socketCleanup(socketId, connectedWebsockets[socketId]) }() eventHandlers.WebSocketConnect(socketId, isTapper) @@ -97,7 +97,12 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even for { _, msg, err := ws.ReadMessage() if err != nil { - logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err) + if _, ok := err.(*websocket.CloseError); ok { + logger.Log.Debugf("Received websocket close message, socket id: %d", socketId) + } else { + logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err) + } + break } diff --git a/agent/pkg/controllers/standalone_controller.go b/agent/pkg/controllers/standalone_controller.go new file mode 100644 index 000000000..ea9468772 --- /dev/null +++ b/agent/pkg/controllers/standalone_controller.go @@ -0,0 +1,133 @@ +package controllers + +import ( + "context" + "github.com/gin-gonic/gin" + "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/shared/kubernetes" + "github.com/up9inc/mizu/shared/logger" + tapApi "github.com/up9inc/mizu/tap/api" + v1 "k8s.io/api/core/v1" + "mizuserver/pkg/config" + "mizuserver/pkg/models" + "mizuserver/pkg/providers" + "net/http" + "regexp" + "time" +) + +var globalTapConfig *models.StandaloneTapConfig +var cancelTapperSyncer context.CancelFunc +var kubernetesProvider *kubernetes.Provider + +func PostTapConfig(c *gin.Context) { + tapConfig := &models.StandaloneTapConfig{} + + if err := c.Bind(tapConfig); err != nil { + c.JSON(http.StatusBadRequest, err) + return + } + + if cancelTapperSyncer != nil { + cancelTapperSyncer() + + providers.TapStatus = shared.TapStatus{} + providers.TappersStatus = make(map[string]shared.TapperStatus) + + broadcastTappedPodsStatus() + } + + if kubernetesProvider == nil { + var err error + kubernetesProvider, err = kubernetes.NewProviderInCluster() + if err != nil { + c.JSON(http.StatusBadRequest, err) + return + } + } + + ctx, cancel := context.WithCancel(context.Background()) + + var tappedNamespaces []string + for namespace, tapped := range tapConfig.TappedNamespaces { + if tapped { + tappedNamespaces = append(tappedNamespaces, namespace) + } + } + + podRegex, _ := regexp.Compile(".*") + + if _, err := startMizuTapperSyncer(ctx, kubernetesProvider, tappedNamespaces, *podRegex, []string{} , tapApi.TrafficFilteringOptions{}, false); err != nil { + c.JSON(http.StatusBadRequest, err) + cancel() + return + } + + cancelTapperSyncer = cancel + globalTapConfig = tapConfig + + c.JSON(http.StatusOK, "OK") +} + +func GetTapConfig(c *gin.Context) { + if globalTapConfig != nil { + c.JSON(http.StatusOK, globalTapConfig) + } + + c.JSON(http.StatusBadRequest, "Not config found") +} + +func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, targetNamespaces []string, podFilterRegex regexp.Regexp, ignoredUserAgents []string, mizuApiFilteringOptions tapApi.TrafficFilteringOptions, istio bool) (*kubernetes.MizuTapperSyncer, error) { + tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ + TargetNamespaces: targetNamespaces, + PodFilterRegex: podFilterRegex, + MizuResourcesNamespace: config.Config.MizuResourcesNamespace, + AgentImage: config.Config.AgentImage, + TapperResources: config.Config.TapperResources, + ImagePullPolicy: v1.PullPolicy(config.Config.PullPolicy), + LogLevel: config.Config.LogLevel, + IgnoredUserAgents: ignoredUserAgents, + MizuApiFilteringOptions: mizuApiFilteringOptions, + MizuServiceAccountExists: true, //assume service account exists since install mode will not function without it anyway + Istio: istio, + }, time.Now()) + + if err != nil { + return nil, err + } + + // handle tapperSyncer events (pod changes and errors) + go func() { + for { + select { + case syncerErr, ok := <-tapperSyncer.ErrorOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer err channel closed, ending listener loop") + return + } + logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr) + case _, ok := <-tapperSyncer.TapPodChangesOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop") + return + } + + providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)} + broadcastTappedPodsStatus() + case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop") + return + } + + addTapperStatus(tapperStatus) + broadcastTappedPodsStatus() + case <-ctx.Done(): + logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") + return + } + } + }() + + return tapperSyncer, nil +} diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index 8819238db..7ffef36b6 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -54,21 +54,28 @@ func broadcastTappedPodsStatus() { } } +func addTapperStatus(tapperStatus shared.TapperStatus) { + if providers.TappersStatus == nil { + providers.TappersStatus = make(map[string]shared.TapperStatus) + } + + providers.TappersStatus[tapperStatus.NodeName] = tapperStatus +} + func PostTapperStatus(c *gin.Context) { tapperStatus := &shared.TapperStatus{} if err := c.Bind(tapperStatus); err != nil { c.JSON(http.StatusBadRequest, err) return } + if err := validation.Validate(tapperStatus); err != nil { c.JSON(http.StatusBadRequest, err) return } + logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus) - if providers.TappersStatus == nil { - providers.TappersStatus = make(map[string]shared.TapperStatus) - } - providers.TappersStatus[tapperStatus.NodeName] = *tapperStatus + addTapperStatus(*tapperStatus) broadcastTappedPodsStatus() } diff --git a/agent/pkg/models/models.go b/agent/pkg/models/models.go index 84f8e9842..fc55d5ac6 100644 --- a/agent/pkg/models/models.go +++ b/agent/pkg/models/models.go @@ -16,6 +16,10 @@ func GetEntry(r *tapApi.MizuEntry, v tapApi.DataUnmarshaler) error { return v.UnmarshalData(r) } +type StandaloneTapConfig struct { + TappedNamespaces map[string]bool `json:"tappedNamespaces"` +} + type EntriesRequest struct { LeftOff int `form:"leftOff" validate:"required,min=-1"` Direction int `form:"direction" validate:"required,oneof='1' '-1'"` diff --git a/agent/pkg/providers/status_provider.go b/agent/pkg/providers/status_provider.go index 9ed1a2a46..2f90b6d75 100644 --- a/agent/pkg/providers/status_provider.go +++ b/agent/pkg/providers/status_provider.go @@ -20,7 +20,6 @@ var ( TappersStatus map[string]shared.TapperStatus authStatus *models.AuthStatus RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime) - ExpectedTapperAmount = -1 //only relevant in install mode as cli manages tappers otherwise tappersCountLock = sync.Mutex{} ) diff --git a/agent/pkg/routes/standalone_routes.go b/agent/pkg/routes/standalone_routes.go new file mode 100644 index 000000000..1245c6506 --- /dev/null +++ b/agent/pkg/routes/standalone_routes.go @@ -0,0 +1,13 @@ +package routes + +import ( + "github.com/gin-gonic/gin" + "mizuserver/pkg/controllers" +) + +func StandaloneRoutes(ginApp *gin.Engine) { + routeGroup := ginApp.Group("/standalone") + + routeGroup.POST("/tapConfig", controllers.PostTapConfig) + routeGroup.GET("/tapConfig", controllers.GetTapConfig) +} diff --git a/shared/kubernetes/mizuTapperSyncer.go b/shared/kubernetes/mizuTapperSyncer.go index 2f9748a57..8b79540a9 100644 --- a/shared/kubernetes/mizuTapperSyncer.go +++ b/shared/kubernetes/mizuTapperSyncer.go @@ -18,7 +18,6 @@ const updateTappersDelay = 5 * time.Second type TappedPodChangeEvent struct { Added []core.Pod Removed []core.Pod - ExpectedTapperAmount int } // MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created @@ -93,6 +92,10 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperPods() { continue } + if tapperSyncer.startTime.After(pod.CreationTimestamp.Time) { + continue + } + logger.Log.Debugf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase) if pod.Spec.NodeName != "" { tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)} @@ -147,7 +150,7 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() { pod, err1 := tapperSyncer.kubernetesProvider.GetPod(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, event.Regarding.Name) if err1 != nil { - logger.Log.Debugf(fmt.Sprintf("Failed to get tapper pod %s", event.Regarding.Name)) + logger.Log.Debugf(fmt.Sprintf("Couldn't get tapper pod %s", event.Regarding.Name)) continue } @@ -284,7 +287,6 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{ Added: addedPods, Removed: removedPods, - ExpectedTapperAmount: len(tapperSyncer.nodeToTappedPodMap), } return nil, true } From 282baee881584b553fa0953273bf3f4061771cac Mon Sep 17 00:00:00 2001 From: lirazyehezkel <61656597+lirazyehezkel@users.noreply.github.com> Date: Tue, 4 Jan 2022 17:51:07 +0200 Subject: [PATCH 3/5] is standalone variable (#585) --- ui/public/index.html | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ui/public/index.html b/ui/public/index.html index 8485870fb..a8268a85a 100644 --- a/ui/public/index.html +++ b/ui/public/index.html @@ -25,6 +25,14 @@ Learn how to configure a non-root public URL by running `npm run build`. -->