diff --git a/tap/api/api.go b/tap/api/api.go index 2cf6807f9..1b2062041 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -48,6 +48,16 @@ type Extension struct { Dissector Dissector } +type Capture string + +const ( + UndefinedCapture Capture = "" + Pcap Capture = "pcap" + Envoy Capture = "envoy" + Linkerd Capture = "linkerd" + Ebpf Capture = "ebpf" +) + type ConnectionInfo struct { ClientIP string ClientPort string @@ -84,6 +94,7 @@ type RequestResponsePair struct { // `Protocol` is modified in the later stages of data propagation. Therefore it's not a pointer. type OutputChannelItem struct { Protocol Protocol + Capture Capture Timestamp int64 ConnectionInfo *ConnectionInfo Pair *RequestResponsePair @@ -102,7 +113,7 @@ type SuperIdentifier struct { type Dissector interface { Register(*Extension) Ping() - Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error + Dissect(b *bufio.Reader, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry Summarize(entry *Entry) *BaseEntry Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) @@ -132,6 +143,7 @@ func (e *Emitting) Emit(item *OutputChannelItem) { type Entry struct { Id uint `json:"id"` Protocol Protocol `json:"proto"` + Capture Capture `json:"capture"` Source *TCP `json:"src"` Destination *TCP `json:"dst"` Namespace string `json:"namespace,omitempty"` @@ -162,6 +174,7 @@ type EntryWrapper struct { type BaseEntry struct { Id uint `json:"id"` Protocol Protocol `json:"proto,omitempty"` + Capture Capture `json:"capture"` Summary string `json:"summary,omitempty"` SummaryQuery string `json:"summaryQuery,omitempty"` Status int `json:"status"` diff --git a/tap/extensions/amqp/Makefile b/tap/extensions/amqp/Makefile index f607555f2..f3b1737fe 100644 --- a/tap/extensions/amqp/Makefile +++ b/tap/extensions/amqp/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/amqp/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/amqp/\* expect diff --git a/tap/extensions/amqp/helpers.go b/tap/extensions/amqp/helpers.go index eb5fb0184..aeafe9990 100644 --- a/tap/extensions/amqp/helpers.go +++ b/tap/extensions/amqp/helpers.go @@ -94,7 +94,7 @@ type AMQPWrapper struct { Details interface{} `json:"details"` } -func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter) { +func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter, capture api.Capture) { request := &api.GenericMessage{ IsRequest: true, CaptureTime: captureTime, @@ -108,6 +108,7 @@ func emitAMQP(event interface{}, _type string, method string, connectionInfo *ap } item := &api.OutputChannelItem{ Protocol: protocol, + Capture: capture, Timestamp: captureTime.UnixNano() / int64(time.Millisecond), ConnectionInfo: connectionInfo, Pair: &api.RequestResponsePair{ diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 31707fafc..f7f378f4d 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -39,7 +39,7 @@ func (d dissecting) Ping() { const amqpRequest string = "amqp_request" -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { +func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { r := AmqpReader{b} var remaining int @@ -113,11 +113,11 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co case *BasicPublish: eventBasicPublish.Body = f.Body superIdentifier.Protocol = &protocol - emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter) + emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter, capture) case *BasicDeliver: eventBasicDeliver.Body = f.Body superIdentifier.Protocol = &protocol - emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter) + emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter, capture) } case *MethodFrame: @@ -138,7 +138,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co Arguments: m.Arguments, } superIdentifier.Protocol = &protocol - emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter) + emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture) case *BasicConsume: eventBasicConsume := &BasicConsume{ @@ -151,7 +151,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co Arguments: m.Arguments, } superIdentifier.Protocol = &protocol - emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter) + emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture) case *BasicDeliver: eventBasicDeliver.ConsumerTag = m.ConsumerTag @@ -171,7 +171,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co Arguments: m.Arguments, } superIdentifier.Protocol = &protocol - emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter) + emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture) case *ExchangeDeclare: eventExchangeDeclare := &ExchangeDeclare{ @@ -185,7 +185,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co Arguments: m.Arguments, } superIdentifier.Protocol = &protocol - emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter) + emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture) case *ConnectionStart: eventConnectionStart := &ConnectionStart{ @@ -196,7 +196,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co Locales: m.Locales, } superIdentifier.Protocol = &protocol - emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter) + emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture) case *ConnectionClose: eventConnectionClose := &ConnectionClose{ @@ -206,7 +206,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co MethodId: m.MethodId, } superIdentifier.Protocol = &protocol - emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter) + emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter, capture) } default: @@ -222,6 +222,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, reqDetails["method"] = request["method"] return &api.Entry{ Protocol: protocol, + Capture: item.Capture, Source: &api.TCP{ Name: resolvedSource, IP: item.ConnectionInfo.ClientIP, @@ -283,6 +284,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { return &api.BaseEntry{ Id: entry.Id, Protocol: entry.Protocol, + Capture: entry.Capture, Summary: summary, SummaryQuery: summaryQuery, Status: 0, diff --git a/tap/extensions/amqp/main_test.go b/tap/extensions/amqp/main_test.go index b96cb82a8..d80d8eadc 100644 --- a/tap/extensions/amqp/main_test.go +++ b/tap/extensions/amqp/main_test.go @@ -122,7 +122,7 @@ func TestDissect(t *testing.T) { DstPort: "2", } reqResMatcher := dissector.NewResponseRequestMatcher() - err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } @@ -140,7 +140,7 @@ func TestDissect(t *testing.T) { SrcPort: "2", DstPort: "1", } - err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } diff --git a/tap/extensions/http/Makefile b/tap/extensions/http/Makefile index 7ef4fbaf6..9b39f6a83 100644 --- a/tap/extensions/http/Makefile +++ b/tap/extensions/http/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/http/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/http/\* expect diff --git a/tap/extensions/http/handlers.go b/tap/extensions/http/handlers.go index 7ffa5fee5..433b7970e 100644 --- a/tap/extensions/http/handlers.go +++ b/tap/extensions/http/handlers.go @@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) { item.ConnectionInfo.ClientPort = "" } -func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error { +func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error { streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage() if err != nil { return err @@ -104,13 +104,14 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi } else { item.Protocol = http2Protocol } + item.Capture = capture filterAndEmit(item, emitter, options) } return nil } -func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { +func handleHTTP1ClientStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { req, err = http.ReadRequest(b) if err != nil { return @@ -147,12 +148,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api ServerPort: tcpID.DstPort, IsOutgoing: true, } + item.Capture = capture filterAndEmit(item, emitter, options) } return } -func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) { +func handleHTTP1ServerStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) { var res *http.Response res, err = http.ReadResponse(b, nil) if err != nil { @@ -190,6 +192,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api ServerPort: tcpID.SrcPort, IsOutgoing: false, } + item.Capture = capture filterAndEmit(item, emitter, options) } return diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 07394763a..b518c31d0 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -86,7 +86,7 @@ func (d dissecting) Ping() { log.Printf("pong %s", http11protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { +func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { reqResMatcher := _reqResMatcher.(*requestResponseMatcher) var err error @@ -121,7 +121,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } if isHTTP2 { - err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher) + err = handleHTTP2Stream(http2Assembler, capture, tcpID, superTimer, emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -130,7 +130,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co superIdentifier.Protocol = &http11protocol } else if isClient { var req *http.Request - switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) + switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -157,11 +157,12 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co ServerPort: tcpID.DstPort, IsOutgoing: true, } + item.Capture = capture filterAndEmit(item, emitter, options) } } } else { - switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) + switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { @@ -259,6 +260,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, httpPair, _ := json.Marshal(item.Pair) return &api.Entry{ Protocol: item.Protocol, + Capture: item.Capture, Source: &api.TCP{ Name: resolvedSource, IP: item.ConnectionInfo.ClientIP, @@ -291,6 +293,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { return &api.BaseEntry{ Id: entry.Id, Protocol: entry.Protocol, + Capture: entry.Capture, Summary: summary, SummaryQuery: summaryQuery, Status: status, diff --git a/tap/extensions/http/main_test.go b/tap/extensions/http/main_test.go index 2a3c4e4ec..57f97e12c 100644 --- a/tap/extensions/http/main_test.go +++ b/tap/extensions/http/main_test.go @@ -124,7 +124,7 @@ func TestDissect(t *testing.T) { DstPort: "2", } reqResMatcher := dissector.NewResponseRequestMatcher() - err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } @@ -142,7 +142,7 @@ func TestDissect(t *testing.T) { SrcPort: "2", DstPort: "1", } - err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { panic(err) } diff --git a/tap/extensions/kafka/Makefile b/tap/extensions/kafka/Makefile index 35ca65ae7..8c0cc240f 100644 --- a/tap/extensions/kafka/Makefile +++ b/tap/extensions/kafka/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/kafka/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/kafka/\* expect diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 405ecc65d..be43c6157 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -35,7 +35,7 @@ func (d dissecting) Ping() { log.Printf("pong %s", _protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { +func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { reqResMatcher := _reqResMatcher.(*requestResponseMatcher) for { if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol { @@ -49,7 +49,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } superIdentifier.Protocol = &_protocol } else { - err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher) + err := ReadResponse(b, capture, tcpID, counterPair, superTimer, emitter, reqResMatcher) if err != nil { return err } @@ -68,6 +68,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, } return &api.Entry{ Protocol: _protocol, + Capture: item.Capture, Source: &api.TCP{ Name: resolvedSource, IP: item.ConnectionInfo.ClientIP, @@ -190,6 +191,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { return &api.BaseEntry{ Id: entry.Id, Protocol: entry.Protocol, + Capture: entry.Capture, Summary: summary, SummaryQuery: summaryQuery, Status: status, diff --git a/tap/extensions/kafka/main_test.go b/tap/extensions/kafka/main_test.go index 4546123ec..a212b1a5f 100644 --- a/tap/extensions/kafka/main_test.go +++ b/tap/extensions/kafka/main_test.go @@ -123,7 +123,7 @@ func TestDissect(t *testing.T) { } reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher.SetMaxTry(10) - err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { log.Println(err) } @@ -141,7 +141,7 @@ func TestDissect(t *testing.T) { SrcPort: "2", DstPort: "1", } - err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { log.Println(err) } diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index 809889c39..a217de1d6 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -16,7 +16,7 @@ type Response struct { CaptureTime time.Time `json:"captureTime"` } -func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) { +func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) { d := &decoder{reader: r, remain: 4} size := d.readInt32() @@ -258,6 +258,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s item := &api.OutputChannelItem{ Protocol: _protocol, + Capture: capture, Timestamp: reqResPair.Request.CaptureTime.UnixNano() / int64(time.Millisecond), ConnectionInfo: connectionInfo, Pair: &api.RequestResponsePair{ diff --git a/tap/extensions/redis/Makefile b/tap/extensions/redis/Makefile index d8d30b999..02aae2cdf 100644 --- a/tap/extensions/redis/Makefile +++ b/tap/extensions/redis/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/redis/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/redis/\* expect diff --git a/tap/extensions/redis/handlers.go b/tap/extensions/redis/handlers.go index c4eb7985e..47c0dac0c 100644 --- a/tap/extensions/redis/handlers.go +++ b/tap/extensions/redis/handlers.go @@ -6,7 +6,7 @@ import ( "github.com/up9inc/mizu/tap/api" ) -func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error { +func handleClientStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error { counterPair.Lock() counterPair.Request++ requestCounter := counterPair.Request @@ -23,6 +23,7 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime) if item != nil { + item.Capture = capture item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: tcpID.SrcIP, ClientPort: tcpID.SrcPort, @@ -35,7 +36,7 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim return nil } -func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error { +func handleServerStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error { counterPair.Lock() counterPair.Response++ responseCounter := counterPair.Response @@ -52,6 +53,7 @@ func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime) if item != nil { + item.Capture = capture item.ConnectionInfo = &api.ConnectionInfo{ ClientIP: tcpID.DstIP, ClientPort: tcpID.DstPort, diff --git a/tap/extensions/redis/main.go b/tap/extensions/redis/main.go index fcb9eade2..73d4b24af 100644 --- a/tap/extensions/redis/main.go +++ b/tap/extensions/redis/main.go @@ -34,7 +34,7 @@ func (d dissecting) Ping() { log.Printf("pong %s", protocol.Name) } -func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { +func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { reqResMatcher := _reqResMatcher.(*requestResponseMatcher) is := &RedisInputStream{ Reader: b, @@ -48,9 +48,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } if isClient { - err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) + err = handleClientStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) } else { - err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) + err = handleServerStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) } if err != nil { @@ -71,6 +71,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, } return &api.Entry{ Protocol: protocol, + Capture: item.Capture, Source: &api.TCP{ Name: resolvedSource, IP: item.ConnectionInfo.ClientIP, @@ -113,6 +114,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { return &api.BaseEntry{ Id: entry.Id, Protocol: entry.Protocol, + Capture: entry.Capture, Summary: summary, SummaryQuery: summaryQuery, Status: status, diff --git a/tap/extensions/redis/main_test.go b/tap/extensions/redis/main_test.go index 27c60bb03..27cc1855f 100644 --- a/tap/extensions/redis/main_test.go +++ b/tap/extensions/redis/main_test.go @@ -123,7 +123,7 @@ func TestDissect(t *testing.T) { DstPort: "2", } reqResMatcher := dissector.NewResponseRequestMatcher() - err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF { log.Println(err) } @@ -141,7 +141,7 @@ func TestDissect(t *testing.T) { SrcPort: "2", DstPort: "1", } - err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF { log.Println(err) } diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index ceb94e98e..7cc803437 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -95,7 +95,8 @@ func (h *tcpReader) Close() { func (h *tcpReader) run(wg *sync.WaitGroup) { defer wg.Done() b := bufio.NewReader(h) - err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher) + // TODO: Add api.Pcap, api.Envoy and api.Linkerd distinction by refactoring NewPacketSourceManager method + err := h.extension.Dissector.Dissect(b, api.Pcap, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher) if err != nil { _, err = io.Copy(ioutil.Discard, b) if err != nil { diff --git a/tap/tlstapper/tls_poller.go b/tap/tlstapper/tls_poller.go index db162ae9d..dca435a6e 100644 --- a/tap/tlstapper/tls_poller.go +++ b/tap/tlstapper/tls_poller.go @@ -158,7 +158,7 @@ func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) { b := bufio.NewReader(reader) - err := extension.Dissector.Dissect(b, isRequest, tcpid, &api.CounterPair{}, + err := extension.Dissector.Dissect(b, api.Ebpf, isRequest, tcpid, &api.CounterPair{}, &api.SuperTimer{}, &api.SuperIdentifier{}, emitter, options, reqResMatcher) if err != nil { diff --git a/ui-common/src/components/TrafficViewer/EntryListItem/EntryListItem.module.sass b/ui-common/src/components/TrafficViewer/EntryListItem/EntryListItem.module.sass index fdb4c612e..334fa0033 100644 --- a/ui-common/src/components/TrafficViewer/EntryListItem/EntryListItem.module.sass +++ b/ui-common/src/components/TrafficViewer/EntryListItem/EntryListItem.module.sass @@ -62,6 +62,13 @@ width: 185px text-align: left +.capture + margin-top: -60px + +.capture img + height: 20px + z-index: 1000 + .endpointServiceContainer display: flex flex-direction: column diff --git a/ui-common/src/components/TrafficViewer/EntryListItem/EntryListItem.tsx b/ui-common/src/components/TrafficViewer/EntryListItem/EntryListItem.tsx index df08cf6fb..ce4cb0ce9 100644 --- a/ui-common/src/components/TrafficViewer/EntryListItem/EntryListItem.tsx +++ b/ui-common/src/components/TrafficViewer/EntryListItem/EntryListItem.tsx @@ -4,6 +4,7 @@ import SwapHorizIcon from '@material-ui/icons/SwapHoriz'; import styles from './EntryListItem.module.sass'; import StatusCode, {getClassification, StatusCodeClassification} from "../../UI/StatusCode"; import Protocol, {ProtocolInterface} from "../../UI/Protocol" +import eBPFLogo from '../assets/ebpf.png'; import {Summary} from "../../UI/Summary"; import Queryable from "../../UI/Queryable"; import ingoingIconSuccess from "assets/ingoing-traffic-success.svg" @@ -24,6 +25,7 @@ interface TCPInterface { interface Entry { proto: ProtocolInterface, + capture: string, method?: string, methodQuery?: string, summary: string, @@ -52,6 +54,14 @@ interface EntryProps { headingMode: boolean; } +enum CaptureTypes { + UndefinedCapture = "", + Pcap = "pcap", + Envoy = "envoy", + Linkerd = "linkerd", + Ebpf = "ebpf", +} + export const EntryItem: React.FC = ({entry, style, headingMode}) => { const [focusedEntryId, setFocusedEntryId] = useRecoilState(focusedEntryIdAtom); @@ -154,6 +164,17 @@ export const EntryItem: React.FC = ({entry, style, headingMode}) => protocol={entry.proto} horizontal={false} /> : null} + {/* TODO: Update the code below once we have api.Pcap, api.Envoy and api.Linkerd distinction in the backend */} + {entry.capture === CaptureTypes.Ebpf ?
+ + eBPF + +
: null} {isStatusCodeEnabled &&
} diff --git a/ui-common/src/components/UI/Protocol.tsx b/ui-common/src/components/UI/Protocol.tsx index c04f7f81c..9ce8715a8 100644 --- a/ui-common/src/components/UI/Protocol.tsx +++ b/ui-common/src/components/UI/Protocol.tsx @@ -46,6 +46,7 @@ const Protocol: React.FC = ({protocol, horizontal}) => { displayIconOnMouseOver={true} flipped={false} iconStyle={{marginTop: "52px", marginRight: "10px", zIndex: 1000}} + tooltipStyle={{marginTop: "-22px", zIndex: 1001}} > = ({query, style, iconStyle, className, useTooltip= true, displayIconOnMouseOver = false, flipped = false, children}) => { +const Queryable: React.FC = ({query, style, iconStyle, className, useTooltip = true, tooltipStyle = null, displayIconOnMouseOver = false, flipped = false, children}) => { const [showAddedNotification, setAdded] = useState(false); const [showTooltip, setShowTooltip] = useState(false); const [queryState, setQuery] = useRecoilState(queryAtom); @@ -48,12 +49,12 @@ const Queryable: React.FC = ({query, style, iconStyle, className, useTool : null; return ( -
setShowTooltip(true)} onMouseLeave={ e => setShowTooltip(false)}> {flipped && addButton} {children} {!flipped && addButton} - {useTooltip && showTooltip && {query}} + {useTooltip && showTooltip && {query}}
); }; diff --git a/ui/src/components/assets/ebpf.png b/ui/src/components/assets/ebpf.png new file mode 100644 index 000000000..53424467e Binary files /dev/null and b/ui/src/components/assets/ebpf.png differ