From de38ef259e4f674c79f3c1029fbb944bd5348be2 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Thu, 29 Dec 2022 03:25:22 +0300 Subject: [PATCH] :zap: Post the storage limit to Hub after posting the worker --- cmd/tap.go | 4 +- cmd/tapPcapRunner.go | 1 + cmd/tapRunner.go | 5 ++- config/configStructs/tapConfig.go | 66 ++++++++++++++++--------------- internal/connect/hub.go | 31 +++++++++++++++ kubernetes/provider.go | 16 ++++---- resources/createResources.go | 32 +++++++-------- 7 files changed, 95 insertions(+), 60 deletions(-) diff --git a/cmd/tap.go b/cmd/tap.go index 3b20ed2a4..7cef6ba0f 100644 --- a/cmd/tap.go +++ b/cmd/tap.go @@ -31,7 +31,7 @@ var tapCmd = &cobra.Command{ } log.Info(). - Str("limit", config.Config.Tap.HumanMaxEntriesDBSize). + Str("limit", config.Config.Tap.StorageLimit). Msg("Kubeshark will store the traffic up to a limit. Oldest entries will be removed once the limit is reached.") return nil @@ -53,7 +53,7 @@ func init() { tapCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward.") tapCmd.Flags().StringSliceP(configStructs.NamespacesLabel, "n", defaultTapConfig.Namespaces, "Namespaces selector.") tapCmd.Flags().BoolP(configStructs.AllNamespacesLabel, "A", defaultTapConfig.AllNamespaces, "Tap all namespaces.") - tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeLabel, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size.") + tapCmd.Flags().String(configStructs.StorageLimitLabel, defaultTapConfig.StorageLimit, "Override the default max entries db size.") tapCmd.Flags().Bool(configStructs.DryRunLabel, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them.") tapCmd.Flags().StringP(configStructs.PcapLabel, "p", defaultTapConfig.Pcap, "Capture from a PCAP snapshot of Kubeshark (.tar.gz) using your Docker Daemon instead of Kubernetes.") tapCmd.Flags().Bool(configStructs.ServiceMeshLabel, defaultTapConfig.ServiceMesh, "Capture the encrypted traffic if the cluster is configured with a service mesh and with mTLS.") diff --git a/cmd/tapPcapRunner.go b/cmd/tapPcapRunner.go index 2da6b30f0..df2c1d959 100644 --- a/cmd/tapPcapRunner.go +++ b/cmd/tapPcapRunner.go @@ -149,6 +149,7 @@ func createAndStartContainers( }, } + // TODO: Get host and port from ProxyConfig respFront, err = cli.ContainerCreate(ctx, &container.Config{ Image: imageFront, Tty: false, diff --git a/cmd/tapRunner.go b/cmd/tapRunner.go index 59e345c64..15862a5d2 100644 --- a/cmd/tapRunner.go +++ b/cmd/tapRunner.go @@ -85,7 +85,7 @@ func tap() { } log.Info().Msg("Waiting for the creation of Kubeshark resources...") - if state.kubesharkServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.MaxEntriesDBSizeBytes(), config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.Tap.Debug); err != nil { + if state.kubesharkServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.Tap.Debug); err != nil { var statusError *k8serrors.StatusError if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) { log.Warn().Msg("Kubeshark is already running in this namespace, change the `kubeshark-resources-namespace` configuration or run `kubeshark clean` to remove the currently running Kubeshark instance") @@ -112,8 +112,9 @@ func finishTapExecution(kubernetesProvider *kubernetes.Provider) { } func getTapConfig() *models.Config { + // TODO: Remove models.Config conf := models.Config{ - MaxDBSizeBytes: config.Config.Tap.MaxEntriesDBSizeBytes(), + MaxDBSizeBytes: config.Config.Tap.StorageLimitBytes(), PullPolicy: config.Config.Tap.Docker.ImagePullPolicy, WorkerResources: config.Config.Tap.Resources.Worker, ResourcesNamespace: config.Config.SelfNamespace, diff --git a/config/configStructs/tapConfig.go b/config/configStructs/tapConfig.go index b22c508f8..ef79b5acc 100644 --- a/config/configStructs/tapConfig.go +++ b/config/configStructs/tapConfig.go @@ -6,22 +6,23 @@ import ( "github.com/kubeshark/base/pkg/models" "github.com/kubeshark/kubeshark/utils" + "github.com/rs/zerolog/log" ) const ( - DockerRegistryLabel = "docker-registry" - DockerTagLabel = "docker-tag" - ProxyFrontPortLabel = "proxy-front-port" - ProxyHubPortLabel = "proxy-hub-port" - ProxyHostLabel = "proxy-host" - NamespacesLabel = "namespaces" - AllNamespacesLabel = "allnamespaces" - HumanMaxEntriesDBSizeLabel = "max-entries-db-size" - DryRunLabel = "dryrun" - PcapLabel = "pcap" - ServiceMeshLabel = "servicemesh" - TlsLabel = "tls" - DebugLabel = "debug" + DockerRegistryLabel = "docker-registry" + DockerTagLabel = "docker-tag" + ProxyFrontPortLabel = "proxy-front-port" + ProxyHubPortLabel = "proxy-hub-port" + ProxyHostLabel = "proxy-host" + NamespacesLabel = "namespaces" + AllNamespacesLabel = "allnamespaces" + StorageLimitLabel = "storagelimit" + DryRunLabel = "dryrun" + PcapLabel = "pcap" + ServiceMeshLabel = "servicemesh" + TlsLabel = "tls" + DebugLabel = "debug" ) type WorkerConfig struct { @@ -58,19 +59,19 @@ type ResourcesConfig struct { } type TapConfig struct { - Docker DockerConfig `yaml:"docker"` - Proxy ProxyConfig `yaml:"proxy"` - PodRegexStr string `yaml:"regex" default:".*"` - Namespaces []string `yaml:"namespaces"` - AllNamespaces bool `yaml:"allnamespaces" default:"false"` - HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"` - DryRun bool `yaml:"dryrun" default:"false"` - Pcap string `yaml:"pcap" default:""` - Resources ResourcesConfig `yaml:"resources"` - ServiceMesh bool `yaml:"servicemesh" default:"true"` - Tls bool `yaml:"tls" default:"true"` - PacketCapture string `yaml:"packetcapture" default:"libpcap"` - Debug bool `yaml:"debug" default:"false"` + Docker DockerConfig `yaml:"docker"` + Proxy ProxyConfig `yaml:"proxy"` + PodRegexStr string `yaml:"regex" default:".*"` + Namespaces []string `yaml:"namespaces"` + AllNamespaces bool `yaml:"allnamespaces" default:"false"` + StorageLimit string `yaml:"storagelimit" default:"200MB"` + DryRun bool `yaml:"dryrun" default:"false"` + Pcap string `yaml:"pcap" default:""` + Resources ResourcesConfig `yaml:"resources"` + ServiceMesh bool `yaml:"servicemesh" default:"true"` + Tls bool `yaml:"tls" default:"true"` + PacketCapture string `yaml:"packetcapture" default:"libpcap"` + Debug bool `yaml:"debug" default:"false"` } func (config *TapConfig) PodRegex() *regexp.Regexp { @@ -78,9 +79,12 @@ func (config *TapConfig) PodRegex() *regexp.Regexp { return podRegex } -func (config *TapConfig) MaxEntriesDBSizeBytes() int64 { - maxEntriesDBSizeBytes, _ := utils.HumanReadableToBytes(config.HumanMaxEntriesDBSize) - return maxEntriesDBSizeBytes +func (config *TapConfig) StorageLimitBytes() int64 { + storageLimitBytes, err := utils.HumanReadableToBytes(config.StorageLimit) + if err != nil { + log.Fatal().Err(err).Send() + } + return storageLimitBytes } func (config *TapConfig) Validate() error { @@ -89,9 +93,9 @@ func (config *TapConfig) Validate() error { return fmt.Errorf("%s is not a valid regex %s", config.PodRegexStr, compileErr) } - _, parseHumanDataSizeErr := utils.HumanReadableToBytes(config.HumanMaxEntriesDBSize) + _, parseHumanDataSizeErr := utils.HumanReadableToBytes(config.StorageLimit) if parseHumanDataSizeErr != nil { - return fmt.Errorf("Could not parse --%s value %s", HumanMaxEntriesDBSizeLabel, config.HumanMaxEntriesDBSize) + return fmt.Errorf("Could not parse --%s value %s", StorageLimitLabel, config.StorageLimit) } return nil diff --git a/internal/connect/hub.go b/internal/connect/hub.go index 7738a0511..e281cf08f 100644 --- a/internal/connect/hub.go +++ b/internal/connect/hub.go @@ -8,6 +8,7 @@ import ( "net/url" "time" + "github.com/kubeshark/kubeshark/config" "github.com/kubeshark/kubeshark/utils" "github.com/rs/zerolog/log" @@ -79,6 +80,36 @@ func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) { } else { ok = true log.Debug().Interface("worker-pod", pod).Msg("Reported worker pod to Hub:") + connector.PostStorageLimitToHub(config.Config.Tap.StorageLimitBytes()) + } + time.Sleep(time.Second) + } + } +} + +type postStorageLimit struct { + Limit int64 `json:"limit"` +} + +func (connector *Connector) PostStorageLimitToHub(limit int64) { + payload := &postStorageLimit{ + Limit: limit, + } + postStorageLimitUrl := fmt.Sprintf("%s/pcaps/set-storage-limit", connector.url) + + if payloadMarshalled, err := json.Marshal(payload); err != nil { + log.Error().Err(err).Msg("Failed to marshal the storage limit:") + } else { + ok := false + for !ok { + if _, err = utils.Post(postStorageLimitUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil { + if _, ok := err.(*url.Error); ok { + break + } + log.Debug().Err(err).Msg("Failed sending the storage limit to Hub:") + } else { + ok = true + log.Debug().Int("limit", int(limit)).Msg("Reported storage limit to Hub:") } time.Sleep(time.Second) } diff --git a/kubernetes/provider.go b/kubernetes/provider.go index e74e05dea..9cd1e3eaa 100644 --- a/kubernetes/provider.go +++ b/kubernetes/provider.go @@ -171,14 +171,13 @@ func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*co } type PodOptions struct { - Namespace string - PodName string - PodImage string - ServiceAccountName string - MaxEntriesDBSizeBytes int64 - Resources models.Resources - ImagePullPolicy core.PullPolicy - Debug bool + Namespace string + PodName string + PodImage string + ServiceAccountName string + Resources models.Resources + ImagePullPolicy core.PullPolicy + Debug bool } func (provider *Provider) BuildHubPod(opts *PodOptions, mountVolumeClaim bool, volumeClaimName string) (*core.Pod, error) { @@ -318,6 +317,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, mountVolumeClaim bool, volumeMounts := []core.VolumeMount{} volumes := []core.Volume{} + // TODO: Get host and port from ProxyConfig containers := []core.Container{ { Name: opts.PodName, diff --git a/resources/createResources.go b/resources/createResources.go index 85c26ba46..f6cfe3a27 100644 --- a/resources/createResources.go +++ b/resources/createResources.go @@ -13,7 +13,7 @@ import ( core "k8s.io/api/core/v1" ) -func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedKubesharkConfig string, isNsRestrictedMode bool, kubesharkResourcesNamespace string, maxEntriesDBSizeBytes int64, hubResources models.Resources, imagePullPolicy core.PullPolicy, debug bool) (bool, error) { +func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedKubesharkConfig string, isNsRestrictedMode bool, kubesharkResourcesNamespace string, hubResources models.Resources, imagePullPolicy core.PullPolicy, debug bool) (bool, error) { if !isNsRestrictedMode { if err := createKubesharkNamespace(ctx, kubernetesProvider, kubesharkResourcesNamespace); err != nil { return false, err @@ -37,25 +37,23 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov } opts := &kubernetes.PodOptions{ - Namespace: kubesharkResourcesNamespace, - PodName: kubernetes.HubPodName, - PodImage: docker.GetHubImage(), - ServiceAccountName: serviceAccountName, - MaxEntriesDBSizeBytes: maxEntriesDBSizeBytes, - Resources: hubResources, - ImagePullPolicy: imagePullPolicy, - Debug: debug, + Namespace: kubesharkResourcesNamespace, + PodName: kubernetes.HubPodName, + PodImage: docker.GetHubImage(), + ServiceAccountName: serviceAccountName, + Resources: hubResources, + ImagePullPolicy: imagePullPolicy, + Debug: debug, } frontOpts := &kubernetes.PodOptions{ - Namespace: kubesharkResourcesNamespace, - PodName: kubernetes.FrontPodName, - PodImage: docker.GetWorkerImage(), - ServiceAccountName: serviceAccountName, - MaxEntriesDBSizeBytes: maxEntriesDBSizeBytes, - Resources: hubResources, - ImagePullPolicy: imagePullPolicy, - Debug: debug, + Namespace: kubesharkResourcesNamespace, + PodName: kubernetes.FrontPodName, + PodImage: docker.GetWorkerImage(), + ServiceAccountName: serviceAccountName, + Resources: hubResources, + ImagePullPolicy: imagePullPolicy, + Debug: debug, } if err := createKubesharkHubPod(ctx, kubernetesProvider, opts); err != nil {