adding the cleaner again (why we removed it?).

add TODO: on the extension loop .
This commit is contained in:
Roee Gadot 2021-08-23 11:48:00 +03:00
parent efde8ae359
commit 20a2caf3d9
4 changed files with 18 additions and 12 deletions

View File

@ -575,10 +575,10 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, tapOutgoing bool, resources configStructs.Resources, imagePullPolicy core.PullPolicy) error {
logger.Log.Debugf("Applying %d tapper deamonsets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
if len(nodeToTappedPodIPMap) == 0 {
return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName)
return fmt.Errorf("daemon set %s must tap at least 1 pod", daemonSetName)
}
nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap)

View File

@ -1,6 +1,8 @@
package tap
import (
"github.com/google/gopacket/reassembly"
"github.com/romana/rlog"
"sync"
"time"
)
@ -12,6 +14,7 @@ type CleanerStats struct {
}
type Cleaner struct {
assembler *reassembly.Assembler
assemblerMutex *sync.Mutex
cleanPeriod time.Duration
connectionTimeout time.Duration
@ -20,18 +23,18 @@ type Cleaner struct {
}
func (cl *Cleaner) clean() {
// startCleanTime := time.Now()
startCleanTime := time.Now()
// cl.assemblerMutex.Lock()
// rlog.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump())
// flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
// cl.assemblerMutex.Unlock()
cl.assemblerMutex.Lock()
rlog.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump())
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock()
// cl.statsMutex.Lock()
// rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())
// cl.stats.flushed += flushed
// cl.stats.closed += closed
// cl.statsMutex.Unlock()
cl.statsMutex.Lock()
rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())
cl.stats.flushed += flushed
cl.stats.closed += closed
cl.statsMutex.Unlock()
}
func (cl *Cleaner) start() {

View File

@ -341,6 +341,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem, allExtensionPor
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)
cleaner := Cleaner{
assembler: assembler,
assemblerMutex: &assemblerMutex,
cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout,

View File

@ -107,6 +107,8 @@ func (h *tcpReader) run(wg *sync.WaitGroup) {
port = h.tcpID.SrcPort
}
b := bufio.NewReader(h)
// TODO: maybe check for kafka and amqp and when it is not one of those pass it to the HTTP?
// because it will check for the ports that we checked in the "isTapTarget"
for _, extension := range extensions {
if containsPort(extension.Protocol.Ports, port) {
extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter)