kubeshark/tap/extensions/http/handlers.go
M. Mert Yıldıran d3e6a69d82
Refactor tap module to achieve synchronously closing other protocol dissectors upon identification (#1026)
* Remove `tcpStreamWrapper` struct

* Refactor `tap` module and move some of the code to `tap/api` module

* Move `TrafficFilteringOptions` struct to `shared` module

* Change the `Dissect` method signature to have `*TcpReader` as an argument

* Add `CloseOtherProtocolDissectors` method and use it to synchronously close the other protocol dissectors

* Run `go mod tidy` in `cli` module

* Rename `SuperIdentifier` struct to `ProtoIdentifier`

* Remove `SuperTimer` struct

* Bring back `CloseTimedoutTcpStreamChannels` method

* Run `go mod tidy` everywhere

* Remove `GOGC` environment variable from tapper

* Fix the tests

* Bring back `debug.FreeOSMemory()` call

* Make `CloseOtherProtocolDissectors` method mutexed

* Revert "Remove `GOGC` environment variable from tapper"

This reverts commit cfc2484bbb.

* Bring back the removed `checksum`, `nooptcheck` and `ignorefsmerr` flags

* Define a bunch of interfaces and don't export any new structs from `tap/api`

* Keep the interfaces in `tap/api` but move the structs to `tap/tcp`

* Fix the unit tests by depending on `github.com/up9inc/mizu/tap`

* Use the modified `tlsEmitter`

* Define `TlsChunk` interface and make `tlsReader` implement `TcpReader`

* Remove unused fields in `tlsReader`

* Define `ReassemblyStream` interface and separate `gopacket` specififc fields to `tcpReassemblyStream` struct

Such that make `tap/api` don't depend on `gopacket`

* Remove the unused fields

* Make `tlsPoller` implement `TcpStream` interface and remove the call to `NewTcpStreamDummy` method

* Remove unused fields from `tlsPoller`

* Remove almost all of the setter methods in `TcpReader` and `TcpStream` interface and remove `TlsChunk` interface

* Revert "Revert "Remove `GOGC` environment variable from tapper""

This reverts commit ab2b9a803b.

* Revert "Bring back `debug.FreeOSMemory()` call"

This reverts commit 1cce863bbb.

* Remove excess comment

* Fix acceptance tests (`logger` module) #run_acceptance_tests

* Bring back `github.com/patrickmn/go-cache`

* Fix `NewTcpStream` method signature

* Put `tcpReader` and `tcpStream` mocks into protocol dissectors to remove `github.com/up9inc/mizu/tap` dependency

* Fix AMQP tests

* Revert 960ba644cd

* Revert `go.mod` and `go.sum` files in protocol dissectors

* Fix the comment position

* Revert `AppStatsInst` change

* Fix indent

* Fix CLI build

* Fix linter error

* Fix error msg

* Revert some of the changes in `chunk.go`
2022-04-28 17:19:14 +03:00

201 lines
5.3 KiB
Go

package http
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/up9inc/mizu/tap/api"
)
func filterAndEmit(item *api.OutputChannelItem, emitter api.Emitter, options *api.TrafficFilteringOptions) {
if IsIgnoredUserAgent(item, options) {
return
}
if !options.DisableRedaction {
FilterSensitiveData(item, options)
}
replaceForwardedFor(item)
emitter.Emit(item)
}
func replaceForwardedFor(item *api.OutputChannelItem) {
if item.Protocol.Name != "http" {
return
}
request := item.Pair.Request.Payload.(api.HTTPPayload).Data.(*http.Request)
forwardedFor := request.Header.Get("X-Forwarded-For")
if forwardedFor == "" {
return
}
ips := strings.Split(forwardedFor, ",")
lastIP := strings.TrimSpace(ips[0])
item.ConnectionInfo.ClientIP = lastIP
// Erase the port field. Because the proxy terminates the connection from the client, the port that we see here
// is not the source port on the client side.
item.ConnectionInfo.ClientPort = ""
}
func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, captureTime time.Time, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
}
var item *api.OutputChannelItem
switch messageHTTP1 := messageHTTP1.(type) {
case http.Request:
ident := fmt.Sprintf(
"%s_%s_%s_%s_%d_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
streamID,
"HTTP2",
)
item = reqResMatcher.registerRequest(ident, &messageHTTP1, captureTime, progress.Current(), messageHTTP1.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
}
case http.Response:
ident := fmt.Sprintf(
"%s_%s_%s_%s_%d_%s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
streamID,
"HTTP2",
)
item = reqResMatcher.registerResponse(ident, &messageHTTP1, captureTime, progress.Current(), messageHTTP1.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
ClientPort: tcpID.DstPort,
ServerIP: tcpID.SrcIP,
ServerPort: tcpID.SrcPort,
IsOutgoing: false,
}
}
}
if item != nil {
if isGrpc {
item.Protocol = grpcProtocol
} else {
item.Protocol = http2Protocol
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return
}
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
counterPair.Unlock()
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
if strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") && strings.ToLower(req.Header.Get("Upgrade")) == "h2c" {
switchingProtocolsHTTP2 = true
}
var body []byte
body, err = ioutil.ReadAll(req.Body)
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%s_%s_%s_%s_%d_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
requestCounter,
"HTTP1",
)
item := reqResMatcher.registerRequest(ident, req, captureTime, progress.Current(), req.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return
}
func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
return
}
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
counterPair.Unlock()
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
if res.StatusCode == 101 && strings.Contains(strings.ToLower(res.Header.Get("Connection")), "upgrade") && strings.ToLower(res.Header.Get("Upgrade")) == "h2c" {
switchingProtocolsHTTP2 = true
}
var body []byte
body, err = ioutil.ReadAll(res.Body)
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%s_%s_%s_%s_%d_%s",
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
responseCounter,
"HTTP1",
)
item := reqResMatcher.registerResponse(ident, res, captureTime, progress.Current(), res.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
ClientPort: tcpID.DstPort,
ServerIP: tcpID.SrcIP,
ServerPort: tcpID.SrcPort,
IsOutgoing: false,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return
}