Deploy workers DaemonSet without NodeSelector and call POST /pods/regex to set the pod regex in Hub

This commit is contained in:
M. Mert Yildiran
2023-01-22 04:14:44 +03:00
parent 846f253a03
commit 4e3233ade8
6 changed files with 86 additions and 399 deletions

View File

@@ -121,56 +121,12 @@ func printTargetedPodsPreview(ctx context.Context, kubernetesProvider *kubernete
printNoPodsFoundSuggestion(namespaces)
}
for _, targetedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Cyan, targetedPod.Name)))
log.Info().Msg(fmt.Sprintf("Targeted pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name)))
}
return nil
}
}
func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error {
workerSyncer, err := kubernetes.CreateAndStartWorkerSyncer(ctx, provider, kubernetes.WorkerSyncerConfig{
TargetNamespaces: targetNamespaces,
PodFilterRegex: *config.Config.Tap.PodRegex(),
SelfNamespace: config.Config.SelfNamespace,
WorkerResources: config.Config.Tap.Resources.Worker,
ImagePullPolicy: config.Config.ImagePullPolicy(),
ImagePullSecrets: config.Config.ImagePullSecrets(),
SelfServiceAccountExists: state.selfServiceAccountExists,
ServiceMesh: config.Config.Tap.ServiceMesh,
Tls: config.Config.Tap.Tls,
Debug: config.Config.Tap.Debug,
}, startTime)
if err != nil {
return err
}
go func() {
for {
select {
case syncerErr, ok := <-workerSyncer.ErrorOut:
if !ok {
log.Debug().Msg("workerSyncer err channel closed, ending listener loop")
return
}
log.Error().Msg(getK8sTapManagerErrorText(syncerErr))
cancel()
case _, ok := <-workerSyncer.TapPodChangesOut:
if !ok {
log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop")
return
}
go connector.PostTargetedPodsToHub(workerSyncer.CurrentlyTargetedPods)
case <-ctx.Done():
log.Debug().Msg("workerSyncer event listener loop exiting due to context done")
return
}
}
}()
return nil
}
func printNoPodsFoundSuggestion(targetNamespaces []string) {
var suggestionStr string
if !utils.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
@@ -433,11 +389,24 @@ func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider
"/echo",
)
if err := startWorkerSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error starting worker syncer")
cancel()
err := kubernetes.CreateWorkers(
kubernetesProvider,
state.selfServiceAccountExists,
ctx,
config.Config.SelfNamespace,
config.Config.Tap.Resources.Worker,
config.Config.ImagePullPolicy(),
config.Config.ImagePullSecrets(),
config.Config.Tap.ServiceMesh,
config.Config.Tap.Tls,
config.Config.Tap.Debug,
)
if err != nil {
log.Error().Err(err).Send()
}
connector.PostRegexToHub(config.Config.Tap.PodRegexStr, state.targetNamespaces)
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, "Hub is available at:"))
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)
@@ -115,22 +114,32 @@ func (connector *Connector) PostStorageLimitToHub(limit int64) {
}
}
func (connector *Connector) PostTargetedPodsToHub(pods []core.Pod) {
postTargetedUrl := fmt.Sprintf("%s/pods/targeted", connector.url)
type postRegexRequest struct {
Regex string `json:"regex"`
Namespaces []string `json:"namespaces"`
}
if podsMarshalled, err := json.Marshal(pods); err != nil {
log.Error().Err(err).Msg("Failed to marshal the targeted pods:")
func (connector *Connector) PostRegexToHub(regex string, namespaces []string) {
postRegexUrl := fmt.Sprintf("%s/pods/regex", connector.url)
payload := postRegexRequest{
Regex: regex,
Namespaces: namespaces,
}
if payloadMarshalled, err := json.Marshal(payload); err != nil {
log.Error().Err(err).Msg("Failed to marshal the payload:")
} else {
ok := false
for !ok {
if _, err = utils.Post(postTargetedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
if _, err = utils.Post(postRegexUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil {
if _, ok := err.(*url.Error); ok {
break
}
log.Debug().Err(err).Msg("Failed sending the targeted pods to Hub:")
log.Debug().Err(err).Msg("Failed sending the payload to Hub:")
} else {
ok = true
log.Debug().Int("pod-count", len(pods)).Msg("Reported targeted pods to Hub:")
log.Debug().Str("regex", regex).Strs("namespaces", namespaces).Msg("Reported payload to Hub:")
}
time.Sleep(time.Second)
}

View File

@@ -652,7 +652,6 @@ func (provider *Provider) ApplyWorkerDaemonSet(
daemonSetName string,
podImage string,
workerPodName string,
nodeNames []string,
serviceAccountName string,
resources Resources,
imagePullPolicy core.PullPolicy,
@@ -662,17 +661,12 @@ func (provider *Provider) ApplyWorkerDaemonSet(
debug bool,
) error {
log.Debug().
Int("node-count", len(nodeNames)).
Str("namespace", namespace).
Str("daemonset-name", daemonSetName).
Str("image", podImage).
Str("pod", workerPodName).
Msg("Applying worker DaemonSets.")
if len(nodeNames) == 0 {
return fmt.Errorf("DaemonSet %s must target at least 1 pod", daemonSetName)
}
command := []string{"./worker", "-i", "any", "-port", "8897"}
if debug {
@@ -752,22 +746,7 @@ func (provider *Provider) ApplyWorkerDaemonSet(
workerResources := applyconfcore.ResourceRequirements().WithRequests(workerResourceRequests).WithLimits(workerResourceLimits)
workerContainer.WithResources(workerResources)
matchFields := make([]*applyconfcore.NodeSelectorTermApplyConfiguration, 0)
for _, nodeName := range nodeNames {
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
nodeSelectorRequirement.WithKey("metadata.name")
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpIn)
nodeSelectorRequirement.WithValues(nodeName)
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
nodeSelectorTerm.WithMatchFields(nodeSelectorRequirement)
matchFields = append(matchFields, nodeSelectorTerm)
}
nodeSelector := applyconfcore.NodeSelector()
nodeSelector.WithNodeSelectorTerms(matchFields...)
nodeAffinity := applyconfcore.NodeAffinity()
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
affinity := applyconfcore.Affinity()
affinity.WithNodeAffinity(nodeAffinity)

View File

@@ -1,8 +1,6 @@
package kubernetes
import (
"regexp"
"github.com/kubeshark/base/pkg/models"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -48,44 +46,6 @@ func getMinimizedContainerStatuses(fullPod core.Pod) []core.ContainerStatus {
return result
}
func excludeSelfPods(pods []core.Pod) []core.Pod {
selfPrefixRegex := regexp.MustCompile("^" + SelfResourcesPrefix)
nonSelfPods := make([]core.Pod, 0)
for _, pod := range pods {
if !selfPrefixRegex.MatchString(pod.Name) {
nonSelfPods = append(nonSelfPods, pod)
}
}
return nonSelfPods
}
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
added = getMissingPods(newPods, oldPods)
removed = getMissingPods(oldPods, newPods)
return added, removed
}
//returns pods present in pods1 array and missing in pods2 array
func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
missingPods := make([]core.Pod, 0)
for _, pod1 := range pods1 {
var found = false
for _, pod2 := range pods2 {
if pod1.UID == pod2.UID {
found = true
break
}
}
if !found {
missingPods = append(missingPods, pod1)
}
}
return missingPods
}
func GetPodInfosForPods(pods []core.Pod) []*models.PodInfo {
podInfos := make([]*models.PodInfo, 0)
for _, pod := range pods {

View File

@@ -1,282 +0,0 @@
package kubernetes
import (
"context"
"fmt"
"regexp"
"time"
"github.com/kubeshark/base/pkg/models"
"github.com/kubeshark/kubeshark/debounce"
"github.com/kubeshark/kubeshark/docker"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
v1 "k8s.io/api/core/v1"
)
const updateWorkersDelay = 5 * time.Second
type TargetedPodChangeEvent struct {
Added []v1.Pod
Removed []v1.Pod
}
// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created
type WorkerSyncer struct {
startTime time.Time
context context.Context
CurrentlyTargetedPods []v1.Pod
config WorkerSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TargetedPodChangeEvent
ErrorOut chan K8sTapManagerError
nodeToTargetedPodMap models.NodeToPodsMap
targetedNodes []string
}
type WorkerSyncerConfig struct {
TargetNamespaces []string
PodFilterRegex regexp.Regexp
SelfNamespace string
WorkerResources Resources
ImagePullPolicy v1.PullPolicy
ImagePullSecrets []v1.LocalObjectReference
SelfServiceAccountExists bool
ServiceMesh bool
Tls bool
Debug bool
}
func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) {
syncer := &WorkerSyncer{
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
context: ctx,
CurrentlyTargetedPods: make([]v1.Pod, 0),
config: config,
kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TargetedPodChangeEvent, 100),
ErrorOut: make(chan K8sTapManagerError, 100),
}
if err, _ := syncer.updateCurrentlyTargetedPods(); err != nil {
return nil, err
}
if err := syncer.updateWorkers(); err != nil {
return nil, err
}
go syncer.watchPodsForTargeting()
return syncer, nil
}
func (workerSyncer *WorkerSyncer) watchPodsForTargeting() {
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
handleChangeInPods := func() {
err, changeFound := workerSyncer.updateCurrentlyTargetedPods()
if err != nil {
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodListError,
}
}
if !changeFound {
log.Debug().Msg("Nothing changed. Updating workers is not needed.")
return
}
if err := workerSyncer.updateWorkers(); err != nil {
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerWorkerUpdateError,
}
}
}
restartWorkersDebouncer := debounce.NewDebouncer(updateWorkersDelay, handleChangeInPods)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
continue
}
switch wEvent.Type {
case EventAdded:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Added matching pod.")
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
case EventDeleted:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Removed matching pod.")
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
case EventModified:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Str("ip", pod.Status.PodIP).
Interface("phase", pod.Status.Phase).
Msg("Modified matching pod.")
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
}
case EventBookmark:
break
case EventError:
break
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
continue
case <-workerSyncer.context.Done():
log.Debug().Msg("Watching pods, context done. Stopping \"restart workers debouncer\"")
restartWorkersDebouncer.Cancel()
// TODO: Does this also perform cleanup?
return
}
}
}
func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorkersDebouncer *debounce.Debouncer) {
log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart workers debouncer\"")
restartWorkersDebouncer.Cancel()
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
}
func (workerSyncer *WorkerSyncer) updateCurrentlyTargetedPods() (err error, changesFound bool) {
if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil {
return err, false
} else {
podsToTarget := excludeSelfPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargetedPods, podsToTarget)
for _, addedPod := range addedPods {
log.Info().Msg(fmt.Sprintf("Targeted pod: %s", fmt.Sprintf(utils.Green, addedPod.Name)))
}
for _, removedPod := range removedPods {
log.Info().Msg(fmt.Sprintf("Untargeted pod: %s", fmt.Sprintf(utils.Red, removedPod.Name)))
}
if len(addedPods) > 0 || len(removedPods) > 0 {
workerSyncer.CurrentlyTargetedPods = podsToTarget
workerSyncer.nodeToTargetedPodMap = GetNodeHostToTargetedPodsMap(workerSyncer.CurrentlyTargetedPods)
workerSyncer.TapPodChangesOut <- TargetedPodChangeEvent{
Added: addedPods,
Removed: removedPods,
}
return nil, true
}
return nil, false
}
}
func (workerSyncer *WorkerSyncer) updateWorkers() error {
nodesToTarget := make([]string, len(workerSyncer.nodeToTargetedPodMap))
i := 0
for node := range workerSyncer.nodeToTargetedPodMap {
nodesToTarget[i] = node
i++
}
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targetedNodes) {
log.Debug().Msg("Skipping apply, DaemonSet is up to date")
return nil
}
log.Debug().Strs("nodes", nodesToTarget).Msg("Updating DaemonSet to run on nodes.")
image := docker.GetWorkerImage()
if len(workerSyncer.nodeToTargetedPodMap) > 0 {
var serviceAccountName string
if workerSyncer.config.SelfServiceAccountExists {
serviceAccountName = ServiceAccountName
} else {
serviceAccountName = ""
}
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargetedPodMap))
for nodeName := range workerSyncer.nodeToTargetedPodMap {
nodeNames = append(nodeNames, nodeName)
}
if err := workerSyncer.kubernetesProvider.ApplyWorkerDaemonSet(
workerSyncer.context,
workerSyncer.config.SelfNamespace,
WorkerDaemonSetName,
image,
WorkerPodName,
nodeNames,
serviceAccountName,
workerSyncer.config.WorkerResources,
workerSyncer.config.ImagePullPolicy,
workerSyncer.config.ImagePullSecrets,
workerSyncer.config.ServiceMesh,
workerSyncer.config.Tls,
workerSyncer.config.Debug); err != nil {
return err
}
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargetedPodMap)).Msg("Successfully created workers.")
} else {
if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet(
workerSyncer.context,
workerSyncer.config.SelfNamespace,
WorkerDaemonSetName,
image,
WorkerPodName); err != nil {
return err
}
log.Debug().Msg("Successfully resetted Worker DaemonSet")
}
workerSyncer.targetedNodes = nodesToTarget
return nil
}

52
kubernetes/workers.go Normal file
View File

@@ -0,0 +1,52 @@
package kubernetes
import (
"context"
"github.com/kubeshark/kubeshark/docker"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
)
func CreateWorkers(
kubernetesProvider *Provider,
selfServiceAccountExists bool,
ctx context.Context,
namespace string,
resources Resources,
imagePullPolicy core.PullPolicy,
imagePullSecrets []core.LocalObjectReference,
serviceMesh bool,
tls bool,
debug bool,
) error {
image := docker.GetWorkerImage()
var serviceAccountName string
if selfServiceAccountExists {
serviceAccountName = ServiceAccountName
} else {
serviceAccountName = ""
}
if err := kubernetesProvider.ApplyWorkerDaemonSet(
ctx,
namespace,
WorkerDaemonSetName,
image,
WorkerPodName,
serviceAccountName,
resources,
imagePullPolicy,
imagePullSecrets,
serviceMesh,
tls,
debug,
); err != nil {
return err
}
log.Debug().Msg("Successfully created workers.")
return nil
}