From 20a2caf3d91f4133f34f1ee0784889a48ee8496b Mon Sep 17 00:00:00 2001 From: Roee Gadot Date: Mon, 23 Aug 2021 11:48:00 +0300 Subject: [PATCH] adding the cleaner again (why we removed it?). add TODO: on the extension loop . --- cli/kubernetes/provider.go | 4 ++-- tap/cleaner.go | 23 +++++++++++++---------- tap/passive_tapper.go | 1 + tap/tcp_reader.go | 2 ++ 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index cefe424ad..58ce5a6c0 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -575,10 +575,10 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, } func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, tapOutgoing bool, resources configStructs.Resources, imagePullPolicy core.PullPolicy) error { - logger.Log.Debugf("Applying %d tapper deamonsets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName) + logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName) if len(nodeToTappedPodIPMap) == 0 { - return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName) + return fmt.Errorf("daemon set %s must tap at least 1 pod", daemonSetName) } nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap) diff --git a/tap/cleaner.go b/tap/cleaner.go index 02a147eda..2d0585b75 100644 --- a/tap/cleaner.go +++ b/tap/cleaner.go @@ -1,6 +1,8 @@ package tap import ( + "github.com/google/gopacket/reassembly" + "github.com/romana/rlog" "sync" "time" ) @@ -12,6 +14,7 @@ type CleanerStats struct { } type Cleaner struct { + assembler *reassembly.Assembler assemblerMutex *sync.Mutex cleanPeriod time.Duration connectionTimeout time.Duration @@ -20,18 +23,18 @@ type Cleaner struct { } func (cl *Cleaner) clean() { - // startCleanTime := time.Now() + startCleanTime := time.Now() - // cl.assemblerMutex.Lock() - // rlog.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump()) - // flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout)) - // cl.assemblerMutex.Unlock() + cl.assemblerMutex.Lock() + rlog.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump()) + flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout)) + cl.assemblerMutex.Unlock() - // cl.statsMutex.Lock() - // rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump()) - // cl.stats.flushed += flushed - // cl.stats.closed += closed - // cl.statsMutex.Unlock() + cl.statsMutex.Lock() + rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump()) + cl.stats.flushed += flushed + cl.stats.closed += closed + cl.statsMutex.Unlock() } func (cl *Cleaner) start() { diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index f714ba13c..aef200430 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -341,6 +341,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem, allExtensionPor staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds) cleaner := Cleaner{ + assembler: assembler, assemblerMutex: &assemblerMutex, cleanPeriod: cleanPeriod, connectionTimeout: staleConnectionTimeout, diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index b85552f1a..eb5d2a764 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -107,6 +107,8 @@ func (h *tcpReader) run(wg *sync.WaitGroup) { port = h.tcpID.SrcPort } b := bufio.NewReader(h) + // TODO: maybe check for kafka and amqp and when it is not one of those pass it to the HTTP? + // because it will check for the ports that we checked in the "isTapTarget" for _, extension := range extensions { if containsPort(extension.Protocol.Ports, port) { extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter)