From f6a260a8c9c6908577dd4eec9c3d2bf36f312c49 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Sun, 22 Aug 2021 12:21:29 +0300 Subject: [PATCH] Do the "Is reversed?" checked inside `getStreamProps` and fix an issue in Kafka `Dissect` method --- tap/extensions/kafka/main.go | 10 ++++--- tap/extensions/kafka/response.go | 4 ++- tap/tcp_reader.go | 12 ++++---- tap/tcp_stream_factory.go | 51 +++++++++++++++++++------------- 4 files changed, 45 insertions(+), 32 deletions(-) diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 233521827..74b096d4e 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -33,10 +33,12 @@ func (d dissecting) Ping() { } func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { - if isClient { - ReadRequest(b, tcpID) - } else { - ReadResponse(b, tcpID, emitter) + for { + if isClient { + ReadRequest(b, tcpID) + } else { + ReadResponse(b, tcpID, emitter) + } } } diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index 6931d822d..43ecb15a2 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -45,7 +45,9 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error reqResPair := reqResMatcher.registerResponse(key, response) if reqResPair == nil { d.discardAll() - return errors.New("Couldn't match a Kafka response to a Kafka request in 3 seconds!") + msg := "Couldn't match a Kafka response to a Kafka request in 3 seconds!" + log.Printf("[WARNING] %s\n", msg) + return errors.New(msg) } apiKey := reqResPair.Request.ApiKey apiVersion := reqResPair.Request.ApiVersion diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index 737d5e9b0..e33f1eefa 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -100,14 +100,14 @@ func containsPort(ports []string, port string) bool { func (h *tcpReader) run(wg *sync.WaitGroup) { defer wg.Done() + var port string + if h.isClient { + port = h.tcpID.DstPort + } else { + port = h.tcpID.SrcPort + } b := bufio.NewReader(h) for _, extension := range extensions { - var port string - if h.isClient { - port = h.tcpID.DstPort - } else { - port = h.tcpID.SrcPort - } if containsPort(extension.Protocol.Ports, port) { extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter) } diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 18b6f2f19..0b5e5ca26 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -32,20 +32,20 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T rlog.Debugf("Current App Ports: %v", gSettings.filterPorts) srcIp := net.Src().String() dstIp := net.Dst().String() - dstPort := int(tcp.DstPort) - dstPortStr := transport.Dst().String() + srcPort := transport.Src().String() + dstPort := transport.Dst().String() // if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) { // factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort, "", "") // } - props := factory.getStreamProps(srcIp, dstIp, dstPort, dstPortStr, factory.AllExtensionPorts) + props := factory.getStreamProps(srcIp, dstIp, srcPort, dstPort, factory.AllExtensionPorts) isTapTarget := props.isTapTarget stream := &tcpStream{ net: net, transport: transport, isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53, isTapTarget: isTapTarget, - reversed: tcp.SrcPort == 80, + reversed: props.reversed, tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions), ident: fmt.Sprintf("%s:%s", net, transport), optchecker: reassembly.NewTCPOptionCheck(), @@ -55,10 +55,10 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T msgQueue: make(chan tcpReaderDataMsg), ident: fmt.Sprintf("%s %s", net, transport), tcpID: &api.TcpID{ - SrcIP: net.Src().String(), - DstIP: net.Dst().String(), - SrcPort: transport.Src().String(), - DstPort: dstPortStr, + SrcIP: srcIp, + DstIP: dstIp, + SrcPort: srcPort, + DstPort: dstPort, }, parent: stream, isClient: true, @@ -92,35 +92,43 @@ func (factory *tcpStreamFactory) WaitGoRoutines() { factory.wg.Wait() } -func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int, dstPortStr string, allExtensionPorts []string) *streamProps { +func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, srcPort string, dstPort string, allExtensionPorts []string) *streamProps { + reversed := false if hostMode { - if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true { - rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%d", dstIP, dstPort)) - return &streamProps{isTapTarget: true, isOutgoing: false} + // TODO: Implement reversed for the `hostMode` + if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) == true { + rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort)) + return &streamProps{isTapTarget: true, isOutgoing: false, reversed: reversed} } else if inArrayString(gSettings.filterAuthorities, dstIP) == true { rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP)) - return &streamProps{isTapTarget: true, isOutgoing: false} + return &streamProps{isTapTarget: true, isOutgoing: false, reversed: reversed} } else if *anydirection && inArrayString(gSettings.filterAuthorities, srcIP) == true { rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s", srcIP)) - return &streamProps{isTapTarget: true, isOutgoing: true} + return &streamProps{isTapTarget: true, isOutgoing: true, reversed: reversed} } - return &streamProps{isTapTarget: false} + return &streamProps{isTapTarget: false, reversed: reversed} } else { - isTappedPort := containsPort(allExtensionPorts, dstPortStr) || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort))) + // TODO: Bring back `filterPorts` as a string if it's really needed + // (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort))) + isTappedPort := containsPort(allExtensionPorts, dstPort) + if !isTappedPort && containsPort(allExtensionPorts, srcPort) { + isTappedPort = true + reversed = true + } if !isTappedPort { - rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost1 %d", dstPort)) - return &streamProps{isTapTarget: false, isOutgoing: false} + rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost1 %s", dstPort)) + return &streamProps{isTapTarget: false, isOutgoing: false, reversed: reversed} } isOutgoing := !inArrayString(ownIps, dstIP) if !*anydirection && isOutgoing { rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost2")) - return &streamProps{isTapTarget: false, isOutgoing: isOutgoing} + return &streamProps{isTapTarget: false, isOutgoing: isOutgoing, reversed: reversed} } - rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%d", srcIP, dstIP, dstPort)) - return &streamProps{isTapTarget: true} + rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%s", srcIP, dstIP, dstPort)) + return &streamProps{isTapTarget: true, reversed: reversed} } } @@ -135,4 +143,5 @@ func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPor type streamProps struct { isTapTarget bool isOutgoing bool + reversed bool }