diff --git a/cmd/tapRunner.go b/cmd/tapRunner.go index 092225eb2..25172ddd9 100644 --- a/cmd/tapRunner.go +++ b/cmd/tapRunner.go @@ -71,9 +71,9 @@ func tap() { } } - log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targetting pods in:") + log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targeting pods in:") - if err := printTargettedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil { + if err := printTargetedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil { log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!") } @@ -113,15 +113,15 @@ This function is a bit problematic as it might be detached from the actual pods The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any. */ -func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error { +func printTargetedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error { if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil { return err } else { if len(matchingPods) == 0 { printNoPodsFoundSuggestion(namespaces) } - for _, targettedPod := range matchingPods { - log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targettedPod.Name))) + for _, targetedPod := range matchingPods { + log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name))) } return nil } @@ -160,7 +160,7 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop") return } - go connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods) + go connector.PostTargetedPodsToHub(workerSyncer.CurrentlyTargetedPods) case pod, ok := <-workerSyncer.WorkerPodsChanges: if !ok { log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop") @@ -188,7 +188,7 @@ func printNoPodsFoundSuggestion(targetNamespaces []string) { func getK8sTapManagerErrorText(err kubernetes.K8sTapManagerError) string { switch err.TapManagerReason { case kubernetes.TapManagerPodListError: - return fmt.Sprintf("Failed to update currently targetted pods: %v", err.OriginalError) + return fmt.Sprintf("Failed to update currently targeted pods: %v", err.OriginalError) case kubernetes.TapManagerPodWatchError: return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError) case kubernetes.TapManagerWorkerUpdateError: diff --git a/errormessage/errormessage.go b/errormessage/errormessage.go index fa7c3dff4..d2b4f4230 100644 --- a/errormessage/errormessage.go +++ b/errormessage/errormessage.go @@ -18,7 +18,7 @@ func FormatError(err error) error { if k8serrors.IsForbidden(err) { errorNew = fmt.Errorf("insufficient permissions: %w. "+ "supply the required permission or control %s's access to namespaces by setting %s "+ - "in the config file or setting the targetted namespace with --%s %s=", + "in the config file or setting the targeted namespace with --%s %s=", err, misc.Software, config.SelfNamespaceConfigName, diff --git a/go.mod b/go.mod index 43fdee21b..fe82bb749 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/docker/go-connections v0.4.0 github.com/docker/go-units v0.4.0 github.com/google/go-github/v37 v37.0.0 - github.com/kubeshark/base v0.5.0 + github.com/kubeshark/base v0.5.1 github.com/rs/zerolog v1.28.0 github.com/spf13/cobra v1.3.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index 17f0a95f5..23273adf7 100644 --- a/go.sum +++ b/go.sum @@ -414,8 +414,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubeshark/base v0.5.0 h1:ZQ/wNIdmLeDwy143korRZyPorcqBgf1y/tQIiIenAL0= -github.com/kubeshark/base v0.5.0/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc= +github.com/kubeshark/base v0.5.1 h1:msy1iQLgWQK1COoicwWxEDbeXU9J5RuptA5fYeOEzfA= +github.com/kubeshark/base v0.5.1/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc= diff --git a/internal/connect/hub.go b/internal/connect/hub.go index 1bc4f147b..1631a084d 100644 --- a/internal/connect/hub.go +++ b/internal/connect/hub.go @@ -116,22 +116,22 @@ func (connector *Connector) PostStorageLimitToHub(limit int64) { } } -func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) { - postTargettedUrl := fmt.Sprintf("%s/pods/targetted", connector.url) +func (connector *Connector) PostTargetedPodsToHub(pods []core.Pod) { + postTargetedUrl := fmt.Sprintf("%s/pods/targeted", connector.url) if podsMarshalled, err := json.Marshal(pods); err != nil { - log.Error().Err(err).Msg("Failed to marshal the targetted pods:") + log.Error().Err(err).Msg("Failed to marshal the targeted pods:") } else { ok := false for !ok { - if _, err = utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil { + if _, err = utils.Post(postTargetedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil { if _, ok := err.(*url.Error); ok { break } - log.Debug().Err(err).Msg("Failed sending the targetted pods to Hub:") + log.Debug().Err(err).Msg("Failed sending the targeted pods to Hub:") } else { ok = true - log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:") + log.Debug().Int("pod-count", len(pods)).Msg("Reported targeted pods to Hub:") } time.Sleep(time.Second) } diff --git a/kubernetes/utils.go b/kubernetes/utils.go index 1fe11e7ed..a2e5aa471 100644 --- a/kubernetes/utils.go +++ b/kubernetes/utils.go @@ -8,19 +8,19 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func GetNodeHostToTargettedPodsMap(targettedPods []core.Pod) models.NodeToPodsMap { - nodeToTargettedPodsMap := make(models.NodeToPodsMap) - for _, pod := range targettedPods { +func GetNodeHostToTargetedPodsMap(targetedPods []core.Pod) models.NodeToPodsMap { + nodeToTargetedPodsMap := make(models.NodeToPodsMap) + for _, pod := range targetedPods { minimizedPod := getMinimizedPod(pod) - existingList := nodeToTargettedPodsMap[pod.Spec.NodeName] + existingList := nodeToTargetedPodsMap[pod.Spec.NodeName] if existingList == nil { - nodeToTargettedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod} + nodeToTargetedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod} } else { - nodeToTargettedPodsMap[pod.Spec.NodeName] = append(nodeToTargettedPodsMap[pod.Spec.NodeName], minimizedPod) + nodeToTargetedPodsMap[pod.Spec.NodeName] = append(nodeToTargetedPodsMap[pod.Spec.NodeName], minimizedPod) } } - return nodeToTargettedPodsMap + return nodeToTargetedPodsMap } func getMinimizedPod(fullPod core.Pod) core.Pod { diff --git a/kubernetes/workerSyncer.go b/kubernetes/workerSyncer.go index 85fea2a9d..1084148ab 100644 --- a/kubernetes/workerSyncer.go +++ b/kubernetes/workerSyncer.go @@ -17,23 +17,23 @@ import ( const updateWorkersDelay = 5 * time.Second -type TargettedPodChangeEvent struct { +type TargetedPodChangeEvent struct { Added []v1.Pod Removed []v1.Pod } // WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created type WorkerSyncer struct { - startTime time.Time - context context.Context - CurrentlyTargettedPods []v1.Pod - config WorkerSyncerConfig - kubernetesProvider *Provider - TapPodChangesOut chan TargettedPodChangeEvent - WorkerPodsChanges chan *v1.Pod - ErrorOut chan K8sTapManagerError - nodeToTargettedPodMap models.NodeToPodsMap - targettedNodes []string + startTime time.Time + context context.Context + CurrentlyTargetedPods []v1.Pod + config WorkerSyncerConfig + kubernetesProvider *Provider + TapPodChangesOut chan TargetedPodChangeEvent + WorkerPodsChanges chan *v1.Pod + ErrorOut chan K8sTapManagerError + nodeToTargetedPodMap models.NodeToPodsMap + targetedNodes []string } type WorkerSyncerConfig struct { @@ -51,17 +51,17 @@ type WorkerSyncerConfig struct { func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) { syncer := &WorkerSyncer{ - startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution. - context: ctx, - CurrentlyTargettedPods: make([]v1.Pod, 0), - config: config, - kubernetesProvider: kubernetesProvider, - TapPodChangesOut: make(chan TargettedPodChangeEvent, 100), - WorkerPodsChanges: make(chan *v1.Pod, 100), - ErrorOut: make(chan K8sTapManagerError, 100), + startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution. + context: ctx, + CurrentlyTargetedPods: make([]v1.Pod, 0), + config: config, + kubernetesProvider: kubernetesProvider, + TapPodChangesOut: make(chan TargetedPodChangeEvent, 100), + WorkerPodsChanges: make(chan *v1.Pod, 100), + ErrorOut: make(chan K8sTapManagerError, 100), } - if err, _ := syncer.updateCurrentlyTargettedPods(); err != nil { + if err, _ := syncer.updateCurrentlyTargetedPods(); err != nil { return nil, err } @@ -69,7 +69,7 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide return nil, err } - go syncer.watchPodsForTargetting() + go syncer.watchPodsForTargeting() go syncer.watchWorkerEvents() go syncer.watchWorkerPods() return syncer, nil @@ -179,12 +179,12 @@ func (workerSyncer *WorkerSyncer) watchWorkerEvents() { } } -func (workerSyncer *WorkerSyncer) watchPodsForTargetting() { +func (workerSyncer *WorkerSyncer) watchPodsForTargeting() { podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex) eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper) handleChangeInPods := func() { - err, changeFound := workerSyncer.updateCurrentlyTargettedPods() + err, changeFound := workerSyncer.updateCurrentlyTargetedPods() if err != nil { workerSyncer.ErrorOut <- K8sTapManagerError{ OriginalError: err, @@ -299,22 +299,22 @@ func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorke } } -func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, changesFound bool) { +func (workerSyncer *WorkerSyncer) updateCurrentlyTargetedPods() (err error, changesFound bool) { if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil { return err, false } else { podsToTarget := excludeSelfPods(matchingPods) - addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargettedPods, podsToTarget) + addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargetedPods, podsToTarget) for _, addedPod := range addedPods { - log.Info().Str("pod", addedPod.Name).Msg("Currently targetting:") + log.Info().Str("pod", addedPod.Name).Msg("Currently targeting:") } for _, removedPod := range removedPods { - log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targetting is stopped.") + log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targeting is stopped.") } if len(addedPods) > 0 || len(removedPods) > 0 { - workerSyncer.CurrentlyTargettedPods = podsToTarget - workerSyncer.nodeToTargettedPodMap = GetNodeHostToTargettedPodsMap(workerSyncer.CurrentlyTargettedPods) - workerSyncer.TapPodChangesOut <- TargettedPodChangeEvent{ + workerSyncer.CurrentlyTargetedPods = podsToTarget + workerSyncer.nodeToTargetedPodMap = GetNodeHostToTargetedPodsMap(workerSyncer.CurrentlyTargetedPods) + workerSyncer.TapPodChangesOut <- TargetedPodChangeEvent{ Added: addedPods, Removed: removedPods, } @@ -325,14 +325,14 @@ func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, cha } func (workerSyncer *WorkerSyncer) updateWorkers() error { - nodesToTarget := make([]string, len(workerSyncer.nodeToTargettedPodMap)) + nodesToTarget := make([]string, len(workerSyncer.nodeToTargetedPodMap)) i := 0 - for node := range workerSyncer.nodeToTargettedPodMap { + for node := range workerSyncer.nodeToTargetedPodMap { nodesToTarget[i] = node i++ } - if utils.EqualStringSlices(nodesToTarget, workerSyncer.targettedNodes) { + if utils.EqualStringSlices(nodesToTarget, workerSyncer.targetedNodes) { log.Debug().Msg("Skipping apply, DaemonSet is up to date") return nil } @@ -341,7 +341,7 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error { image := docker.GetWorkerImage() - if len(workerSyncer.nodeToTargettedPodMap) > 0 { + if len(workerSyncer.nodeToTargetedPodMap) > 0 { var serviceAccountName string if workerSyncer.config.SelfServiceAccountExists { serviceAccountName = ServiceAccountName @@ -349,8 +349,8 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error { serviceAccountName = "" } - nodeNames := make([]string, 0, len(workerSyncer.nodeToTargettedPodMap)) - for nodeName := range workerSyncer.nodeToTargettedPodMap { + nodeNames := make([]string, 0, len(workerSyncer.nodeToTargetedPodMap)) + for nodeName := range workerSyncer.nodeToTargetedPodMap { nodeNames = append(nodeNames, nodeName) } @@ -371,7 +371,7 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error { return err } - log.Debug().Int("worker-count", len(workerSyncer.nodeToTargettedPodMap)).Msg("Successfully created workers.") + log.Debug().Int("worker-count", len(workerSyncer.nodeToTargetedPodMap)).Msg("Successfully created workers.") } else { if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet( workerSyncer.context, @@ -385,7 +385,7 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error { log.Debug().Msg("Successfully resetted Worker DaemonSet") } - workerSyncer.targettedNodes = nodesToTarget + workerSyncer.targetedNodes = nodesToTarget return nil }