🔨 Fix the targetted typo

This commit is contained in:
M. Mert Yildiran 2023-01-08 21:44:12 +03:00
parent ac6aee07f5
commit eaa21f0789
No known key found for this signature in database
GPG Key ID: DA5D6DCBB758A461
7 changed files with 62 additions and 62 deletions

View File

@ -71,9 +71,9 @@ func tap() {
} }
} }
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targetting pods in:") log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targeting pods in:")
if err := printTargettedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil { if err := printTargetedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!") log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!")
} }
@ -113,15 +113,15 @@ This function is a bit problematic as it might be detached from the actual pods
The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any. the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
*/ */
func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error { func printTargetedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil { if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil {
return err return err
} else { } else {
if len(matchingPods) == 0 { if len(matchingPods) == 0 {
printNoPodsFoundSuggestion(namespaces) printNoPodsFoundSuggestion(namespaces)
} }
for _, targettedPod := range matchingPods { for _, targetedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targettedPod.Name))) log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name)))
} }
return nil return nil
} }
@ -160,7 +160,7 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider
log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop") log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop")
return return
} }
go connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods) go connector.PostTargetedPodsToHub(workerSyncer.CurrentlyTargetedPods)
case pod, ok := <-workerSyncer.WorkerPodsChanges: case pod, ok := <-workerSyncer.WorkerPodsChanges:
if !ok { if !ok {
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop") log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
@ -188,7 +188,7 @@ func printNoPodsFoundSuggestion(targetNamespaces []string) {
func getK8sTapManagerErrorText(err kubernetes.K8sTapManagerError) string { func getK8sTapManagerErrorText(err kubernetes.K8sTapManagerError) string {
switch err.TapManagerReason { switch err.TapManagerReason {
case kubernetes.TapManagerPodListError: case kubernetes.TapManagerPodListError:
return fmt.Sprintf("Failed to update currently targetted pods: %v", err.OriginalError) return fmt.Sprintf("Failed to update currently targeted pods: %v", err.OriginalError)
case kubernetes.TapManagerPodWatchError: case kubernetes.TapManagerPodWatchError:
return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError) return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError)
case kubernetes.TapManagerWorkerUpdateError: case kubernetes.TapManagerWorkerUpdateError:

View File

@ -18,7 +18,7 @@ func FormatError(err error) error {
if k8serrors.IsForbidden(err) { if k8serrors.IsForbidden(err) {
errorNew = fmt.Errorf("insufficient permissions: %w. "+ errorNew = fmt.Errorf("insufficient permissions: %w. "+
"supply the required permission or control %s's access to namespaces by setting %s "+ "supply the required permission or control %s's access to namespaces by setting %s "+
"in the config file or setting the targetted namespace with --%s %s=<NAMEPSACE>", "in the config file or setting the targeted namespace with --%s %s=<NAMEPSACE>",
err, err,
misc.Software, misc.Software,
config.SelfNamespaceConfigName, config.SelfNamespaceConfigName,

2
go.mod
View File

@ -8,7 +8,7 @@ require (
github.com/docker/go-connections v0.4.0 github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0 github.com/docker/go-units v0.4.0
github.com/google/go-github/v37 v37.0.0 github.com/google/go-github/v37 v37.0.0
github.com/kubeshark/base v0.5.0 github.com/kubeshark/base v0.5.1
github.com/rs/zerolog v1.28.0 github.com/rs/zerolog v1.28.0
github.com/spf13/cobra v1.3.0 github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5

4
go.sum
View File

@ -414,8 +414,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubeshark/base v0.5.0 h1:ZQ/wNIdmLeDwy143korRZyPorcqBgf1y/tQIiIenAL0= github.com/kubeshark/base v0.5.1 h1:msy1iQLgWQK1COoicwWxEDbeXU9J5RuptA5fYeOEzfA=
github.com/kubeshark/base v0.5.0/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc= github.com/kubeshark/base v0.5.1/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc= github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=

View File

@ -116,22 +116,22 @@ func (connector *Connector) PostStorageLimitToHub(limit int64) {
} }
} }
func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) { func (connector *Connector) PostTargetedPodsToHub(pods []core.Pod) {
postTargettedUrl := fmt.Sprintf("%s/pods/targetted", connector.url) postTargetedUrl := fmt.Sprintf("%s/pods/targeted", connector.url)
if podsMarshalled, err := json.Marshal(pods); err != nil { if podsMarshalled, err := json.Marshal(pods); err != nil {
log.Error().Err(err).Msg("Failed to marshal the targetted pods:") log.Error().Err(err).Msg("Failed to marshal the targeted pods:")
} else { } else {
ok := false ok := false
for !ok { for !ok {
if _, err = utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil { if _, err = utils.Post(postTargetedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
if _, ok := err.(*url.Error); ok { if _, ok := err.(*url.Error); ok {
break break
} }
log.Debug().Err(err).Msg("Failed sending the targetted pods to Hub:") log.Debug().Err(err).Msg("Failed sending the targeted pods to Hub:")
} else { } else {
ok = true ok = true
log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:") log.Debug().Int("pod-count", len(pods)).Msg("Reported targeted pods to Hub:")
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} }

View File

@ -8,19 +8,19 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
func GetNodeHostToTargettedPodsMap(targettedPods []core.Pod) models.NodeToPodsMap { func GetNodeHostToTargetedPodsMap(targetedPods []core.Pod) models.NodeToPodsMap {
nodeToTargettedPodsMap := make(models.NodeToPodsMap) nodeToTargetedPodsMap := make(models.NodeToPodsMap)
for _, pod := range targettedPods { for _, pod := range targetedPods {
minimizedPod := getMinimizedPod(pod) minimizedPod := getMinimizedPod(pod)
existingList := nodeToTargettedPodsMap[pod.Spec.NodeName] existingList := nodeToTargetedPodsMap[pod.Spec.NodeName]
if existingList == nil { if existingList == nil {
nodeToTargettedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod} nodeToTargetedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
} else { } else {
nodeToTargettedPodsMap[pod.Spec.NodeName] = append(nodeToTargettedPodsMap[pod.Spec.NodeName], minimizedPod) nodeToTargetedPodsMap[pod.Spec.NodeName] = append(nodeToTargetedPodsMap[pod.Spec.NodeName], minimizedPod)
} }
} }
return nodeToTargettedPodsMap return nodeToTargetedPodsMap
} }
func getMinimizedPod(fullPod core.Pod) core.Pod { func getMinimizedPod(fullPod core.Pod) core.Pod {

View File

@ -17,23 +17,23 @@ import (
const updateWorkersDelay = 5 * time.Second const updateWorkersDelay = 5 * time.Second
type TargettedPodChangeEvent struct { type TargetedPodChangeEvent struct {
Added []v1.Pod Added []v1.Pod
Removed []v1.Pod Removed []v1.Pod
} }
// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created // WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created
type WorkerSyncer struct { type WorkerSyncer struct {
startTime time.Time startTime time.Time
context context.Context context context.Context
CurrentlyTargettedPods []v1.Pod CurrentlyTargetedPods []v1.Pod
config WorkerSyncerConfig config WorkerSyncerConfig
kubernetesProvider *Provider kubernetesProvider *Provider
TapPodChangesOut chan TargettedPodChangeEvent TapPodChangesOut chan TargetedPodChangeEvent
WorkerPodsChanges chan *v1.Pod WorkerPodsChanges chan *v1.Pod
ErrorOut chan K8sTapManagerError ErrorOut chan K8sTapManagerError
nodeToTargettedPodMap models.NodeToPodsMap nodeToTargetedPodMap models.NodeToPodsMap
targettedNodes []string targetedNodes []string
} }
type WorkerSyncerConfig struct { type WorkerSyncerConfig struct {
@ -51,17 +51,17 @@ type WorkerSyncerConfig struct {
func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) { func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) {
syncer := &WorkerSyncer{ syncer := &WorkerSyncer{
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution. startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
context: ctx, context: ctx,
CurrentlyTargettedPods: make([]v1.Pod, 0), CurrentlyTargetedPods: make([]v1.Pod, 0),
config: config, config: config,
kubernetesProvider: kubernetesProvider, kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TargettedPodChangeEvent, 100), TapPodChangesOut: make(chan TargetedPodChangeEvent, 100),
WorkerPodsChanges: make(chan *v1.Pod, 100), WorkerPodsChanges: make(chan *v1.Pod, 100),
ErrorOut: make(chan K8sTapManagerError, 100), ErrorOut: make(chan K8sTapManagerError, 100),
} }
if err, _ := syncer.updateCurrentlyTargettedPods(); err != nil { if err, _ := syncer.updateCurrentlyTargetedPods(); err != nil {
return nil, err return nil, err
} }
@ -69,7 +69,7 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide
return nil, err return nil, err
} }
go syncer.watchPodsForTargetting() go syncer.watchPodsForTargeting()
go syncer.watchWorkerEvents() go syncer.watchWorkerEvents()
go syncer.watchWorkerPods() go syncer.watchWorkerPods()
return syncer, nil return syncer, nil
@ -179,12 +179,12 @@ func (workerSyncer *WorkerSyncer) watchWorkerEvents() {
} }
} }
func (workerSyncer *WorkerSyncer) watchPodsForTargetting() { func (workerSyncer *WorkerSyncer) watchPodsForTargeting() {
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex) podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper) eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
handleChangeInPods := func() { handleChangeInPods := func() {
err, changeFound := workerSyncer.updateCurrentlyTargettedPods() err, changeFound := workerSyncer.updateCurrentlyTargetedPods()
if err != nil { if err != nil {
workerSyncer.ErrorOut <- K8sTapManagerError{ workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err, OriginalError: err,
@ -299,22 +299,22 @@ func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorke
} }
} }
func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, changesFound bool) { func (workerSyncer *WorkerSyncer) updateCurrentlyTargetedPods() (err error, changesFound bool) {
if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil { if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil {
return err, false return err, false
} else { } else {
podsToTarget := excludeSelfPods(matchingPods) podsToTarget := excludeSelfPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargettedPods, podsToTarget) addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargetedPods, podsToTarget)
for _, addedPod := range addedPods { for _, addedPod := range addedPods {
log.Info().Str("pod", addedPod.Name).Msg("Currently targetting:") log.Info().Str("pod", addedPod.Name).Msg("Currently targeting:")
} }
for _, removedPod := range removedPods { for _, removedPod := range removedPods {
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targetting is stopped.") log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targeting is stopped.")
} }
if len(addedPods) > 0 || len(removedPods) > 0 { if len(addedPods) > 0 || len(removedPods) > 0 {
workerSyncer.CurrentlyTargettedPods = podsToTarget workerSyncer.CurrentlyTargetedPods = podsToTarget
workerSyncer.nodeToTargettedPodMap = GetNodeHostToTargettedPodsMap(workerSyncer.CurrentlyTargettedPods) workerSyncer.nodeToTargetedPodMap = GetNodeHostToTargetedPodsMap(workerSyncer.CurrentlyTargetedPods)
workerSyncer.TapPodChangesOut <- TargettedPodChangeEvent{ workerSyncer.TapPodChangesOut <- TargetedPodChangeEvent{
Added: addedPods, Added: addedPods,
Removed: removedPods, Removed: removedPods,
} }
@ -325,14 +325,14 @@ func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, cha
} }
func (workerSyncer *WorkerSyncer) updateWorkers() error { func (workerSyncer *WorkerSyncer) updateWorkers() error {
nodesToTarget := make([]string, len(workerSyncer.nodeToTargettedPodMap)) nodesToTarget := make([]string, len(workerSyncer.nodeToTargetedPodMap))
i := 0 i := 0
for node := range workerSyncer.nodeToTargettedPodMap { for node := range workerSyncer.nodeToTargetedPodMap {
nodesToTarget[i] = node nodesToTarget[i] = node
i++ i++
} }
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targettedNodes) { if utils.EqualStringSlices(nodesToTarget, workerSyncer.targetedNodes) {
log.Debug().Msg("Skipping apply, DaemonSet is up to date") log.Debug().Msg("Skipping apply, DaemonSet is up to date")
return nil return nil
} }
@ -341,7 +341,7 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
image := docker.GetWorkerImage() image := docker.GetWorkerImage()
if len(workerSyncer.nodeToTargettedPodMap) > 0 { if len(workerSyncer.nodeToTargetedPodMap) > 0 {
var serviceAccountName string var serviceAccountName string
if workerSyncer.config.SelfServiceAccountExists { if workerSyncer.config.SelfServiceAccountExists {
serviceAccountName = ServiceAccountName serviceAccountName = ServiceAccountName
@ -349,8 +349,8 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
serviceAccountName = "" serviceAccountName = ""
} }
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargettedPodMap)) nodeNames := make([]string, 0, len(workerSyncer.nodeToTargetedPodMap))
for nodeName := range workerSyncer.nodeToTargettedPodMap { for nodeName := range workerSyncer.nodeToTargetedPodMap {
nodeNames = append(nodeNames, nodeName) nodeNames = append(nodeNames, nodeName)
} }
@ -371,7 +371,7 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
return err return err
} }
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargettedPodMap)).Msg("Successfully created workers.") log.Debug().Int("worker-count", len(workerSyncer.nodeToTargetedPodMap)).Msg("Successfully created workers.")
} else { } else {
if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet( if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet(
workerSyncer.context, workerSyncer.context,
@ -385,7 +385,7 @@ func (workerSyncer *WorkerSyncer) updateWorkers() error {
log.Debug().Msg("Successfully resetted Worker DaemonSet") log.Debug().Msg("Successfully resetted Worker DaemonSet")
} }
workerSyncer.targettedNodes = nodesToTarget workerSyncer.targetedNodes = nodesToTarget
return nil return nil
} }