diff --git a/README.md b/README.md index 4edafe423..4d34f6939 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ kubeshark tap "(catalo*|front-end*)" ### Specify the Namespace -By default, Kubeshark is deployed into the `default` namespace. +By default, Kubeshark targets the `default` namespace. To specify a different namespace: ``` @@ -88,8 +88,8 @@ kubeshark tap -n sock-shop ### Specify All Namespaces -The default deployment strategy of Kubeshark waits for the new pods -to be created. To simply deploy to all existing namespaces run: +The default strategy of Kubeshark waits for the new pods +to be created. To simply tap all existing namespaces run: ``` kubeshark tap -A diff --git a/cmd/checkRunner.go b/cmd/checkRunner.go index 20a981a3b..69765518b 100644 --- a/cmd/checkRunner.go +++ b/cmd/checkRunner.go @@ -17,7 +17,7 @@ var ( ) func runKubesharkCheck() { - log.Info().Msg("Checking the deployment...") + log.Info().Msg("Checking the Kubeshark resources...") ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel will be called when this function exits @@ -48,8 +48,8 @@ func runKubesharkCheck() { } else { log.Error(). Str("command1", fmt.Sprintf("kubeshark %s", cleanCmd.Use)). - Str("command2", fmt.Sprintf("kubeshark %s", deployCmd.Use)). - Msg(fmt.Sprintf(utils.Red, "There are issues in your deployment! Run these commands:")) + Str("command2", fmt.Sprintf("kubeshark %s", tapCmd.Use)). + Msg(fmt.Sprintf(utils.Red, "There are issues in your Kubeshark resources! Run these commands:")) os.Exit(1) } } diff --git a/cmd/common.go b/cmd/common.go index 80864ff06..0ff6ae147 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -22,7 +22,7 @@ import ( ) func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc, serviceName string, srcPort uint16, dstPort uint16, healthCheck string) { - httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Deploy.ProxyHost, srcPort, dstPort, config.Config.ResourcesNamespace, serviceName, cancel) + httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, srcPort, dstPort, config.Config.ResourcesNamespace, serviceName, cancel) if err != nil { log.Error(). Err(errormessage.FormatError(err)). @@ -115,7 +115,7 @@ func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provid } } -func getSerializedDeployConfig(conf *models.Config) (string, error) { +func getSerializedTapConfig(conf *models.Config) (string, error) { serializedConfig, err := json.Marshal(conf) if err != nil { return "", err diff --git a/cmd/deploy.go b/cmd/deploy.go deleted file mode 100644 index bc6349b33..000000000 --- a/cmd/deploy.go +++ /dev/null @@ -1,60 +0,0 @@ -package cmd - -import ( - "errors" - - "github.com/creasty/defaults" - "github.com/kubeshark/kubeshark/config" - "github.com/kubeshark/kubeshark/config/configStructs" - "github.com/kubeshark/kubeshark/errormessage" - "github.com/rs/zerolog/log" - "github.com/spf13/cobra" -) - -var deployCmd = &cobra.Command{ - Use: "deploy [POD REGEX]", - Short: "Deploy Kubeshark into your K8s cluster.", - Long: `Deploy Kubeshark into your K8s cluster to gain visibility.`, - RunE: func(cmd *cobra.Command, args []string) error { - deploy() - return nil - }, - PreRunE: func(cmd *cobra.Command, args []string) error { - if len(args) == 1 { - config.Config.Deploy.PodRegexStr = args[0] - } else if len(args) > 1 { - return errors.New("unexpected number of arguments") - } - - if err := config.Config.Deploy.Validate(); err != nil { - return errormessage.FormatError(err) - } - - log.Info(). - Str("limit", config.Config.Deploy.HumanMaxEntriesDBSize). - Msg("Kubeshark will store the traffic up to a limit. Oldest entries will be removed once the limit is reached.") - - return nil - }, -} - -func init() { - rootCmd.AddCommand(deployCmd) - - defaultDeployConfig := configStructs.DeployConfig{} - if err := defaults.Set(&defaultDeployConfig); err != nil { - log.Debug().Err(err).Send() - } - - deployCmd.Flags().StringP(configStructs.TagLabel, "t", defaultDeployConfig.Tag, "The tag of the Docker images that are going to be deployed.") - deployCmd.Flags().Uint16P(configStructs.ProxyPortLabel, "p", defaultDeployConfig.ProxyPort, "Provide a custom port for the web interface webserver.") - deployCmd.Flags().StringSliceP(configStructs.NamespacesLabel, "n", defaultDeployConfig.Namespaces, "Namespaces selector.") - deployCmd.Flags().BoolP(configStructs.AllNamespacesLabel, "A", defaultDeployConfig.AllNamespaces, "Deploy to all namespaces.") - deployCmd.Flags().Bool(configStructs.EnableRedactionLabel, defaultDeployConfig.EnableRedaction, "Enables redaction of potentially sensitive request/response headers and body values.") - deployCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeLabel, defaultDeployConfig.HumanMaxEntriesDBSize, "Override the default max entries db size.") - deployCmd.Flags().String(configStructs.InsertionFilterName, defaultDeployConfig.InsertionFilter, "Set the insertion filter. Accepts string or a file path.") - deployCmd.Flags().Bool(configStructs.DryRunLabel, defaultDeployConfig.DryRun, "Preview of all pods matching the regex, without deploying workers on them.") - deployCmd.Flags().Bool(configStructs.ServiceMeshName, defaultDeployConfig.ServiceMesh, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls.") - deployCmd.Flags().Bool(configStructs.TlsName, defaultDeployConfig.Tls, "Record tls traffic.") - deployCmd.Flags().Bool(configStructs.ProfilerName, defaultDeployConfig.Profiler, "Run pprof server.") -} diff --git a/cmd/openRunner.go b/cmd/openRunner.go index 5ee122430..38882ef51 100644 --- a/cmd/openRunner.go +++ b/cmd/openRunner.go @@ -34,7 +34,7 @@ func runOpen() { if !exists { log.Error(). Str("service", kubernetes.FrontServiceName). - Str("command", fmt.Sprintf("kubeshark %s", deployCmd.Use)). + Str("command", fmt.Sprintf("kubeshark %s", tapCmd.Use)). Msg("Service not found! You should run the command first:") cancel() return diff --git a/cmd/root.go b/cmd/root.go index dc8bbf7ce..5f793d49d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -36,7 +36,7 @@ func init() { } // Execute adds all child commands to the root command and sets flags appropriately. -// This is called by main.main(). It only needs to happen once to the deployCmd. +// This is called by main.main(). It only needs to happen once to the tapCmd. func Execute() { cobra.CheckErr(rootCmd.Execute()) } diff --git a/cmd/tap.go b/cmd/tap.go new file mode 100644 index 000000000..c667c0cc5 --- /dev/null +++ b/cmd/tap.go @@ -0,0 +1,60 @@ +package cmd + +import ( + "errors" + + "github.com/creasty/defaults" + "github.com/kubeshark/kubeshark/config" + "github.com/kubeshark/kubeshark/config/configStructs" + "github.com/kubeshark/kubeshark/errormessage" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" +) + +var tapCmd = &cobra.Command{ + Use: "tap [POD REGEX]", + Short: "Record and see the network traffic in your Kubernetes cluster.", + Long: "Record and see the network traffic in your Kubernetes cluster.", + RunE: func(cmd *cobra.Command, args []string) error { + tap() + return nil + }, + PreRunE: func(cmd *cobra.Command, args []string) error { + if len(args) == 1 { + config.Config.Tap.PodRegexStr = args[0] + } else if len(args) > 1 { + return errors.New("unexpected number of arguments") + } + + if err := config.Config.Tap.Validate(); err != nil { + return errormessage.FormatError(err) + } + + log.Info(). + Str("limit", config.Config.Tap.HumanMaxEntriesDBSize). + Msg("Kubeshark will store the traffic up to a limit. Oldest entries will be removed once the limit is reached.") + + return nil + }, +} + +func init() { + rootCmd.AddCommand(tapCmd) + + defaultTapConfig := configStructs.TapConfig{} + if err := defaults.Set(&defaultTapConfig); err != nil { + log.Debug().Err(err).Send() + } + + tapCmd.Flags().StringP(configStructs.TagLabel, "t", defaultTapConfig.Tag, "The tag of the Docker images that are going to be pulled.") + tapCmd.Flags().Uint16P(configStructs.ProxyPortLabel, "p", defaultTapConfig.ProxyPort, "Provide a custom port for the web interface webserver.") + tapCmd.Flags().StringSliceP(configStructs.NamespacesLabel, "n", defaultTapConfig.Namespaces, "Namespaces selector.") + tapCmd.Flags().BoolP(configStructs.AllNamespacesLabel, "A", defaultTapConfig.AllNamespaces, "Tap all namespaces.") + tapCmd.Flags().Bool(configStructs.EnableRedactionLabel, defaultTapConfig.EnableRedaction, "Enables redaction of potentially sensitive request/response headers and body values.") + tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeLabel, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size.") + tapCmd.Flags().String(configStructs.InsertionFilterName, defaultTapConfig.InsertionFilter, "Set the insertion filter. Accepts string or a file path.") + tapCmd.Flags().Bool(configStructs.DryRunLabel, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them.") + tapCmd.Flags().Bool(configStructs.ServiceMeshName, defaultTapConfig.ServiceMesh, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls.") + tapCmd.Flags().Bool(configStructs.TlsName, defaultTapConfig.Tls, "Record tls traffic.") + tapCmd.Flags().Bool(configStructs.ProfilerName, defaultTapConfig.Profiler, "Run pprof server.") +} diff --git a/cmd/deployRunner.go b/cmd/tapRunner.go similarity index 88% rename from cmd/deployRunner.go rename to cmd/tapRunner.go index 49149534c..5ca005bb8 100644 --- a/cmd/deployRunner.go +++ b/cmd/tapRunner.go @@ -28,21 +28,21 @@ import ( const cleanupTimeout = time.Minute -type deployState struct { +type tapState struct { startTime time.Time targetNamespaces []string kubesharkServiceAccountExists bool } -var state deployState +var state tapState var connector *connect.Connector var hubPodReady bool var frontPodReady bool var proxyDone bool -func deploy() { +func tap() { state.startTime = time.Now() - docker.SetTag(config.Config.Deploy.Tag) + docker.SetTag(config.Config.Tap.Tag) connector = connect.NewConnector(kubernetes.GetLocalhostOnPort(config.Config.Hub.PortForward.SrcPort), connect.DefaultRetries, connect.DefaultTimeout) @@ -56,8 +56,8 @@ func deploy() { state.targetNamespaces = getNamespaces(kubernetesProvider) - conf := getDeployConfig() - serializedKubesharkConfig, err := getSerializedDeployConfig(conf) + conf := getTapConfig() + serializedKubesharkConfig, err := getSerializedTapConfig(conf) if err != nil { log.Error().Err(errormessage.FormatError(err)).Msg("Error serializing Kubeshark config!") return @@ -76,12 +76,12 @@ func deploy() { log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!") } - if config.Config.Deploy.DryRun { + if config.Config.Tap.DryRun { return } - log.Info().Msg("Waiting for Kubeshark deployment to finish...") - if state.kubesharkServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.ResourcesNamespace, config.Config.Deploy.MaxEntriesDBSizeBytes(), config.Config.Deploy.HubResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Deploy.Profiler); err != nil { + log.Info().Msg("Waiting for the creation of Kubeshark resources...") + if state.kubesharkServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.ResourcesNamespace, config.Config.Tap.MaxEntriesDBSizeBytes(), config.Config.Tap.HubResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Tap.Profiler); err != nil { var statusError *k8serrors.StatusError if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) { log.Info().Msg("Kubeshark is already running in this namespace, change the `kubeshark-resources-namespace` configuration or run `kubeshark clean` to remove the currently running Kubeshark instance") @@ -93,7 +93,7 @@ func deploy() { return } - defer finishDeployExecution(kubernetesProvider) + defer finishTapExecution(kubernetesProvider) go goUtils.HandleExcWrapper(watchHubEvents, ctx, kubernetesProvider, cancel) go goUtils.HandleExcWrapper(watchHubPod, ctx, kubernetesProvider, cancel) @@ -103,16 +103,16 @@ func deploy() { utils.WaitForFinish(ctx, cancel) } -func finishDeployExecution(kubernetesProvider *kubernetes.Provider) { +func finishTapExecution(kubernetesProvider *kubernetes.Provider) { finishKubesharkExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.ResourcesNamespace) } -func getDeployConfig() *models.Config { +func getTapConfig() *models.Config { conf := models.Config{ - MaxDBSizeBytes: config.Config.Deploy.MaxEntriesDBSizeBytes(), - InsertionFilter: config.Config.Deploy.GetInsertionFilter(), + MaxDBSizeBytes: config.Config.Tap.MaxEntriesDBSizeBytes(), + InsertionFilter: config.Config.Tap.GetInsertionFilter(), PullPolicy: config.Config.ImagePullPolicyStr, - WorkerResources: config.Config.Deploy.WorkerResources, + WorkerResources: config.Config.Tap.WorkerResources, ResourcesNamespace: config.Config.ResourcesNamespace, DatabasePath: models.DataDirPath, } @@ -126,7 +126,7 @@ The alternative would be to wait for Hub to be ready and then query it for the p the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any. */ func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error { - if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Deploy.PodRegex(), namespaces); err != nil { + if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil { return err } else { if len(matchingPods) == 0 { @@ -142,17 +142,17 @@ func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernet func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error { workerSyncer, err := kubernetes.CreateAndStartWorkerSyncer(ctx, provider, kubernetes.WorkerSyncerConfig{ TargetNamespaces: targetNamespaces, - PodFilterRegex: *config.Config.Deploy.PodRegex(), + PodFilterRegex: *config.Config.Tap.PodRegex(), KubesharkResourcesNamespace: config.Config.ResourcesNamespace, - WorkerResources: config.Config.Deploy.WorkerResources, + WorkerResources: config.Config.Tap.WorkerResources, ImagePullPolicy: config.Config.ImagePullPolicy(), LogLevel: config.Config.LogLevel(), KubesharkApiFilteringOptions: api.TrafficFilteringOptions{ - IgnoredUserAgents: config.Config.Deploy.IgnoredUserAgents, + IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents, }, KubesharkServiceAccountExists: state.kubesharkServiceAccountExists, - ServiceMesh: config.Config.Deploy.ServiceMesh, - Tls: config.Config.Deploy.Tls, + ServiceMesh: config.Config.Tap.ServiceMesh, + Tls: config.Config.Tap.Tls, }, startTime) if err != nil { @@ -167,9 +167,9 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider log.Debug().Msg("workerSyncer err channel closed, ending listener loop") return } - log.Error().Msg(getK8sDeployManagerErrorText(syncerErr)) + log.Error().Msg(getK8sTapManagerErrorText(syncerErr)) cancel() - case _, ok := <-workerSyncer.DeployPodChangesOut: + case _, ok := <-workerSyncer.TapPodChangesOut: if !ok { log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop") return @@ -203,16 +203,16 @@ func printNoPodsFoundSuggestion(targetNamespaces []string) { log.Warn().Msg(fmt.Sprintf("Did not find any currently running pods that match the regex argument, kubeshark will automatically target matching pods if any are created later%s", suggestionStr)) } -func getK8sDeployManagerErrorText(err kubernetes.K8sDeployManagerError) string { - switch err.DeployManagerReason { - case kubernetes.DeployManagerPodListError: +func getK8sTapManagerErrorText(err kubernetes.K8sTapManagerError) string { + switch err.TapManagerReason { + case kubernetes.TapManagerPodListError: return fmt.Sprintf("Failed to update currently targetted pods: %v", err.OriginalError) - case kubernetes.DeployManagerPodWatchError: + case kubernetes.TapManagerPodWatchError: return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError) - case kubernetes.DeployManagerWorkerUpdateError: + case kubernetes.TapManagerWorkerUpdateError: return fmt.Sprintf("Error updating worker: %v", err.OriginalError) default: - return fmt.Sprintf("Unknown error occured in K8s deploy manager: %v", err.OriginalError) + return fmt.Sprintf("Unknown error occured in K8s tap manager: %v", err.OriginalError) } } @@ -470,10 +470,10 @@ func postFrontStarted(ctx context.Context, kubernetesProvider *kubernetes.Provid } func getNamespaces(kubernetesProvider *kubernetes.Provider) []string { - if config.Config.Deploy.AllNamespaces { + if config.Config.Tap.AllNamespaces { return []string{kubernetes.K8sAllNamespaces} - } else if len(config.Config.Deploy.Namespaces) > 0 { - return utils.Unique(config.Config.Deploy.Namespaces) + } else if len(config.Config.Tap.Namespaces) > 0 { + return utils.Unique(config.Config.Tap.Namespaces) } else { currentNamespace, err := kubernetesProvider.CurrentNamespace() if err != nil { diff --git a/config/configStruct.go b/config/configStruct.go index 0880f13e5..c45084451 100644 --- a/config/configStruct.go +++ b/config/configStruct.go @@ -56,7 +56,7 @@ func CreateDefaultConfig() ConfigStruct { type ConfigStruct struct { Hub HubConfig `yaml:"hub"` Front FrontConfig `yaml:"front"` - Deploy configStructs.DeployConfig `yaml:"deploy"` + Tap configStructs.TapConfig `yaml:"tap"` Logs configStructs.LogsConfig `yaml:"logs"` Config configStructs.ConfigConfig `yaml:"config,omitempty"` ImagePullPolicyStr string `yaml:"image-pull-policy" default:"Always"` diff --git a/config/configStructs/deployConfig.go b/config/configStructs/tapConfig.go similarity index 93% rename from config/configStructs/deployConfig.go rename to config/configStructs/tapConfig.go index 264c73fe6..a0bb71d02 100644 --- a/config/configStructs/deployConfig.go +++ b/config/configStructs/tapConfig.go @@ -26,7 +26,7 @@ const ( ProfilerName = "profiler" ) -type DeployConfig struct { +type TapConfig struct { Tag string `yaml:"tag" default:"latest"` PodRegexStr string `yaml:"regex" default:".*"` ProxyPort uint16 `yaml:"proxy-port" default:"8899"` @@ -53,17 +53,17 @@ type DeployConfig struct { Profiler bool `yaml:"profiler" default:"false"` } -func (config *DeployConfig) PodRegex() *regexp.Regexp { +func (config *TapConfig) PodRegex() *regexp.Regexp { podRegex, _ := regexp.Compile(config.PodRegexStr) return podRegex } -func (config *DeployConfig) MaxEntriesDBSizeBytes() int64 { +func (config *TapConfig) MaxEntriesDBSizeBytes() int64 { maxEntriesDBSizeBytes, _ := utils.HumanReadableToBytes(config.HumanMaxEntriesDBSize) return maxEntriesDBSizeBytes } -func (config *DeployConfig) GetInsertionFilter() string { +func (config *TapConfig) GetInsertionFilter() string { insertionFilter := config.InsertionFilter if fs.ValidPath(insertionFilter) { if _, err := os.Stat(insertionFilter); err == nil { @@ -87,7 +87,7 @@ func (config *DeployConfig) GetInsertionFilter() string { return insertionFilter } -func getRedactFilter(config *DeployConfig) string { +func getRedactFilter(config *TapConfig) string { if !config.EnableRedaction { return "" } @@ -118,7 +118,7 @@ func getRedactFilter(config *DeployConfig) string { return fmt.Sprintf("redact(\"%s\")", strings.Join(redactValues, "\",\"")) } -func (config *DeployConfig) Validate() error { +func (config *TapConfig) Validate() error { _, compileErr := regexp.Compile(config.PodRegexStr) if compileErr != nil { return fmt.Errorf("%s is not a valid regex %s", config.PodRegexStr, compileErr) diff --git a/kubernetes/errors.go b/kubernetes/errors.go index b3bb2c930..247ceaecc 100644 --- a/kubernetes/errors.go +++ b/kubernetes/errors.go @@ -1,20 +1,20 @@ package kubernetes -type K8sDeployManagerErrorReason string +type K8sTapManagerErrorReason string const ( - DeployManagerWorkerUpdateError K8sDeployManagerErrorReason = "WORKER_UPDATE_ERROR" - DeployManagerPodWatchError K8sDeployManagerErrorReason = "POD_WATCH_ERROR" - DeployManagerPodListError K8sDeployManagerErrorReason = "POD_LIST_ERROR" + TapManagerWorkerUpdateError K8sTapManagerErrorReason = "WORKER_UPDATE_ERROR" + TapManagerPodWatchError K8sTapManagerErrorReason = "POD_WATCH_ERROR" + TapManagerPodListError K8sTapManagerErrorReason = "POD_LIST_ERROR" ) -type K8sDeployManagerError struct { - OriginalError error - DeployManagerReason K8sDeployManagerErrorReason +type K8sTapManagerError struct { + OriginalError error + TapManagerReason K8sTapManagerErrorReason } -// K8sDeployManagerError implements the Error interface. -func (e *K8sDeployManagerError) Error() string { +// K8sTapManagerError implements the Error interface. +func (e *K8sTapManagerError) Error() string { return e.OriginalError.Error() } diff --git a/kubernetes/workerSyncer.go b/kubernetes/workerSyncer.go index 5936a1e35..c77102d22 100644 --- a/kubernetes/workerSyncer.go +++ b/kubernetes/workerSyncer.go @@ -30,9 +30,9 @@ type WorkerSyncer struct { CurrentlyTargettedPods []v1.Pod config WorkerSyncerConfig kubernetesProvider *Provider - DeployPodChangesOut chan TargettedPodChangeEvent + TapPodChangesOut chan TargettedPodChangeEvent WorkerPodsChanges chan *v1.Pod - ErrorOut chan K8sDeployManagerError + ErrorOut chan K8sTapManagerError nodeToTargettedPodMap models.NodeToPodsMap targettedNodes []string } @@ -57,9 +57,9 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide CurrentlyTargettedPods: make([]v1.Pod, 0), config: config, kubernetesProvider: kubernetesProvider, - DeployPodChangesOut: make(chan TargettedPodChangeEvent, 100), + TapPodChangesOut: make(chan TargettedPodChangeEvent, 100), WorkerPodsChanges: make(chan *v1.Pod, 100), - ErrorOut: make(chan K8sDeployManagerError, 100), + ErrorOut: make(chan K8sTapManagerError, 100), } if err, _ := syncer.updateCurrentlyTargettedPods(); err != nil { @@ -187,9 +187,9 @@ func (workerSyncer *WorkerSyncer) watchPodsForTargetting() { handleChangeInPods := func() { err, changeFound := workerSyncer.updateCurrentlyTargettedPods() if err != nil { - workerSyncer.ErrorOut <- K8sDeployManagerError{ - OriginalError: err, - DeployManagerReason: DeployManagerPodListError, + workerSyncer.ErrorOut <- K8sTapManagerError{ + OriginalError: err, + TapManagerReason: TapManagerPodListError, } } @@ -198,9 +198,9 @@ func (workerSyncer *WorkerSyncer) watchPodsForTargetting() { return } if err := workerSyncer.updateWorkers(); err != nil { - workerSyncer.ErrorOut <- K8sDeployManagerError{ - OriginalError: err, - DeployManagerReason: DeployManagerWorkerUpdateError, + workerSyncer.ErrorOut <- K8sTapManagerError{ + OriginalError: err, + TapManagerReason: TapManagerWorkerUpdateError, } } } @@ -294,9 +294,9 @@ func (workerSyncer *WorkerSyncer) watchPodsForTargetting() { func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorkersDebouncer *debounce.Debouncer) { log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart workers debouncer\"") restartWorkersDebouncer.Cancel() - workerSyncer.ErrorOut <- K8sDeployManagerError{ - OriginalError: err, - DeployManagerReason: DeployManagerPodWatchError, + workerSyncer.ErrorOut <- K8sTapManagerError{ + OriginalError: err, + TapManagerReason: TapManagerPodWatchError, } } @@ -315,7 +315,7 @@ func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, cha if len(addedPods) > 0 || len(removedPods) > 0 { workerSyncer.CurrentlyTargettedPods = podsToTarget workerSyncer.nodeToTargettedPodMap = GetNodeHostToTargettedPodsMap(workerSyncer.CurrentlyTargettedPods) - workerSyncer.DeployPodChangesOut <- TargettedPodChangeEvent{ + workerSyncer.TapPodChangesOut <- TargettedPodChangeEvent{ Added: addedPods, Removed: removedPods, }