diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index 971652b3d..b008e931c 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -57,4 +57,5 @@ func init() { tapCmd.Flags().Bool(configStructs.ServiceMeshName, defaultTapConfig.ServiceMesh, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls") tapCmd.Flags().Bool(configStructs.TlsName, defaultTapConfig.Tls, "Record tls traffic") tapCmd.Flags().Bool(configStructs.ProfilerName, defaultTapConfig.Profiler, "Run pprof server") + tapCmd.Flags().Int(configStructs.MaxLiveStreamsName, defaultTapConfig.MaxLiveStreams, "Maximum live tcp streams to handle concurrently") } diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 2f644a30e..66af62d2c 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -176,6 +176,7 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider MizuServiceAccountExists: state.mizuServiceAccountExists, ServiceMesh: config.Config.Tap.ServiceMesh, Tls: config.Config.Tap.Tls, + MaxLiveStreams: config.Config.Tap.MaxLiveStreams, }, startTime) if err != nil { diff --git a/cli/config/configStructs/tapConfig.go b/cli/config/configStructs/tapConfig.go index 6fd37e886..ab245573a 100644 --- a/cli/config/configStructs/tapConfig.go +++ b/cli/config/configStructs/tapConfig.go @@ -27,6 +27,7 @@ const ( ServiceMeshName = "service-mesh" TlsName = "tls" ProfilerName = "profiler" + MaxLiveStreamsName = "max-live-streams" ) type TapConfig struct { @@ -47,6 +48,7 @@ type TapConfig struct { ServiceMesh bool `yaml:"service-mesh" default:"false"` Tls bool `yaml:"tls" default:"false"` Profiler bool `yaml:"profiler" default:"false"` + MaxLiveStreams int `yaml:"max-live-streams" default:"500"` } func (config *TapConfig) PodRegex() *regexp.Regexp { diff --git a/shared/kubernetes/mizuTapperSyncer.go b/shared/kubernetes/mizuTapperSyncer.go index 7e412842d..d996665a4 100644 --- a/shared/kubernetes/mizuTapperSyncer.go +++ b/shared/kubernetes/mizuTapperSyncer.go @@ -48,6 +48,7 @@ type TapperSyncerConfig struct { MizuServiceAccountExists bool ServiceMesh bool Tls bool + MaxLiveStreams int } func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig, startTime time.Time) (*MizuTapperSyncer, error) { @@ -337,7 +338,8 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error { tapperSyncer.config.MizuApiFilteringOptions, tapperSyncer.config.LogLevel, tapperSyncer.config.ServiceMesh, - tapperSyncer.config.Tls); err != nil { + tapperSyncer.config.Tls, + tapperSyncer.config.MaxLiveStreams); err != nil { return err } diff --git a/shared/kubernetes/provider.go b/shared/kubernetes/provider.go index d97f006c3..c17a9ceb0 100644 --- a/shared/kubernetes/provider.go +++ b/shared/kubernetes/provider.go @@ -10,6 +10,7 @@ import ( "net/url" "path/filepath" "regexp" + "strconv" "github.com/op/go-logging" "github.com/up9inc/mizu/logger" @@ -382,11 +383,11 @@ func (provider *Provider) GetMizuApiServerPodObject(opts *ApiServerOptions, moun Tolerations: []core.Toleration{ { Operator: core.TolerationOpExists, - Effect: core.TaintEffectNoExecute, + Effect: core.TaintEffectNoExecute, }, { Operator: core.TolerationOpExists, - Effect: core.TaintEffectNoSchedule, + Effect: core.TaintEffectNoSchedule, }, }, }, @@ -711,7 +712,7 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, return nil } -func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeNames []string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, serviceMesh bool, tls bool) error { +func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeNames []string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, serviceMesh bool, tls bool, maxLiveStreams int) error { logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeNames), namespace, daemonSetName, podImage, tapperPodName) if len(nodeNames) == 0 { @@ -729,6 +730,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac "--tap", "--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp), "--nodefrag", + "--max-live-streams", strconv.Itoa(maxLiveStreams), } if serviceMesh { diff --git a/tap/tcp_assembler.go b/tap/tcp_assembler.go index d1215783a..c7b2b4901 100644 --- a/tap/tcp_assembler.go +++ b/tap/tcp_assembler.go @@ -85,7 +85,7 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection() maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal() - logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d, opts=%v", + logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d, opts=%+v", maxBufferedPagesTotal, maxBufferedPagesPerConnection, opts) a.Assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal a.Assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection