Do the "Is reversed?" checked inside getStreamProps and fix an issue in Kafka Dissect method

This commit is contained in:
M. Mert Yildiran 2021-08-22 12:21:29 +03:00
parent f6a532a5b5
commit f6a260a8c9
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
4 changed files with 45 additions and 32 deletions

View File

@ -33,10 +33,12 @@ func (d dissecting) Ping() {
} }
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) { func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) {
if isClient { for {
ReadRequest(b, tcpID) if isClient {
} else { ReadRequest(b, tcpID)
ReadResponse(b, tcpID, emitter) } else {
ReadResponse(b, tcpID, emitter)
}
} }
} }

View File

@ -45,7 +45,9 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
reqResPair := reqResMatcher.registerResponse(key, response) reqResPair := reqResMatcher.registerResponse(key, response)
if reqResPair == nil { if reqResPair == nil {
d.discardAll() d.discardAll()
return errors.New("Couldn't match a Kafka response to a Kafka request in 3 seconds!") msg := "Couldn't match a Kafka response to a Kafka request in 3 seconds!"
log.Printf("[WARNING] %s\n", msg)
return errors.New(msg)
} }
apiKey := reqResPair.Request.ApiKey apiKey := reqResPair.Request.ApiKey
apiVersion := reqResPair.Request.ApiVersion apiVersion := reqResPair.Request.ApiVersion

View File

@ -100,14 +100,14 @@ func containsPort(ports []string, port string) bool {
func (h *tcpReader) run(wg *sync.WaitGroup) { func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
var port string
if h.isClient {
port = h.tcpID.DstPort
} else {
port = h.tcpID.SrcPort
}
b := bufio.NewReader(h) b := bufio.NewReader(h)
for _, extension := range extensions { for _, extension := range extensions {
var port string
if h.isClient {
port = h.tcpID.DstPort
} else {
port = h.tcpID.SrcPort
}
if containsPort(extension.Protocol.Ports, port) { if containsPort(extension.Protocol.Ports, port) {
extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter) extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter)
} }

View File

@ -32,20 +32,20 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
rlog.Debugf("Current App Ports: %v", gSettings.filterPorts) rlog.Debugf("Current App Ports: %v", gSettings.filterPorts)
srcIp := net.Src().String() srcIp := net.Src().String()
dstIp := net.Dst().String() dstIp := net.Dst().String()
dstPort := int(tcp.DstPort) srcPort := transport.Src().String()
dstPortStr := transport.Dst().String() dstPort := transport.Dst().String()
// if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) { // if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) {
// factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort, "", "") // factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort, "", "")
// } // }
props := factory.getStreamProps(srcIp, dstIp, dstPort, dstPortStr, factory.AllExtensionPorts) props := factory.getStreamProps(srcIp, dstIp, srcPort, dstPort, factory.AllExtensionPorts)
isTapTarget := props.isTapTarget isTapTarget := props.isTapTarget
stream := &tcpStream{ stream := &tcpStream{
net: net, net: net,
transport: transport, transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53, isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isTapTarget: isTapTarget, isTapTarget: isTapTarget,
reversed: tcp.SrcPort == 80, reversed: props.reversed,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions), tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport), ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(), optchecker: reassembly.NewTCPOptionCheck(),
@ -55,10 +55,10 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
msgQueue: make(chan tcpReaderDataMsg), msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport), ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{ tcpID: &api.TcpID{
SrcIP: net.Src().String(), SrcIP: srcIp,
DstIP: net.Dst().String(), DstIP: dstIp,
SrcPort: transport.Src().String(), SrcPort: srcPort,
DstPort: dstPortStr, DstPort: dstPort,
}, },
parent: stream, parent: stream,
isClient: true, isClient: true,
@ -92,35 +92,43 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
factory.wg.Wait() factory.wg.Wait()
} }
func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int, dstPortStr string, allExtensionPorts []string) *streamProps { func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, srcPort string, dstPort string, allExtensionPorts []string) *streamProps {
reversed := false
if hostMode { if hostMode {
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true { // TODO: Implement reversed for the `hostMode`
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%d", dstIP, dstPort)) if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) == true {
return &streamProps{isTapTarget: true, isOutgoing: false} rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort))
return &streamProps{isTapTarget: true, isOutgoing: false, reversed: reversed}
} else if inArrayString(gSettings.filterAuthorities, dstIP) == true { } else if inArrayString(gSettings.filterAuthorities, dstIP) == true {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP)) rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP))
return &streamProps{isTapTarget: true, isOutgoing: false} return &streamProps{isTapTarget: true, isOutgoing: false, reversed: reversed}
} else if *anydirection && inArrayString(gSettings.filterAuthorities, srcIP) == true { } else if *anydirection && inArrayString(gSettings.filterAuthorities, srcIP) == true {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s", srcIP)) rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s", srcIP))
return &streamProps{isTapTarget: true, isOutgoing: true} return &streamProps{isTapTarget: true, isOutgoing: true, reversed: reversed}
} }
return &streamProps{isTapTarget: false} return &streamProps{isTapTarget: false, reversed: reversed}
} else { } else {
isTappedPort := containsPort(allExtensionPorts, dstPortStr) || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort))) // TODO: Bring back `filterPorts` as a string if it's really needed
// (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort)))
isTappedPort := containsPort(allExtensionPorts, dstPort)
if !isTappedPort && containsPort(allExtensionPorts, srcPort) {
isTappedPort = true
reversed = true
}
if !isTappedPort { if !isTappedPort {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost1 %d", dstPort)) rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost1 %s", dstPort))
return &streamProps{isTapTarget: false, isOutgoing: false} return &streamProps{isTapTarget: false, isOutgoing: false, reversed: reversed}
} }
isOutgoing := !inArrayString(ownIps, dstIP) isOutgoing := !inArrayString(ownIps, dstIP)
if !*anydirection && isOutgoing { if !*anydirection && isOutgoing {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost2")) rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost2"))
return &streamProps{isTapTarget: false, isOutgoing: isOutgoing} return &streamProps{isTapTarget: false, isOutgoing: isOutgoing, reversed: reversed}
} }
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%d", srcIP, dstIP, dstPort)) rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%s", srcIP, dstIP, dstPort))
return &streamProps{isTapTarget: true} return &streamProps{isTapTarget: true, reversed: reversed}
} }
} }
@ -135,4 +143,5 @@ func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPor
type streamProps struct { type streamProps struct {
isTapTarget bool isTapTarget bool
isOutgoing bool isOutgoing bool
reversed bool
} }