diff --git a/agent/pkg/api/entry_streamer_socket_connector.go b/agent/pkg/api/entry_streamer_socket_connector.go index 1f376c23d..a8923cd34 100644 --- a/agent/pkg/api/entry_streamer_socket_connector.go +++ b/agent/pkg/api/entry_streamer_socket_connector.go @@ -22,7 +22,16 @@ func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tap if params.EnableFullEntries { message, _ = models.CreateFullEntryWebSocketMessage(entry) } else { - extension := extensionsMap[entry.Protocol.Name] + protocol, ok := protocolsMap[entry.ProtocolId] + if !ok { + return fmt.Errorf("protocol not found, protocol: %v", protocol) + } + + extension, ok := extensionsMap[protocol.Name] + if !ok { + return fmt.Errorf("extension not found, extension: %v", protocol.Name) + } + base := extension.Dissector.Summarize(entry) message, _ = models.CreateBaseEntryWebSocketMessage(base) } diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index 40b0048aa..72015113c 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -134,7 +134,7 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol) oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGeneratorSink) - oasGenerator.HandleEntry(mizuEntry) + oasGenerator.HandleEntry(mizuEntry, &item.Protocol) } } diff --git a/agent/pkg/api/socket_routes.go b/agent/pkg/api/socket_routes.go index 7e2c1b896..fb4830563 100644 --- a/agent/pkg/api/socket_routes.go +++ b/agent/pkg/api/socket_routes.go @@ -14,10 +14,14 @@ import ( tapApi "github.com/up9inc/mizu/tap/api" ) -var extensionsMap map[string]*tapApi.Extension // global +var ( + extensionsMap map[string]*tapApi.Extension // global + protocolsMap map[string]*tapApi.Protocol //global +) -func InitExtensionsMap(ref map[string]*tapApi.Extension) { - extensionsMap = ref +func InitMaps(extensions map[string]*tapApi.Extension, protocols map[string]*tapApi.Protocol) { + extensionsMap = extensions + protocolsMap = protocols } type EventHandlers interface { diff --git a/agent/pkg/app/main.go b/agent/pkg/app/main.go index eed1ae0b6..2e22b2cb5 100644 --- a/agent/pkg/app/main.go +++ b/agent/pkg/app/main.go @@ -11,8 +11,8 @@ import ( "github.com/up9inc/mizu/agent/pkg/api" "github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/logger" - "github.com/up9inc/mizu/tap/dbgctl" tapApi "github.com/up9inc/mizu/tap/api" + "github.com/up9inc/mizu/tap/dbgctl" amqpExt "github.com/up9inc/mizu/tap/extensions/amqp" httpExt "github.com/up9inc/mizu/tap/extensions/http" kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka" @@ -22,11 +22,13 @@ import ( var ( Extensions []*tapApi.Extension // global ExtensionsMap map[string]*tapApi.Extension // global + ProtocolsMap map[string]*tapApi.Protocol //global ) func LoadExtensions() { Extensions = make([]*tapApi.Extension, 0) ExtensionsMap = make(map[string]*tapApi.Extension) + ProtocolsMap = make(map[string]*tapApi.Protocol) extensionHttp := &tapApi.Extension{} dissectorHttp := httpExt.NewDissector() @@ -34,6 +36,10 @@ func LoadExtensions() { extensionHttp.Dissector = dissectorHttp Extensions = append(Extensions, extensionHttp) ExtensionsMap[extensionHttp.Protocol.Name] = extensionHttp + protocolsHttp := dissectorHttp.GetProtocols() + for k, v := range protocolsHttp { + ProtocolsMap[k] = v + } if !dbgctl.MizuTapperDisableNonHttpExtensions { extensionAmqp := &tapApi.Extension{} @@ -42,6 +48,10 @@ func LoadExtensions() { extensionAmqp.Dissector = dissectorAmqp Extensions = append(Extensions, extensionAmqp) ExtensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp + protocolsAmqp := dissectorAmqp.GetProtocols() + for k, v := range protocolsAmqp { + ProtocolsMap[k] = v + } extensionKafka := &tapApi.Extension{} dissectorKafka := kafkaExt.NewDissector() @@ -49,6 +59,10 @@ func LoadExtensions() { extensionKafka.Dissector = dissectorKafka Extensions = append(Extensions, extensionKafka) ExtensionsMap[extensionKafka.Protocol.Name] = extensionKafka + protocolsKafka := dissectorKafka.GetProtocols() + for k, v := range protocolsKafka { + ProtocolsMap[k] = v + } extensionRedis := &tapApi.Extension{} dissectorRedis := redisExt.NewDissector() @@ -56,13 +70,17 @@ func LoadExtensions() { extensionRedis.Dissector = dissectorRedis Extensions = append(Extensions, extensionRedis) ExtensionsMap[extensionRedis.Protocol.Name] = extensionRedis + protocolsRedis := dissectorRedis.GetProtocols() + for k, v := range protocolsRedis { + ProtocolsMap[k] = v + } } sort.Slice(Extensions, func(i, j int) bool { return Extensions[i].Protocol.Priority < Extensions[j].Protocol.Priority }) - api.InitExtensionsMap(ExtensionsMap) + api.InitMaps(ExtensionsMap, ProtocolsMap) } func ConfigureBasenineServer(host string, port string, dbSize int64, logLevel logging.Level, insertionFilter string) { diff --git a/agent/pkg/entries/entries_provider.go b/agent/pkg/entries/entries_provider.go index f9b97fc17..f06948c11 100644 --- a/agent/pkg/entries/entries_provider.go +++ b/agent/pkg/entries/entries_provider.go @@ -3,6 +3,7 @@ package entries import ( "encoding/json" "errors" + "fmt" "time" basenine "github.com/up9inc/basenine/client/go" @@ -38,11 +39,20 @@ func (e *BasenineEntriesProvider) GetEntries(entriesRequest *models.EntriesReque return nil, nil, err } - extension := app.ExtensionsMap[entry.Protocol.Name] + protocol, ok := app.ProtocolsMap[entry.ProtocolId] + if !ok { + return nil, nil, fmt.Errorf("protocol not found, protocol: %v", protocol) + } + + extension, ok := app.ExtensionsMap[protocol.Name] + if !ok { + return nil, nil, fmt.Errorf("extension not found, extension: %v", protocol.Name) + } + base := extension.Dissector.Summarize(entry) dataSlice = append(dataSlice, &tapApi.EntryWrapper{ - Protocol: entry.Protocol, + Protocol: *protocol, Data: entry, Base: base, }) @@ -68,7 +78,16 @@ func (e *BasenineEntriesProvider) GetEntry(singleEntryRequest *models.SingleEntr return nil, errors.New(string(bytes)) } - extension := app.ExtensionsMap[entry.Protocol.Name] + protocol, ok := app.ProtocolsMap[entry.ProtocolId] + if !ok { + return nil, fmt.Errorf("protocol not found, protocol: %v", protocol) + } + + extension, ok := app.ExtensionsMap[protocol.Name] + if !ok { + return nil, fmt.Errorf("extension not found, extension: %v", protocol.Name) + } + base := extension.Dissector.Summarize(entry) var representation []byte representation, err = extension.Dissector.Represent(entry.Request, entry.Response) @@ -78,7 +97,7 @@ func (e *BasenineEntriesProvider) GetEntry(singleEntryRequest *models.SingleEntr var rules []map[string]interface{} var isRulesEnabled bool - if entry.Protocol.Name == "http" { + if protocol.Name == "http" { harEntry, _ := har.NewEntry(entry.Request, entry.Response, entry.StartTime, entry.ElapsedTime) _, rulesMatched, _isRulesEnabled := models.RunValidationRulesState(*harEntry, entry.Destination.Name) isRulesEnabled = _isRulesEnabled @@ -89,7 +108,7 @@ func (e *BasenineEntriesProvider) GetEntry(singleEntryRequest *models.SingleEntr } return &tapApi.EntryWrapper{ - Protocol: entry.Protocol, + Protocol: *protocol, Representation: string(representation), Data: entry, Base: base, diff --git a/agent/pkg/oas/oas_generator.go b/agent/pkg/oas/oas_generator.go index b16ab3e7a..e1d755d03 100644 --- a/agent/pkg/oas/oas_generator.go +++ b/agent/pkg/oas/oas_generator.go @@ -6,9 +6,8 @@ import ( "sync" "github.com/up9inc/mizu/agent/pkg/har" - "github.com/up9inc/mizu/tap/api" - "github.com/up9inc/mizu/logger" + "github.com/up9inc/mizu/tap/api" ) var ( @@ -17,7 +16,7 @@ var ( ) type OasGeneratorSink interface { - HandleEntry(mizuEntry *api.Entry) + HandleEntry(mizuEntry *api.Entry, protocol *api.Protocol) } type OasGenerator interface { @@ -59,12 +58,12 @@ func (g *defaultOasGenerator) IsStarted() bool { return g.started } -func (g *defaultOasGenerator) HandleEntry(mizuEntry *api.Entry) { +func (g *defaultOasGenerator) HandleEntry(mizuEntry *api.Entry, protocol *api.Protocol) { if !g.started { return } - if mizuEntry.Protocol.Name == "http" { + if protocol.Name == "http" { dest := mizuEntry.Destination.Name if dest == "" { logger.Log.Debugf("OAS: Unresolved entry %d", mizuEntry.Id) @@ -86,7 +85,7 @@ func (g *defaultOasGenerator) HandleEntry(mizuEntry *api.Entry) { g.handleHARWithSource(entryWSource) } else { - logger.Log.Debugf("OAS: Unsupported protocol in entry %d: %s", mizuEntry.Id, mizuEntry.Protocol.Name) + logger.Log.Debugf("OAS: Unsupported protocol in entry %d: %s", mizuEntry.Id, protocol.Name) } } diff --git a/tap/api/api.go b/tap/api/api.go index 1ce736c0b..ba96464ad 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -91,7 +91,6 @@ type OutputChannelItem struct { Timestamp int64 ConnectionInfo *ConnectionInfo Pair *RequestResponsePair - Summary *BaseEntry Namespace string } @@ -116,6 +115,7 @@ func (p *ReadProgress) Reset() { type Dissector interface { Register(*Extension) + GetProtocols() map[string]*Protocol Ping() Dissect(b *bufio.Reader, reader TcpReader, options *TrafficFilteringOptions) error Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry @@ -151,7 +151,7 @@ func (e *Emitting) Emit(item *OutputChannelItem) { type Entry struct { Id string `json:"id"` - Protocol Protocol `json:"proto"` + ProtocolId string `json:"protocol"` Capture Capture `json:"capture"` Source *TCP `json:"src"` Destination *TCP `json:"dst"` diff --git a/tap/extensions/amqp/Makefile b/tap/extensions/amqp/Makefile index e49dea598..c77ee4181 100644 --- a/tap/extensions/amqp/Makefile +++ b/tap/extensions/amqp/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect11/amqp/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect13/amqp/\* expect diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 13e625b15..213c8d715 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -26,12 +26,20 @@ var protocol = api.Protocol{ Priority: 1, } +var protocolsMap = map[string]*api.Protocol{ + fmt.Sprintf("%s/%s/%s", protocol.Name, protocol.Version, protocol.Abbreviation): &protocol, +} + type dissecting string func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &protocol } +func (d dissecting) GetProtocols() map[string]*api.Protocol { + return protocolsMap +} + func (d dissecting) Ping() { log.Printf("pong %s", protocol.Name) } @@ -214,8 +222,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, reqDetails["method"] = request["method"] return &api.Entry{ - Protocol: protocol, - Capture: item.Capture, + ProtocolId: fmt.Sprintf("%s/%s/%s", protocol.Name, protocol.Version, protocol.Abbreviation), + Capture: item.Capture, Source: &api.TCP{ Name: resolvedSource, IP: item.ConnectionInfo.ClientIP, @@ -277,7 +285,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { return &api.BaseEntry{ Id: entry.Id, - Protocol: entry.Protocol, + Protocol: *protocolsMap[entry.ProtocolId], Capture: entry.Capture, Summary: summary, SummaryQuery: summaryQuery, @@ -322,7 +330,7 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin func (d dissecting) Macros() map[string]string { return map[string]string{ - `amqp`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name), + `amqp`: fmt.Sprintf(`protocol == "%s/%s/%s"`, protocol.Name, protocol.Version, protocol.Abbreviation), } } diff --git a/tap/extensions/amqp/main_test.go b/tap/extensions/amqp/main_test.go index 4584c092c..9b2727282 100644 --- a/tap/extensions/amqp/main_test.go +++ b/tap/extensions/amqp/main_test.go @@ -44,7 +44,7 @@ func TestRegister(t *testing.T) { func TestMacros(t *testing.T) { expectedMacros := map[string]string{ - "amqp": `proto.name == "amqp"`, + "amqp": `protocol == "amqp/0-9-1/AMQP"`, } dissector := NewDissector() macros := dissector.Macros() diff --git a/tap/extensions/http/Makefile b/tap/extensions/http/Makefile index 4edc053e1..70434e356 100644 --- a/tap/extensions/http/Makefile +++ b/tap/extensions/http/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect12/http/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect13/http/\* expect diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 6148c2bb5..f6dc5f1bf 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -98,6 +98,15 @@ var graphQL2Protocol = api.Protocol{ Priority: 0, } +var protocolsMap = map[string]*api.Protocol{ + fmt.Sprintf("%s/%s/%s", http10protocol.Name, http10protocol.Version, http10protocol.Abbreviation): &http10protocol, + fmt.Sprintf("%s/%s/%s", http11protocol.Name, http11protocol.Version, http11protocol.Abbreviation): &http11protocol, + fmt.Sprintf("%s/%s/%s", http2Protocol.Name, http2Protocol.Version, http2Protocol.Abbreviation): &http2Protocol, + fmt.Sprintf("%s/%s/%s", grpcProtocol.Name, grpcProtocol.Version, grpcProtocol.Abbreviation): &grpcProtocol, + fmt.Sprintf("%s/%s/%s", graphQL1Protocol.Name, graphQL1Protocol.Version, graphQL1Protocol.Abbreviation): &graphQL1Protocol, + fmt.Sprintf("%s/%s/%s", graphQL2Protocol.Name, graphQL2Protocol.Version, graphQL2Protocol.Abbreviation): &graphQL2Protocol, +} + const ( TypeHttpRequest = iota TypeHttpResponse @@ -109,6 +118,10 @@ func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &http11protocol } +func (d dissecting) GetProtocols() map[string]*api.Protocol { + return protocolsMap +} + func (d dissecting) Ping() { log.Printf("pong %s", http11protocol.Name) } @@ -281,8 +294,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, } return &api.Entry{ - Protocol: item.Protocol, - Capture: item.Capture, + ProtocolId: fmt.Sprintf("%s/%s/%s", item.Protocol.Name, item.Protocol.Version, item.Protocol.Abbreviation), + Capture: item.Capture, Source: &api.TCP{ Name: resolvedSource, IP: item.ConnectionInfo.ClientIP, @@ -315,7 +328,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { return &api.BaseEntry{ Id: entry.Id, - Protocol: entry.Protocol, + Protocol: *protocolsMap[entry.ProtocolId], Capture: entry.Capture, Summary: summary, SummaryQuery: summaryQuery, @@ -503,10 +516,10 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin func (d dissecting) Macros() map[string]string { return map[string]string{ - `http`: fmt.Sprintf(`proto.name == "%s" and proto.version.startsWith("%c")`, http11protocol.Name, http11protocol.Version[0]), - `http2`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s"`, http11protocol.Name, http2Protocol.Version), - `grpc`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s" and proto.macro == "%s"`, http11protocol.Name, grpcProtocol.Version, grpcProtocol.Macro), - `gql`: fmt.Sprintf(`proto.name == "%s" and proto.macro == "%s"`, graphQL1Protocol.Name, graphQL1Protocol.Macro), + `http`: fmt.Sprintf(`protocol == "%s/%s/%s" or protocol == "%s/%s/%s"`, http10protocol.Name, http10protocol.Version, http10protocol.Abbreviation, http11protocol.Name, http11protocol.Version, http11protocol.Abbreviation), + `http2`: fmt.Sprintf(`protocol == "%s/%s/%s"`, http2Protocol.Name, http2Protocol.Version, http2Protocol.Abbreviation), + `grpc`: fmt.Sprintf(`protocol == "%s/%s/%s"`, grpcProtocol.Name, grpcProtocol.Version, grpcProtocol.Abbreviation), + `gql`: fmt.Sprintf(`protocol == "%s/%s/%s" or protocol == "%s/%s/%s"`, graphQL1Protocol.Name, graphQL1Protocol.Version, graphQL1Protocol.Abbreviation, graphQL2Protocol.Name, graphQL2Protocol.Version, graphQL2Protocol.Abbreviation), } } diff --git a/tap/extensions/http/main_test.go b/tap/extensions/http/main_test.go index 8ffdf41ae..dc969ee68 100644 --- a/tap/extensions/http/main_test.go +++ b/tap/extensions/http/main_test.go @@ -44,10 +44,10 @@ func TestRegister(t *testing.T) { func TestMacros(t *testing.T) { expectedMacros := map[string]string{ - "http": `proto.name == "http" and proto.version.startsWith("1")`, - "http2": `proto.name == "http" and proto.version == "2.0"`, - "grpc": `proto.name == "http" and proto.version == "2.0" and proto.macro == "grpc"`, - "gql": `proto.name == "http" and proto.macro == "gql"`, + "http": `protocol == "http/1.0/HTTP" or protocol == "http/1.1/HTTP"`, + "http2": `protocol == "http/2.0/HTTP/2"`, + "grpc": `protocol == "http/2.0/gRPC"`, + "gql": `protocol == "http/1.1/GQL" or protocol == "http/2.0/GQL"`, } dissector := NewDissector() macros := dissector.Macros() diff --git a/tap/extensions/kafka/Makefile b/tap/extensions/kafka/Makefile index 6d7f260ef..01d91bc1c 100644 --- a/tap/extensions/kafka/Makefile +++ b/tap/extensions/kafka/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect11/kafka/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect13/kafka/\* expect diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 3d10e53d1..485cb316d 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -24,12 +24,20 @@ var _protocol = api.Protocol{ Priority: 2, } +var protocolsMap = map[string]*api.Protocol{ + fmt.Sprintf("%s/%s/%s", _protocol.Name, _protocol.Version, _protocol.Abbreviation): &_protocol, +} + type dissecting string func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &_protocol } +func (d dissecting) GetProtocols() map[string]*api.Protocol { + return protocolsMap +} + func (d dissecting) Ping() { log.Printf("pong %s", _protocol.Name) } @@ -62,8 +70,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, elapsedTime = 0 } return &api.Entry{ - Protocol: _protocol, - Capture: item.Capture, + ProtocolId: fmt.Sprintf("%s/%s/%s", _protocol.Name, _protocol.Version, _protocol.Abbreviation), + Capture: item.Capture, Source: &api.TCP{ Name: resolvedSource, IP: item.ConnectionInfo.ClientIP, @@ -187,7 +195,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { return &api.BaseEntry{ Id: entry.Id, - Protocol: entry.Protocol, + Protocol: *protocolsMap[entry.ProtocolId], Capture: entry.Capture, Summary: summary, SummaryQuery: summaryQuery, @@ -243,7 +251,7 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin func (d dissecting) Macros() map[string]string { return map[string]string{ - `kafka`: fmt.Sprintf(`proto.name == "%s"`, _protocol.Name), + `kafka`: fmt.Sprintf(`protocol == "%s/%s/%s"`, _protocol.Name, _protocol.Version, _protocol.Abbreviation), } } diff --git a/tap/extensions/kafka/main_test.go b/tap/extensions/kafka/main_test.go index c36568121..41016d008 100644 --- a/tap/extensions/kafka/main_test.go +++ b/tap/extensions/kafka/main_test.go @@ -44,7 +44,7 @@ func TestRegister(t *testing.T) { func TestMacros(t *testing.T) { expectedMacros := map[string]string{ - "kafka": `proto.name == "kafka"`, + "kafka": `protocol == "kafka/12/KAFKA"`, } dissector := NewDissector() macros := dissector.Macros() diff --git a/tap/extensions/redis/Makefile b/tap/extensions/redis/Makefile index 956463305..e46843191 100644 --- a/tap/extensions/redis/Makefile +++ b/tap/extensions/redis/Makefile @@ -13,4 +13,4 @@ test-pull-bin: test-pull-expect: @mkdir -p expect - @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect11/redis/\* expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect13/redis/\* expect diff --git a/tap/extensions/redis/main.go b/tap/extensions/redis/main.go index 2e02e1ffd..f8caf66d5 100644 --- a/tap/extensions/redis/main.go +++ b/tap/extensions/redis/main.go @@ -24,12 +24,20 @@ var protocol = api.Protocol{ Priority: 3, } +var protocolsMap = map[string]*api.Protocol{ + fmt.Sprintf("%s/%s/%s", protocol.Name, protocol.Version, protocol.Abbreviation): &protocol, +} + type dissecting string func (d dissecting) Register(extension *api.Extension) { extension.Protocol = &protocol } +func (d dissecting) GetProtocols() map[string]*api.Protocol { + return protocolsMap +} + func (d dissecting) Ping() { log.Printf("pong %s", protocol.Name) } @@ -70,8 +78,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, elapsedTime = 0 } return &api.Entry{ - Protocol: protocol, - Capture: item.Capture, + ProtocolId: fmt.Sprintf("%s/%s/%s", protocol.Name, protocol.Version, protocol.Abbreviation), + Capture: item.Capture, Source: &api.TCP{ Name: resolvedSource, IP: item.ConnectionInfo.ClientIP, @@ -115,7 +123,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry { return &api.BaseEntry{ Id: entry.Id, - Protocol: entry.Protocol, + Protocol: *protocolsMap[entry.ProtocolId], Capture: entry.Capture, Summary: summary, SummaryQuery: summaryQuery, @@ -144,7 +152,7 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin func (d dissecting) Macros() map[string]string { return map[string]string{ - `redis`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name), + `redis`: fmt.Sprintf(`protocol == "%s/%s/%s"`, protocol.Name, protocol.Version, protocol.Abbreviation), } } diff --git a/tap/extensions/redis/main_test.go b/tap/extensions/redis/main_test.go index 47befa953..474df0b69 100644 --- a/tap/extensions/redis/main_test.go +++ b/tap/extensions/redis/main_test.go @@ -45,7 +45,7 @@ func TestRegister(t *testing.T) { func TestMacros(t *testing.T) { expectedMacros := map[string]string{ - "redis": `proto.name == "redis"`, + "redis": `protocol == "redis/3.x/REDIS"`, } dissector := NewDissector() macros := dissector.Macros()