diff --git a/tap/api/api.go b/tap/api/api.go index 59eef0a8f..885904d2d 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -17,5 +17,5 @@ type Extension struct { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader) interface{} + Dissect(b *bufio.Reader, isClient bool) interface{} } diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 978f15293..0c6f233b6 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -23,7 +23,7 @@ func (g dissecting) Ping() { log.Printf("pong AMQP\n") } -func (g dissecting) Dissect(b *bufio.Reader) interface{} { +func (g dissecting) Dissect(b *bufio.Reader, isClient bool) interface{} { // TODO: Implement return nil } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index a0e51dc96..bd819eaf0 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -3,6 +3,7 @@ package main import ( "bufio" "io" + "io/ioutil" "log" "net/http" @@ -27,38 +28,32 @@ func (g dissecting) Ping() { log.Printf("pong HTTP\n") } -func DiscardBytesToFirstError(r io.Reader) (discarded int, err error) { +func (g dissecting) Dissect(b *bufio.Reader, isClient bool) interface{} { for { - n, e := r.Read(discardBuffer) - discarded += n - if e != nil { - return discarded, e - } - } -} - -func DiscardBytesToEOF(r io.Reader) (discarded int) { - for { - n, e := DiscardBytesToFirstError(r) - discarded += n - if e == io.EOF { - return - } - } -} - -func (g dissecting) Dissect(b *bufio.Reader) interface{} { - for { - req, err := http.ReadRequest(b) - if err == io.EOF { - // We must read until we see an EOF... very important! - return nil - } else if err != nil { - log.Println("Error reading stream:", err) + if isClient { + req, err := http.ReadRequest(b) + if err == io.EOF || err == io.ErrUnexpectedEOF { + // We must read until we see an EOF... very important! + return nil + } else if err != nil { + log.Println("Error reading stream:", err) + } else { + body, _ := ioutil.ReadAll(req.Body) + req.Body.Close() + log.Printf("Received request: %+v with body: %+v\n", req, body) + } } else { - bodyBytes := DiscardBytesToEOF(req.Body) - req.Body.Close() - log.Println("Received request from stream:", req, "with", bodyBytes, "bytes in request body") + res, err := http.ReadResponse(b, nil) + if err == io.EOF || err == io.ErrUnexpectedEOF { + // We must read until we see an EOF... very important! + return nil + } else if err != nil { + log.Println("Error reading stream:", err) + } else { + body, _ := ioutil.ReadAll(res.Body) + res.Body.Close() + log.Printf("Received response: %+v with body: %+v\n", res, body) + } } } } diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 32b1f5a91..21179d183 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -23,7 +23,7 @@ func (g dissecting) Ping() { log.Printf("pong Kafka\n") } -func (g dissecting) Dissect(b *bufio.Reader) interface{} { +func (g dissecting) Dissect(b *bufio.Reader, isClient bool) interface{} { // TODO: Implement return nil } diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 19d576c3e..f6a934a7b 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -91,7 +91,7 @@ var dumpToHar = flag.Bool("hardump", false, "Dump traffic to har files") var HarOutputDir = flag.String("hardir", "", "Directory in which to store output har files") var harEntriesPerFile = flag.Int("harentriesperfile", 200, "Number of max number of har entries to store in each file") -var filter = flag.String("f", "tcp and dst port 80", "BPF filter for pcap") +var filter = flag.String("f", "tcp and (src port 80 or dst port 80)", "BPF filter for pcap") var statsTracker = StatsTracker{} diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index 311616c00..357ca59ac 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -29,12 +29,22 @@ func containsPort(ports []string, port string) bool { return false } -func (h *tcpStream) run() { +func (h *tcpStream) clientRun() { b := bufio.NewReader(&h.r) for _, extension := range extensions { if containsPort(extension.OutboundPorts, h.transport.Dst().String()) { extension.Dissector.Ping() - extension.Dissector.Dissect(b) + extension.Dissector.Dissect(b, true) + } + } +} + +func (h *tcpStream) serverRun() { + b := bufio.NewReader(&h.r) + for _, extension := range extensions { + if containsPort(extension.OutboundPorts, h.transport.Src().String()) { + extension.Dissector.Ping() + extension.Dissector.Dissect(b, false) } } } @@ -47,7 +57,9 @@ func (h *tcpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream r: tcpreader.NewReaderStream(), } if containsPort(allOutboundPorts, transport.Dst().String()) { - go stream.run() + go stream.clientRun() + } else if containsPort(allOutboundPorts, transport.Src().String()) { + go stream.serverRun() } return &stream.r }