Remove everything HTTP related from the tap package and make the extension system fully functional

This commit is contained in:
M. Mert Yildiran 2021-08-21 10:00:02 +03:00
parent d20cc1412b
commit 494af0c8c7
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
9 changed files with 57 additions and 102 deletions

View File

@ -35,8 +35,7 @@ var namespace = flag.String("namespace", "", "Resolve IPs if they belong to reso
var extensions []*tapApi.Extension // global
var extensionsMap map[string]*tapApi.Extension // global
var allOutboundPorts []string // global
var allInboundPorts []string // global
var allExtensionPorts []string // global
func main() {
flag.Parse()
@ -52,7 +51,7 @@ func main() {
api.StartResolving(*namespace)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, allExtensionPorts)
// go filterHarItems(harOutputChannel, filteredOutputItemsChannel, getTrafficFilteringOptions())
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
@ -72,7 +71,7 @@ func main() {
// harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, allExtensionPorts)
socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
@ -142,12 +141,10 @@ func loadExtensions() {
log.Printf("Extension Properties: %+v\n", extension)
extensions[i] = extension
extensionsMap[extension.Protocol.Name] = extension
allOutboundPorts = mergeUnique(allOutboundPorts, extension.Protocol.OutboundPorts)
allInboundPorts = mergeUnique(allInboundPorts, extension.Protocol.InboundPorts)
allExtensionPorts = mergeUnique(allExtensionPorts, extension.Protocol.Ports)
}
controllers.InitExtensionsMap(extensionsMap)
log.Printf("allOutboundPorts: %v\n", allOutboundPorts)
log.Printf("allInboundPorts: %v\n", allInboundPorts)
log.Printf("All extension ports: %v\n", allExtensionPorts)
}
func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) {

View File

@ -15,8 +15,7 @@ type Protocol struct {
ForegroundColor string `json:"foreground_color"`
FontSize int8 `json:"font_size"`
ReferenceLink string `json:"reference_link"`
OutboundPorts []string `json:"outbound_ports"`
InboundPorts []string `json:"inbound_ports"`
Ports []string `json:"outbound_ports"`
}
type Extension struct {

View File

@ -15,8 +15,7 @@ var protocol api.Protocol = api.Protocol{
ForegroundColor: "#ffffff",
FontSize: 12,
ReferenceLink: "https://www.rabbitmq.com/amqp-0-9-1-reference.html",
OutboundPorts: []string{"5671", "5672"},
InboundPorts: []string{},
Ports: []string{"5671", "5672"},
}
func init() {

View File

@ -23,8 +23,7 @@ var protocol api.Protocol = api.Protocol{
ForegroundColor: "#ffffff",
FontSize: 12,
ReferenceLink: "https://datatracker.ietf.org/doc/html/rfc2616",
OutboundPorts: []string{"80", "8080", "443"},
InboundPorts: []string{},
Ports: []string{"80", "8080"},
}
var http2Protocol api.Protocol = api.Protocol{
@ -35,8 +34,7 @@ var http2Protocol api.Protocol = api.Protocol{
ForegroundColor: "#ffffff",
FontSize: 12,
ReferenceLink: "https://datatracker.ietf.org/doc/html/rfc7540",
OutboundPorts: []string{"80", "8080", "443"},
InboundPorts: []string{},
Ports: []string{"80", "8080"},
}
func init() {

View File

@ -15,8 +15,7 @@ var protocol api.Protocol = api.Protocol{
ForegroundColor: "#ffffff",
FontSize: 12,
ReferenceLink: "https://kafka.apache.org/protocol",
OutboundPorts: []string{"9092"},
InboundPorts: []string{},
Ports: []string{"9092"},
}
func init() {

View File

@ -34,8 +34,6 @@ import (
)
const AppPortsEnvVar = "APP_PORTS"
const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT"
const maxHTTP2DataLenDefault = 1 * 1024 * 1024 // 1MB
const cleanPeriod = time.Second * 10
var remoteOnlyOutboundPorts = []int{80, 443}
@ -65,13 +63,6 @@ var allowmissinginit = flag.Bool("allowmissinginit", true, "Support streams with
var verbose = flag.Bool("verbose", false, "Be verbose")
var debug = flag.Bool("debug", false, "Display debug information")
var quiet = flag.Bool("quiet", false, "Be quiet regarding errors")
// http
var nohttp = flag.Bool("nohttp", false, "Disable HTTP parsing")
var output = flag.String("output", "", "Path to create file for HTTP 200 OK responses")
var writeincomplete = flag.Bool("writeincomplete", false, "Write incomplete response")
var hexdump = flag.Bool("dump", false, "Dump HTTP request/response as hex") // global
var hexdumppkt = flag.Bool("dumppkt", false, "Dump packet as hex")
// capture
@ -80,7 +71,7 @@ var fname = flag.String("r", "", "Filename to read from, overrides -i")
var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per packet")
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var anydirection = flag.Bool("anydirection", false, "Capture http requests to other hosts")
var anydirection = flag.Bool("anydirection", false, "Capture requests to other hosts")
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
var memprofile = flag.String("memprofile", "", "Write memory profile")
@ -186,7 +177,7 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension) {
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, allExtensionPorts []string) {
hostMode = opts.HostMode
extensions = extensionsRef
@ -194,7 +185,7 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
startMemoryProfiler()
}
go startPassiveTapper(outputItems)
go startPassiveTapper(outputItems, allExtensionPorts)
}
func startMemoryProfiler() {
@ -228,7 +219,7 @@ func startMemoryProfiler() {
}()
}
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
func startPassiveTapper(outputItems chan *api.OutputChannelItem, allExtensionPorts []string) {
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
defer util.Run()()
@ -253,25 +244,12 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
appPortsStr := os.Getenv(AppPortsEnvVar)
var appPorts []int
if appPortsStr == "" {
rlog.Info("Received empty/no APP_PORTS env var! only listening to http on port 80!")
rlog.Info("Received empty/no APP_PORTS env var! only listening to ports: %v!", allExtensionPorts)
appPorts = make([]int, 0)
} else {
appPorts = parseAppPorts(appPortsStr)
}
SetFilterPorts(appPorts)
// envVal := os.Getenv(maxHTTP2DataLenEnvVar)
// if envVal == "" {
// rlog.Infof("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault)
// maxHTTP2DataLen = maxHTTP2DataLenDefault
// } else {
// if convertedInt, err := strconv.Atoi(envVal); err != nil {
// rlog.Infof("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault)
// maxHTTP2DataLen = maxHTTP2DataLenDefault
// } else {
// rlog.Infof("Received HTTP2_DATA_SIZE_LIMIT env var: %v", maxHTTP2DataLenDefault)
// maxHTTP2DataLen = convertedInt
// }
// }
log.Printf("App Ports: %v", gSettings.filterPorts)
@ -344,8 +322,8 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
}
streamFactory := &tcpStreamFactory{
doHTTP: !*nohttp,
Emitter: emitter,
AllExtensionPorts: allExtensionPorts,
Emitter: emitter,
}
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)

View File

@ -14,7 +14,7 @@ import (
const checkTLSPacketAmount = 100
type httpReaderDataMsg struct {
type tcpReaderDataMsg struct {
bytes []byte
timestamp time.Time
}
@ -38,21 +38,19 @@ func (tid *tcpID) String() string {
return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort)
}
/* httpReader gets reads from a channel of bytes of tcp payload, and parses it into HTTP/1 requests and responses.
/* tcpReader gets reads from a channel of bytes of tcp payload, and parses it into requests and responses.
* The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection.
* An httpReader object is unidirectional: it parses either a client stream or a server stream.
* An tcpReader object is unidirectional: it parses either a client stream or a server stream.
* Implements io.Reader interface (Read)
*/
type tcpReader struct {
ident string
tcpID *api.TcpID
isClient bool
isHTTP2 bool
isOutgoing bool
msgQueue chan httpReaderDataMsg // Channel of captured reassembled tcp payload
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
captureTime time.Time
hexdump bool
parent *tcpStream
messageCount uint
packetsSeen uint
@ -61,7 +59,7 @@ type tcpReader struct {
}
func (h *tcpReader) Read(p []byte) (int, error) {
var msg httpReaderDataMsg
var msg tcpReaderDataMsg
ok := true
for ok && len(h.data) == 0 {
@ -102,24 +100,16 @@ func containsPort(ports []string, port string) bool {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
// log.Printf("Called run h.isClient: %v\n", h.isClient)
b := bufio.NewReader(h)
if h.isClient {
extensions[1].Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter)
} else {
extensions[1].Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter)
for _, extension := range extensions {
var port string
if h.isClient {
port = h.tcpID.DstPort
} else {
port = h.tcpID.SrcPort
}
if containsPort(extension.Protocol.Ports, port) {
extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter)
}
}
// for _, extension := range extensions {
// var subjectPorts []string
// if h.isClient {
// subjectPorts = extension.OutboundPorts
// } else {
// subjectPorts = extension.InboundPorts
// }
// if containsPort(subjectPorts, "80") {
// extension.Dissector.Ping()
// fmt.Printf("h.isClient: %v\n", h.isClient)
// extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.Emitter)
// }
// }
}

View File

@ -2,7 +2,6 @@ package tap
import (
"encoding/binary"
"encoding/hex"
"fmt"
"sync"
@ -14,7 +13,7 @@ import (
/* It's a connection (bidirectional)
* Implements gopacket.reassembly.Stream interface (Accept, ReassembledSG, ReassemblyComplete)
* ReassembledSG gets called when new reassembled data is ready (i.e. bytes in order, no duplicates, complete)
* In our implementation, we pass information from ReassembledSG to the httpReader through a shared channel.
* In our implementation, we pass information from ReassembledSG to the tcpReader through a shared channel.
*/
type tcpStream struct {
tcpstate *reassembly.TCPSimpleFSM
@ -22,7 +21,7 @@ type tcpStream struct {
optchecker reassembly.TCPOptionCheck
net, transport gopacket.Flow
isDNS bool
isHTTP bool
isTapTarget bool
reversed bool
client tcpReader
server tcpReader
@ -141,17 +140,14 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
if len(data) > 2+int(dnsSize) {
sg.KeepFrom(2 + int(dnsSize))
}
} else if t.isHTTP {
} else if t.isTapTarget {
if length > 0 {
if *hexdump {
Trace("Feeding http with:%s", hex.Dump(data))
}
// This is where we pass the reassembled information onwards
// This channel is read by an httpReader object
// This channel is read by an tcpReader object
if dir == reassembly.TCPDirClientToServer && !t.reversed {
t.client.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
t.client.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
} else {
t.server.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
t.server.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
}
}
}
@ -159,7 +155,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Trace("%s: Connection closed", t.ident)
if t.isHTTP {
if t.isTapTarget {
close(t.client.msgQueue)
close(t.server.msgQueue)
}

View File

@ -19,8 +19,8 @@ import (
*/
type tcpStreamFactory struct {
wg sync.WaitGroup
doHTTP bool
outbountLinkWriter *OutboundLinkWriter
AllExtensionPorts []string
Emitter api.Emitter
}
@ -33,33 +33,33 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
srcIp := net.Src().String()
dstIp := net.Dst().String()
dstPort := int(tcp.DstPort)
dstPortStr := transport.Dst().String()
// if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) {
// factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort, "", "")
// }
props := factory.getStreamProps(srcIp, dstIp, dstPort)
isHTTP := props.isTapTarget
props := factory.getStreamProps(srcIp, dstIp, dstPort, dstPortStr, factory.AllExtensionPorts)
isTapTarget := props.isTapTarget
stream := &tcpStream{
net: net,
transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isHTTP: isHTTP && factory.doHTTP,
reversed: tcp.SrcPort == 80,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(),
net: net,
transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isTapTarget: isTapTarget,
reversed: tcp.SrcPort == 80,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(),
}
if stream.isHTTP {
if stream.isTapTarget {
stream.client = tcpReader{
msgQueue: make(chan httpReaderDataMsg),
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: net.Src().String(),
DstIP: net.Dst().String(),
SrcPort: transport.Src().String(),
DstPort: transport.Dst().String(),
DstPort: dstPortStr,
},
hexdump: *hexdump,
parent: stream,
isClient: true,
isOutgoing: props.isOutgoing,
@ -67,7 +67,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
Emitter: factory.Emitter,
}
stream.server = tcpReader{
msgQueue: make(chan httpReaderDataMsg),
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net.Reverse(), transport.Reverse()),
tcpID: &api.TcpID{
SrcIP: net.Dst().String(),
@ -75,7 +75,6 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
SrcPort: transport.Dst().String(),
DstPort: transport.Src().String(),
},
hexdump: *hexdump,
parent: stream,
isOutgoing: props.isOutgoing,
outboundLinkWriter: factory.outbountLinkWriter,
@ -93,7 +92,7 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
factory.wg.Wait()
}
func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int) *streamProps {
func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int, dstPortStr string, allExtensionPorts []string) *streamProps {
if hostMode {
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%d", dstIP, dstPort))
@ -107,7 +106,7 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstP
}
return &streamProps{isTapTarget: false}
} else {
isTappedPort := dstPort == 80 || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort)))
isTappedPort := containsPort(allExtensionPorts, dstPortStr) || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort)))
if !isTappedPort {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("- notHost1 %d", dstPort))
return &streamProps{isTapTarget: false, isOutgoing: false}