diff --git a/cmd/check/kubernetesPermissions.go b/cmd/check/kubernetesPermissions.go index 657461631..94d00c820 100644 --- a/cmd/check/kubernetesPermissions.go +++ b/cmd/check/kubernetesPermissions.go @@ -12,7 +12,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" ) -func TapKubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool { +func KubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool { log.Info().Str("procedure", "kubernetes-permissions").Msg("Checking:") var filePath string diff --git a/cmd/check/kubernetesResources.go b/cmd/check/kubernetesResources.go index 36a73ac4a..5e2b85e45 100644 --- a/cmd/check/kubernetesResources.go +++ b/cmd/check/kubernetesResources.go @@ -66,33 +66,33 @@ func checkPodResourcesExist(ctx context.Context, kubernetesProvider *kubernetes. Str("name", kubernetes.HubPodName). Msg("Pod is running.") - if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.ResourcesNamespace, kubernetes.TapperPodName); err != nil { + if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.ResourcesNamespace, kubernetes.WorkerPodName); err != nil { log.Error(). - Str("name", kubernetes.TapperPodName). + Str("name", kubernetes.WorkerPodName). Err(err). Msg("While checking if pods are running!") return false } else { - tappers := 0 - notRunningTappers := 0 + workers := 0 + notRunningWorkers := 0 for _, pod := range pods { - tappers += 1 + workers += 1 if !kubernetes.IsPodRunning(&pod) { - notRunningTappers += 1 + notRunningWorkers += 1 } } - if notRunningTappers > 0 { + if notRunningWorkers > 0 { log.Error(). - Str("name", kubernetes.TapperPodName). - Msg(fmt.Sprintf("%d/%d pods are not running!", notRunningTappers, tappers)) + Str("name", kubernetes.WorkerPodName). + Msg(fmt.Sprintf("%d/%d pods are not running!", notRunningWorkers, workers)) return false } log.Info(). - Str("name", kubernetes.TapperPodName). - Msg(fmt.Sprintf("All %d pods are running.", tappers)) + Str("name", kubernetes.WorkerPodName). + Msg(fmt.Sprintf("All %d pods are running.", workers)) return true } } diff --git a/cmd/checkRunner.go b/cmd/checkRunner.go index 110ae42e6..20a981a3b 100644 --- a/cmd/checkRunner.go +++ b/cmd/checkRunner.go @@ -29,7 +29,7 @@ func runKubesharkCheck() { } if checkPassed { - checkPassed = check.TapKubernetesPermissions(ctx, embedFS, kubernetesProvider) + checkPassed = check.KubernetesPermissions(ctx, embedFS, kubernetesProvider) } if checkPassed { @@ -47,8 +47,8 @@ func runKubesharkCheck() { log.Info().Msg(fmt.Sprintf(utils.Green, "All checks are passed.")) } else { log.Error(). - Str("command1", "kubeshark clean"). - Str("command2", "kubeshark tap"). + Str("command1", fmt.Sprintf("kubeshark %s", cleanCmd.Use)). + Str("command2", fmt.Sprintf("kubeshark %s", deployCmd.Use)). Msg(fmt.Sprintf(utils.Red, "There are issues in your deployment! Run these commands:")) os.Exit(1) } diff --git a/cmd/common.go b/cmd/common.go index f714ae3d8..7e1d835f3 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -26,7 +26,7 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con if err != nil { log.Error(). Err(errormessage.FormatError(err)). - Msg(fmt.Sprintf("Error occured while running k8s proxy. Try setting different port by using --%s", configStructs.GuiPortTapName)) + Msg(fmt.Sprintf("Error occured while running k8s proxy. Try setting different port by using --%s", configStructs.ProxyPortLabel)) cancel() return } @@ -45,7 +45,7 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con log.Error(). Str("pod-regex", podRegex.String()). Err(errormessage.FormatError(err)). - Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port by using --%s", configStructs.GuiPortTapName)) + Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port by using --%s", configStructs.ProxyPortLabel)) cancel() return } diff --git a/cmd/deploy.go b/cmd/deploy.go index f393092fe..1cc6f832e 100644 --- a/cmd/deploy.go +++ b/cmd/deploy.go @@ -16,7 +16,7 @@ var deployCmd = &cobra.Command{ Short: "Deploy Kubeshark into your K8s cluster.", Long: `Deploy Kubeshark into your K8s cluster to gain visibility.`, RunE: func(cmd *cobra.Command, args []string) error { - RunKubesharkTap() + deploy() return nil }, PreRunE: func(cmd *cobra.Command, args []string) error { @@ -46,15 +46,15 @@ func init() { log.Debug().Err(err).Send() } - deployCmd.Flags().Uint16P(configStructs.GuiPortTapName, "p", defaultDeployConfig.GuiPort, "Provide a custom port for the web interface webserver") - deployCmd.Flags().StringSliceP(configStructs.NamespacesTapName, "n", defaultDeployConfig.Namespaces, "Namespaces selector") - deployCmd.Flags().BoolP(configStructs.AllNamespacesTapName, "A", defaultDeployConfig.AllNamespaces, "Tap all namespaces") - deployCmd.Flags().Bool(configStructs.EnableRedactionTapName, defaultDeployConfig.EnableRedaction, "Enables redaction of potentially sensitive request/response headers and body values") - deployCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultDeployConfig.HumanMaxEntriesDBSize, "Override the default max entries db size") + deployCmd.Flags().Uint16P(configStructs.ProxyPortLabel, "p", defaultDeployConfig.ProxyPort, "Provide a custom port for the web interface webserver.") + deployCmd.Flags().StringSliceP(configStructs.NamespacesLabel, "n", defaultDeployConfig.Namespaces, "Namespaces selector.") + deployCmd.Flags().BoolP(configStructs.AllNamespacesLabel, "A", defaultDeployConfig.AllNamespaces, "Deploy to all namespaces.") + deployCmd.Flags().Bool(configStructs.EnableRedactionLabel, defaultDeployConfig.EnableRedaction, "Enables redaction of potentially sensitive request/response headers and body values.") + deployCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeLabel, defaultDeployConfig.HumanMaxEntriesDBSize, "Override the default max entries db size.") deployCmd.Flags().String(configStructs.InsertionFilterName, defaultDeployConfig.InsertionFilter, "Set the insertion filter. Accepts string or a file path.") - deployCmd.Flags().Bool(configStructs.DryRunTapName, defaultDeployConfig.DryRun, "Preview of all pods matching the regex, without tapping them") - deployCmd.Flags().Bool(configStructs.ServiceMeshName, defaultDeployConfig.ServiceMesh, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls") - deployCmd.Flags().Bool(configStructs.TlsName, defaultDeployConfig.Tls, "Record tls traffic") - deployCmd.Flags().Bool(configStructs.ProfilerName, defaultDeployConfig.Profiler, "Run pprof server") - deployCmd.Flags().Int(configStructs.MaxLiveStreamsName, defaultDeployConfig.MaxLiveStreams, "Maximum live tcp streams to handle concurrently") + deployCmd.Flags().Bool(configStructs.DryRunLabel, defaultDeployConfig.DryRun, "Preview of all pods matching the regex, without deploying workers on them.") + deployCmd.Flags().Bool(configStructs.ServiceMeshName, defaultDeployConfig.ServiceMesh, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls.") + deployCmd.Flags().Bool(configStructs.TlsName, defaultDeployConfig.Tls, "Record tls traffic.") + deployCmd.Flags().Bool(configStructs.ProfilerName, defaultDeployConfig.Profiler, "Run pprof server.") + deployCmd.Flags().Int(configStructs.MaxLiveStreamsName, defaultDeployConfig.MaxLiveStreams, "Maximum live tcp streams to handle concurrently.") } diff --git a/cmd/deployRunner.go b/cmd/deployRunner.go index e6e370a73..ed130645d 100644 --- a/cmd/deployRunner.go +++ b/cmd/deployRunner.go @@ -27,19 +27,19 @@ import ( const cleanupTimeout = time.Minute -type tapState struct { +type deployState struct { startTime time.Time targetNamespaces []string kubesharkServiceAccountExists bool } -var state tapState +var state deployState var connector *connect.Connector var hubPodReady bool var frontPodReady bool var proxyDone bool -func RunKubesharkTap() { +func deploy() { state.startTime = time.Now() connector = connect.NewConnector(kubernetes.GetLocalhostOnPort(config.Config.Hub.PortForward.SrcPort), connect.DefaultRetries, connect.DefaultTimeout) @@ -63,14 +63,14 @@ func RunKubesharkTap() { if config.Config.IsNsRestrictedMode() { if len(state.targetNamespaces) != 1 || !utils.Contains(state.targetNamespaces, config.Config.ResourcesNamespace) { - log.Error().Msg(fmt.Sprintf("Kubeshark can't resolve IPs in other namespaces when running in namespace restricted mode. You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, config.ResourcesNamespaceConfigName)) + log.Error().Msg(fmt.Sprintf("Kubeshark can't resolve IPs in other namespaces when running in namespace restricted mode. You can use the same namespace for --%s and --%s", configStructs.NamespacesLabel, config.ResourcesNamespaceConfigName)) return } } log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targetting pods in:") - if err := printTappedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil { + if err := printTargettedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil { log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!") } @@ -79,7 +79,7 @@ func RunKubesharkTap() { } log.Info().Msg("Waiting for Kubeshark deployment to finish...") - if state.kubesharkServiceAccountExists, err = resources.CreateTapKubesharkResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.ResourcesNamespace, config.Config.Deploy.MaxEntriesDBSizeBytes(), config.Config.Deploy.HubResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Deploy.Profiler); err != nil { + if state.kubesharkServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.ResourcesNamespace, config.Config.Deploy.MaxEntriesDBSizeBytes(), config.Config.Deploy.HubResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Deploy.Profiler); err != nil { var statusError *k8serrors.StatusError if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) { log.Info().Msg("Kubeshark is already running in this namespace, change the `kubeshark-resources-namespace` configuration or run `kubeshark clean` to remove the currently running Kubeshark instance") @@ -91,7 +91,7 @@ func RunKubesharkTap() { return } - defer finishTapExecution(kubernetesProvider) + defer finishDeployExecution(kubernetesProvider) go goUtils.HandleExcWrapper(watchHubEvents, ctx, kubernetesProvider, cancel) go goUtils.HandleExcWrapper(watchHubPod, ctx, kubernetesProvider, cancel) @@ -101,7 +101,7 @@ func RunKubesharkTap() { utils.WaitForFinish(ctx, cancel) } -func finishTapExecution(kubernetesProvider *kubernetes.Provider) { +func finishDeployExecution(kubernetesProvider *kubernetes.Provider) { finishKubesharkExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.ResourcesNamespace) } @@ -110,7 +110,7 @@ func getDeployConfig() *models.Config { MaxDBSizeBytes: config.Config.Deploy.MaxEntriesDBSizeBytes(), InsertionFilter: config.Config.Deploy.GetInsertionFilter(), PullPolicy: config.Config.ImagePullPolicyStr, - TapperResources: config.Config.Deploy.TapperResources, + TapperResources: config.Config.Deploy.WorkerResources, KubesharkResourcesNamespace: config.Config.ResourcesNamespace, AgentDatabasePath: models.DataDirPath, ServiceMap: config.Config.ServiceMap, @@ -121,30 +121,30 @@ func getDeployConfig() *models.Config { } /* -this function is a bit problematic as it might be detached from the actual pods the Kubeshark Hub will tap. +This function is a bit problematic as it might be detached from the actual pods the Kubeshark that targets. The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any. */ -func printTappedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error { +func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error { if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Deploy.PodRegex(), namespaces); err != nil { return err } else { if len(matchingPods) == 0 { printNoPodsFoundSuggestion(namespaces) } - for _, tappedPod := range matchingPods { - log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, tappedPod.Name))) + for _, targettedPod := range matchingPods { + log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targettedPod.Name))) } return nil } } -func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error { - tapperSyncer, err := kubernetes.CreateAndStartKubesharkTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ +func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error { + workerSyncer, err := kubernetes.CreateAndStartWorkerSyncer(ctx, provider, kubernetes.WorkerSyncerConfig{ TargetNamespaces: targetNamespaces, PodFilterRegex: *config.Config.Deploy.PodRegex(), KubesharkResourcesNamespace: config.Config.ResourcesNamespace, - TapperResources: config.Config.Deploy.TapperResources, + WorkerResources: config.Config.Deploy.WorkerResources, ImagePullPolicy: config.Config.ImagePullPolicy(), LogLevel: config.Config.LogLevel(), KubesharkApiFilteringOptions: api.TrafficFilteringOptions{ @@ -163,31 +163,31 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider go func() { for { select { - case syncerErr, ok := <-tapperSyncer.ErrorOut: + case syncerErr, ok := <-workerSyncer.ErrorOut: if !ok { - log.Debug().Msg("kubesharkTapperSyncer err channel closed, ending listener loop") + log.Debug().Msg("workerSyncer err channel closed, ending listener loop") return } - log.Error().Msg(getErrorDisplayTextForK8sTapManagerError(syncerErr)) + log.Error().Msg(getK8sDeployManagerErrorText(syncerErr)) cancel() - case _, ok := <-tapperSyncer.TapPodChangesOut: + case _, ok := <-workerSyncer.DeployPodChangesOut: if !ok { - log.Debug().Msg("kubesharkTapperSyncer pod changes channel closed, ending listener loop") + log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop") return } - if err := connector.ReportTappedPods(tapperSyncer.CurrentlyTappedPods); err != nil { - log.Error().Err(err).Msg("failed update tapped pods.") + if err := connector.ReportTargettedPods(workerSyncer.CurrentlyTargettedPods); err != nil { + log.Error().Err(err).Msg("failed update targetted pods.") } - case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut: + case workerStatus, ok := <-workerSyncer.WorkerStatusChangedOut: if !ok { - log.Debug().Msg("kubesharkTapperSyncer tapper status changed channel closed, ending listener loop") + log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop") return } - if err := connector.ReportTapperStatus(tapperStatus); err != nil { - log.Error().Err(err).Msg("failed update tapper status.") + if err := connector.ReportWorkerStatus(workerStatus); err != nil { + log.Error().Err(err).Msg("failed update worker status.") } case <-ctx.Done(): - log.Debug().Msg("kubesharkTapperSyncer event listener loop exiting due to context done") + log.Debug().Msg("workerSyncer event listener loop exiting due to context done") return } } @@ -199,21 +199,21 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider func printNoPodsFoundSuggestion(targetNamespaces []string) { var suggestionStr string if !utils.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) { - suggestionStr = ". You can also try selecting a different namespace with -n or tap all namespaces with -A" + suggestionStr = ". You can also try selecting a different namespace with -n or target all namespaces with -A" } - log.Warn().Msg(fmt.Sprintf("Did not find any currently running pods that match the regex argument, kubeshark will automatically tap matching pods if any are created later%s", suggestionStr)) + log.Warn().Msg(fmt.Sprintf("Did not find any currently running pods that match the regex argument, kubeshark will automatically target matching pods if any are created later%s", suggestionStr)) } -func getErrorDisplayTextForK8sTapManagerError(err kubernetes.K8sTapManagerError) string { - switch err.TapManagerReason { - case kubernetes.TapManagerPodListError: - return fmt.Sprintf("Failed to update currently tapped pods: %v", err.OriginalError) - case kubernetes.TapManagerPodWatchError: - return fmt.Sprintf("Error occured in k8s pod watch: %v", err.OriginalError) - case kubernetes.TapManagerTapperUpdateError: - return fmt.Sprintf("Error updating tappers: %v", err.OriginalError) +func getK8sDeployManagerErrorText(err kubernetes.K8sDeployManagerError) string { + switch err.DeployManagerReason { + case kubernetes.DeployManagerPodListError: + return fmt.Sprintf("Failed to update currently targetted pods: %v", err.OriginalError) + case kubernetes.DeployManagerPodWatchError: + return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError) + case kubernetes.DeployManagerWorkerUpdateError: + return fmt.Sprintf("Error updating worker: %v", err.OriginalError) default: - return fmt.Sprintf("Unknown error occured in k8s tap manager: %v", err.OriginalError) + return fmt.Sprintf("Unknown error occured in K8s deploy manager: %v", err.OriginalError) } } @@ -450,8 +450,8 @@ func watchHubEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.HubServiceName, config.Config.Hub.PortForward.SrcPort, config.Config.Hub.PortForward.DstPort, "/echo") - if err := startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil { - log.Error().Err(errormessage.FormatError(err)).Msg("Error starting kubeshark tapper syncer") + if err := startWorkerSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil { + log.Error().Err(errormessage.FormatError(err)).Msg("Error starting kubeshark worker syncer") cancel() } diff --git a/cmd/openRunner.go b/cmd/openRunner.go index 5a2acdc06..5ee122430 100644 --- a/cmd/openRunner.go +++ b/cmd/openRunner.go @@ -34,7 +34,7 @@ func runOpen() { if !exists { log.Error(). Str("service", kubernetes.FrontServiceName). - Str("command", "kubeshark tap"). + Str("command", fmt.Sprintf("kubeshark %s", deployCmd.Use)). Msg("Service not found! You should run the command first:") cancel() return diff --git a/cmd/root.go b/cmd/root.go index 5f793d49d..dc8bbf7ce 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -36,7 +36,7 @@ func init() { } // Execute adds all child commands to the root command and sets flags appropriately. -// This is called by main.main(). It only needs to happen once to the tapCmd. +// This is called by main.main(). It only needs to happen once to the deployCmd. func Execute() { cobra.CheckErr(rootCmd.Execute()) } diff --git a/config/configStructs/tapConfig.go b/config/configStructs/tapConfig.go index 33a763914..e2ddb1f15 100644 --- a/config/configStructs/tapConfig.go +++ b/config/configStructs/tapConfig.go @@ -13,22 +13,22 @@ import ( ) const ( - GuiPortTapName = "gui-port" - NamespacesTapName = "namespaces" - AllNamespacesTapName = "all-namespaces" - EnableRedactionTapName = "redact" - HumanMaxEntriesDBSizeTapName = "max-entries-db-size" - InsertionFilterName = "insertion-filter" - DryRunTapName = "dry-run" - ServiceMeshName = "service-mesh" - TlsName = "tls" - ProfilerName = "profiler" - MaxLiveStreamsName = "max-live-streams" + ProxyPortLabel = "proxy-port" + NamespacesLabel = "namespaces" + AllNamespacesLabel = "all-namespaces" + EnableRedactionLabel = "redact" + HumanMaxEntriesDBSizeLabel = "max-entries-db-size" + InsertionFilterName = "insertion-filter" + DryRunLabel = "dry-run" + ServiceMeshName = "service-mesh" + TlsName = "tls" + ProfilerName = "profiler" + MaxLiveStreamsName = "max-live-streams" ) type DeployConfig struct { PodRegexStr string `yaml:"regex" default:".*"` - GuiPort uint16 `yaml:"gui-port" default:"8899"` + ProxyPort uint16 `yaml:"proxy-port" default:"8899"` ProxyHost string `yaml:"proxy-host" default:"127.0.0.1"` Namespaces []string `yaml:"namespaces"` AllNamespaces bool `yaml:"all-namespaces" default:"false"` @@ -45,7 +45,7 @@ type DeployConfig struct { InsertionFilter string `yaml:"insertion-filter" default:""` DryRun bool `yaml:"dry-run" default:"false"` HubResources models.Resources `yaml:"hub-resources"` - TapperResources models.Resources `yaml:"tapper-resources"` + WorkerResources models.Resources `yaml:"worker-resources"` ServiceMesh bool `yaml:"service-mesh" default:"false"` Tls bool `yaml:"tls" default:"false"` PacketCapture string `yaml:"packet-capture" default:"libpcap"` @@ -126,7 +126,7 @@ func (config *DeployConfig) Validate() error { _, parseHumanDataSizeErr := utils.HumanReadableToBytes(config.HumanMaxEntriesDBSize) if parseHumanDataSizeErr != nil { - return fmt.Errorf("Could not parse --%s value %s", HumanMaxEntriesDBSizeTapName, config.HumanMaxEntriesDBSize) + return fmt.Errorf("Could not parse --%s value %s", HumanMaxEntriesDBSizeLabel, config.HumanMaxEntriesDBSize) } return nil diff --git a/errormessage/errormessage.go b/errormessage/errormessage.go index 31681850c..a18e15a3d 100644 --- a/errormessage/errormessage.go +++ b/errormessage/errormessage.go @@ -17,7 +17,7 @@ func FormatError(err error) error { if k8serrors.IsForbidden(err) { errorNew = fmt.Errorf("insufficient permissions: %w. "+ "supply the required permission or control Kubeshark's access to namespaces by setting %s "+ - "in the config file or setting the tapped namespace with --%s %s=", + "in the config file or setting the targetted namespace with --%s %s=", err, config.ResourcesNamespaceConfigName, config.SetCommandName, diff --git a/internal/connect/hub.go b/internal/connect/hub.go index 1b8be6dec..f912eed72 100644 --- a/internal/connect/hub.go +++ b/internal/connect/hub.go @@ -62,31 +62,31 @@ func (connector *Connector) isReachable(path string) (bool, error) { } } -func (connector *Connector) ReportTapperStatus(tapperStatus models.TapperStatus) error { - tapperStatusUrl := fmt.Sprintf("%s/status/tapperStatus", connector.url) +func (connector *Connector) ReportWorkerStatus(workerStatus models.TapperStatus) error { + workerStatusUrl := fmt.Sprintf("%s/status/tapperStatus", connector.url) - if jsonValue, err := json.Marshal(tapperStatus); err != nil { - return fmt.Errorf("Failed Marshal the tapper status %w", err) + if jsonValue, err := json.Marshal(workerStatus); err != nil { + return fmt.Errorf("Failed Marshal the worker status %w", err) } else { - if _, err := utils.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil { - return fmt.Errorf("Failed sending to Hub the tapped pods %w", err) + if _, err := utils.Post(workerStatusUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil { + return fmt.Errorf("Failed sending to Hub the targetted pods %w", err) } else { - log.Debug().Interface("tapper-status", tapperStatus).Msg("Reported to Hub about tapper status:") + log.Debug().Interface("worker-status", workerStatus).Msg("Reported to Hub about Worker status:") return nil } } } -func (connector *Connector) ReportTappedPods(pods []core.Pod) error { - tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", connector.url) +func (connector *Connector) ReportTargettedPods(pods []core.Pod) error { + targettedPodsUrl := fmt.Sprintf("%s/status/tappedPods", connector.url) if jsonValue, err := json.Marshal(pods); err != nil { - return fmt.Errorf("Failed Marshal the tapped pods %w", err) + return fmt.Errorf("Failed Marshal the targetted pods %w", err) } else { - if _, err := utils.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil { - return fmt.Errorf("Failed sending to Hub the tapped pods %w", err) + if _, err := utils.Post(targettedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil { + return fmt.Errorf("Failed sending to Hub the targetted pods %w", err) } else { - log.Debug().Int("pod-count", len(pods)).Msg("Reported to Hub about tapped pod count:") + log.Debug().Int("pod-count", len(pods)).Msg("Reported to Hub about targetted pod count:") return nil } } diff --git a/kubernetes/consts.go b/kubernetes/consts.go index 3e4fe7c1b..dab5ae019 100644 --- a/kubernetes/consts.go +++ b/kubernetes/consts.go @@ -12,8 +12,8 @@ const ( RoleBindingName = KubesharkResourcesPrefix + "role-binding" RoleName = KubesharkResourcesPrefix + "role" ServiceAccountName = KubesharkResourcesPrefix + "service-account" - TapperDaemonSetName = KubesharkResourcesPrefix + "worker-daemon-set" - TapperPodName = KubesharkResourcesPrefix + "worker" + WorkerDaemonSetName = KubesharkResourcesPrefix + "worker-daemon-set" + WorkerPodName = KubesharkResourcesPrefix + "worker" ConfigMapName = KubesharkResourcesPrefix + "config" MinKubernetesServerVersion = "1.16.0" ) diff --git a/kubernetes/errors.go b/kubernetes/errors.go index 5b65e2d17..6a71ab50b 100644 --- a/kubernetes/errors.go +++ b/kubernetes/errors.go @@ -1,20 +1,20 @@ package kubernetes -type K8sTapManagerErrorReason string +type K8sDeployManagerErrorReason string const ( - TapManagerTapperUpdateError K8sTapManagerErrorReason = "TAPPER_UPDATE_ERROR" - TapManagerPodWatchError K8sTapManagerErrorReason = "POD_WATCH_ERROR" - TapManagerPodListError K8sTapManagerErrorReason = "POD_LIST_ERROR" + DeployManagerWorkerUpdateError K8sDeployManagerErrorReason = "TAPPER_UPDATE_ERROR" + DeployManagerPodWatchError K8sDeployManagerErrorReason = "POD_WATCH_ERROR" + DeployManagerPodListError K8sDeployManagerErrorReason = "POD_LIST_ERROR" ) -type K8sTapManagerError struct { - OriginalError error - TapManagerReason K8sTapManagerErrorReason +type K8sDeployManagerError struct { + OriginalError error + DeployManagerReason K8sDeployManagerErrorReason } -// K8sTapManagerError implements the Error interface. -func (e *K8sTapManagerError) Error() string { +// K8sDeployManagerError implements the Error interface. +func (e *K8sDeployManagerError) Error() string { return e.OriginalError.Error() } diff --git a/kubernetes/kubesharkTapperSyncer.go b/kubernetes/kubesharkTapperSyncer.go index 60d6af816..f09c86161 100644 --- a/kubernetes/kubesharkTapperSyncer.go +++ b/kubernetes/kubesharkTapperSyncer.go @@ -15,32 +15,32 @@ import ( core "k8s.io/api/core/v1" ) -const updateTappersDelay = 5 * time.Second +const updateWorkersDelay = 5 * time.Second -type TappedPodChangeEvent struct { +type TargettedPodChangeEvent struct { Added []core.Pod Removed []core.Pod } -// KubesharkTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created -type KubesharkTapperSyncer struct { +// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created +type WorkerSyncer struct { startTime time.Time context context.Context - CurrentlyTappedPods []core.Pod - config TapperSyncerConfig + CurrentlyTargettedPods []core.Pod + config WorkerSyncerConfig kubernetesProvider *Provider - TapPodChangesOut chan TappedPodChangeEvent - TapperStatusChangedOut chan models.TapperStatus - ErrorOut chan K8sTapManagerError - nodeToTappedPodMap models.NodeToPodsMap - tappedNodes []string + DeployPodChangesOut chan TargettedPodChangeEvent + WorkerStatusChangedOut chan models.TapperStatus + ErrorOut chan K8sDeployManagerError + nodeToTargettedPodMap models.NodeToPodsMap + targettedNodes []string } -type TapperSyncerConfig struct { +type WorkerSyncerConfig struct { TargetNamespaces []string PodFilterRegex regexp.Regexp KubesharkResourcesNamespace string - TapperResources models.Resources + WorkerResources models.Resources ImagePullPolicy core.PullPolicy LogLevel zerolog.Level KubesharkApiFilteringOptions api.TrafficFilteringOptions @@ -50,36 +50,36 @@ type TapperSyncerConfig struct { MaxLiveStreams int } -func CreateAndStartKubesharkTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig, startTime time.Time) (*KubesharkTapperSyncer, error) { - syncer := &KubesharkTapperSyncer{ +func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) { + syncer := &WorkerSyncer{ startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution. context: ctx, - CurrentlyTappedPods: make([]core.Pod, 0), + CurrentlyTargettedPods: make([]core.Pod, 0), config: config, kubernetesProvider: kubernetesProvider, - TapPodChangesOut: make(chan TappedPodChangeEvent, 100), - TapperStatusChangedOut: make(chan models.TapperStatus, 100), - ErrorOut: make(chan K8sTapManagerError, 100), + DeployPodChangesOut: make(chan TargettedPodChangeEvent, 100), + WorkerStatusChangedOut: make(chan models.TapperStatus, 100), + ErrorOut: make(chan K8sDeployManagerError, 100), } - if err, _ := syncer.updateCurrentlyTappedPods(); err != nil { + if err, _ := syncer.updateCurrentlyTargettedPods(); err != nil { return nil, err } - if err := syncer.updateKubesharkTappers(); err != nil { + if err := syncer.updateWorkers(); err != nil { return nil, err } - go syncer.watchPodsForTapping() - go syncer.watchTapperEvents() - go syncer.watchTapperPods() + go syncer.watchPodsForTargetting() + go syncer.watchWorkerEvents() + go syncer.watchWorkerPods() return syncer, nil } -func (tapperSyncer *KubesharkTapperSyncer) watchTapperPods() { - kubesharkResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName)) - podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, kubesharkResourceRegex) - eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, []string{tapperSyncer.config.KubesharkResourcesNamespace}, podWatchHelper) +func (workerSyncer *WorkerSyncer) watchWorkerPods() { + kubesharkResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName)) + podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, kubesharkResourceRegex) + eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, []string{workerSyncer.config.KubesharkResourcesNamespace}, podWatchHelper) for { select { @@ -91,7 +91,7 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperPods() { pod, err := wEvent.ToPod() if err != nil { - log.Error().Str("pod", TapperPodName).Err(err).Msg("While parsing Kubeshark resource!") + log.Error().Str("pod", WorkerPodName).Err(err).Msg("While parsing Kubeshark resource!") continue } @@ -101,8 +101,8 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperPods() { Interface("phase", pod.Status.Phase). Msg("Watching pod events...") if pod.Spec.NodeName != "" { - tapperStatus := models.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)} - tapperSyncer.TapperStatusChangedOut <- tapperStatus + workerStatus := models.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)} + workerSyncer.WorkerStatusChangedOut <- workerStatus } case err, ok := <-errorChan: @@ -110,21 +110,21 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperPods() { errorChan = nil continue } - log.Error().Str("pod", TapperPodName).Err(err).Msg("While watching pod!") + log.Error().Str("pod", WorkerPodName).Err(err).Msg("While watching pod!") - case <-tapperSyncer.context.Done(): + case <-workerSyncer.context.Done(): log.Debug(). - Str("pod", TapperPodName). + Str("pod", WorkerPodName). Msg("Watching pod, context done.") return } } } -func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() { - kubesharkResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName)) - eventWatchHelper := NewEventWatchHelper(tapperSyncer.kubernetesProvider, kubesharkResourceRegex, "pod") - eventChan, errorChan := FilteredWatch(tapperSyncer.context, eventWatchHelper, []string{tapperSyncer.config.KubesharkResourcesNamespace}, eventWatchHelper) +func (workerSyncer *WorkerSyncer) watchWorkerEvents() { + kubesharkResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName)) + eventWatchHelper := NewEventWatchHelper(workerSyncer.kubernetesProvider, kubesharkResourceRegex, "pod") + eventChan, errorChan := FilteredWatch(workerSyncer.context, eventWatchHelper, []string{workerSyncer.config.KubesharkResourcesNamespace}, eventWatchHelper) for { select { @@ -137,14 +137,14 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() { event, err := wEvent.ToEvent() if err != nil { log.Error(). - Str("pod", TapperPodName). + Str("pod", WorkerPodName). Err(err). Msg("Parsing resource event.") continue } log.Debug(). - Str("pod", TapperPodName). + Str("pod", WorkerPodName). Str("event", event.Name). Time("time", event.CreationTimestamp.Time). Str("name", event.Regarding.Name). @@ -153,7 +153,7 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() { Str("note", event.Note). Msg("Watching events.") - pod, err1 := tapperSyncer.kubernetesProvider.GetPod(tapperSyncer.context, tapperSyncer.config.KubesharkResourcesNamespace, event.Regarding.Name) + pod, err1 := workerSyncer.kubernetesProvider.GetPod(workerSyncer.context, workerSyncer.config.KubesharkResourcesNamespace, event.Regarding.Name) if err1 != nil { log.Error().Str("name", event.Regarding.Name).Msg("Couldn't get pod") continue @@ -166,8 +166,8 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() { nodeName = pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values[0] } - tapperStatus := models.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: string(pod.Status.Phase)} - tapperSyncer.TapperStatusChangedOut <- tapperStatus + workerStatus := models.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: string(pod.Status.Phase)} + workerSyncer.WorkerStatusChangedOut <- workerStatus case err, ok := <-errorChan: if !ok { @@ -176,44 +176,44 @@ func (tapperSyncer *KubesharkTapperSyncer) watchTapperEvents() { } log.Error(). - Str("pod", TapperPodName). + Str("pod", WorkerPodName). Err(err). Msg("While watching events.") - case <-tapperSyncer.context.Done(): + case <-workerSyncer.context.Done(): log.Debug(). - Str("pod", TapperPodName). + Str("pod", WorkerPodName). Msg("Watching pod events, context done.") return } } } -func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() { - podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex) - eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper) +func (workerSyncer *WorkerSyncer) watchPodsForTargetting() { + podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex) + eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper) handleChangeInPods := func() { - err, changeFound := tapperSyncer.updateCurrentlyTappedPods() + err, changeFound := workerSyncer.updateCurrentlyTargettedPods() if err != nil { - tapperSyncer.ErrorOut <- K8sTapManagerError{ - OriginalError: err, - TapManagerReason: TapManagerPodListError, + workerSyncer.ErrorOut <- K8sDeployManagerError{ + OriginalError: err, + DeployManagerReason: DeployManagerPodListError, } } if !changeFound { - log.Debug().Msg("Nothing changed. Updating tappers is not needed.") + log.Debug().Msg("Nothing changed. Updating workers is not needed.") return } - if err := tapperSyncer.updateKubesharkTappers(); err != nil { - tapperSyncer.ErrorOut <- K8sTapManagerError{ - OriginalError: err, - TapManagerReason: TapManagerTapperUpdateError, + if err := workerSyncer.updateWorkers(); err != nil { + workerSyncer.ErrorOut <- K8sDeployManagerError{ + OriginalError: err, + DeployManagerReason: DeployManagerWorkerUpdateError, } } } - restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, handleChangeInPods) + restartWorkersDebouncer := debounce.NewDebouncer(updateWorkersDelay, handleChangeInPods) for { select { @@ -225,7 +225,7 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() { pod, err := wEvent.ToPod() if err != nil { - tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer) + workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer) continue } @@ -235,24 +235,24 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() { Str("pod", pod.Name). Str("namespace", pod.Namespace). Msg("Added matching pod.") - if err := restartTappersDebouncer.SetOn(); err != nil { + if err := restartWorkersDebouncer.SetOn(); err != nil { log.Error(). Str("pod", pod.Name). Str("namespace", pod.Namespace). Err(err). - Msg("While restarting tappers!") + Msg("While restarting workers!") } case EventDeleted: log.Debug(). Str("pod", pod.Name). Str("namespace", pod.Namespace). Msg("Removed matching pod.") - if err := restartTappersDebouncer.SetOn(); err != nil { + if err := restartWorkersDebouncer.SetOn(); err != nil { log.Error(). Str("pod", pod.Name). Str("namespace", pod.Namespace). Err(err). - Msg("While restarting tappers!") + Msg("While restarting workers!") } case EventModified: log.Debug(). @@ -269,12 +269,12 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() { // - Pod reaches ready state // Ready/unready transitions might also trigger this event. if pod.Status.PodIP != "" { - if err := restartTappersDebouncer.SetOn(); err != nil { + if err := restartWorkersDebouncer.SetOn(); err != nil { log.Error(). Str("pod", pod.Name). Str("namespace", pod.Namespace). Err(err). - Msg("While restarting tappers!") + Msg("While restarting workers!") } } case EventBookmark: @@ -288,33 +288,33 @@ func (tapperSyncer *KubesharkTapperSyncer) watchPodsForTapping() { continue } - tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer) + workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer) continue - case <-tapperSyncer.context.Done(): - log.Debug().Msg("Watching pods, context done. Stopping \"restart tappers debouncer\"") - restartTappersDebouncer.Cancel() + case <-workerSyncer.context.Done(): + log.Debug().Msg("Watching pods, context done. Stopping \"restart workers debouncer\"") + restartWorkersDebouncer.Cancel() // TODO: Does this also perform cleanup? return } } } -func (tapperSyncer *KubesharkTapperSyncer) handleErrorInWatchLoop(err error, restartTappersDebouncer *debounce.Debouncer) { - log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart tappers debouncer\"") - restartTappersDebouncer.Cancel() - tapperSyncer.ErrorOut <- K8sTapManagerError{ - OriginalError: err, - TapManagerReason: TapManagerPodWatchError, +func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorkersDebouncer *debounce.Debouncer) { + log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart workers debouncer\"") + restartWorkersDebouncer.Cancel() + workerSyncer.ErrorOut <- K8sDeployManagerError{ + OriginalError: err, + DeployManagerReason: DeployManagerPodWatchError, } } -func (tapperSyncer *KubesharkTapperSyncer) updateCurrentlyTappedPods() (err error, changesFound bool) { - if matchingPods, err := tapperSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(tapperSyncer.context, &tapperSyncer.config.PodFilterRegex, tapperSyncer.config.TargetNamespaces); err != nil { +func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, changesFound bool) { + if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil { return err, false } else { - podsToTap := excludeKubesharkPods(matchingPods) - addedPods, removedPods := getPodArrayDiff(tapperSyncer.CurrentlyTappedPods, podsToTap) + podsToTarget := excludeSelfPods(matchingPods) + addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargettedPods, podsToTarget) for _, addedPod := range addedPods { log.Info().Str("pod", addedPod.Name).Msg("Currently targetting:") } @@ -322,9 +322,9 @@ func (tapperSyncer *KubesharkTapperSyncer) updateCurrentlyTappedPods() (err erro log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targetting is stopped.") } if len(addedPods) > 0 || len(removedPods) > 0 { - tapperSyncer.CurrentlyTappedPods = podsToTap - tapperSyncer.nodeToTappedPodMap = GetNodeHostToTappedPodsMap(tapperSyncer.CurrentlyTappedPods) - tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{ + workerSyncer.CurrentlyTargettedPods = podsToTarget + workerSyncer.nodeToTargettedPodMap = GetNodeHostToTargettedPodsMap(workerSyncer.CurrentlyTargettedPods) + workerSyncer.DeployPodChangesOut <- TargettedPodChangeEvent{ Added: addedPods, Removed: removedPods, } @@ -334,70 +334,70 @@ func (tapperSyncer *KubesharkTapperSyncer) updateCurrentlyTappedPods() (err erro } } -func (tapperSyncer *KubesharkTapperSyncer) updateKubesharkTappers() error { - nodesToTap := make([]string, len(tapperSyncer.nodeToTappedPodMap)) +func (workerSyncer *WorkerSyncer) updateWorkers() error { + nodesToTarget := make([]string, len(workerSyncer.nodeToTargettedPodMap)) i := 0 - for node := range tapperSyncer.nodeToTappedPodMap { - nodesToTap[i] = node + for node := range workerSyncer.nodeToTargettedPodMap { + nodesToTarget[i] = node i++ } - if utils.EqualStringSlices(nodesToTap, tapperSyncer.tappedNodes) { + if utils.EqualStringSlices(nodesToTarget, workerSyncer.targettedNodes) { log.Debug().Msg("Skipping apply, DaemonSet is up to date") return nil } - log.Debug().Strs("nodes", nodesToTap).Msg("Updating DaemonSet to run on nodes.") + log.Debug().Strs("nodes", nodesToTarget).Msg("Updating DaemonSet to run on nodes.") image := "kubeshark/worker:latest" - if len(tapperSyncer.nodeToTappedPodMap) > 0 { + if len(workerSyncer.nodeToTargettedPodMap) > 0 { var serviceAccountName string - if tapperSyncer.config.KubesharkServiceAccountExists { + if workerSyncer.config.KubesharkServiceAccountExists { serviceAccountName = ServiceAccountName } else { serviceAccountName = "" } - nodeNames := make([]string, 0, len(tapperSyncer.nodeToTappedPodMap)) - for nodeName := range tapperSyncer.nodeToTappedPodMap { + nodeNames := make([]string, 0, len(workerSyncer.nodeToTargettedPodMap)) + for nodeName := range workerSyncer.nodeToTargettedPodMap { nodeNames = append(nodeNames, nodeName) } - if err := tapperSyncer.kubernetesProvider.ApplyKubesharkTapperDaemonSet( - tapperSyncer.context, - tapperSyncer.config.KubesharkResourcesNamespace, - TapperDaemonSetName, + if err := workerSyncer.kubernetesProvider.ApplyWorkerDaemonSet( + workerSyncer.context, + workerSyncer.config.KubesharkResourcesNamespace, + WorkerDaemonSetName, image, - TapperPodName, - fmt.Sprintf("%s.%s.svc", HubPodName, tapperSyncer.config.KubesharkResourcesNamespace), + WorkerPodName, + fmt.Sprintf("%s.%s.svc", HubPodName, workerSyncer.config.KubesharkResourcesNamespace), nodeNames, serviceAccountName, - tapperSyncer.config.TapperResources, - tapperSyncer.config.ImagePullPolicy, - tapperSyncer.config.KubesharkApiFilteringOptions, - tapperSyncer.config.LogLevel, - tapperSyncer.config.ServiceMesh, - tapperSyncer.config.Tls, - tapperSyncer.config.MaxLiveStreams); err != nil { + workerSyncer.config.WorkerResources, + workerSyncer.config.ImagePullPolicy, + workerSyncer.config.KubesharkApiFilteringOptions, + workerSyncer.config.LogLevel, + workerSyncer.config.ServiceMesh, + workerSyncer.config.Tls, + workerSyncer.config.MaxLiveStreams); err != nil { return err } - log.Debug().Int("tapper-count", len(tapperSyncer.nodeToTappedPodMap)).Msg("Successfully created tappers.") + log.Debug().Int("worker-count", len(workerSyncer.nodeToTargettedPodMap)).Msg("Successfully created workers.") } else { - if err := tapperSyncer.kubernetesProvider.ResetKubesharkTapperDaemonSet( - tapperSyncer.context, - tapperSyncer.config.KubesharkResourcesNamespace, - TapperDaemonSetName, + if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet( + workerSyncer.context, + workerSyncer.config.KubesharkResourcesNamespace, + WorkerDaemonSetName, image, - TapperPodName); err != nil { + WorkerPodName); err != nil { return err } - log.Debug().Msg("Successfully reset tapper daemon set") + log.Debug().Msg("Successfully resetted Worker DaemonSet") } - tapperSyncer.tappedNodes = nodesToTap + workerSyncer.targettedNodes = nodesToTarget return nil } diff --git a/kubernetes/provider.go b/kubernetes/provider.go index 3b9435b0b..9b5eecf1d 100644 --- a/kubernetes/provider.go +++ b/kubernetes/provider.go @@ -810,17 +810,17 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, return nil } -func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, hubPodIp string, nodeNames []string, serviceAccountName string, resources models.Resources, imagePullPolicy core.PullPolicy, kubesharkApiFilteringOptions api.TrafficFilteringOptions, logLevel zerolog.Level, serviceMesh bool, tls bool, maxLiveStreams int) error { +func (provider *Provider) ApplyWorkerDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, workerPodName string, hubPodIp string, nodeNames []string, serviceAccountName string, resources models.Resources, imagePullPolicy core.PullPolicy, kubesharkApiFilteringOptions api.TrafficFilteringOptions, logLevel zerolog.Level, serviceMesh bool, tls bool, maxLiveStreams int) error { log.Debug(). Int("node-count", len(nodeNames)). Str("namespace", namespace). Str("daemonset-name", daemonSetName). Str("image", podImage). - Str("pod", tapperPodName). - Msg("Applying tapper DaemonSets.") + Str("pod", workerPodName). + Msg("Applying worker DaemonSets.") if len(nodeNames) == 0 { - return fmt.Errorf("daemon set %s must tap at least 1 pod", daemonSetName) + return fmt.Errorf("DaemonSet %s must target at least 1 pod", daemonSetName) } kubesharkApiFilteringOptionsJsonStr, err := json.Marshal(kubesharkApiFilteringOptions) @@ -849,7 +849,7 @@ func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, nam } workerContainer := applyconfcore.Container() - workerContainer.WithName(tapperPodName) + workerContainer.WithName(workerPodName) workerContainer.WithImage(podImage) workerContainer.WithImagePullPolicy(imagePullPolicy) @@ -887,19 +887,19 @@ func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, nam ) cpuLimit, err := resource.ParseQuantity(resources.CpuLimit) if err != nil { - return fmt.Errorf("invalid cpu limit for %s container", tapperPodName) + return fmt.Errorf("invalid cpu limit for %s container", workerPodName) } memLimit, err := resource.ParseQuantity(resources.MemoryLimit) if err != nil { - return fmt.Errorf("invalid memory limit for %s container", tapperPodName) + return fmt.Errorf("invalid memory limit for %s container", workerPodName) } cpuRequests, err := resource.ParseQuantity(resources.CpuRequests) if err != nil { - return fmt.Errorf("invalid cpu request for %s container", tapperPodName) + return fmt.Errorf("invalid cpu request for %s container", workerPodName) } memRequests, err := resource.ParseQuantity(resources.MemoryRequests) if err != nil { - return fmt.Errorf("invalid memory request for %s container", tapperPodName) + return fmt.Errorf("invalid memory request for %s container", workerPodName) } workerResourceLimits := core.ResourceList{ "cpu": cpuLimit, @@ -967,14 +967,14 @@ func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, nam podTemplate := applyconfcore.PodTemplateSpec() podTemplate.WithLabels(map[string]string{ - "app": tapperPodName, + "app": workerPodName, LabelManagedBy: provider.managedBy, LabelCreatedBy: provider.createdBy, }) podTemplate.WithSpec(podSpec) labelSelector := applyconfmeta.LabelSelector() - labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName}) + labelSelector.WithMatchLabels(map[string]string{"app": workerPodName}) applyOptions := metav1.ApplyOptions{ Force: true, @@ -993,9 +993,9 @@ func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, nam return err } -func (provider *Provider) ResetKubesharkTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string) error { +func (provider *Provider) ResetWorkerDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, workerPodName string) error { workerContainer := applyconfcore.Container() - workerContainer.WithName(tapperPodName) + workerContainer.WithName(workerPodName) workerContainer.WithImage(podImage) nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement() @@ -1016,14 +1016,14 @@ func (provider *Provider) ResetKubesharkTapperDaemonSet(ctx context.Context, nam podTemplate := applyconfcore.PodTemplateSpec() podTemplate.WithLabels(map[string]string{ - "app": tapperPodName, + "app": workerPodName, LabelManagedBy: provider.managedBy, LabelCreatedBy: provider.createdBy, }) podTemplate.WithSpec(podSpec) labelSelector := applyconfmeta.LabelSelector() - labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName}) + labelSelector.WithMatchLabels(map[string]string{"app": workerPodName}) applyOptions := metav1.ApplyOptions{ Force: true, diff --git a/kubernetes/utils.go b/kubernetes/utils.go index 1aa9df09e..a1e6766d2 100644 --- a/kubernetes/utils.go +++ b/kubernetes/utils.go @@ -8,19 +8,19 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func GetNodeHostToTappedPodsMap(tappedPods []core.Pod) models.NodeToPodsMap { - nodeToTappedPodMap := make(models.NodeToPodsMap) - for _, pod := range tappedPods { +func GetNodeHostToTargettedPodsMap(targettedPods []core.Pod) models.NodeToPodsMap { + nodeToTargettedPodsMap := make(models.NodeToPodsMap) + for _, pod := range targettedPods { minimizedPod := getMinimizedPod(pod) - existingList := nodeToTappedPodMap[pod.Spec.NodeName] + existingList := nodeToTargettedPodsMap[pod.Spec.NodeName] if existingList == nil { - nodeToTappedPodMap[pod.Spec.NodeName] = []core.Pod{minimizedPod} + nodeToTargettedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod} } else { - nodeToTappedPodMap[pod.Spec.NodeName] = append(nodeToTappedPodMap[pod.Spec.NodeName], minimizedPod) + nodeToTargettedPodsMap[pod.Spec.NodeName] = append(nodeToTargettedPodsMap[pod.Spec.NodeName], minimizedPod) } } - return nodeToTappedPodMap + return nodeToTargettedPodsMap } func getMinimizedPod(fullPod core.Pod) core.Pod { @@ -48,7 +48,7 @@ func getMinimizedContainerStatuses(fullPod core.Pod) []core.ContainerStatus { return result } -func excludeKubesharkPods(pods []core.Pod) []core.Pod { +func excludeSelfPods(pods []core.Pod) []core.Pod { kubesharkPrefixRegex := regexp.MustCompile("^" + KubesharkResourcesPrefix) nonKubesharkPods := make([]core.Pod, 0) diff --git a/resources/cleanResources.go b/resources/cleanResources.go index 4d55291ec..b4daaca19 100644 --- a/resources/cleanResources.go +++ b/resources/cleanResources.go @@ -107,8 +107,8 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P handleDeletionError(err, resourceDesc, &leftoverResources) } - if err := kubernetesProvider.RemoveDaemonSet(ctx, kubesharkResourcesNamespace, kubernetes.TapperDaemonSetName); err != nil { - resourceDesc := fmt.Sprintf("DaemonSet %s in namespace %s", kubernetes.TapperDaemonSetName, kubesharkResourcesNamespace) + if err := kubernetesProvider.RemoveDaemonSet(ctx, kubesharkResourcesNamespace, kubernetes.WorkerDaemonSetName); err != nil { + resourceDesc := fmt.Sprintf("DaemonSet %s in namespace %s", kubernetes.WorkerDaemonSetName, kubesharkResourcesNamespace) handleDeletionError(err, resourceDesc, &leftoverResources) } diff --git a/resources/createResources.go b/resources/createResources.go index b756fa1c9..b18731092 100644 --- a/resources/createResources.go +++ b/resources/createResources.go @@ -13,7 +13,7 @@ import ( core "k8s.io/api/core/v1" ) -func CreateTapKubesharkResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedKubesharkConfig string, isNsRestrictedMode bool, kubesharkResourcesNamespace string, maxEntriesDBSizeBytes int64, hubResources models.Resources, imagePullPolicy core.PullPolicy, logLevel zerolog.Level, profiler bool) (bool, error) { +func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedKubesharkConfig string, isNsRestrictedMode bool, kubesharkResourcesNamespace string, maxEntriesDBSizeBytes int64, hubResources models.Resources, imagePullPolicy core.PullPolicy, logLevel zerolog.Level, profiler bool) (bool, error) { if !isNsRestrictedMode { if err := createKubesharkNamespace(ctx, kubernetesProvider, kubesharkResourcesNamespace); err != nil { return false, err