Add an indicator for the eBPF sourced entries (#886)

* Define `Capture` type and expect it as an argument in `Dissect` method

* Add an indicator for the  eBPF sourced entries

* Fix the Go lint error

* Fix the logic in the UI

* Update the expected JSONs

* Add TODOs

* Add `UndefinedCapture` constant

* Define `CaptureTypes` enum
This commit is contained in:
M. Mert Yıldıran 2022-03-16 23:32:09 -07:00 committed by GitHub
parent 237002ef29
commit 5455220a3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 103 additions and 43 deletions

View File

@ -48,6 +48,16 @@ type Extension struct {
Dissector Dissector
}
type Capture string
const (
UndefinedCapture Capture = ""
Pcap Capture = "pcap"
Envoy Capture = "envoy"
Linkerd Capture = "linkerd"
Ebpf Capture = "ebpf"
)
type ConnectionInfo struct {
ClientIP string
ClientPort string
@ -84,6 +94,7 @@ type RequestResponsePair struct {
// `Protocol` is modified in the later stages of data propagation. Therefore it's not a pointer.
type OutputChannelItem struct {
Protocol Protocol
Capture Capture
Timestamp int64
ConnectionInfo *ConnectionInfo
Pair *RequestResponsePair
@ -102,7 +113,7 @@ type SuperIdentifier struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Dissect(b *bufio.Reader, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Summarize(entry *Entry) *BaseEntry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
@ -132,6 +143,7 @@ func (e *Emitting) Emit(item *OutputChannelItem) {
type Entry struct {
Id uint `json:"id"`
Protocol Protocol `json:"proto"`
Capture Capture `json:"capture"`
Source *TCP `json:"src"`
Destination *TCP `json:"dst"`
Namespace string `json:"namespace,omitempty"`
@ -162,6 +174,7 @@ type EntryWrapper struct {
type BaseEntry struct {
Id uint `json:"id"`
Protocol Protocol `json:"proto,omitempty"`
Capture Capture `json:"capture"`
Summary string `json:"summary,omitempty"`
SummaryQuery string `json:"summaryQuery,omitempty"`
Status int `json:"status"`

View File

@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/amqp/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/amqp/\* expect

View File

@ -94,7 +94,7 @@ type AMQPWrapper struct {
Details interface{} `json:"details"`
}
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter) {
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter, capture api.Capture) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
@ -108,6 +108,7 @@ func emitAMQP(event interface{}, _type string, method string, connectionInfo *ap
}
item := &api.OutputChannelItem{
Protocol: protocol,
Capture: capture,
Timestamp: captureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{

View File

@ -39,7 +39,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
r := AmqpReader{b}
var remaining int
@ -113,11 +113,11 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
case *BasicPublish:
eventBasicPublish.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter, capture)
}
case *MethodFrame:
@ -138,7 +138,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
@ -151,7 +151,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
@ -171,7 +171,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
@ -185,7 +185,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
@ -196,7 +196,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Locales: m.Locales,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
@ -206,7 +206,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
MethodId: m.MethodId,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter, capture)
}
default:
@ -222,6 +222,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails["method"] = request["method"]
return &api.Entry{
Protocol: protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@ -283,6 +284,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: 0,

View File

@ -122,7 +122,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@ -140,7 +140,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}

View File

@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/http/\* expect

View File

@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = ""
}
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
@ -104,13 +104,14 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
} else {
item.Protocol = http2Protocol
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
func handleHTTP1ClientStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return
@ -147,12 +148,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
func handleHTTP1ServerStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
@ -190,6 +192,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
ServerPort: tcpID.SrcPort,
IsOutgoing: false,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return

View File

@ -86,7 +86,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
var err error
@ -121,7 +121,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher)
err = handleHTTP2Stream(http2Assembler, capture, tcpID, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@ -130,7 +130,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
superIdentifier.Protocol = &http11protocol
} else if isClient {
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@ -157,11 +157,12 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
}
} else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@ -259,6 +260,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
httpPair, _ := json.Marshal(item.Pair)
return &api.Entry{
Protocol: item.Protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@ -291,6 +293,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,

View File

@ -124,7 +124,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@ -142,7 +142,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}

View File

@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/kafka/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/kafka/\* expect

View File

@ -35,7 +35,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
@ -49,7 +49,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher)
err := ReadResponse(b, capture, tcpID, counterPair, superTimer, emitter, reqResMatcher)
if err != nil {
return err
}
@ -68,6 +68,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
return &api.Entry{
Protocol: _protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@ -190,6 +191,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,

View File

@ -123,7 +123,7 @@ func TestDissect(t *testing.T) {
}
reqResMatcher := dissector.NewResponseRequestMatcher()
reqResMatcher.SetMaxTry(10)
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@ -141,7 +141,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}

View File

@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@ -258,6 +258,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
item := &api.OutputChannelItem{
Protocol: _protocol,
Capture: capture,
Timestamp: reqResPair.Request.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{

View File

@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/redis/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/redis/\* expect

View File

@ -6,7 +6,7 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
func handleClientStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
@ -23,6 +23,7 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
if item != nil {
item.Capture = capture
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
@ -35,7 +36,7 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
return nil
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
func handleServerStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
@ -52,6 +53,7 @@ func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
if item != nil {
item.Capture = capture
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
ClientPort: tcpID.DstPort,

View File

@ -34,7 +34,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{
Reader: b,
@ -48,9 +48,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isClient {
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
err = handleClientStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} else {
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
err = handleServerStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
}
if err != nil {
@ -71,6 +71,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
return &api.Entry{
Protocol: protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@ -113,6 +114,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,

View File

@ -123,7 +123,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@ -141,7 +141,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}

View File

@ -95,7 +95,8 @@ func (h *tcpReader) Close() {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
// TODO: Add api.Pcap, api.Envoy and api.Linkerd distinction by refactoring NewPacketSourceManager method
err := h.extension.Dissector.Dissect(b, api.Pcap, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
if err != nil {
_, err = io.Copy(ioutil.Discard, b)
if err != nil {

View File

@ -158,7 +158,7 @@ func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid
emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) {
b := bufio.NewReader(reader)
err := extension.Dissector.Dissect(b, isRequest, tcpid, &api.CounterPair{},
err := extension.Dissector.Dissect(b, api.Ebpf, isRequest, tcpid, &api.CounterPair{},
&api.SuperTimer{}, &api.SuperIdentifier{}, emitter, options, reqResMatcher)
if err != nil {

View File

@ -62,6 +62,13 @@
width: 185px
text-align: left
.capture
margin-top: -60px
.capture img
height: 20px
z-index: 1000
.endpointServiceContainer
display: flex
flex-direction: column

View File

@ -4,6 +4,7 @@ import SwapHorizIcon from '@material-ui/icons/SwapHoriz';
import styles from './EntryListItem.module.sass';
import StatusCode, {getClassification, StatusCodeClassification} from "../../UI/StatusCode";
import Protocol, {ProtocolInterface} from "../../UI/Protocol"
import eBPFLogo from '../assets/ebpf.png';
import {Summary} from "../../UI/Summary";
import Queryable from "../../UI/Queryable";
import ingoingIconSuccess from "assets/ingoing-traffic-success.svg"
@ -24,6 +25,7 @@ interface TCPInterface {
interface Entry {
proto: ProtocolInterface,
capture: string,
method?: string,
methodQuery?: string,
summary: string,
@ -52,6 +54,14 @@ interface EntryProps {
headingMode: boolean;
}
enum CaptureTypes {
UndefinedCapture = "",
Pcap = "pcap",
Envoy = "envoy",
Linkerd = "linkerd",
Ebpf = "ebpf",
}
export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) => {
const [focusedEntryId, setFocusedEntryId] = useRecoilState(focusedEntryIdAtom);
@ -154,6 +164,17 @@ export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) =>
protocol={entry.proto}
horizontal={false}
/> : null}
{/* TODO: Update the code below once we have api.Pcap, api.Envoy and api.Linkerd distinction in the backend */}
{entry.capture === CaptureTypes.Ebpf ? <div className={styles.capture}>
<Queryable
query={`capture == "${entry.capture}"`}
displayIconOnMouseOver={true}
flipped={false}
style={{position: "absolute"}}
>
<img src={eBPFLogo} alt="eBPF"/>
</Queryable>
</div> : null}
{isStatusCodeEnabled && <div>
<StatusCode statusCode={entry.status} statusQuery={entry.statusQuery}/>
</div>}

View File

@ -46,6 +46,7 @@ const Protocol: React.FC<ProtocolProps> = ({protocol, horizontal}) => {
displayIconOnMouseOver={true}
flipped={false}
iconStyle={{marginTop: "52px", marginRight: "10px", zIndex: 1000}}
tooltipStyle={{marginTop: "-22px", zIndex: 1001}}
>
<span
className={`${styles.base} ${styles.vertical}`}

View File

@ -11,11 +11,12 @@ interface Props {
iconStyle?: object,
className?: string,
useTooltip?: boolean,
tooltipStyle?: object,
displayIconOnMouseOver?: boolean,
flipped?: boolean,
}
const Queryable: React.FC<Props> = ({query, style, iconStyle, className, useTooltip= true, displayIconOnMouseOver = false, flipped = false, children}) => {
const Queryable: React.FC<Props> = ({query, style, iconStyle, className, useTooltip = true, tooltipStyle = null, displayIconOnMouseOver = false, flipped = false, children}) => {
const [showAddedNotification, setAdded] = useState(false);
const [showTooltip, setShowTooltip] = useState(false);
const [queryState, setQuery] = useRecoilState(queryAtom);
@ -48,12 +49,12 @@ const Queryable: React.FC<Props> = ({query, style, iconStyle, className, useTool
</CopyToClipboard> : null;
return (
<div className={`${QueryableStyle.QueryableContainer} ${QueryableStyle.displayIconOnMouseOver} ${className ? className : ''} ${displayIconOnMouseOver ? QueryableStyle.displayIconOnMouseOver : ''}`}
<div className={`${QueryableStyle.QueryableContainer} ${QueryableStyle.displayIconOnMouseOver} ${className ? className : ''} ${displayIconOnMouseOver ? QueryableStyle.displayIconOnMouseOver : ''}`}
style={style} onMouseOver={ e => setShowTooltip(true)} onMouseLeave={ e => setShowTooltip(false)}>
{flipped && addButton}
{children}
{!flipped && addButton}
{useTooltip && showTooltip && <span className={QueryableStyle.QueryableTooltip}>{query}</span>}
{useTooltip && showTooltip && <span className={QueryableStyle.QueryableTooltip} style={tooltipStyle}>{query}</span>}
</div>
);
};

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB