From 4e3233ade8242a51357c31064013c29182bfba43 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Sun, 22 Jan 2023 04:14:44 +0300 Subject: [PATCH] :sparkles: Deploy workers DaemonSet without NodeSelector and call `POST /pods/regex` to set the pod regex in Hub --- cmd/tapRunner.go | 65 +++------ internal/connect/hub.go | 25 ++-- kubernetes/provider.go | 21 --- kubernetes/utils.go | 40 ------ kubernetes/workerSyncer.go | 282 ------------------------------------- kubernetes/workers.go | 52 +++++++ 6 files changed, 86 insertions(+), 399 deletions(-) delete mode 100644 kubernetes/workerSyncer.go create mode 100644 kubernetes/workers.go diff --git a/cmd/tapRunner.go b/cmd/tapRunner.go index e2b53bf5d..25b0d9a7d 100644 --- a/cmd/tapRunner.go +++ b/cmd/tapRunner.go @@ -121,56 +121,12 @@ func printTargetedPodsPreview(ctx context.Context, kubernetesProvider *kubernete printNoPodsFoundSuggestion(namespaces) } for _, targetedPod := range matchingPods { - log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Cyan, targetedPod.Name))) + log.Info().Msg(fmt.Sprintf("Targeted pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name))) } return nil } } -func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error { - workerSyncer, err := kubernetes.CreateAndStartWorkerSyncer(ctx, provider, kubernetes.WorkerSyncerConfig{ - TargetNamespaces: targetNamespaces, - PodFilterRegex: *config.Config.Tap.PodRegex(), - SelfNamespace: config.Config.SelfNamespace, - WorkerResources: config.Config.Tap.Resources.Worker, - ImagePullPolicy: config.Config.ImagePullPolicy(), - ImagePullSecrets: config.Config.ImagePullSecrets(), - SelfServiceAccountExists: state.selfServiceAccountExists, - ServiceMesh: config.Config.Tap.ServiceMesh, - Tls: config.Config.Tap.Tls, - Debug: config.Config.Tap.Debug, - }, startTime) - - if err != nil { - return err - } - - go func() { - for { - select { - case syncerErr, ok := <-workerSyncer.ErrorOut: - if !ok { - log.Debug().Msg("workerSyncer err channel closed, ending listener loop") - return - } - log.Error().Msg(getK8sTapManagerErrorText(syncerErr)) - cancel() - case _, ok := <-workerSyncer.TapPodChangesOut: - if !ok { - log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop") - return - } - go connector.PostTargetedPodsToHub(workerSyncer.CurrentlyTargetedPods) - case <-ctx.Done(): - log.Debug().Msg("workerSyncer event listener loop exiting due to context done") - return - } - } - }() - - return nil -} - func printNoPodsFoundSuggestion(targetNamespaces []string) { var suggestionStr string if !utils.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) { @@ -433,11 +389,24 @@ func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider "/echo", ) - if err := startWorkerSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil { - log.Error().Err(errormessage.FormatError(err)).Msg("Error starting worker syncer") - cancel() + err := kubernetes.CreateWorkers( + kubernetesProvider, + state.selfServiceAccountExists, + ctx, + config.Config.SelfNamespace, + config.Config.Tap.Resources.Worker, + config.Config.ImagePullPolicy(), + config.Config.ImagePullSecrets(), + config.Config.Tap.ServiceMesh, + config.Config.Tap.Tls, + config.Config.Tap.Debug, + ) + if err != nil { + log.Error().Err(err).Send() } + connector.PostRegexToHub(config.Config.Tap.PodRegexStr, state.targetNamespaces) + url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort) log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, "Hub is available at:")) } diff --git a/internal/connect/hub.go b/internal/connect/hub.go index e1e0ee083..7404c650f 100644 --- a/internal/connect/hub.go +++ b/internal/connect/hub.go @@ -12,7 +12,6 @@ import ( "github.com/kubeshark/kubeshark/utils" "github.com/rs/zerolog/log" - core "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" ) @@ -115,22 +114,32 @@ func (connector *Connector) PostStorageLimitToHub(limit int64) { } } -func (connector *Connector) PostTargetedPodsToHub(pods []core.Pod) { - postTargetedUrl := fmt.Sprintf("%s/pods/targeted", connector.url) +type postRegexRequest struct { + Regex string `json:"regex"` + Namespaces []string `json:"namespaces"` +} - if podsMarshalled, err := json.Marshal(pods); err != nil { - log.Error().Err(err).Msg("Failed to marshal the targeted pods:") +func (connector *Connector) PostRegexToHub(regex string, namespaces []string) { + postRegexUrl := fmt.Sprintf("%s/pods/regex", connector.url) + + payload := postRegexRequest{ + Regex: regex, + Namespaces: namespaces, + } + + if payloadMarshalled, err := json.Marshal(payload); err != nil { + log.Error().Err(err).Msg("Failed to marshal the payload:") } else { ok := false for !ok { - if _, err = utils.Post(postTargetedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil { + if _, err = utils.Post(postRegexUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil { if _, ok := err.(*url.Error); ok { break } - log.Debug().Err(err).Msg("Failed sending the targeted pods to Hub:") + log.Debug().Err(err).Msg("Failed sending the payload to Hub:") } else { ok = true - log.Debug().Int("pod-count", len(pods)).Msg("Reported targeted pods to Hub:") + log.Debug().Str("regex", regex).Strs("namespaces", namespaces).Msg("Reported payload to Hub:") } time.Sleep(time.Second) } diff --git a/kubernetes/provider.go b/kubernetes/provider.go index 93ee09284..06da7a1da 100644 --- a/kubernetes/provider.go +++ b/kubernetes/provider.go @@ -652,7 +652,6 @@ func (provider *Provider) ApplyWorkerDaemonSet( daemonSetName string, podImage string, workerPodName string, - nodeNames []string, serviceAccountName string, resources Resources, imagePullPolicy core.PullPolicy, @@ -662,17 +661,12 @@ func (provider *Provider) ApplyWorkerDaemonSet( debug bool, ) error { log.Debug(). - Int("node-count", len(nodeNames)). Str("namespace", namespace). Str("daemonset-name", daemonSetName). Str("image", podImage). Str("pod", workerPodName). Msg("Applying worker DaemonSets.") - if len(nodeNames) == 0 { - return fmt.Errorf("DaemonSet %s must target at least 1 pod", daemonSetName) - } - command := []string{"./worker", "-i", "any", "-port", "8897"} if debug { @@ -752,22 +746,7 @@ func (provider *Provider) ApplyWorkerDaemonSet( workerResources := applyconfcore.ResourceRequirements().WithRequests(workerResourceRequests).WithLimits(workerResourceLimits) workerContainer.WithResources(workerResources) - matchFields := make([]*applyconfcore.NodeSelectorTermApplyConfiguration, 0) - for _, nodeName := range nodeNames { - nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement() - nodeSelectorRequirement.WithKey("metadata.name") - nodeSelectorRequirement.WithOperator(core.NodeSelectorOpIn) - nodeSelectorRequirement.WithValues(nodeName) - - nodeSelectorTerm := applyconfcore.NodeSelectorTerm() - nodeSelectorTerm.WithMatchFields(nodeSelectorRequirement) - matchFields = append(matchFields, nodeSelectorTerm) - } - - nodeSelector := applyconfcore.NodeSelector() - nodeSelector.WithNodeSelectorTerms(matchFields...) nodeAffinity := applyconfcore.NodeAffinity() - nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector) affinity := applyconfcore.Affinity() affinity.WithNodeAffinity(nodeAffinity) diff --git a/kubernetes/utils.go b/kubernetes/utils.go index a2e5aa471..da6c2098a 100644 --- a/kubernetes/utils.go +++ b/kubernetes/utils.go @@ -1,8 +1,6 @@ package kubernetes import ( - "regexp" - "github.com/kubeshark/base/pkg/models" core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -48,44 +46,6 @@ func getMinimizedContainerStatuses(fullPod core.Pod) []core.ContainerStatus { return result } -func excludeSelfPods(pods []core.Pod) []core.Pod { - selfPrefixRegex := regexp.MustCompile("^" + SelfResourcesPrefix) - - nonSelfPods := make([]core.Pod, 0) - for _, pod := range pods { - if !selfPrefixRegex.MatchString(pod.Name) { - nonSelfPods = append(nonSelfPods, pod) - } - } - - return nonSelfPods -} - -func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) { - added = getMissingPods(newPods, oldPods) - removed = getMissingPods(oldPods, newPods) - - return added, removed -} - -//returns pods present in pods1 array and missing in pods2 array -func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod { - missingPods := make([]core.Pod, 0) - for _, pod1 := range pods1 { - var found = false - for _, pod2 := range pods2 { - if pod1.UID == pod2.UID { - found = true - break - } - } - if !found { - missingPods = append(missingPods, pod1) - } - } - return missingPods -} - func GetPodInfosForPods(pods []core.Pod) []*models.PodInfo { podInfos := make([]*models.PodInfo, 0) for _, pod := range pods { diff --git a/kubernetes/workerSyncer.go b/kubernetes/workerSyncer.go deleted file mode 100644 index 6217f5049..000000000 --- a/kubernetes/workerSyncer.go +++ /dev/null @@ -1,282 +0,0 @@ -package kubernetes - -import ( - "context" - "fmt" - "regexp" - "time" - - "github.com/kubeshark/base/pkg/models" - "github.com/kubeshark/kubeshark/debounce" - "github.com/kubeshark/kubeshark/docker" - "github.com/kubeshark/kubeshark/utils" - "github.com/rs/zerolog/log" - v1 "k8s.io/api/core/v1" -) - -const updateWorkersDelay = 5 * time.Second - -type TargetedPodChangeEvent struct { - Added []v1.Pod - Removed []v1.Pod -} - -// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created -type WorkerSyncer struct { - startTime time.Time - context context.Context - CurrentlyTargetedPods []v1.Pod - config WorkerSyncerConfig - kubernetesProvider *Provider - TapPodChangesOut chan TargetedPodChangeEvent - ErrorOut chan K8sTapManagerError - nodeToTargetedPodMap models.NodeToPodsMap - targetedNodes []string -} - -type WorkerSyncerConfig struct { - TargetNamespaces []string - PodFilterRegex regexp.Regexp - SelfNamespace string - WorkerResources Resources - ImagePullPolicy v1.PullPolicy - ImagePullSecrets []v1.LocalObjectReference - SelfServiceAccountExists bool - ServiceMesh bool - Tls bool - Debug bool -} - -func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) { - syncer := &WorkerSyncer{ - startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution. - context: ctx, - CurrentlyTargetedPods: make([]v1.Pod, 0), - config: config, - kubernetesProvider: kubernetesProvider, - TapPodChangesOut: make(chan TargetedPodChangeEvent, 100), - ErrorOut: make(chan K8sTapManagerError, 100), - } - - if err, _ := syncer.updateCurrentlyTargetedPods(); err != nil { - return nil, err - } - - if err := syncer.updateWorkers(); err != nil { - return nil, err - } - - go syncer.watchPodsForTargeting() - return syncer, nil -} - -func (workerSyncer *WorkerSyncer) watchPodsForTargeting() { - podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex) - eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper) - - handleChangeInPods := func() { - err, changeFound := workerSyncer.updateCurrentlyTargetedPods() - if err != nil { - workerSyncer.ErrorOut <- K8sTapManagerError{ - OriginalError: err, - TapManagerReason: TapManagerPodListError, - } - } - - if !changeFound { - log.Debug().Msg("Nothing changed. Updating workers is not needed.") - return - } - if err := workerSyncer.updateWorkers(); err != nil { - workerSyncer.ErrorOut <- K8sTapManagerError{ - OriginalError: err, - TapManagerReason: TapManagerWorkerUpdateError, - } - } - } - restartWorkersDebouncer := debounce.NewDebouncer(updateWorkersDelay, handleChangeInPods) - - for { - select { - case wEvent, ok := <-eventChan: - if !ok { - eventChan = nil - continue - } - - pod, err := wEvent.ToPod() - if err != nil { - workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer) - continue - } - - switch wEvent.Type { - case EventAdded: - log.Debug(). - Str("pod", pod.Name). - Str("namespace", pod.Namespace). - Msg("Added matching pod.") - if err := restartWorkersDebouncer.SetOn(); err != nil { - log.Error(). - Str("pod", pod.Name). - Str("namespace", pod.Namespace). - Err(err). - Msg("While restarting workers!") - } - case EventDeleted: - log.Debug(). - Str("pod", pod.Name). - Str("namespace", pod.Namespace). - Msg("Removed matching pod.") - if err := restartWorkersDebouncer.SetOn(); err != nil { - log.Error(). - Str("pod", pod.Name). - Str("namespace", pod.Namespace). - Err(err). - Msg("While restarting workers!") - } - case EventModified: - log.Debug(). - Str("pod", pod.Name). - Str("namespace", pod.Namespace). - Str("ip", pod.Status.PodIP). - Interface("phase", pod.Status.Phase). - Msg("Modified matching pod.") - - // Act only if the modified pod has already obtained an IP address. - // After filtering for IPs, on a normal pod restart this includes the following events: - // - Pod deletion - // - Pod reaches start state - // - Pod reaches ready state - // Ready/unready transitions might also trigger this event. - if pod.Status.PodIP != "" { - if err := restartWorkersDebouncer.SetOn(); err != nil { - log.Error(). - Str("pod", pod.Name). - Str("namespace", pod.Namespace). - Err(err). - Msg("While restarting workers!") - } - } - case EventBookmark: - break - case EventError: - break - } - case err, ok := <-errorChan: - if !ok { - errorChan = nil - continue - } - - workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer) - continue - - case <-workerSyncer.context.Done(): - log.Debug().Msg("Watching pods, context done. Stopping \"restart workers debouncer\"") - restartWorkersDebouncer.Cancel() - // TODO: Does this also perform cleanup? - return - } - } -} - -func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorkersDebouncer *debounce.Debouncer) { - log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart workers debouncer\"") - restartWorkersDebouncer.Cancel() - workerSyncer.ErrorOut <- K8sTapManagerError{ - OriginalError: err, - TapManagerReason: TapManagerPodWatchError, - } -} - -func (workerSyncer *WorkerSyncer) updateCurrentlyTargetedPods() (err error, changesFound bool) { - if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil { - return err, false - } else { - podsToTarget := excludeSelfPods(matchingPods) - addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargetedPods, podsToTarget) - for _, addedPod := range addedPods { - log.Info().Msg(fmt.Sprintf("Targeted pod: %s", fmt.Sprintf(utils.Green, addedPod.Name))) - } - for _, removedPod := range removedPods { - log.Info().Msg(fmt.Sprintf("Untargeted pod: %s", fmt.Sprintf(utils.Red, removedPod.Name))) - } - if len(addedPods) > 0 || len(removedPods) > 0 { - workerSyncer.CurrentlyTargetedPods = podsToTarget - workerSyncer.nodeToTargetedPodMap = GetNodeHostToTargetedPodsMap(workerSyncer.CurrentlyTargetedPods) - workerSyncer.TapPodChangesOut <- TargetedPodChangeEvent{ - Added: addedPods, - Removed: removedPods, - } - return nil, true - } - return nil, false - } -} - -func (workerSyncer *WorkerSyncer) updateWorkers() error { - nodesToTarget := make([]string, len(workerSyncer.nodeToTargetedPodMap)) - i := 0 - for node := range workerSyncer.nodeToTargetedPodMap { - nodesToTarget[i] = node - i++ - } - - if utils.EqualStringSlices(nodesToTarget, workerSyncer.targetedNodes) { - log.Debug().Msg("Skipping apply, DaemonSet is up to date") - return nil - } - - log.Debug().Strs("nodes", nodesToTarget).Msg("Updating DaemonSet to run on nodes.") - - image := docker.GetWorkerImage() - - if len(workerSyncer.nodeToTargetedPodMap) > 0 { - var serviceAccountName string - if workerSyncer.config.SelfServiceAccountExists { - serviceAccountName = ServiceAccountName - } else { - serviceAccountName = "" - } - - nodeNames := make([]string, 0, len(workerSyncer.nodeToTargetedPodMap)) - for nodeName := range workerSyncer.nodeToTargetedPodMap { - nodeNames = append(nodeNames, nodeName) - } - - if err := workerSyncer.kubernetesProvider.ApplyWorkerDaemonSet( - workerSyncer.context, - workerSyncer.config.SelfNamespace, - WorkerDaemonSetName, - image, - WorkerPodName, - nodeNames, - serviceAccountName, - workerSyncer.config.WorkerResources, - workerSyncer.config.ImagePullPolicy, - workerSyncer.config.ImagePullSecrets, - workerSyncer.config.ServiceMesh, - workerSyncer.config.Tls, - workerSyncer.config.Debug); err != nil { - return err - } - - log.Debug().Int("worker-count", len(workerSyncer.nodeToTargetedPodMap)).Msg("Successfully created workers.") - } else { - if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet( - workerSyncer.context, - workerSyncer.config.SelfNamespace, - WorkerDaemonSetName, - image, - WorkerPodName); err != nil { - return err - } - - log.Debug().Msg("Successfully resetted Worker DaemonSet") - } - - workerSyncer.targetedNodes = nodesToTarget - - return nil -} diff --git a/kubernetes/workers.go b/kubernetes/workers.go new file mode 100644 index 000000000..894d767e4 --- /dev/null +++ b/kubernetes/workers.go @@ -0,0 +1,52 @@ +package kubernetes + +import ( + "context" + + "github.com/kubeshark/kubeshark/docker" + "github.com/rs/zerolog/log" + core "k8s.io/api/core/v1" +) + +func CreateWorkers( + kubernetesProvider *Provider, + selfServiceAccountExists bool, + ctx context.Context, + namespace string, + resources Resources, + imagePullPolicy core.PullPolicy, + imagePullSecrets []core.LocalObjectReference, + serviceMesh bool, + tls bool, + debug bool, +) error { + image := docker.GetWorkerImage() + + var serviceAccountName string + if selfServiceAccountExists { + serviceAccountName = ServiceAccountName + } else { + serviceAccountName = "" + } + + if err := kubernetesProvider.ApplyWorkerDaemonSet( + ctx, + namespace, + WorkerDaemonSetName, + image, + WorkerPodName, + serviceAccountName, + resources, + imagePullPolicy, + imagePullSecrets, + serviceMesh, + tls, + debug, + ); err != nil { + return err + } + + log.Debug().Msg("Successfully created workers.") + + return nil +}