diff --git a/Makefile b/Makefile index 79e977fe9..8f4a2c004 100644 --- a/Makefile +++ b/Makefile @@ -103,6 +103,7 @@ test-shared: ## Run shared tests test-extensions: ## Run extensions tests @echo "running http tests"; cd tap/extensions/http && $(MAKE) test + @echo "running redis tests"; cd tap/extensions/redis && $(MAKE) test @echo "running kafka tests"; cd tap/extensions/kafka && $(MAKE) test @echo "running amqp tests"; cd tap/extensions/amqp && $(MAKE) test diff --git a/tap/extensions/redis/Makefile b/tap/extensions/redis/Makefile new file mode 100644 index 000000000..a67b4a1f6 --- /dev/null +++ b/tap/extensions/redis/Makefile @@ -0,0 +1,16 @@ +skipbin := $$(find bin -mindepth 1 -maxdepth 1) +skipexpect := $$(find expect -mindepth 1 -maxdepth 1) + +test: test-pull-bin test-pull-expect + @MIZU_TEST=1 go test -v ./... -coverpkg=./... -race -coverprofile=coverage.out -covermode=atomic + +test-update: test-pull-bin + @MIZU_TEST=1 TEST_UPDATE=1 go test -v ./... -coverpkg=./... -coverprofile=coverage.out -covermode=atomic + +test-pull-bin: + @mkdir -p bin + @[ "${skipbin}" ] && echo "Skipping downloading BINs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp gs://static.up9.io/mizu/test-pcap/bin/redis/\*.bin bin + +test-pull-expect: + @mkdir -p expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/redis/\* expect diff --git a/tap/extensions/redis/go.mod b/tap/extensions/redis/go.mod index 7c5d9ba24..82bd16d51 100644 --- a/tap/extensions/redis/go.mod +++ b/tap/extensions/redis/go.mod @@ -2,8 +2,16 @@ module github.com/up9inc/mizu/tap/extensions/redis go 1.17 -require github.com/up9inc/mizu/tap/api v0.0.0 +require ( + github.com/stretchr/testify v1.7.0 + github.com/up9inc/mizu/tap/api v0.0.0 +) -require github.com/google/martian v2.1.0+incompatible // indirect +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/google/martian v2.1.0+incompatible // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api diff --git a/tap/extensions/redis/go.sum b/tap/extensions/redis/go.sum index bbcb7d053..53414f82f 100644 --- a/tap/extensions/redis/go.sum +++ b/tap/extensions/redis/go.sum @@ -1,2 +1,13 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tap/extensions/redis/main_test.go b/tap/extensions/redis/main_test.go new file mode 100644 index 000000000..5aa0a5cd4 --- /dev/null +++ b/tap/extensions/redis/main_test.go @@ -0,0 +1,290 @@ +package redis + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path" + "path/filepath" + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/up9inc/mizu/tap/api" +) + +const ( + binDir = "bin" + patternBin = "*_req.bin" + patternDissect = "*.json" + msgDissecting = "Dissecting:" + msgAnalyzing = "Analyzing:" + msgRepresenting = "Representing:" + respSuffix = "_res.bin" + expectDir = "expect" + dissectDir = "dissect" + analyzeDir = "analyze" + representDir = "represent" + testUpdate = "TEST_UPDATE" +) + +func TestRegister(t *testing.T) { + dissector := NewDissector() + extension := &api.Extension{} + dissector.Register(extension) + assert.Equal(t, "redis", extension.Protocol.Name) +} + +func TestMacros(t *testing.T) { + expectedMacros := map[string]string{ + "redis": `proto.name == "redis"`, + } + dissector := NewDissector() + macros := dissector.Macros() + assert.Equal(t, expectedMacros, macros) +} + +func TestPing(t *testing.T) { + dissector := NewDissector() + dissector.Ping() +} + +func TestDissect(t *testing.T) { + _, testUpdateEnabled := os.LookupEnv(testUpdate) + + expectDirDissect := path.Join(expectDir, dissectDir) + + if testUpdateEnabled { + os.RemoveAll(expectDirDissect) + err := os.MkdirAll(expectDirDissect, 0775) + assert.Nil(t, err) + } + + dissector := NewDissector() + paths, err := filepath.Glob(path.Join(binDir, patternBin)) + if err != nil { + log.Fatal(err) + } + + options := &api.TrafficFilteringOptions{ + IgnoredUserAgents: []string{}, + } + + for _, _path := range paths { + basePath := _path[:len(_path)-8] + + // Channel to verify the output + itemChannel := make(chan *api.OutputChannelItem) + var emitter api.Emitter = &api.Emitting{ + AppStats: &api.AppStats{}, + OutputChannel: itemChannel, + } + + var items []*api.OutputChannelItem + stop := make(chan bool) + + go func() { + for { + select { + case <-stop: + return + case item := <-itemChannel: + items = append(items, item) + } + } + }() + + // Stream level + counterPair := &api.CounterPair{ + Request: 0, + Response: 0, + } + superIdentifier := &api.SuperIdentifier{} + + // Request + pathClient := _path + fmt.Printf("%s %s\n", msgDissecting, pathClient) + fileClient, err := os.Open(pathClient) + assert.Nil(t, err) + + bufferClient := bufio.NewReader(fileClient) + tcpIDClient := &api.TcpID{ + SrcIP: "1", + DstIP: "2", + SrcPort: "1", + DstPort: "2", + } + reqResMatcher := dissector.NewResponseRequestMatcher() + err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF { + log.Println(err) + } + + // Response + pathServer := basePath + respSuffix + fmt.Printf("%s %s\n", msgDissecting, pathServer) + fileServer, err := os.Open(pathServer) + assert.Nil(t, err) + + bufferServer := bufio.NewReader(fileServer) + tcpIDServer := &api.TcpID{ + SrcIP: "2", + DstIP: "1", + SrcPort: "2", + DstPort: "1", + } + err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF { + log.Println(err) + } + + fileClient.Close() + fileServer.Close() + + pathExpect := path.Join(expectDirDissect, fmt.Sprintf("%s.json", basePath[4:])) + + time.Sleep(10 * time.Millisecond) + + stop <- true + + marshaled, err := json.Marshal(items) + assert.Nil(t, err) + + if testUpdateEnabled { + if len(items) > 0 { + err = os.WriteFile(pathExpect, marshaled, 0644) + assert.Nil(t, err) + } + } else { + if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) { + assert.Len(t, items, 0) + } else { + expectedBytes, err := ioutil.ReadFile(pathExpect) + assert.Nil(t, err) + + assert.JSONEq(t, string(expectedBytes), string(marshaled)) + } + } + } +} + +func TestAnalyze(t *testing.T) { + _, testUpdateEnabled := os.LookupEnv(testUpdate) + + expectDirDissect := path.Join(expectDir, dissectDir) + expectDirAnalyze := path.Join(expectDir, analyzeDir) + + if testUpdateEnabled { + os.RemoveAll(expectDirAnalyze) + err := os.MkdirAll(expectDirAnalyze, 0775) + assert.Nil(t, err) + } + + dissector := NewDissector() + paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect)) + if err != nil { + log.Fatal(err) + } + + for _, _path := range paths { + fmt.Printf("%s %s\n", msgAnalyzing, _path) + + bytes, err := ioutil.ReadFile(_path) + assert.Nil(t, err) + + var items []*api.OutputChannelItem + err = json.Unmarshal(bytes, &items) + assert.Nil(t, err) + + var entries []*api.Entry + for _, item := range items { + entry := dissector.Analyze(item, "", "", "") + entries = append(entries, entry) + } + + pathExpect := path.Join(expectDirAnalyze, filepath.Base(_path)) + + marshaled, err := json.Marshal(entries) + assert.Nil(t, err) + + if testUpdateEnabled { + if len(entries) > 0 { + err = os.WriteFile(pathExpect, marshaled, 0644) + assert.Nil(t, err) + } + } else { + if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) { + assert.Len(t, items, 0) + } else { + expectedBytes, err := ioutil.ReadFile(pathExpect) + assert.Nil(t, err) + + assert.JSONEq(t, string(expectedBytes), string(marshaled)) + } + } + } +} + +func TestRepresent(t *testing.T) { + _, testUpdateEnabled := os.LookupEnv(testUpdate) + + expectDirAnalyze := path.Join(expectDir, analyzeDir) + expectDirRepresent := path.Join(expectDir, representDir) + + if testUpdateEnabled { + os.RemoveAll(expectDirRepresent) + err := os.MkdirAll(expectDirRepresent, 0775) + assert.Nil(t, err) + } + + dissector := NewDissector() + paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect)) + if err != nil { + log.Fatal(err) + } + + for _, _path := range paths { + fmt.Printf("%s %s\n", msgRepresenting, _path) + + bytes, err := ioutil.ReadFile(_path) + assert.Nil(t, err) + + var entries []*api.Entry + err = json.Unmarshal(bytes, &entries) + assert.Nil(t, err) + + var objects []string + for _, entry := range entries { + object, _, err := dissector.Represent(entry.Request, entry.Response) + assert.Nil(t, err) + objects = append(objects, string(object)) + } + + pathExpect := path.Join(expectDirRepresent, filepath.Base(_path)) + + marshaled, err := json.Marshal(objects) + assert.Nil(t, err) + + if testUpdateEnabled { + if len(objects) > 0 { + err = os.WriteFile(pathExpect, marshaled, 0644) + assert.Nil(t, err) + } + } else { + if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) { + assert.Len(t, objects, 0) + } else { + expectedBytes, err := ioutil.ReadFile(pathExpect) + assert.Nil(t, err) + + assert.JSONEq(t, string(expectedBytes), string(marshaled)) + } + } + } +}