diff --git a/agent/main.go b/agent/main.go index bff6e8c46..1aa2ec5c3 100644 --- a/agent/main.go +++ b/agent/main.go @@ -35,8 +35,7 @@ var namespace = flag.String("namespace", "", "Resolve IPs if they belong to reso var extensions []*tapApi.Extension // global var extensionsMap map[string]*tapApi.Extension // global -var allOutboundPorts []string // global -var allInboundPorts []string // global +var allExtensionPorts []string // global func main() { flag.Parse() @@ -52,7 +51,7 @@ func main() { api.StartResolving(*namespace) filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) - tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions) + tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, allExtensionPorts) // go filterHarItems(harOutputChannel, filteredOutputItemsChannel, getTrafficFilteringOptions()) go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap) @@ -72,7 +71,7 @@ func main() { // harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts) filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) - tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions) + tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, allExtensionPorts) socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false) if err != nil { panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err)) @@ -142,12 +141,10 @@ func loadExtensions() { log.Printf("Extension Properties: %+v\n", extension) extensions[i] = extension extensionsMap[extension.Protocol.Name] = extension - allOutboundPorts = mergeUnique(allOutboundPorts, extension.Protocol.OutboundPorts) - allInboundPorts = mergeUnique(allInboundPorts, extension.Protocol.InboundPorts) + allExtensionPorts = mergeUnique(allExtensionPorts, extension.Protocol.Ports) } controllers.InitExtensionsMap(extensionsMap) - log.Printf("allOutboundPorts: %v\n", allOutboundPorts) - log.Printf("allInboundPorts: %v\n", allInboundPorts) + log.Printf("All extension ports: %v\n", allExtensionPorts) } func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) { diff --git a/tap/api/api.go b/tap/api/api.go index ec5f8acba..0c9efea07 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -15,8 +15,7 @@ type Protocol struct { ForegroundColor string `json:"foreground_color"` FontSize int8 `json:"font_size"` ReferenceLink string `json:"reference_link"` - OutboundPorts []string `json:"outbound_ports"` - InboundPorts []string `json:"inbound_ports"` + Ports []string `json:"outbound_ports"` } type Extension struct { diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 0246b4c90..b7c4a21b4 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -15,8 +15,7 @@ var protocol api.Protocol = api.Protocol{ ForegroundColor: "#ffffff", FontSize: 12, ReferenceLink: "https://www.rabbitmq.com/amqp-0-9-1-reference.html", - OutboundPorts: []string{"5671", "5672"}, - InboundPorts: []string{}, + Ports: []string{"5671", "5672"}, } func init() { diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 317d821b5..1574d9939 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -23,8 +23,7 @@ var protocol api.Protocol = api.Protocol{ ForegroundColor: "#ffffff", FontSize: 12, ReferenceLink: "https://datatracker.ietf.org/doc/html/rfc2616", - OutboundPorts: []string{"80", "8080", "443"}, - InboundPorts: []string{}, + Ports: []string{"80", "8080"}, } var http2Protocol api.Protocol = api.Protocol{ @@ -35,8 +34,7 @@ var http2Protocol api.Protocol = api.Protocol{ ForegroundColor: "#ffffff", FontSize: 12, ReferenceLink: "https://datatracker.ietf.org/doc/html/rfc7540", - OutboundPorts: []string{"80", "8080", "443"}, - InboundPorts: []string{}, + Ports: []string{"80", "8080"}, } func init() { diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index b0a970fa1..745b5332e 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -15,8 +15,7 @@ var protocol api.Protocol = api.Protocol{ ForegroundColor: "#ffffff", FontSize: 12, ReferenceLink: "https://kafka.apache.org/protocol", - OutboundPorts: []string{"9092"}, - InboundPorts: []string{}, + Ports: []string{"9092"}, } func init() { diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 6f55ea042..113951d03 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -34,8 +34,6 @@ import ( ) const AppPortsEnvVar = "APP_PORTS" -const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT" -const maxHTTP2DataLenDefault = 1 * 1024 * 1024 // 1MB const cleanPeriod = time.Second * 10 var remoteOnlyOutboundPorts = []int{80, 443} @@ -65,13 +63,6 @@ var allowmissinginit = flag.Bool("allowmissinginit", true, "Support streams with var verbose = flag.Bool("verbose", false, "Be verbose") var debug = flag.Bool("debug", false, "Display debug information") var quiet = flag.Bool("quiet", false, "Be quiet regarding errors") - -// http -var nohttp = flag.Bool("nohttp", false, "Disable HTTP parsing") -var output = flag.String("output", "", "Path to create file for HTTP 200 OK responses") -var writeincomplete = flag.Bool("writeincomplete", false, "Write incomplete response") - -var hexdump = flag.Bool("dump", false, "Dump HTTP request/response as hex") // global var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex") // capture @@ -80,7 +71,7 @@ var fname = flag.String("r", "", "Filename to read from, overrides -i") var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per packet") var tstype = flag.String("timestamp_type", "", "Type of timestamps to use") var promisc = flag.Bool("promisc", true, "Set promiscuous mode") -var anydirection = flag.Bool("anydirection", false, "Capture http requests to other hosts") +var anydirection = flag.Bool("anydirection", false, "Capture requests to other hosts") var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data") var memprofile = flag.String("memprofile", "", "Write memory profile") @@ -186,7 +177,7 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo { return c.CaptureInfo } -func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension) { +func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, allExtensionPorts []string) { hostMode = opts.HostMode extensions = extensionsRef @@ -194,7 +185,7 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, startMemoryProfiler() } - go startPassiveTapper(outputItems) + go startPassiveTapper(outputItems, allExtensionPorts) } func startMemoryProfiler() { @@ -228,7 +219,7 @@ func startMemoryProfiler() { }() } -func startPassiveTapper(outputItems chan *api.OutputChannelItem) { +func startPassiveTapper(outputItems chan *api.OutputChannelItem, allExtensionPorts []string) { log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile) defer util.Run()() @@ -253,25 +244,12 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { appPortsStr := os.Getenv(AppPortsEnvVar) var appPorts []int if appPortsStr == "" { - rlog.Info("Received empty/no APP_PORTS env var! only listening to http on port 80!") + rlog.Info("Received empty/no APP_PORTS env var! only listening to ports: %v!", allExtensionPorts) appPorts = make([]int, 0) } else { appPorts = parseAppPorts(appPortsStr) } SetFilterPorts(appPorts) - // envVal := os.Getenv(maxHTTP2DataLenEnvVar) - // if envVal == "" { - // rlog.Infof("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault) - // maxHTTP2DataLen = maxHTTP2DataLenDefault - // } else { - // if convertedInt, err := strconv.Atoi(envVal); err != nil { - // rlog.Infof("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault) - // maxHTTP2DataLen = maxHTTP2DataLenDefault - // } else { - // rlog.Infof("Received HTTP2_DATA_SIZE_LIMIT env var: %v", maxHTTP2DataLenDefault) - // maxHTTP2DataLen = convertedInt - // } - // } log.Printf("App Ports: %v", gSettings.filterPorts) @@ -344,8 +322,8 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { } streamFactory := &tcpStreamFactory{ - doHTTP: !*nohttp, - Emitter: emitter, + AllExtensionPorts: allExtensionPorts, + Emitter: emitter, } streamPool := reassembly.NewStreamPool(streamFactory) assembler := reassembly.NewAssembler(streamPool) diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index c9859a9ec..737d5e9b0 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -14,7 +14,7 @@ import ( const checkTLSPacketAmount = 100 -type httpReaderDataMsg struct { +type tcpReaderDataMsg struct { bytes []byte timestamp time.Time } @@ -38,21 +38,19 @@ func (tid *tcpID) String() string { return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort) } -/* httpReader gets reads from a channel of bytes of tcp payload, and parses it into HTTP/1 requests and responses. +/* tcpReader gets reads from a channel of bytes of tcp payload, and parses it into requests and responses. * The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection. - * An httpReader object is unidirectional: it parses either a client stream or a server stream. + * An tcpReader object is unidirectional: it parses either a client stream or a server stream. * Implements io.Reader interface (Read) */ type tcpReader struct { ident string tcpID *api.TcpID isClient bool - isHTTP2 bool isOutgoing bool - msgQueue chan httpReaderDataMsg // Channel of captured reassembled tcp payload + msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload data []byte captureTime time.Time - hexdump bool parent *tcpStream messageCount uint packetsSeen uint @@ -61,7 +59,7 @@ type tcpReader struct { } func (h *tcpReader) Read(p []byte) (int, error) { - var msg httpReaderDataMsg + var msg tcpReaderDataMsg ok := true for ok && len(h.data) == 0 { @@ -102,24 +100,16 @@ func containsPort(ports []string, port string) bool { func (h *tcpReader) run(wg *sync.WaitGroup) { defer wg.Done() - // log.Printf("Called run h.isClient: %v\n", h.isClient) b := bufio.NewReader(h) - if h.isClient { - extensions[1].Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter) - } else { - extensions[1].Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter) + for _, extension := range extensions { + var port string + if h.isClient { + port = h.tcpID.DstPort + } else { + port = h.tcpID.SrcPort + } + if containsPort(extension.Protocol.Ports, port) { + extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter) + } } - // for _, extension := range extensions { - // var subjectPorts []string - // if h.isClient { - // subjectPorts = extension.OutboundPorts - // } else { - // subjectPorts = extension.InboundPorts - // } - // if containsPort(subjectPorts, "80") { - // extension.Dissector.Ping() - // fmt.Printf("h.isClient: %v\n", h.isClient) - // extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter) - // } - // } } diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index 3cd0972a4..8f5551ee1 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -2,7 +2,6 @@ package tap import ( "encoding/binary" - "encoding/hex" "fmt" "sync" @@ -14,7 +13,7 @@ import ( /* It's a connection (bidirectional) * Implements gopacket.reassembly.Stream interface (Accept, ReassembledSG, ReassemblyComplete) * ReassembledSG gets called when new reassembled data is ready (i.e. bytes in order, no duplicates, complete) - * In our implementation, we pass information from ReassembledSG to the httpReader through a shared channel. + * In our implementation, we pass information from ReassembledSG to the tcpReader through a shared channel. */ type tcpStream struct { tcpstate *reassembly.TCPSimpleFSM @@ -22,7 +21,7 @@ type tcpStream struct { optchecker reassembly.TCPOptionCheck net, transport gopacket.Flow isDNS bool - isHTTP bool + isTapTarget bool reversed bool client tcpReader server tcpReader @@ -141,17 +140,14 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass if len(data) > 2+int(dnsSize) { sg.KeepFrom(2 + int(dnsSize)) } - } else if t.isHTTP { + } else if t.isTapTarget { if length > 0 { - if *hexdump { - Trace("Feeding http with:%s", hex.Dump(data)) - } // This is where we pass the reassembled information onwards - // This channel is read by an httpReader object + // This channel is read by an tcpReader object if dir == reassembly.TCPDirClientToServer && !t.reversed { - t.client.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + t.client.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} } else { - t.server.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + t.server.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} } } } @@ -159,7 +155,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { Trace("%s: Connection closed", t.ident) - if t.isHTTP { + if t.isTapTarget { close(t.client.msgQueue) close(t.server.msgQueue) } diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index d653d1003..18b6f2f19 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -19,8 +19,8 @@ import ( */ type tcpStreamFactory struct { wg sync.WaitGroup - doHTTP bool outbountLinkWriter *OutboundLinkWriter + AllExtensionPorts []string Emitter api.Emitter } @@ -33,33 +33,33 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T srcIp := net.Src().String() dstIp := net.Dst().String() dstPort := int(tcp.DstPort) + dstPortStr := transport.Dst().String() // if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) { // factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort, "", "") // } - props := factory.getStreamProps(srcIp, dstIp, dstPort) - isHTTP := props.isTapTarget + props := factory.getStreamProps(srcIp, dstIp, dstPort, dstPortStr, factory.AllExtensionPorts) + isTapTarget := props.isTapTarget stream := &tcpStream{ - net: net, - transport: transport, - isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53, - isHTTP: isHTTP && factory.doHTTP, - reversed: tcp.SrcPort == 80, - tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions), - ident: fmt.Sprintf("%s:%s", net, transport), - optchecker: reassembly.NewTCPOptionCheck(), + net: net, + transport: transport, + isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53, + isTapTarget: isTapTarget, + reversed: tcp.SrcPort == 80, + tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions), + ident: fmt.Sprintf("%s:%s", net, transport), + optchecker: reassembly.NewTCPOptionCheck(), } - if stream.isHTTP { + if stream.isTapTarget { stream.client = tcpReader{ - msgQueue: make(chan httpReaderDataMsg), + msgQueue: make(chan tcpReaderDataMsg), ident: fmt.Sprintf("%s %s", net, transport), tcpID: &api.TcpID{ SrcIP: net.Src().String(), DstIP: net.Dst().String(), SrcPort: transport.Src().String(), - DstPort: transport.Dst().String(), + DstPort: dstPortStr, }, - hexdump: *hexdump, parent: stream, isClient: true, isOutgoing: props.isOutgoing, @@ -67,7 +67,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T Emitter: factory.Emitter, } stream.server = tcpReader{ - msgQueue: make(chan httpReaderDataMsg), + msgQueue: make(chan tcpReaderDataMsg), ident: fmt.Sprintf("%s %s", net.Reverse(), transport.Reverse()), tcpID: &api.TcpID{ SrcIP: net.Dst().String(), @@ -75,7 +75,6 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T SrcPort: transport.Dst().String(), DstPort: transport.Src().String(), }, - hexdump: *hexdump, parent: stream, isOutgoing: props.isOutgoing, outboundLinkWriter: factory.outbountLinkWriter, @@ -93,7 +92,7 @@ func (factory *tcpStreamFactory) WaitGoRoutines() { factory.wg.Wait() } -func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int) *streamProps { +func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int, dstPortStr string, allExtensionPorts []string) *streamProps { if hostMode { if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true { rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%d", dstIP, dstPort)) @@ -107,7 +106,7 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstP } return &streamProps{isTapTarget: false} } else { - isTappedPort := dstPort == 80 || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort))) + isTappedPort := containsPort(allExtensionPorts, dstPortStr) || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort))) if !isTappedPort { rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost1 %d", dstPort)) return &streamProps{isTapTarget: false, isOutgoing: false}