Post the storage limit to Hub after posting the worker

This commit is contained in:
M. Mert Yildiran 2022-12-29 03:25:22 +03:00
parent 872e4961dd
commit de38ef259e
No known key found for this signature in database
GPG Key ID: DA5D6DCBB758A461
7 changed files with 95 additions and 60 deletions

View File

@ -31,7 +31,7 @@ var tapCmd = &cobra.Command{
} }
log.Info(). log.Info().
Str("limit", config.Config.Tap.HumanMaxEntriesDBSize). Str("limit", config.Config.Tap.StorageLimit).
Msg("Kubeshark will store the traffic up to a limit. Oldest entries will be removed once the limit is reached.") Msg("Kubeshark will store the traffic up to a limit. Oldest entries will be removed once the limit is reached.")
return nil return nil
@ -53,7 +53,7 @@ func init() {
tapCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward.") tapCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward.")
tapCmd.Flags().StringSliceP(configStructs.NamespacesLabel, "n", defaultTapConfig.Namespaces, "Namespaces selector.") tapCmd.Flags().StringSliceP(configStructs.NamespacesLabel, "n", defaultTapConfig.Namespaces, "Namespaces selector.")
tapCmd.Flags().BoolP(configStructs.AllNamespacesLabel, "A", defaultTapConfig.AllNamespaces, "Tap all namespaces.") tapCmd.Flags().BoolP(configStructs.AllNamespacesLabel, "A", defaultTapConfig.AllNamespaces, "Tap all namespaces.")
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeLabel, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size.") tapCmd.Flags().String(configStructs.StorageLimitLabel, defaultTapConfig.StorageLimit, "Override the default max entries db size.")
tapCmd.Flags().Bool(configStructs.DryRunLabel, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them.") tapCmd.Flags().Bool(configStructs.DryRunLabel, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them.")
tapCmd.Flags().StringP(configStructs.PcapLabel, "p", defaultTapConfig.Pcap, "Capture from a PCAP snapshot of Kubeshark (.tar.gz) using your Docker Daemon instead of Kubernetes.") tapCmd.Flags().StringP(configStructs.PcapLabel, "p", defaultTapConfig.Pcap, "Capture from a PCAP snapshot of Kubeshark (.tar.gz) using your Docker Daemon instead of Kubernetes.")
tapCmd.Flags().Bool(configStructs.ServiceMeshLabel, defaultTapConfig.ServiceMesh, "Capture the encrypted traffic if the cluster is configured with a service mesh and with mTLS.") tapCmd.Flags().Bool(configStructs.ServiceMeshLabel, defaultTapConfig.ServiceMesh, "Capture the encrypted traffic if the cluster is configured with a service mesh and with mTLS.")

View File

@ -149,6 +149,7 @@ func createAndStartContainers(
}, },
} }
// TODO: Get host and port from ProxyConfig
respFront, err = cli.ContainerCreate(ctx, &container.Config{ respFront, err = cli.ContainerCreate(ctx, &container.Config{
Image: imageFront, Image: imageFront,
Tty: false, Tty: false,

View File

@ -85,7 +85,7 @@ func tap() {
} }
log.Info().Msg("Waiting for the creation of Kubeshark resources...") log.Info().Msg("Waiting for the creation of Kubeshark resources...")
if state.kubesharkServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.MaxEntriesDBSizeBytes(), config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.Tap.Debug); err != nil { if state.kubesharkServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.Tap.Debug); err != nil {
var statusError *k8serrors.StatusError var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) { if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
log.Warn().Msg("Kubeshark is already running in this namespace, change the `kubeshark-resources-namespace` configuration or run `kubeshark clean` to remove the currently running Kubeshark instance") log.Warn().Msg("Kubeshark is already running in this namespace, change the `kubeshark-resources-namespace` configuration or run `kubeshark clean` to remove the currently running Kubeshark instance")
@ -112,8 +112,9 @@ func finishTapExecution(kubernetesProvider *kubernetes.Provider) {
} }
func getTapConfig() *models.Config { func getTapConfig() *models.Config {
// TODO: Remove models.Config
conf := models.Config{ conf := models.Config{
MaxDBSizeBytes: config.Config.Tap.MaxEntriesDBSizeBytes(), MaxDBSizeBytes: config.Config.Tap.StorageLimitBytes(),
PullPolicy: config.Config.Tap.Docker.ImagePullPolicy, PullPolicy: config.Config.Tap.Docker.ImagePullPolicy,
WorkerResources: config.Config.Tap.Resources.Worker, WorkerResources: config.Config.Tap.Resources.Worker,
ResourcesNamespace: config.Config.SelfNamespace, ResourcesNamespace: config.Config.SelfNamespace,

View File

@ -6,22 +6,23 @@ import (
"github.com/kubeshark/base/pkg/models" "github.com/kubeshark/base/pkg/models"
"github.com/kubeshark/kubeshark/utils" "github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
) )
const ( const (
DockerRegistryLabel = "docker-registry" DockerRegistryLabel = "docker-registry"
DockerTagLabel = "docker-tag" DockerTagLabel = "docker-tag"
ProxyFrontPortLabel = "proxy-front-port" ProxyFrontPortLabel = "proxy-front-port"
ProxyHubPortLabel = "proxy-hub-port" ProxyHubPortLabel = "proxy-hub-port"
ProxyHostLabel = "proxy-host" ProxyHostLabel = "proxy-host"
NamespacesLabel = "namespaces" NamespacesLabel = "namespaces"
AllNamespacesLabel = "allnamespaces" AllNamespacesLabel = "allnamespaces"
HumanMaxEntriesDBSizeLabel = "max-entries-db-size" StorageLimitLabel = "storagelimit"
DryRunLabel = "dryrun" DryRunLabel = "dryrun"
PcapLabel = "pcap" PcapLabel = "pcap"
ServiceMeshLabel = "servicemesh" ServiceMeshLabel = "servicemesh"
TlsLabel = "tls" TlsLabel = "tls"
DebugLabel = "debug" DebugLabel = "debug"
) )
type WorkerConfig struct { type WorkerConfig struct {
@ -58,19 +59,19 @@ type ResourcesConfig struct {
} }
type TapConfig struct { type TapConfig struct {
Docker DockerConfig `yaml:"docker"` Docker DockerConfig `yaml:"docker"`
Proxy ProxyConfig `yaml:"proxy"` Proxy ProxyConfig `yaml:"proxy"`
PodRegexStr string `yaml:"regex" default:".*"` PodRegexStr string `yaml:"regex" default:".*"`
Namespaces []string `yaml:"namespaces"` Namespaces []string `yaml:"namespaces"`
AllNamespaces bool `yaml:"allnamespaces" default:"false"` AllNamespaces bool `yaml:"allnamespaces" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"` StorageLimit string `yaml:"storagelimit" default:"200MB"`
DryRun bool `yaml:"dryrun" default:"false"` DryRun bool `yaml:"dryrun" default:"false"`
Pcap string `yaml:"pcap" default:""` Pcap string `yaml:"pcap" default:""`
Resources ResourcesConfig `yaml:"resources"` Resources ResourcesConfig `yaml:"resources"`
ServiceMesh bool `yaml:"servicemesh" default:"true"` ServiceMesh bool `yaml:"servicemesh" default:"true"`
Tls bool `yaml:"tls" default:"true"` Tls bool `yaml:"tls" default:"true"`
PacketCapture string `yaml:"packetcapture" default:"libpcap"` PacketCapture string `yaml:"packetcapture" default:"libpcap"`
Debug bool `yaml:"debug" default:"false"` Debug bool `yaml:"debug" default:"false"`
} }
func (config *TapConfig) PodRegex() *regexp.Regexp { func (config *TapConfig) PodRegex() *regexp.Regexp {
@ -78,9 +79,12 @@ func (config *TapConfig) PodRegex() *regexp.Regexp {
return podRegex return podRegex
} }
func (config *TapConfig) MaxEntriesDBSizeBytes() int64 { func (config *TapConfig) StorageLimitBytes() int64 {
maxEntriesDBSizeBytes, _ := utils.HumanReadableToBytes(config.HumanMaxEntriesDBSize) storageLimitBytes, err := utils.HumanReadableToBytes(config.StorageLimit)
return maxEntriesDBSizeBytes if err != nil {
log.Fatal().Err(err).Send()
}
return storageLimitBytes
} }
func (config *TapConfig) Validate() error { func (config *TapConfig) Validate() error {
@ -89,9 +93,9 @@ func (config *TapConfig) Validate() error {
return fmt.Errorf("%s is not a valid regex %s", config.PodRegexStr, compileErr) return fmt.Errorf("%s is not a valid regex %s", config.PodRegexStr, compileErr)
} }
_, parseHumanDataSizeErr := utils.HumanReadableToBytes(config.HumanMaxEntriesDBSize) _, parseHumanDataSizeErr := utils.HumanReadableToBytes(config.StorageLimit)
if parseHumanDataSizeErr != nil { if parseHumanDataSizeErr != nil {
return fmt.Errorf("Could not parse --%s value %s", HumanMaxEntriesDBSizeLabel, config.HumanMaxEntriesDBSize) return fmt.Errorf("Could not parse --%s value %s", StorageLimitLabel, config.StorageLimit)
} }
return nil return nil

View File

@ -8,6 +8,7 @@ import (
"net/url" "net/url"
"time" "time"
"github.com/kubeshark/kubeshark/config"
"github.com/kubeshark/kubeshark/utils" "github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -79,6 +80,36 @@ func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) {
} else { } else {
ok = true ok = true
log.Debug().Interface("worker-pod", pod).Msg("Reported worker pod to Hub:") log.Debug().Interface("worker-pod", pod).Msg("Reported worker pod to Hub:")
connector.PostStorageLimitToHub(config.Config.Tap.StorageLimitBytes())
}
time.Sleep(time.Second)
}
}
}
type postStorageLimit struct {
Limit int64 `json:"limit"`
}
func (connector *Connector) PostStorageLimitToHub(limit int64) {
payload := &postStorageLimit{
Limit: limit,
}
postStorageLimitUrl := fmt.Sprintf("%s/pcaps/set-storage-limit", connector.url)
if payloadMarshalled, err := json.Marshal(payload); err != nil {
log.Error().Err(err).Msg("Failed to marshal the storage limit:")
} else {
ok := false
for !ok {
if _, err = utils.Post(postStorageLimitUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil {
if _, ok := err.(*url.Error); ok {
break
}
log.Debug().Err(err).Msg("Failed sending the storage limit to Hub:")
} else {
ok = true
log.Debug().Int("limit", int(limit)).Msg("Reported storage limit to Hub:")
} }
time.Sleep(time.Second) time.Sleep(time.Second)
} }

View File

@ -171,14 +171,13 @@ func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*co
} }
type PodOptions struct { type PodOptions struct {
Namespace string Namespace string
PodName string PodName string
PodImage string PodImage string
ServiceAccountName string ServiceAccountName string
MaxEntriesDBSizeBytes int64 Resources models.Resources
Resources models.Resources ImagePullPolicy core.PullPolicy
ImagePullPolicy core.PullPolicy Debug bool
Debug bool
} }
func (provider *Provider) BuildHubPod(opts *PodOptions, mountVolumeClaim bool, volumeClaimName string) (*core.Pod, error) { func (provider *Provider) BuildHubPod(opts *PodOptions, mountVolumeClaim bool, volumeClaimName string) (*core.Pod, error) {
@ -318,6 +317,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, mountVolumeClaim bool,
volumeMounts := []core.VolumeMount{} volumeMounts := []core.VolumeMount{}
volumes := []core.Volume{} volumes := []core.Volume{}
// TODO: Get host and port from ProxyConfig
containers := []core.Container{ containers := []core.Container{
{ {
Name: opts.PodName, Name: opts.PodName,

View File

@ -13,7 +13,7 @@ import (
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
) )
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedKubesharkConfig string, isNsRestrictedMode bool, kubesharkResourcesNamespace string, maxEntriesDBSizeBytes int64, hubResources models.Resources, imagePullPolicy core.PullPolicy, debug bool) (bool, error) { func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedKubesharkConfig string, isNsRestrictedMode bool, kubesharkResourcesNamespace string, hubResources models.Resources, imagePullPolicy core.PullPolicy, debug bool) (bool, error) {
if !isNsRestrictedMode { if !isNsRestrictedMode {
if err := createKubesharkNamespace(ctx, kubernetesProvider, kubesharkResourcesNamespace); err != nil { if err := createKubesharkNamespace(ctx, kubernetesProvider, kubesharkResourcesNamespace); err != nil {
return false, err return false, err
@ -37,25 +37,23 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
} }
opts := &kubernetes.PodOptions{ opts := &kubernetes.PodOptions{
Namespace: kubesharkResourcesNamespace, Namespace: kubesharkResourcesNamespace,
PodName: kubernetes.HubPodName, PodName: kubernetes.HubPodName,
PodImage: docker.GetHubImage(), PodImage: docker.GetHubImage(),
ServiceAccountName: serviceAccountName, ServiceAccountName: serviceAccountName,
MaxEntriesDBSizeBytes: maxEntriesDBSizeBytes, Resources: hubResources,
Resources: hubResources, ImagePullPolicy: imagePullPolicy,
ImagePullPolicy: imagePullPolicy, Debug: debug,
Debug: debug,
} }
frontOpts := &kubernetes.PodOptions{ frontOpts := &kubernetes.PodOptions{
Namespace: kubesharkResourcesNamespace, Namespace: kubesharkResourcesNamespace,
PodName: kubernetes.FrontPodName, PodName: kubernetes.FrontPodName,
PodImage: docker.GetWorkerImage(), PodImage: docker.GetWorkerImage(),
ServiceAccountName: serviceAccountName, ServiceAccountName: serviceAccountName,
MaxEntriesDBSizeBytes: maxEntriesDBSizeBytes, Resources: hubResources,
Resources: hubResources, ImagePullPolicy: imagePullPolicy,
ImagePullPolicy: imagePullPolicy, Debug: debug,
Debug: debug,
} }
if err := createKubesharkHubPod(ctx, kubernetesProvider, opts); err != nil { if err := createKubesharkHubPod(ctx, kubernetesProvider, opts); err != nil {