diff --git a/Dockerfile b/Dockerfile index e85745689..157588ec9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,12 +18,14 @@ WORKDIR /app/api-build COPY api/go.mod api/go.sum ./ COPY shared/go.mod shared/go.mod ../shared/ +COPY tap/go.mod tap/go.mod ../tap/ RUN go mod download # cheap trick to make the build faster (As long as go.mod wasn't changes) RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get # Copy and build api code COPY shared ../shared +COPY tap ../tap COPY api . RUN go build -ldflags="-s -w" -o mizuagent . diff --git a/api/go.mod b/api/go.mod index dcb1edf86..f75ea5920 100644 --- a/api/go.mod +++ b/api/go.mod @@ -18,6 +18,7 @@ require ( github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/up9inc/mizu/shared v0.0.0 + github.com/up9inc/mizu/tap v0.0.0 go.mongodb.org/mongo-driver v1.5.1 golang.org/x/net v0.0.0-20210421230115-4e50805a0758 gorm.io/driver/sqlite v1.1.4 @@ -28,3 +29,4 @@ require ( ) replace github.com/up9inc/mizu/shared v0.0.0 => ../shared +replace github.com/up9inc/mizu/tap v0.0.0 => ../tap diff --git a/api/main.go b/api/main.go index b1bd17a6c..b7bbaa14d 100644 --- a/api/main.go +++ b/api/main.go @@ -7,12 +7,12 @@ import ( "github.com/gofiber/fiber/v2" "github.com/gorilla/websocket" "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/tap" "mizuserver/pkg/api" "mizuserver/pkg/middleware" "mizuserver/pkg/models" "mizuserver/pkg/routes" "mizuserver/pkg/sensitiveDataFiltering" - "mizuserver/pkg/tap" "mizuserver/pkg/utils" "os" "os/signal" @@ -26,16 +26,21 @@ var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu c func main() { flag.Parse() + hostMode := os.Getenv(shared.HostModeEnvVar) == "1" + tapOpts := &tap.TapOpts{HostMode: hostMode} if !*shouldTap && !*aggregator && !*standalone{ panic("One of the flags --tap, --api or --standalone must be provided") } if *standalone { - harOutputChannel := tap.StartPassiveTapper() + harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts) filteredHarChannel := make(chan *tap.OutputChannelItem) + go filterHarHeaders(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions()) go api.StartReadingEntries(filteredHarChannel, nil) + go api.StartReadingOutbound(outboundLinkOutputChannel) + hostApi(nil) } else if *shouldTap { if *aggregatorAddress == "" { @@ -44,21 +49,26 @@ func main() { tapTargets := getTapTargets() if tapTargets != nil { - tap.HostAppAddresses = tapTargets - fmt.Println("Filtering for the following addresses:", tap.HostAppAddresses) + tap.SetFilterAuthorities(tapTargets) + fmt.Println("Filtering for the following authorities:", tap.GetFilterIPs()) } - harOutputChannel := tap.StartPassiveTapper() + harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts) + socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false) if err != nil { panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err)) } + go pipeChannelToSocket(socketConnection, harOutputChannel) + go api.StartReadingOutbound(outboundLinkOutputChannel) } else if *aggregator { socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000) filteredHarChannel := make(chan *tap.OutputChannelItem) + go api.StartReadingEntries(filteredHarChannel, nil) go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions()) + hostApi(socketHarOutChannel) } diff --git a/api/pkg/api/main.go b/api/pkg/api/main.go index e5eb0b5c2..2a1e550d4 100644 --- a/api/pkg/api/main.go +++ b/api/pkg/api/main.go @@ -6,11 +6,11 @@ import ( "encoding/json" "fmt" "github.com/google/martian/har" + "github.com/up9inc/mizu/tap" "go.mongodb.org/mongo-driver/bson/primitive" "mizuserver/pkg/database" "mizuserver/pkg/models" "mizuserver/pkg/resolver" - "mizuserver/pkg/tap" "mizuserver/pkg/utils" "net/url" "os" @@ -88,10 +88,18 @@ func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) { } for item := range outputItems { - saveHarToDb(item.HarEntry, item.RequestSenderIp) + saveHarToDb(item.HarEntry, item.ConnectionInfo.ClientIP) } } +func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) { + // tcpStreamFactory will block on write to channel. Empty channel to unblock. + // TODO: Make write to channel optional. + for range outboundLinkChannel { + } +} + + func saveHarToDb(entry *har.Entry, sender string) { entryBytes, _ := json.Marshal(entry) serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL) diff --git a/api/pkg/api/socket_server_handlers.go b/api/pkg/api/socket_server_handlers.go index 12bf42adf..8164df004 100644 --- a/api/pkg/api/socket_server_handlers.go +++ b/api/pkg/api/socket_server_handlers.go @@ -5,10 +5,10 @@ import ( "fmt" "github.com/antoniodipinto/ikisocket" "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/tap" "mizuserver/pkg/controllers" "mizuserver/pkg/models" "mizuserver/pkg/routes" - "mizuserver/pkg/tap" ) var browserClientSocketUUIDs = make([]string, 0) diff --git a/api/pkg/models/models.go b/api/pkg/models/models.go index 3ed03ebed..d8dc90184 100644 --- a/api/pkg/models/models.go +++ b/api/pkg/models/models.go @@ -4,7 +4,7 @@ import ( "encoding/json" "github.com/google/martian/har" "github.com/up9inc/mizu/shared" - "mizuserver/pkg/tap" + "github.com/up9inc/mizu/tap" "time" ) @@ -101,4 +101,4 @@ type ExtendedLog struct { type ExtendedCreator struct { *har.Creator Source string `json:"_source"` -} \ No newline at end of file +} diff --git a/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go b/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go index f302660f2..15a8e5d8b 100644 --- a/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go +++ b/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go @@ -5,7 +5,7 @@ import ( "encoding/xml" "errors" "fmt" - "mizuserver/pkg/tap" + "github.com/up9inc/mizu/tap" "net/url" "strings" diff --git a/api/pkg/tap/http_matcher.go b/api/pkg/tap/http_matcher.go deleted file mode 100644 index c0a98fb1f..000000000 --- a/api/pkg/tap/http_matcher.go +++ /dev/null @@ -1,209 +0,0 @@ -package tap - -import ( - "fmt" - "net/http" - "strconv" - "strings" - "time" - - "github.com/orcaman/concurrent-map" -) - -type requestResponsePair struct { - Request httpMessage `json:"request"` - Response httpMessage `json:"response"` -} - -type envoyMessageWrapper struct { - HttpBufferedTrace requestResponsePair `json:"http_buffered_trace"` -} - -type headerKeyVal struct { - Key string `json:"key"` - Value string `json:"value"` -} - -type messageBody struct { - Truncated bool `json:"truncated"` - AsBytes string `json:"as_bytes"` -} - -type httpMessage struct { - IsRequest bool - Headers []headerKeyVal `json:"headers"` - HTTPVersion string `json:"httpVersion"` - Body messageBody `json:"body"` - captureTime time.Time - orig interface {} - requestSenderIp string -} - - -// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port} -type requestResponseMatcher struct { - openMessagesMap cmap.ConcurrentMap - -} - -func createResponseRequestMatcher() requestResponseMatcher { - newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()} - return *newMatcher -} - -func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, body string, isHTTP2 bool) *envoyMessageWrapper { - split := splitIdent(ident) - key := genKey(split) - - messageExtraHeaders := []headerKeyVal{ - {Key: "x-up9-source", Value: split[0]}, - {Key: "x-up9-destination", Value: split[1] + ":" + split[3]}, - } - - requestHTTPMessage := requestToMessage(request, captureTime, body, &messageExtraHeaders, isHTTP2, split[0]) - - if response, found := matcher.openMessagesMap.Pop(key); found { - // Type assertion always succeeds because all of the map's values are of httpMessage type - responseHTTPMessage := response.(*httpMessage) - if responseHTTPMessage.IsRequest { - SilentError("Request-Duplicate", "Got duplicate request with same identifier\n") - return nil - } - Debug("Matched open Response for %s\n", key) - return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage) - } - - matcher.openMessagesMap.Set(key, &requestHTTPMessage) - Debug("Registered open Request for %s\n", key) - return nil -} - -func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, body string, isHTTP2 bool) *envoyMessageWrapper { - split := splitIdent(ident) - key := genKey(split) - - responseHTTPMessage := responseToMessage(response, captureTime, body, isHTTP2) - - if request, found := matcher.openMessagesMap.Pop(key); found { - // Type assertion always succeeds because all of the map's values are of httpMessage type - requestHTTPMessage := request.(*httpMessage) - if !requestHTTPMessage.IsRequest { - SilentError("Response-Duplicate", "Got duplicate response with same identifier\n") - return nil - } - Debug("Matched open Request for %s\n", key) - return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage) - } - - matcher.openMessagesMap.Set(key, &responseHTTPMessage) - Debug("Registered open Response for %s\n", key) - return nil -} - -func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) *envoyMessageWrapper { - matcher.addDuration(requestHTTPMessage, responseHTTPMessage) - - return &envoyMessageWrapper{ - HttpBufferedTrace: requestResponsePair{ - Request: *requestHTTPMessage, - Response: *responseHTTPMessage, - }, - } -} - -func requestToMessage(request *http.Request, captureTime time.Time, body string, messageExtraHeaders *[]headerKeyVal, isHTTP2 bool, requestSenderIp string) httpMessage { - messageHeaders := make([]headerKeyVal, 0) - - for key, value := range request.Header { - messageHeaders = append(messageHeaders, headerKeyVal{Key: key, Value: value[0]}) - } - - if !isHTTP2 { - messageHeaders = append(messageHeaders, headerKeyVal{Key: ":method", Value: request.Method}) - messageHeaders = append(messageHeaders, headerKeyVal{Key: ":path", Value: request.RequestURI}) - messageHeaders = append(messageHeaders, headerKeyVal{Key: ":authority", Value: request.Host}) - messageHeaders = append(messageHeaders, headerKeyVal{Key: ":scheme", Value: "http"}) - } - - messageHeaders = append(messageHeaders, headerKeyVal{Key: "x-request-start", Value: fmt.Sprintf("%.3f", float64(captureTime.UnixNano()) / float64(1000000000))}) - - messageHeaders = append(messageHeaders, *messageExtraHeaders...) - - httpVersion := request.Proto - - requestBody := messageBody{Truncated: false, AsBytes: body} - - return httpMessage{ - IsRequest: true, - Headers: messageHeaders, - HTTPVersion: httpVersion, - Body: requestBody, - captureTime: captureTime, - orig: request, - requestSenderIp: requestSenderIp, - } -} - -func responseToMessage(response *http.Response, captureTime time.Time, body string, isHTTP2 bool) httpMessage { - messageHeaders := make([]headerKeyVal, 0) - - for key, value := range response.Header { - messageHeaders = append(messageHeaders, headerKeyVal{Key: key, Value: value[0]}) - } - - if !isHTTP2 { - messageHeaders = append(messageHeaders, headerKeyVal{Key: ":status", Value: strconv.Itoa(response.StatusCode)}) - } - - httpVersion := response.Proto - - requestBody := messageBody{Truncated: false, AsBytes: body} - - return httpMessage{ - IsRequest: false, - Headers: messageHeaders, - HTTPVersion: httpVersion, - Body: requestBody, - captureTime: captureTime, - orig: response, - } -} - -func (matcher *requestResponseMatcher) addDuration(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) { - durationMs := float64(responseHTTPMessage.captureTime.UnixNano() / 1000000) - float64(requestHTTPMessage.captureTime.UnixNano() / 1000000) - if durationMs < 1 { - durationMs = 1 - } - - responseHTTPMessage.Headers = append(responseHTTPMessage.Headers, headerKeyVal{Key: "x-up9-duration-ms", Value: fmt.Sprintf("%.0f", durationMs)}) -} - -func splitIdent(ident string) []string { - ident = strings.Replace(ident, "->", " ", -1) - return strings.Split(ident, " ") -} - -func genKey(split []string) string { - key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4]) - return key -} - -func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int { - keysToPop := make([]string, 0) - for item := range matcher.openMessagesMap.IterBuffered() { - // Map only contains values of type httpMessage - message, _ := item.Val.(*httpMessage) - - if message.captureTime.Before(t) { - keysToPop = append(keysToPop, item.Key) - } - } - - numDeleted := len(keysToPop) - - for _, key := range keysToPop { - _, _ = matcher.openMessagesMap.Pop(key) - } - - return numDeleted -} diff --git a/api/pkg/tap/tap_output.go b/api/pkg/tap/tap_output.go deleted file mode 100644 index 561c445c0..000000000 --- a/api/pkg/tap/tap_output.go +++ /dev/null @@ -1,239 +0,0 @@ -package tap - -import ( - "bytes" - "encoding/json" - "fmt" - "log" - "net/http" - "time" - - "github.com/gorilla/websocket" - "github.com/patrickmn/go-cache" -) - - -const ( - // Time allowed to write a message to the peer. - writeWait = 10 * time.Second - - // Time allowed to read the next pong message from the peer. - pongWait = 60 * time.Second - - // Send pings to peer with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 - - // Maximum message size allowed from peer. - maxMessageSize = 512 -) - -var ( - newline = []byte{'\n'} - space = []byte{' '} - hub *Hub - outboundSocketNotifyExpiringCache = cache.New(outboundThrottleCacheExpiryPeriod, outboundThrottleCacheExpiryPeriod) -) - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func (_ *http.Request) bool { return true }, -} - -// Client is a middleman between the websocket connection and the hub. -type Client struct { - hub *Hub - - // The websocket connection. - conn *websocket.Conn - - // Buffered channel of outbound messages. - send chan []byte -} - -type OutBoundLinkMessage struct { - SourceIP string `json:"sourceIP"` - IP string `json:"ip"` - Port int `json:"port"` - Type string `json:"type"` -} - - -// readPump pumps messages from the websocket connection to the hub. -// -// The application runs readPump in a per-connection goroutine. The application -// ensures that there is at most one reader on a connection by executing all -// reads from this goroutine. -func (c *Client) readPump() { - defer func() { - c.hub.unregister <- c - c.conn.Close() - }() - c.conn.SetReadLimit(maxMessageSize) - c.conn.SetReadDeadline(time.Now().Add(pongWait)) - c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) - for { - _, message, err := c.conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - log.Printf("error: %v", err) - } - break - } - message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) - c.hub.onMessageCallback(message) - } -} - -// writePump pumps messages from the hub to the websocket connection. -// -// A goroutine running writePump is started for each connection. The -// application ensures that there is at most one writer to a connection by -// executing all writes from this goroutine. -func (c *Client) writePump() { - ticker := time.NewTicker(pingPeriod) - defer func() { - ticker.Stop() - c.conn.Close() - }() - for { - select { - case message, ok := <-c.send: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if !ok { - // The hub closed the channel. - c.conn.WriteMessage(websocket.CloseMessage, []byte{}) - return - } - - w, err := c.conn.NextWriter(websocket.TextMessage) - if err != nil { - return - } - w.Write(message) - - - if err := w.Close(); err != nil { - return - } - case <-ticker.C: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - return - } - } - } -} - -type Hub struct { - // Registered clients. - clients map[*Client]bool - - // Inbound messages from the clients. - broadcast chan []byte - - // Register requests from the clients. - register chan *Client - - // Unregister requests from clients. - unregister chan *Client - - // Handle messages from client - onMessageCallback func([]byte) - - -} - -func newHub(onMessageCallback func([]byte)) *Hub { - return &Hub{ - broadcast: make(chan []byte), - register: make(chan *Client), - unregister: make(chan *Client), - clients: make(map[*Client]bool), - onMessageCallback: onMessageCallback, - } -} - -func (h *Hub) run() { - for { - select { - case client := <-h.register: - h.clients[client] = true - case client := <-h.unregister: - if _, ok := h.clients[client]; ok { - delete(h.clients, client) - close(client.send) - } - case message := <-h.broadcast: - // matched messages counter is incremented in this thread instead of in multiple http reader - // threads in order to reduce contention. - statsTracker.incMatchedMessages() - - for client := range h.clients { - select { - case client.send <- message: - default: - close(client.send) - delete(h.clients, client) - } - } - } - } -} - - -// serveWs handles websocket requests from the peer. -func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Println(err) - return - } - client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} - client.hub.register <- client - - // Allow collection of memory referenced by the caller by doing all work in - // new goroutines. - go client.writePump() - go client.readPump() -} - -func startOutputServer(port string, messageCallback func([]byte)) { - hub = newHub(messageCallback) - go hub.run() - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - serveWs(hub, w, r) - }) - err := http.ListenAndServe("0.0.0.0:" + port, nil) - if err != nil { - log.Fatal("Output server error: ", err) - } -} - -func broadcastReqResPair(reqResJson []byte) { - hub.broadcast <- reqResJson -} - -func broadcastOutboundLink(srcIP string, dstIP string, dstPort int) { - cacheKey := fmt.Sprintf("%s -> %s:%d", srcIP, dstIP, dstPort) - _, isInCache := outboundSocketNotifyExpiringCache.Get(cacheKey) - if isInCache { - return - } else { - outboundSocketNotifyExpiringCache.SetDefault(cacheKey, true) - } - - socketMessage := OutBoundLinkMessage{ - SourceIP: srcIP, - IP: dstIP, - Port: dstPort, - Type: "outboundSocketDetected", - } - - jsonStr, err := json.Marshal(socketMessage) - if err != nil { - log.Printf("error marshalling outbound socket detection object: %v", err) - } else { - hub.broadcast <- jsonStr - } -} diff --git a/api/pkg/tap/cleaner.go b/tap/cleaner.go similarity index 100% rename from api/pkg/tap/cleaner.go rename to tap/cleaner.go diff --git a/tap/go.mod b/tap/go.mod new file mode 100644 index 000000000..d8c38a22c --- /dev/null +++ b/tap/go.mod @@ -0,0 +1,12 @@ +module github.com/up9inc/mizu/tap + +go 1.16 + +require ( + github.com/google/gopacket v1.1.19 + github.com/google/martian v2.1.0+incompatible + github.com/gorilla/websocket v1.4.2 + github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 + github.com/patrickmn/go-cache v2.1.0+incompatible + golang.org/x/net v0.0.0-20210421230115-4e50805a0758 +) diff --git a/tap/go.sum b/tap/go.sum new file mode 100644 index 000000000..a110e49b4 --- /dev/null +++ b/tap/go.sum @@ -0,0 +1,31 @@ +github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= +github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= +github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ= +github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo= +golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/api/pkg/tap/grpc_assembler.go b/tap/grpc_assembler.go similarity index 96% rename from api/pkg/tap/grpc_assembler.go rename to tap/grpc_assembler.go index 076faf7cd..72b5665f1 100644 --- a/api/pkg/tap/grpc_assembler.go +++ b/tap/grpc_assembler.go @@ -84,14 +84,14 @@ type GrpcAssembler struct { framer *http2.Framer } -func (ga *GrpcAssembler) readMessage() (uint32, interface{}, string, error) { +func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) { // Exactly one Framer is used for each half connection. // (Instead of creating a new Framer for each ReadFrame operation) // This is needed in order to decompress the headers, // because the compression context is updated with each requests/response. frame, err := ga.framer.ReadFrame() if err != nil { - return 0, nil, "", err + return 0, nil, err } streamID := frame.Header().StreamID @@ -99,7 +99,7 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, string, error) { ga.fragmentsByStream.appendFrame(streamID, frame) if !(ga.isStreamEnd(frame)) { - return 0, nil, "", nil + return 0, nil, nil } headers, data := ga.fragmentsByStream.pop(streamID) @@ -137,10 +137,10 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, string, error) { ContentLength: int64(len(dataString)), } } else { - return 0, nil, "", errors.New("Failed to assemble stream: neither a request nor a message") + return 0, nil, errors.New("Failed to assemble stream: neither a request nor a message") } - return streamID, messageHTTP1, dataString, nil + return streamID, messageHTTP1, nil } func (ga *GrpcAssembler) isStreamEnd(frame http2.Frame) bool { diff --git a/api/pkg/tap/har_writer.go b/tap/har_writer.go similarity index 82% rename from api/pkg/tap/har_writer.go rename to tap/har_writer.go index 1b1eb1096..1bd2b5ee5 100644 --- a/api/pkg/tap/har_writer.go +++ b/tap/har_writer.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "net/http" "os" "path/filepath" @@ -23,12 +24,13 @@ type PairChanItem struct { Response *http.Response ResponseTime time.Time RequestSenderIp string + ConnectionInfo *ConnectionInfo } func openNewHarFile(filename string) *HarFile { file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, readPermission) if err != nil { - panic(fmt.Sprintf("Failed to open output file: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to open output file: %s (%v,%+v)", err, err, err) } harFile := HarFile{file: file, entryCount: 0} @@ -45,13 +47,13 @@ type HarFile struct { func NewEntry(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time) (*har.Entry, error) { harRequest, err := har.NewRequest(request, true) if err != nil { - SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)\n", err, err, err) + SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)", err, err, err) return nil, errors.New("Failed converting request to HAR") } harResponse, err := har.NewResponse(response, true) if err != nil { - SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)\n", err, err, err) + SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)", err, err, err) return nil, errors.New("Failed converting response to HAR") } @@ -62,7 +64,7 @@ func NewEntry(request *http.Request, requestTime time.Time, response *http.Respo status, err := strconv.Atoi(response.Header.Get(":status")) if err != nil { - SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)\n", err, err, err) + SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)", err, err, err) return nil, errors.New("Failed converting response status to int for HAR") } harResponse.Status = status @@ -102,7 +104,7 @@ func NewEntry(request *http.Request, requestTime time.Time, response *http.Respo func (f *HarFile) WriteEntry(harEntry *har.Entry) { harEntryJson, err := json.Marshal(harEntry) if err != nil { - SilentError("har-entry-marshal", "Failed converting har entry object to JSON%s (%v,%+v)\n", err, err, err) + SilentError("har-entry-marshal", "Failed converting har entry object to JSON%s (%v,%+v)", err, err, err) return } @@ -116,7 +118,7 @@ func (f *HarFile) WriteEntry(harEntry *har.Entry) { harEntryString := append([]byte(separator), harEntryJson...) if _, err := f.file.Write(harEntryString); err != nil { - panic(fmt.Sprintf("Failed to write to output file: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to write to output file: %s (%v,%+v)", err, err, err) } f.entryCount++ @@ -131,21 +133,21 @@ func (f *HarFile) Close() { err := f.file.Close() if err != nil { - panic(fmt.Sprintf("Failed to close output file: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to close output file: %s (%v,%+v)", err, err, err) } } func (f*HarFile) writeHeader() { header := []byte(`{"log": {"version": "1.2", "creator": {"name": "Mizu", "version": "0.0.1"}, "entries": [`) if _, err := f.file.Write(header); err != nil { - panic(fmt.Sprintf("Failed to write header to output file: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to write header to output file: %s (%v,%+v)", err, err, err) } } func (f*HarFile) writeTrailer() { trailer := []byte("]}}") if _, err := f.file.Write(trailer); err != nil { - panic(fmt.Sprintf("Failed to write trailer to output file: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to write trailer to output file: %s (%v,%+v)", err, err, err) } } @@ -161,8 +163,8 @@ func NewHarWriter(outputDir string, maxEntries int) *HarWriter { } type OutputChannelItem struct { - HarEntry *har.Entry - RequestSenderIp string + HarEntry *har.Entry + ConnectionInfo *ConnectionInfo } type HarWriter struct { @@ -174,20 +176,20 @@ type HarWriter struct { done chan bool } -func (hw *HarWriter) WritePair(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time, requestSenderIp string) { +func (hw *HarWriter) WritePair(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time, connectionInfo *ConnectionInfo) { hw.PairChan <- &PairChanItem{ - Request: request, - RequestTime: requestTime, - Response: response, - ResponseTime: responseTime, - RequestSenderIp: requestSenderIp, + Request: request, + RequestTime: requestTime, + Response: response, + ResponseTime: responseTime, + ConnectionInfo: connectionInfo, } } func (hw *HarWriter) Start() { if hw.OutputDirPath != "" { if err := os.MkdirAll(hw.OutputDirPath, os.ModePerm); err != nil { - panic(fmt.Sprintf("Failed to create output directory: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to create output directory: %s (%v,%+v)", err, err, err) } } @@ -210,8 +212,8 @@ func (hw *HarWriter) Start() { } } else { hw.OutChan <- &OutputChannelItem{ - HarEntry: harEntry, - RequestSenderIp: pair.RequestSenderIp, + HarEntry: harEntry, + ConnectionInfo: pair.ConnectionInfo, } } } @@ -226,6 +228,7 @@ func (hw *HarWriter) Start() { func (hw *HarWriter) Stop() { close(hw.PairChan) <-hw.done + close(hw.OutChan) } func (hw *HarWriter) openNewFile() { @@ -241,7 +244,7 @@ func (hw *HarWriter) closeFile() { filename := buildFilename(hw.OutputDirPath, time.Now()) err := os.Rename(tmpFilename, filename) if err != nil { - SilentError("Rename-file", "cannot rename file: %s (%v,%+v)\n", err, err, err) + SilentError("Rename-file", "cannot rename file: %s (%v,%+v)", err, err, err) } } diff --git a/tap/http_matcher.go b/tap/http_matcher.go new file mode 100644 index 000000000..a0377cd46 --- /dev/null +++ b/tap/http_matcher.go @@ -0,0 +1,138 @@ +package tap + +import ( + "fmt" + "net/http" + "strings" + "time" + + "github.com/orcaman/concurrent-map" +) + +type requestResponsePair struct { + Request httpMessage `json:"request"` + Response httpMessage `json:"response"` +} + +type ConnectionInfo struct { + ClientIP string + ClientPort string + ServerIP string + ServerPort string +} + +type httpMessage struct { + isRequest bool + captureTime time.Time + orig interface{} + connectionInfo ConnectionInfo +} + + +// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port} +type requestResponseMatcher struct { + openMessagesMap cmap.ConcurrentMap + +} + +func createResponseRequestMatcher() requestResponseMatcher { + newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()} + return *newMatcher +} + +func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time) *requestResponsePair { + split := splitIdent(ident) + key := genKey(split) + + connectionInfo := &ConnectionInfo{ + ClientIP: split[0], + ClientPort: split[2], + ServerIP: split[1], + ServerPort: split[3], + } + + requestHTTPMessage := httpMessage{ + isRequest: true, + captureTime: captureTime, + orig: request, + connectionInfo: *connectionInfo, + } + + if response, found := matcher.openMessagesMap.Pop(key); found { + // Type assertion always succeeds because all of the map's values are of httpMessage type + responseHTTPMessage := response.(*httpMessage) + if responseHTTPMessage.isRequest { + SilentError("Request-Duplicate", "Got duplicate request with same identifier") + return nil + } + Debug("Matched open Response for %s", key) + return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage) + } + + matcher.openMessagesMap.Set(key, &requestHTTPMessage) + Debug("Registered open Request for %s", key) + return nil +} + +func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time) *requestResponsePair { + split := splitIdent(ident) + key := genKey(split) + + responseHTTPMessage := httpMessage{ + isRequest: false, + captureTime: captureTime, + orig: response, + } + + if request, found := matcher.openMessagesMap.Pop(key); found { + // Type assertion always succeeds because all of the map's values are of httpMessage type + requestHTTPMessage := request.(*httpMessage) + if !requestHTTPMessage.isRequest { + SilentError("Response-Duplicate", "Got duplicate response with same identifier") + return nil + } + Debug("Matched open Request for %s", key) + return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage) + } + + matcher.openMessagesMap.Set(key, &responseHTTPMessage) + Debug("Registered open Response for %s", key) + return nil +} + +func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) *requestResponsePair { + return &requestResponsePair{ + Request: *requestHTTPMessage, + Response: *responseHTTPMessage, + } +} + +func splitIdent(ident string) []string { + ident = strings.Replace(ident, "->", " ", -1) + return strings.Split(ident, " ") +} + +func genKey(split []string) string { + key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4]) + return key +} + +func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int { + keysToPop := make([]string, 0) + for item := range matcher.openMessagesMap.IterBuffered() { + // Map only contains values of type httpMessage + message, _ := item.Val.(*httpMessage) + + if message.captureTime.Before(t) { + keysToPop = append(keysToPop, item.Key) + } + } + + numDeleted := len(keysToPop) + + for _, key := range keysToPop { + _, _ = matcher.openMessagesMap.Pop(key) + } + + return numDeleted +} diff --git a/api/pkg/tap/http_reader.go b/tap/http_reader.go similarity index 60% rename from api/pkg/tap/http_reader.go rename to tap/http_reader.go index 21ce9c788..00d20f425 100644 --- a/api/pkg/tap/http_reader.go +++ b/tap/http_reader.go @@ -3,10 +3,7 @@ package tap import ( "bufio" "bytes" - "compress/gzip" - b64 "encoding/base64" "encoding/hex" - "encoding/json" "fmt" "io" "io/ioutil" @@ -73,7 +70,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) { b := bufio.NewReader(h) if isHTTP2, err := checkIsHTTP2Connection(b, h.isClient); err != nil { - SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)\n", h.ident, err, err, err) + SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)", h.ident, err, err, err) // Do something? } else { h.isHTTP2 = isHTTP2 @@ -82,7 +79,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) { if h.isHTTP2 { err := prepareHTTP2Connection(b, h.isClient) if err != nil { - SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)\n", h.ident, err, err, err) + SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)", h.ident, err, err, err) } h.grpcAssembler = createGrpcAssembler(b) } @@ -93,7 +90,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) { if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - SilentError("HTTP/2", "stream %s error: %s (%v,%+v)\n", h.ident, err, err, err) + SilentError("HTTP/2", "stream %s error: %s (%v,%+v)", h.ident, err, err, err) continue } } else if h.isClient { @@ -101,7 +98,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) { if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - SilentError("HTTP-request", "stream %s Request error: %s (%v,%+v)\n", h.ident, err, err, err) + SilentError("HTTP-request", "stream %s Request error: %s (%v,%+v)", h.ident, err, err, err) continue } } else { @@ -109,7 +106,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) { if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - SilentError("HTTP-response", "stream %s Response error: %s (%v,%+v)\n", h.ident, err, err, err) + SilentError("HTTP-response", "stream %s Response error: %s (%v,%+v)", h.ident, err, err, err) continue } } @@ -117,38 +114,34 @@ func (h *httpReader) run(wg *sync.WaitGroup) { } func (h *httpReader) handleHTTP2Stream() error { - streamID, messageHTTP1, body, err := h.grpcAssembler.readMessage() + streamID, messageHTTP1, err := h.grpcAssembler.readMessage() h.messageCount++ if err != nil { return err } - var reqResPair *envoyMessageWrapper + var reqResPair *requestResponsePair switch messageHTTP1 := messageHTTP1.(type) { case http.Request: ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID) - reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime, body, true) + reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime) case http.Response: ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID) - reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime, body, true) + reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime) } if reqResPair != nil { + statsTracker.incMatchedMessages() + if h.harWriter != nil { h.harWriter.WritePair( - reqResPair.HttpBufferedTrace.Request.orig.(*http.Request), - reqResPair.HttpBufferedTrace.Request.captureTime, - reqResPair.HttpBufferedTrace.Response.orig.(*http.Response), - reqResPair.HttpBufferedTrace.Response.captureTime, - reqResPair.HttpBufferedTrace.Request.requestSenderIp, + reqResPair.Request.orig.(*http.Request), + reqResPair.Request.captureTime, + reqResPair.Response.orig.(*http.Response), + reqResPair.Response.captureTime, + &reqResPair.Request.connectionInfo, ) - } else { - jsonStr, err := json.Marshal(reqResPair) - if err != nil { - return err - } - broadcastReqResPair(jsonStr) } } @@ -165,37 +158,29 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error { req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind s := len(body) if err != nil { - SilentError("HTTP-request-body", "stream %s Got body err: %s\n", h.ident, err) + SilentError("HTTP-request-body", "stream %s Got body err: %s", h.ident, err) } else if h.hexdump { - Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body)) + Info("Body(%d/0x%x) - %s", len(body), len(body), hex.Dump(body)) } if err := req.Body.Close(); err != nil { - SilentError("HTTP-request-body-close", "stream %s Failed to close request body: %s\n", h.ident, err) + SilentError("HTTP-request-body-close", "stream %s Failed to close request body: %s", h.ident, err) } encoding := req.Header["Content-Encoding"] - bodyStr, err := readBody(body, encoding) - if err != nil { - SilentError("HTTP-request-body-decode", "stream %s Failed to decode body: %s\n", h.ident, err) - } - Info("HTTP/%s Request: %s %s (Body:%d)\n", h.ident, req.Method, req.URL, s) + Info("HTTP/1 Request: %s %s %s (Body:%d) -> %s", h.ident, req.Method, req.URL, s, encoding) ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, h.messageCount) - reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime, bodyStr, false) + reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime) if reqResPair != nil { + statsTracker.incMatchedMessages() + if h.harWriter != nil { h.harWriter.WritePair( - reqResPair.HttpBufferedTrace.Request.orig.(*http.Request), - reqResPair.HttpBufferedTrace.Request.captureTime, - reqResPair.HttpBufferedTrace.Response.orig.(*http.Response), - reqResPair.HttpBufferedTrace.Response.captureTime, - reqResPair.HttpBufferedTrace.Request.requestSenderIp, + reqResPair.Request.orig.(*http.Request), + reqResPair.Request.captureTime, + reqResPair.Response.orig.(*http.Response), + reqResPair.Response.captureTime, + &reqResPair.Request.connectionInfo, ) - } else { - jsonStr, err := json.Marshal(reqResPair) - if err != nil { - SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err) - } - broadcastReqResPair(jsonStr) } } @@ -224,13 +209,13 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error { res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind s := len(body) if err != nil { - SilentError("HTTP-response-body", "HTTP/%s: failed to get body(parsed len:%d): %s\n", h.ident, s, err) + SilentError("HTTP-response-body", "HTTP/%s: failed to get body(parsed len:%d): %s", h.ident, s, err) } if h.hexdump { - Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body)) + Info("Body(%d/0x%x) - %s", len(body), len(body), hex.Dump(body)) } if err := res.Body.Close(); err != nil { - SilentError("HTTP-response-body-close", "HTTP/%s: failed to close body(parsed len:%d): %s\n", h.ident, s, err) + SilentError("HTTP-response-body-close", "HTTP/%s: failed to close body(parsed len:%d): %s", h.ident, s, err) } sym := "," if res.ContentLength > 0 && res.ContentLength != int64(s) { @@ -241,54 +226,23 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error { contentType = []string{http.DetectContentType(body)} } encoding := res.Header["Content-Encoding"] - Info("HTTP/%s Response: %s URL:%s (%d%s%d%s) -> %s\n", h.ident, res.Status, req, res.ContentLength, sym, s, contentType, encoding) - bodyStr, err := readBody(body, encoding) - if err != nil { - SilentError("HTTP-response-body-decode", "stream %s Failed to decode body: %s\n", h.ident, err) - } + Info("HTTP/1 Response: %s %s URL:%s (%d%s%d%s) -> %s", h.ident, res.Status, req, res.ContentLength, sym, s, contentType, encoding) ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, h.messageCount) - reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime, bodyStr, false) + reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime) if reqResPair != nil { + statsTracker.incMatchedMessages() + if h.harWriter != nil { h.harWriter.WritePair( - reqResPair.HttpBufferedTrace.Request.orig.(*http.Request), - reqResPair.HttpBufferedTrace.Request.captureTime, - reqResPair.HttpBufferedTrace.Response.orig.(*http.Response), - reqResPair.HttpBufferedTrace.Response.captureTime, - reqResPair.HttpBufferedTrace.Request.requestSenderIp, + reqResPair.Request.orig.(*http.Request), + reqResPair.Request.captureTime, + reqResPair.Response.orig.(*http.Response), + reqResPair.Response.captureTime, + &reqResPair.Request.connectionInfo, ) - } else { - jsonStr, err := json.Marshal(reqResPair) - if err != nil { - SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err) - } - broadcastReqResPair(jsonStr) } } return nil } - -func readBody(bodyBytes []byte, encoding []string) (string, error) { - var bodyBuffer io.Reader - bodyBuffer = bytes.NewBuffer(bodyBytes) - var err error - if len(encoding) > 0 && (encoding[0] == "gzip" || encoding[0] == "deflate") { - bodyBuffer, err = gzip.NewReader(bodyBuffer) - if err != nil { - SilentError("HTTP-gunzip", "Failed to gzip decode: %s\n", err) - return "", err - } - } - if _, ok := bodyBuffer.(*gzip.Reader); ok { - err = bodyBuffer.(*gzip.Reader).Close() - if err != nil { - return "", err - } - } - - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(bodyBuffer) - return b64.StdEncoding.EncodeToString(buf.Bytes()), err -} diff --git a/api/pkg/tap/net_utils.go b/tap/net_utils.go similarity index 100% rename from api/pkg/tap/net_utils.go rename to tap/net_utils.go diff --git a/tap/outboundlinks.go b/tap/outboundlinks.go new file mode 100644 index 000000000..0cb60bbd9 --- /dev/null +++ b/tap/outboundlinks.go @@ -0,0 +1,29 @@ +package tap + +type OutboundLink struct { + Src string + DstIP string + DstPort int +} + +func NewOutboundLinkWriter() *OutboundLinkWriter { + return &OutboundLinkWriter{ + OutChan: make(chan *OutboundLink), + } +} + +type OutboundLinkWriter struct { + OutChan chan *OutboundLink +} + +func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int) { + olw.OutChan <- &OutboundLink{ + Src: src, + DstIP: DstIP, + DstPort: DstPort, + } +} + +func (olw *OutboundLinkWriter) Stop() { + close(olw.OutChan) +} diff --git a/api/pkg/tap/passive_tapper.go b/tap/passive_tapper.go similarity index 70% rename from api/pkg/tap/passive_tapper.go rename to tap/passive_tapper.go index 72f5c1294..3b53e524a 100644 --- a/api/pkg/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -10,10 +10,8 @@ package tap import ( "encoding/hex" - "encoding/json" "flag" "fmt" - "github.com/up9inc/mizu/shared" "log" "os" "os/signal" @@ -33,12 +31,10 @@ import ( ) const AppPortsEnvVar = "APP_PORTS" -const OutPortEnvVar = "WEB_SOCKET_PORT" const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT" // default is 1MB, more than the max size accepted by collector and traffic-dumper const maxHTTP2DataLenDefault = 1 * 1024 * 1024 const cleanPeriod = time.Second * 10 -const outboundThrottleCacheExpiryPeriod = time.Minute * 15 var remoteOnlyOutboundPorts = []int { 80, 443 } func parseAppPorts(appPortsList string) []int { @@ -46,7 +42,7 @@ func parseAppPorts(appPortsList string) []int { for _, portStr := range strings.Split(appPortsList, ",") { parsedInt, parseError := strconv.Atoi(portStr) if parseError != nil { - fmt.Println("Provided app port ", portStr, " is not a valid number!") + log.Printf("Provided app port %v is not a valid number!", portStr) } else { ports = append(ports, parsedInt) } @@ -54,13 +50,6 @@ func parseAppPorts(appPortsList string) []int { return ports } -func parseHostAppAddresses(hostAppAddressesString string) []string { - if len(hostAppAddressesString) == 0 { - return []string{} - } - return strings.Split(hostAppAddressesString, ",") -} - var maxcount = flag.Int("c", -1, "Only grab this many packets, then exit") var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)") var statsevery = flag.Int("stats", 60, "Output statistics every N seconds") @@ -90,7 +79,6 @@ var tstype = flag.String("timestamp_type", "", "Type of timestamps to use") var promisc = flag.Bool("promisc", true, "Set promiscuous mode") var anydirection = flag.Bool("anydirection", false, "Capture http requests to other hosts") var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data") -var hostAppAddressesString = flag.String("targets", "", "Comma separated list of ip:ports to tap") var memprofile = flag.String("memprofile", "", "Write memory profile") @@ -121,24 +109,20 @@ var stats struct { overlapPackets int } -type CollectorMessage struct { - MessageType string - Ports *[]int `json:"ports,omitempty"` - Addresses *[]string `json:"addresses,omitempty"` +type TapOpts struct { + HostMode bool } var outputLevel int var errorsMap map[string]uint var errorsMapMutex sync.Mutex var nErrors uint -var appPorts []int // global -var ownIps []string //global -var hostMode bool //global -var HostAppAddresses []string //global +var ownIps []string // global +var hostMode bool // global /* minOutputLevel: Error will be printed only if outputLevel is above this value * t: key for errorsMap (counting errors) - * s, a: arguments fmt.Printf + * s, a: arguments log.Printf * Note: Too bad for perf that a... is evaluated */ func logError(minOutputLevel int, t string, s string, a ...interface{}) { @@ -149,7 +133,7 @@ func logError(minOutputLevel int, t string, s string, a ...interface{}) { errorsMapMutex.Unlock() if outputLevel >= minOutputLevel { formatStr := fmt.Sprintf("%s: %s", t, s) - fmt.Printf(formatStr, a...) + log.Printf(formatStr, a...) } } func Error(t string, s string, a ...interface{}) { @@ -160,12 +144,12 @@ func SilentError(t string, s string, a ...interface{}) { } func Info(s string, a ...interface{}) { if outputLevel >= 1 { - fmt.Printf(s, a...) + log.Printf(s, a...) } } func Debug(s string, a ...interface{}) { if outputLevel >= 2 { - fmt.Printf(s, a...) + log.Printf(s, a...) } } @@ -187,9 +171,8 @@ func inArrayString(arr []string, valueToCheck string) bool { return false } -/* - * The assembler context - */ +// Context +// The assembler context type Context struct { CaptureInfo gopacket.CaptureInfo } @@ -198,22 +181,27 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo { return c.CaptureInfo } -func StartPassiveTapper() <-chan *OutputChannelItem { +func StartPassiveTapper(opts *TapOpts) (<-chan *OutputChannelItem, <-chan *OutboundLink) { + hostMode = opts.HostMode + var harWriter *HarWriter if *dumpToHar { harWriter = NewHarWriter(*HarOutputDir, *harEntriesPerFile) } + outboundLinkWriter := NewOutboundLinkWriter() - go startPassiveTapper(harWriter) + go startPassiveTapper(harWriter, outboundLinkWriter) if harWriter != nil { - return harWriter.OutChan + return harWriter.OutChan, outboundLinkWriter.OutChan } - return nil + return nil, outboundLinkWriter.OutChan } -func startPassiveTapper(harWriter *HarWriter) { +func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWriter) { + log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile) + defer util.Run()() if *debug { outputLevel = 2 @@ -226,68 +214,43 @@ func startPassiveTapper(harWriter *HarWriter) { if localhostIPs, err := getLocalhostIPs(); err != nil { // TODO: think this over - fmt.Println("Failed to get self IP addresses") - Error("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)\n", err, err, err) + log.Println("Failed to get self IP addresses") + Error("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err) ownIps = make([]string, 0) } else { ownIps = localhostIPs } appPortsStr := os.Getenv(AppPortsEnvVar) + var appPorts []int if appPortsStr == "" { - fmt.Println("Received empty/no APP_PORTS env var! only listening to http on port 80!") + log.Println("Received empty/no APP_PORTS env var! only listening to http on port 80!") appPorts = make([]int, 0) } else { appPorts = parseAppPorts(appPortsStr) } - tapOutputPort := os.Getenv(OutPortEnvVar) - if tapOutputPort == "" { - fmt.Println("Received empty/no WEB_SOCKET_PORT env var! falling back to port 8080") - tapOutputPort = "8080" - } + SetFilterPorts(appPorts) envVal := os.Getenv(maxHTTP2DataLenEnvVar) if envVal == "" { - fmt.Println("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault) + log.Println("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault) maxHTTP2DataLen = maxHTTP2DataLenDefault } else { if convertedInt, err := strconv.Atoi(envVal); err != nil { - fmt.Println("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault) + log.Println("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault) maxHTTP2DataLen = maxHTTP2DataLenDefault } else { - fmt.Println("Received HTTP2_DATA_SIZE_LIMIT env var:", maxHTTP2DataLenDefault) + log.Println("Received HTTP2_DATA_SIZE_LIMIT env var:", maxHTTP2DataLenDefault) maxHTTP2DataLen = convertedInt } } - hostMode = os.Getenv(shared.HostModeEnvVar) == "1" - fmt.Printf("App Ports: %v\n", appPorts) - fmt.Printf("Tap output websocket port: %s\n", tapOutputPort) - - var onCollectorMessage = func(message []byte) { - var parsedMessage CollectorMessage - err := json.Unmarshal(message, &parsedMessage) - if err == nil { - - if parsedMessage.MessageType == "setPorts" { - Debug("Got message from collector. Type: %s, Ports: %v\n", parsedMessage.MessageType, parsedMessage.Ports) - appPorts = *parsedMessage.Ports - } else if parsedMessage.MessageType == "setAddresses" { - Debug("Got message from collector. Type: %s, IPs: %v\n", parsedMessage.MessageType, parsedMessage.Addresses) - HostAppAddresses = *parsedMessage.Addresses - Info("Filtering for the following addresses: %s\n", HostAppAddresses) - } - } else { - Error("Collector-Message-Parsing", "Error parsing message from collector: %s (%v,%+v)\n", err, err, err) - } - } - - go startOutputServer(tapOutputPort, onCollectorMessage) + log.Printf("App Ports: %v", gSettings.filterPorts) var handle *pcap.Handle var err error if *fname != "" { if handle, err = pcap.OpenOffline(*fname); err != nil { - log.Fatal("PCAP OpenOffline error:", err) + log.Fatalf("PCAP OpenOffline error: %v", err) } } else { // This is a little complicated because we want to allow all possible options @@ -313,15 +276,15 @@ func startPassiveTapper(harWriter *HarWriter) { } } if handle, err = inactive.Activate(); err != nil { - log.Fatal("PCAP Activate error:", err) + log.Fatalf("PCAP Activate error: %v", err) } defer handle.Close() } if len(flag.Args()) > 0 { bpffilter := strings.Join(flag.Args(), " ") - Info("Using BPF filter %q\n", bpffilter) + Info("Using BPF filter %q", bpffilter) if err = handle.SetBPFFilter(bpffilter); err != nil { - log.Fatal("BPF filter error:", err) + log.Fatalf("BPF filter error: %v", err) } } @@ -329,6 +292,7 @@ func startPassiveTapper(harWriter *HarWriter) { harWriter.Start() defer harWriter.Stop() } + defer outboundLinkWriter.Stop() var dec gopacket.Decoder var ok bool @@ -342,13 +306,18 @@ func startPassiveTapper(harWriter *HarWriter) { source := gopacket.NewPacketSource(handle, dec) source.Lazy = *lazy source.NoCopy = true - Info("Starting to read packets\n") + Info("Starting to read packets") count := 0 bytes := int64(0) start := time.Now() defragger := ip4defrag.NewIPv4Defragmenter() - streamFactory := &tcpStreamFactory{doHTTP: !*nohttp, harWriter: harWriter} + streamFactory := &tcpStreamFactory{ + doHTTP: !*nohttp, + harWriter: harWriter, + outbountLinkWriter: outboundLinkWriter, + + } streamPool := reassembly.NewStreamPool(streamFactory) assembler := reassembly.NewAssembler(streamPool) var assemblerMutex sync.Mutex @@ -378,7 +347,7 @@ func startPassiveTapper(harWriter *HarWriter) { errorMapLen := len(errorsMap) errorsSummery := fmt.Sprintf("%v", errorsMap) errorsMapMutex.Unlock() - fmt.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)\nErrors Summary: %s\n", + log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v) - Errors Summary: %s", count, bytes, time.Since(start), @@ -390,8 +359,8 @@ func startPassiveTapper(harWriter *HarWriter) { // At this moment memStats := runtime.MemStats{} runtime.ReadMemStats(&memStats) - fmt.Printf( - "mem: %d, goroutines: %d, unmatched messages: %d\n", + log.Printf( + "mem: %d, goroutines: %d, unmatched messages: %d", memStats.HeapAlloc, runtime.NumGoroutine(), reqResMatcher.openMessagesMap.Count(), @@ -400,8 +369,8 @@ func startPassiveTapper(harWriter *HarWriter) { // Since the last print cleanStats := cleaner.dumpStats() appStats := statsTracker.dumpStats() - fmt.Printf( - "flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d\n", + log.Printf( + "flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d", cleanStats.flushed, cleanStats.closed, cleanStats.deleted, @@ -412,11 +381,11 @@ func startPassiveTapper(harWriter *HarWriter) { for packet := range source.Packets() { count++ - Debug("PACKET #%d\n", count) + Debug("PACKET #%d", count) data := packet.Data() bytes += int64(len(data)) if *hexdumppkt { - Debug("Packet content (%d/0x%x)\n%s\n", len(data), len(data), hex.Dump(data)) + Debug("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data)) } // defrag the IPv4 packet if required @@ -431,18 +400,18 @@ func startPassiveTapper(harWriter *HarWriter) { if err != nil { log.Fatalln("Error while de-fragmenting", err) } else if newip4 == nil { - Debug("Fragment...\n") + Debug("Fragment...") continue // packet fragment, we don't have whole packet yet. } if newip4.Length != l { stats.ipdefrag++ - Debug("Decoding re-assembled packet: %s\n", newip4.NextLayerType()) + Debug("Decoding re-assembled packet: %s", newip4.NextLayerType()) pb, ok := packet.(gopacket.PacketBuilder) if !ok { - panic("Not a PacketBuilder") + log.Panic("Not a PacketBuilder") } nextDecoder := newip4.NextLayerType() - nextDecoder.Decode(newip4.Payload, pb) + _ = nextDecoder.Decode(newip4.Payload, pb) } } @@ -459,7 +428,7 @@ func startPassiveTapper(harWriter *HarWriter) { CaptureInfo: packet.Metadata().CaptureInfo, } stats.totalsz += len(tcp.Payload) - //fmt.Println(packet.NetworkLayer().NetworkFlow().Src(), ":", tcp.SrcPort, " -> ", packet.NetworkLayer().NetworkFlow().Dst(), ":", tcp.DstPort) + // log.Println(packet.NetworkLayer().NetworkFlow().Src(), ":", tcp.SrcPort, " -> ", packet.NetworkLayer().NetworkFlow().Dst(), ":", tcp.DstPort) assemblerMutex.Lock() assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c) assemblerMutex.Unlock() @@ -470,11 +439,11 @@ func startPassiveTapper(harWriter *HarWriter) { errorsMapMutex.Lock() errorMapLen := len(errorsMap) errorsMapMutex.Unlock() - fmt.Fprintf(os.Stderr, "Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)\n", count, bytes, time.Since(start), nErrors, errorMapLen) + log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", count, bytes, time.Since(start), nErrors, errorMapLen) } select { case <-signalChan: - fmt.Fprintf(os.Stderr, "\nCaught SIGINT: aborting\n") + log.Printf("Caught SIGINT: aborting") done = true default: // NOP: continue @@ -497,34 +466,34 @@ func startPassiveTapper(harWriter *HarWriter) { if err != nil { log.Fatal(err) } - pprof.WriteHeapProfile(f) - f.Close() + _ = pprof.WriteHeapProfile(f) + _ = f.Close() } streamFactory.WaitGoRoutines() assemblerMutex.Lock() - Debug("%s\n", assembler.Dump()) + Debug("%s", assembler.Dump()) assemblerMutex.Unlock() if !*nodefrag { - fmt.Printf("IPdefrag:\t\t%d\n", stats.ipdefrag) + log.Printf("IPdefrag:\t\t%d", stats.ipdefrag) } - fmt.Printf("TCP stats:\n") - fmt.Printf(" missed bytes:\t\t%d\n", stats.missedBytes) - fmt.Printf(" total packets:\t\t%d\n", stats.pkt) - fmt.Printf(" rejected FSM:\t\t%d\n", stats.rejectFsm) - fmt.Printf(" rejected Options:\t%d\n", stats.rejectOpt) - fmt.Printf(" reassembled bytes:\t%d\n", stats.sz) - fmt.Printf(" total TCP bytes:\t%d\n", stats.totalsz) - fmt.Printf(" conn rejected FSM:\t%d\n", stats.rejectConnFsm) - fmt.Printf(" reassembled chunks:\t%d\n", stats.reassembled) - fmt.Printf(" out-of-order packets:\t%d\n", stats.outOfOrderPackets) - fmt.Printf(" out-of-order bytes:\t%d\n", stats.outOfOrderBytes) - fmt.Printf(" biggest-chunk packets:\t%d\n", stats.biggestChunkPackets) - fmt.Printf(" biggest-chunk bytes:\t%d\n", stats.biggestChunkBytes) - fmt.Printf(" overlap packets:\t%d\n", stats.overlapPackets) - fmt.Printf(" overlap bytes:\t\t%d\n", stats.overlapBytes) - fmt.Printf("Errors: %d\n", nErrors) + log.Printf("TCP stats:") + log.Printf(" missed bytes:\t\t%d", stats.missedBytes) + log.Printf(" total packets:\t\t%d", stats.pkt) + log.Printf(" rejected FSM:\t\t%d", stats.rejectFsm) + log.Printf(" rejected Options:\t%d", stats.rejectOpt) + log.Printf(" reassembled bytes:\t%d", stats.sz) + log.Printf(" total TCP bytes:\t%d", stats.totalsz) + log.Printf(" conn rejected FSM:\t%d", stats.rejectConnFsm) + log.Printf(" reassembled chunks:\t%d", stats.reassembled) + log.Printf(" out-of-order packets:\t%d", stats.outOfOrderPackets) + log.Printf(" out-of-order bytes:\t%d", stats.outOfOrderBytes) + log.Printf(" biggest-chunk packets:\t%d", stats.biggestChunkPackets) + log.Printf(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes) + log.Printf(" overlap packets:\t%d", stats.overlapPackets) + log.Printf(" overlap bytes:\t\t%d", stats.overlapBytes) + log.Printf("Errors: %d", nErrors) for e := range errorsMap { - fmt.Printf(" %s:\t\t%d\n", e, errorsMap[e]) + log.Printf(" %s:\t\t%d", e, errorsMap[e]) } } diff --git a/tap/settings.go b/tap/settings.go new file mode 100644 index 000000000..cf89dd345 --- /dev/null +++ b/tap/settings.go @@ -0,0 +1,31 @@ +package tap + +type globalSettings struct { + filterPorts []int + filterAuthorities []string +} + +var gSettings = &globalSettings{ + filterPorts: []int{}, + filterAuthorities: []string{}, +} + +func SetFilterPorts(ports []int) { + gSettings.filterPorts = ports +} + +func GetFilterPorts() []int { + ports := make([]int, len(gSettings.filterPorts)) + copy(ports, gSettings.filterPorts) + return ports +} + +func SetFilterAuthorities(ipAddresses []string) { + gSettings.filterAuthorities = ipAddresses +} + +func GetFilterIPs() []string { + addresses := make([]string, len(gSettings.filterAuthorities)) + copy(addresses, gSettings.filterAuthorities) + return addresses +} diff --git a/api/pkg/tap/stats_tracker.go b/tap/stats_tracker.go similarity index 100% rename from api/pkg/tap/stats_tracker.go rename to tap/stats_tracker.go diff --git a/api/pkg/tap/tcp_stream.go b/tap/tcp_stream.go similarity index 83% rename from api/pkg/tap/tcp_stream.go rename to tap/tcp_stream.go index 7e96e9ed5..db5fb59ee 100644 --- a/api/pkg/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -34,7 +34,7 @@ type tcpStream struct { func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool { // FSM if !t.tcpstate.CheckState(tcp, dir) { - //SilentError("FSM", "%s: Packet rejected by FSM (state:%s)\n", t.ident, t.tcpstate.String()) + SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String()) stats.rejectFsm++ if !t.fsmerr { t.fsmerr = true @@ -47,7 +47,7 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem // Options err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start) if err != nil { - //SilentError("OptionChecker", "%s: Packet rejected by OptionChecker: %s\n", t.ident, err) + SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err) stats.rejectOpt++ if !*nooptcheck { return false @@ -58,10 +58,10 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem if *checksum { c, err := tcp.ComputeChecksum() if err != nil { - SilentError("ChecksumCompute", "%s: Got error computing checksum: %s\n", t.ident, err) + SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err) accept = false } else if c != 0x0 { - SilentError("Checksum", "%s: Invalid checksum: 0x%x\n", t.ident, c) + SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c) accept = false } } @@ -95,7 +95,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 { // In the original example this was handled with panic(). // I don't know what this error means or how to handle it properly. - SilentError("Invalid-Overlap", "bytes:%d, pkts:%d\n", sgStats.OverlapBytes, sgStats.OverlapPackets) + SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets) } stats.overlapBytes += sgStats.OverlapBytes stats.overlapPackets += sgStats.OverlapPackets @@ -106,7 +106,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } else { ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir) } - Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)\n", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets) + Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets) if skip == -1 && *allowmissinginit { // this is allowed } else if skip != 0 { @@ -125,18 +125,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } dnsSize := binary.BigEndian.Uint16(data[:2]) missing := int(dnsSize) - len(data[2:]) - Debug("dnsSize: %d, missing: %d\n", dnsSize, missing) + Debug("dnsSize: %d, missing: %d", dnsSize, missing) if missing > 0 { - Info("Missing some bytes: %d\n", missing) + Info("Missing some bytes: %d", missing) sg.KeepFrom(0) return } p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns) err := p.DecodeLayers(data[2:], &decoded) if err != nil { - SilentError("DNS-parser", "Failed to decode DNS: %v\n", err) + SilentError("DNS-parser", "Failed to decode DNS: %v", err) } else { - Debug("DNS: %s\n", gopacket.LayerDump(dns)) + Debug("DNS: %s", gopacket.LayerDump(dns)) } if len(data) > 2+int(dnsSize) { sg.KeepFrom(2 + int(dnsSize)) @@ -144,7 +144,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } else if t.isHTTP { if length > 0 { if *hexdump { - Debug("Feeding http with:\n%s", hex.Dump(data)) + Debug("Feeding http with:%s", hex.Dump(data)) } // This is where we pass the reassembled information onwards // This channel is read by an httpReader object @@ -158,7 +158,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { - Debug("%s: Connection closed\n", t.ident) + Debug("%s: Connection closed", t.ident) if t.isHTTP { close(t.client.msgQueue) close(t.server.msgQueue) diff --git a/api/pkg/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go similarity index 82% rename from api/pkg/tap/tcp_stream_factory.go rename to tap/tcp_stream_factory.go index 23bda51bd..6808cc13e 100644 --- a/api/pkg/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -15,22 +15,23 @@ import ( * Generates a new tcp stream for each new tcp connection. Closes the stream when the connection closes. */ type tcpStreamFactory struct { - wg sync.WaitGroup - doHTTP bool - harWriter *HarWriter + wg sync.WaitGroup + doHTTP bool + harWriter *HarWriter + outbountLinkWriter *OutboundLinkWriter } func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream { - Debug("* NEW: %s %s\n", net, transport) + Debug("* NEW: %s %s", net, transport) fsmOptions := reassembly.TCPSimpleFSMOptions{ SupportMissingEstablishment: *allowmissinginit, } - Debug("Current App Ports: %v\n", appPorts) + Debug("Current App Ports: %v", gSettings.filterPorts) dstIp := net.Dst().String() dstPort := int(tcp.DstPort) if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) { - broadcastOutboundLink(net.Src().String(), dstIp, dstPort) + factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort) } isHTTP := factory.shouldTap(dstIp, dstPort) stream := &tcpStream{ @@ -85,14 +86,14 @@ func (factory *tcpStreamFactory) WaitGoRoutines() { func (factory *tcpStreamFactory) shouldTap(dstIP string, dstPort int) bool { if hostMode { - if inArrayString(HostAppAddresses, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true { + if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true { return true - } else if inArrayString(HostAppAddresses, dstIP) == true { + } else if inArrayString(gSettings.filterAuthorities, dstIP) == true { return true } return false } else { - isTappedPort := dstPort == 80 || (appPorts != nil && (inArrayInt(appPorts, dstPort))) + isTappedPort := dstPort == 80 || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort))) if !isTappedPort { return false }