From c67675c1386d5197b0ec50aad8cd1b7c58139ac1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Wed, 16 Feb 2022 12:18:33 +0300 Subject: [PATCH] Add unit tests for Kafka dissector (#807) * Add unit tests for Kafka dissector * Return `io.EOF` if request or response header size is zero * Sort the slice in `representMapAsTable` * Remove the dead code * Remove more dead code * Remove more dead code * Fix `dissector.Analyze` call Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com> --- Makefile | 1 + tap/api/api.go | 1 + tap/extensions/http/matcher.go | 3 + tap/extensions/kafka/Makefile | 16 + tap/extensions/kafka/buffer.go | 584 -------------------------- tap/extensions/kafka/cluster.go | 143 ------- tap/extensions/kafka/decode.go | 61 --- tap/extensions/kafka/discard.go | 16 - tap/extensions/kafka/encode.go | 537 ----------------------- tap/extensions/kafka/error.go | 91 ---- tap/extensions/kafka/go.mod | 4 + tap/extensions/kafka/go.sum | 5 + tap/extensions/kafka/helpers.go | 5 + tap/extensions/kafka/main_test.go | 290 +++++++++++++ tap/extensions/kafka/matcher.go | 11 +- tap/extensions/kafka/protocol.go | 137 +----- tap/extensions/kafka/protocol_make.go | 182 -------- tap/extensions/kafka/read.go | 159 ------- tap/extensions/kafka/record.go | 279 ------------ tap/extensions/kafka/record_bytes.go | 43 -- tap/extensions/kafka/reflect.go | 36 -- tap/extensions/kafka/request.go | 69 +-- tap/extensions/kafka/response.go | 52 +-- tap/extensions/kafka/structs.go | 13 - tap/extensions/redis/matcher.go | 2 + 25 files changed, 357 insertions(+), 2383 deletions(-) create mode 100644 tap/extensions/kafka/Makefile delete mode 100644 tap/extensions/kafka/buffer.go delete mode 100644 tap/extensions/kafka/cluster.go delete mode 100644 tap/extensions/kafka/discard.go delete mode 100644 tap/extensions/kafka/encode.go delete mode 100644 tap/extensions/kafka/error.go create mode 100644 tap/extensions/kafka/main_test.go delete mode 100644 tap/extensions/kafka/protocol_make.go delete mode 100644 tap/extensions/kafka/read.go delete mode 100644 tap/extensions/kafka/record.go delete mode 100644 tap/extensions/kafka/record_bytes.go diff --git a/Makefile b/Makefile index 9dc129ab8..79e977fe9 100644 --- a/Makefile +++ b/Makefile @@ -103,6 +103,7 @@ test-shared: ## Run shared tests test-extensions: ## Run extensions tests @echo "running http tests"; cd tap/extensions/http && $(MAKE) test + @echo "running kafka tests"; cd tap/extensions/kafka && $(MAKE) test @echo "running amqp tests"; cd tap/extensions/amqp && $(MAKE) test acceptance-test: ## Run acceptance tests diff --git a/tap/api/api.go b/tap/api/api.go index d1a812e80..f061db404 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -107,6 +107,7 @@ type Dissector interface { type RequestResponseMatcher interface { GetMap() *sync.Map + SetMaxTry(value int) } type Emitting struct { diff --git a/tap/extensions/http/matcher.go b/tap/extensions/http/matcher.go index 67c09136a..7670bcf81 100644 --- a/tap/extensions/http/matcher.go +++ b/tap/extensions/http/matcher.go @@ -21,6 +21,9 @@ func (matcher *requestResponseMatcher) GetMap() *sync.Map { return matcher.openMessagesMap } +func (matcher *requestResponseMatcher) SetMaxTry(value int) { +} + func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem { requestHTTPMessage := api.GenericMessage{ IsRequest: true, diff --git a/tap/extensions/kafka/Makefile b/tap/extensions/kafka/Makefile new file mode 100644 index 000000000..da3809666 --- /dev/null +++ b/tap/extensions/kafka/Makefile @@ -0,0 +1,16 @@ +skipbin := $$(find bin -mindepth 1 -maxdepth 1) +skipexpect := $$(find expect -mindepth 1 -maxdepth 1) + +test: test-pull-bin test-pull-expect + @MIZU_TEST=1 go test -v ./... -coverpkg=./... -race -coverprofile=coverage.out -covermode=atomic + +test-update: test-pull-bin + @MIZU_TEST=1 TEST_UPDATE=1 go test -v ./... -coverpkg=./... -coverprofile=coverage.out -covermode=atomic + +test-pull-bin: + @mkdir -p bin + @[ "${skipbin}" ] && echo "Skipping downloading BINs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp gs://static.up9.io/mizu/test-pcap/bin/kafka/\*.bin bin + +test-pull-expect: + @mkdir -p expect + @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/kafka/\* expect diff --git a/tap/extensions/kafka/buffer.go b/tap/extensions/kafka/buffer.go deleted file mode 100644 index 57420bff4..000000000 --- a/tap/extensions/kafka/buffer.go +++ /dev/null @@ -1,584 +0,0 @@ -package kafka - -import ( - "bytes" - "fmt" - "io" - "sync" - "sync/atomic" -) - -// Bytes is an interface implemented by types that represent immutable -// sequences of bytes. -// -// Bytes values are used to abstract the location where record keys and -// values are read from (e.g. in-memory buffers, network sockets, files). -// -// The Close method should be called to release resources held by the object -// when the program is done with it. -// -// Bytes values are generally not safe to use concurrently from multiple -// goroutines. -type Bytes interface { - io.ReadCloser - // Returns the number of bytes remaining to be read from the payload. - Len() int -} - -// NewBytes constructs a Bytes value from b. -// -// The returned value references b, it does not make a copy of the backing -// array. -// -// If b is nil, nil is returned to represent a null BYTES value in the kafka -// protocol. -func NewBytes(b []byte) Bytes { - if b == nil { - return nil - } - r := new(bytesReader) - r.Reset(b) - return r -} - -// ReadAll is similar to ioutil.ReadAll, but it takes advantage of knowing the -// length of b to minimize the memory footprint. -// -// The function returns a nil slice if b is nil. -// func ReadAll(b Bytes) ([]byte, error) { -// if b == nil { -// return nil, nil -// } -// s := make([]byte, b.Len()) -// _, err := io.ReadFull(b, s) -// return s, err -// } - -type bytesReader struct{ bytes.Reader } - -func (*bytesReader) Close() error { return nil } - -type refCount uintptr - -func (rc *refCount) ref() { atomic.AddUintptr((*uintptr)(rc), 1) } - -func (rc *refCount) unref(onZero func()) { - if atomic.AddUintptr((*uintptr)(rc), ^uintptr(0)) == 0 { - onZero() - } -} - -const ( - // Size of the memory buffer for a single page. We use a farily - // large size here (64 KiB) because batches exchanged with kafka - // tend to be multiple kilobytes in size, sometimes hundreds. - // Using large pages amortizes the overhead of the page metadata - // and algorithms to manage the pages. - pageSize = 65536 -) - -type page struct { - refc refCount - offset int64 - length int - buffer *[pageSize]byte -} - -func newPage(offset int64) *page { - p, _ := pagePool.Get().(*page) - if p != nil { - p.offset = offset - p.length = 0 - p.ref() - } else { - p = &page{ - refc: 1, - offset: offset, - buffer: &[pageSize]byte{}, - } - } - return p -} - -func (p *page) ref() { p.refc.ref() } - -func (p *page) unref() { p.refc.unref(func() { pagePool.Put(p) }) } - -func (p *page) slice(begin, end int64) []byte { - i, j := begin-p.offset, end-p.offset - - if i < 0 { - i = 0 - } else if i > pageSize { - i = pageSize - } - - if j < 0 { - j = 0 - } else if j > pageSize { - j = pageSize - } - - if i < j { - return p.buffer[i:j] - } - - return nil -} - -func (p *page) Cap() int { return pageSize } - -func (p *page) Len() int { return p.length } - -func (p *page) Size() int64 { return int64(p.length) } - -func (p *page) Truncate(n int) { - if n < p.length { - p.length = n - } -} - -func (p *page) ReadAt(b []byte, off int64) (int, error) { - if off -= p.offset; off < 0 || off > pageSize { - panic("offset out of range") - } - if off > int64(p.length) { - return 0, nil - } - return copy(b, p.buffer[off:p.length]), nil -} - -func (p *page) ReadFrom(r io.Reader) (int64, error) { - n, err := io.ReadFull(r, p.buffer[p.length:]) - if err == io.EOF || err == io.ErrUnexpectedEOF { - err = nil - } - p.length += n - return int64(n), err -} - -func (p *page) WriteAt(b []byte, off int64) (int, error) { - if off -= p.offset; off < 0 || off > pageSize { - panic("offset out of range") - } - n := copy(p.buffer[off:], b) - if end := int(off) + n; end > p.length { - p.length = end - } - return n, nil -} - -func (p *page) Write(b []byte) (int, error) { - return p.WriteAt(b, p.offset+int64(p.length)) -} - -var ( - _ io.ReaderAt = (*page)(nil) - _ io.ReaderFrom = (*page)(nil) - _ io.Writer = (*page)(nil) - _ io.WriterAt = (*page)(nil) -) - -type pageBuffer struct { - refc refCount - pages contiguousPages - length int - cursor int -} - -func newPageBuffer() *pageBuffer { - b, _ := pageBufferPool.Get().(*pageBuffer) - if b != nil { - b.cursor = 0 - b.refc.ref() - } else { - b = &pageBuffer{ - refc: 1, - pages: make(contiguousPages, 0, 16), - } - } - return b -} - -func (pb *pageBuffer) unref() { - pb.refc.unref(func() { - pb.pages.unref() - pb.pages.clear() - pb.pages = pb.pages[:0] - pb.length = 0 - pageBufferPool.Put(pb) - }) -} - -func (pb *pageBuffer) newPage() *page { - return newPage(int64(pb.length)) -} - -func (pb *pageBuffer) Close() error { - return nil -} - -func (pb *pageBuffer) Len() int { - return pb.length - pb.cursor -} - -func (pb *pageBuffer) Size() int64 { - return int64(pb.length) -} - -func (pb *pageBuffer) Discard(n int) (int, error) { - remain := pb.length - pb.cursor - if remain < n { - n = remain - } - pb.cursor += n - return n, nil -} - -func (pb *pageBuffer) Truncate(n int) { - if n < pb.length { - pb.length = n - - if n < pb.cursor { - pb.cursor = n - } - - for i := range pb.pages { - if p := pb.pages[i]; p.length <= n { - n -= p.length - } else { - if n > 0 { - pb.pages[i].Truncate(n) - i++ - } - pb.pages[i:].unref() - pb.pages[i:].clear() - pb.pages = pb.pages[:i] - break - } - } - } -} - -func (pb *pageBuffer) Seek(offset int64, whence int) (int64, error) { - c, err := seek(int64(pb.cursor), int64(pb.length), offset, whence) - if err != nil { - return -1, err - } - pb.cursor = int(c) - return c, nil -} - -func (pb *pageBuffer) ReadByte() (byte, error) { - b := [1]byte{} - _, err := pb.Read(b[:]) - return b[0], err -} - -func (pb *pageBuffer) Read(b []byte) (int, error) { - if pb.cursor >= pb.length { - return 0, io.EOF - } - n, err := pb.ReadAt(b, int64(pb.cursor)) - pb.cursor += n - return n, err -} - -func (pb *pageBuffer) ReadAt(b []byte, off int64) (int, error) { - return pb.pages.ReadAt(b, off) -} - -func (pb *pageBuffer) ReadFrom(r io.Reader) (int64, error) { - if len(pb.pages) == 0 { - pb.pages = append(pb.pages, pb.newPage()) - } - - rn := int64(0) - - for { - tail := pb.pages[len(pb.pages)-1] - free := tail.Cap() - tail.Len() - - if free == 0 { - tail = pb.newPage() - free = pageSize - pb.pages = append(pb.pages, tail) - } - - n, err := tail.ReadFrom(r) - pb.length += int(n) - rn += n - if n < int64(free) { - return rn, err - } - } -} - -func (pb *pageBuffer) WriteString(s string) (int, error) { - return pb.Write([]byte(s)) -} - -func (pb *pageBuffer) Write(b []byte) (int, error) { - wn := len(b) - if wn == 0 { - return 0, nil - } - - if len(pb.pages) == 0 { - pb.pages = append(pb.pages, pb.newPage()) - } - - for len(b) != 0 { - tail := pb.pages[len(pb.pages)-1] - free := tail.Cap() - tail.Len() - - if len(b) <= free { - _, _ = tail.Write(b) - pb.length += len(b) - break - } - - _, _ = tail.Write(b[:free]) - b = b[free:] - - pb.length += free - pb.pages = append(pb.pages, pb.newPage()) - } - - return wn, nil -} - -func (pb *pageBuffer) WriteAt(b []byte, off int64) (int, error) { - n, err := pb.pages.WriteAt(b, off) - if err != nil { - return n, err - } - if n < len(b) { - _, _ = pb.Write(b[n:]) - } - return len(b), nil -} - -func (pb *pageBuffer) WriteTo(w io.Writer) (int64, error) { - var wn int - var err error - pb.pages.scan(int64(pb.cursor), int64(pb.length), func(b []byte) bool { - var n int - n, err = w.Write(b) - wn += n - return err == nil - }) - pb.cursor += wn - return int64(wn), err -} - -var ( - _ io.ReaderAt = (*pageBuffer)(nil) - _ io.ReaderFrom = (*pageBuffer)(nil) - _ io.StringWriter = (*pageBuffer)(nil) - _ io.Writer = (*pageBuffer)(nil) - _ io.WriterAt = (*pageBuffer)(nil) - _ io.WriterTo = (*pageBuffer)(nil) - - pagePool sync.Pool - pageBufferPool sync.Pool -) - -type contiguousPages []*page - -func (pages contiguousPages) unref() { - for _, p := range pages { - p.unref() - } -} - -func (pages contiguousPages) clear() { - for i := range pages { - pages[i] = nil - } -} - -func (pages contiguousPages) ReadAt(b []byte, off int64) (int, error) { - rn := 0 - - for _, p := range pages.slice(off, off+int64(len(b))) { - n, _ := p.ReadAt(b, off) - b = b[n:] - rn += n - off += int64(n) - } - - return rn, nil -} - -func (pages contiguousPages) WriteAt(b []byte, off int64) (int, error) { - wn := 0 - - for _, p := range pages.slice(off, off+int64(len(b))) { - n, _ := p.WriteAt(b, off) - b = b[n:] - wn += n - off += int64(n) - } - - return wn, nil -} - -func (pages contiguousPages) slice(begin, end int64) contiguousPages { - i := pages.indexOf(begin) - j := pages.indexOf(end) - if j < len(pages) { - j++ - } - return pages[i:j] -} - -func (pages contiguousPages) indexOf(offset int64) int { - if len(pages) == 0 { - return 0 - } - return int((offset - pages[0].offset) / pageSize) -} - -func (pages contiguousPages) scan(begin, end int64, f func([]byte) bool) { - for _, p := range pages.slice(begin, end) { - if !f(p.slice(begin, end)) { - break - } - } -} - -var ( - _ io.ReaderAt = contiguousPages{} - _ io.WriterAt = contiguousPages{} -) - -type pageRef struct { - pages contiguousPages - offset int64 - cursor int64 - length uint32 - once uint32 -} - -func (ref *pageRef) unref() { - if atomic.CompareAndSwapUint32(&ref.once, 0, 1) { - ref.pages.unref() - ref.pages.clear() - ref.pages = nil - ref.offset = 0 - ref.cursor = 0 - ref.length = 0 - } -} - -func (ref *pageRef) Len() int { return int(ref.Size() - ref.cursor) } - -func (ref *pageRef) Size() int64 { return int64(ref.length) } - -func (ref *pageRef) Close() error { ref.unref(); return nil } - -func (ref *pageRef) String() string { - return fmt.Sprintf("[offset=%d cursor=%d length=%d]", ref.offset, ref.cursor, ref.length) -} - -func (ref *pageRef) Seek(offset int64, whence int) (int64, error) { - c, err := seek(ref.cursor, int64(ref.length), offset, whence) - if err != nil { - return -1, err - } - ref.cursor = c - return c, nil -} - -func (ref *pageRef) ReadByte() (byte, error) { - var c byte - var ok bool - ref.scan(ref.cursor, func(b []byte) bool { - c, ok = b[0], true - return false - }) - if ok { - ref.cursor++ - } else { - return 0, io.EOF - } - return c, nil -} - -func (ref *pageRef) Read(b []byte) (int, error) { - if ref.cursor >= int64(ref.length) { - return 0, io.EOF - } - n, err := ref.ReadAt(b, ref.cursor) - ref.cursor += int64(n) - return n, err -} - -func (ref *pageRef) ReadAt(b []byte, off int64) (int, error) { - limit := ref.offset + int64(ref.length) - off += ref.offset - - if off >= limit { - return 0, io.EOF - } - - if off+int64(len(b)) > limit { - b = b[:limit-off] - } - - if len(b) == 0 { - return 0, nil - } - - n, err := ref.pages.ReadAt(b, off) - if n == 0 && err == nil { - err = io.EOF - } - return n, err -} - -func (ref *pageRef) WriteTo(w io.Writer) (wn int64, err error) { - ref.scan(ref.cursor, func(b []byte) bool { - var n int - n, err = w.Write(b) - wn += int64(n) - return err == nil - }) - ref.cursor += wn - return -} - -func (ref *pageRef) scan(off int64, f func([]byte) bool) { - begin := ref.offset + off - end := ref.offset + int64(ref.length) - ref.pages.scan(begin, end, f) -} - -var ( - _ io.Closer = (*pageRef)(nil) - _ io.Seeker = (*pageRef)(nil) - _ io.Reader = (*pageRef)(nil) - _ io.ReaderAt = (*pageRef)(nil) - _ io.WriterTo = (*pageRef)(nil) -) - -func seek(cursor, limit, offset int64, whence int) (int64, error) { - switch whence { - case io.SeekStart: - // absolute offset - case io.SeekCurrent: - offset = cursor + offset - case io.SeekEnd: - offset = limit - offset - default: - return -1, fmt.Errorf("seek: invalid whence value: %d", whence) - } - if offset < 0 { - offset = 0 - } - if offset > limit { - offset = limit - } - return offset, nil -} diff --git a/tap/extensions/kafka/cluster.go b/tap/extensions/kafka/cluster.go deleted file mode 100644 index 4bbbb5844..000000000 --- a/tap/extensions/kafka/cluster.go +++ /dev/null @@ -1,143 +0,0 @@ -package kafka - -import ( - "fmt" - "sort" - "strings" - "text/tabwriter" -) - -type Cluster struct { - ClusterID string - Controller int32 - Brokers map[int32]Broker - Topics map[string]Topic -} - -func (c Cluster) BrokerIDs() []int32 { - brokerIDs := make([]int32, 0, len(c.Brokers)) - for id := range c.Brokers { - brokerIDs = append(brokerIDs, id) - } - sort.Slice(brokerIDs, func(i, j int) bool { - return brokerIDs[i] < brokerIDs[j] - }) - return brokerIDs -} - -func (c Cluster) TopicNames() []string { - topicNames := make([]string, 0, len(c.Topics)) - for name := range c.Topics { - topicNames = append(topicNames, name) - } - sort.Strings(topicNames) - return topicNames -} - -func (c Cluster) IsZero() bool { - return c.ClusterID == "" && c.Controller == 0 && len(c.Brokers) == 0 && len(c.Topics) == 0 -} - -func (c Cluster) Format(w fmt.State, _ rune) { - tw := new(tabwriter.Writer) - fmt.Fprintf(w, "CLUSTER: %q\n\n", c.ClusterID) - - tw.Init(w, 0, 8, 2, ' ', 0) - fmt.Fprint(tw, " BROKER\tHOST\tPORT\tRACK\tCONTROLLER\n") - - for _, id := range c.BrokerIDs() { - broker := c.Brokers[id] - fmt.Fprintf(tw, " %d\t%s\t%d\t%s\t%t\n", broker.ID, broker.Host, broker.Port, broker.Rack, broker.ID == c.Controller) - } - - tw.Flush() - fmt.Fprintln(w) - - tw.Init(w, 0, 8, 2, ' ', 0) - fmt.Fprint(tw, " TOPIC\tPARTITIONS\tBROKERS\n") - topicNames := c.TopicNames() - brokers := make(map[int32]struct{}, len(c.Brokers)) - brokerIDs := make([]int32, 0, len(c.Brokers)) - - for _, name := range topicNames { - topic := c.Topics[name] - - for _, p := range topic.Partitions { - for _, id := range p.Replicas { - brokers[id] = struct{}{} - } - } - - for id := range brokers { - brokerIDs = append(brokerIDs, id) - } - - fmt.Fprintf(tw, " %s\t%d\t%s\n", topic.Name, len(topic.Partitions), formatBrokerIDs(brokerIDs, -1)) - - for id := range brokers { - delete(brokers, id) - } - - brokerIDs = brokerIDs[:0] - } - - tw.Flush() - fmt.Fprintln(w) - - if w.Flag('+') { - for _, name := range topicNames { - fmt.Fprintf(w, " TOPIC: %q\n\n", name) - - tw.Init(w, 0, 8, 2, ' ', 0) - fmt.Fprint(tw, " PARTITION\tREPLICAS\tISR\tOFFLINE\n") - - for _, p := range c.Topics[name].Partitions { - fmt.Fprintf(tw, " %d\t%s\t%s\t%s\n", p.ID, - formatBrokerIDs(p.Replicas, -1), - formatBrokerIDs(p.ISR, p.Leader), - formatBrokerIDs(p.Offline, -1), - ) - } - - tw.Flush() - fmt.Fprintln(w) - } - } -} - -func formatBrokerIDs(brokerIDs []int32, leader int32) string { - if len(brokerIDs) == 0 { - return "" - } - - if len(brokerIDs) == 1 { - return itoa(brokerIDs[0]) - } - - sort.Slice(brokerIDs, func(i, j int) bool { - id1 := brokerIDs[i] - id2 := brokerIDs[j] - - if id1 == leader { - return true - } - - if id2 == leader { - return false - } - - return id1 < id2 - }) - - brokerNames := make([]string, len(brokerIDs)) - - for i, id := range brokerIDs { - brokerNames[i] = itoa(id) - } - - return strings.Join(brokerNames, ",") -} - -var ( - _ fmt.Formatter = Cluster{} -) diff --git a/tap/extensions/kafka/decode.go b/tap/extensions/kafka/decode.go index 252b31493..c27bd0f77 100644 --- a/tap/extensions/kafka/decode.go +++ b/tap/extensions/kafka/decode.go @@ -1,7 +1,6 @@ package kafka import ( - "bytes" "encoding/binary" "fmt" "hash/crc32" @@ -9,8 +8,6 @@ import ( "io/ioutil" "reflect" "strings" - "sync" - "sync/atomic" ) type discarder interface { @@ -26,15 +23,6 @@ type decoder struct { crc32 uint32 } -func (d *decoder) Reset(r io.Reader, n int) { - d.reader = r - d.remain = n - d.buffer = [8]byte{} - d.err = nil - d.table = nil - d.crc32 = 0 -} - func (d *decoder) Read(b []byte) (int, error) { if d.err != nil { return 0, d.err @@ -483,52 +471,3 @@ func decodeReadInt32(b []byte) int32 { func decodeReadInt64(b []byte) int64 { return int64(binary.BigEndian.Uint64(b)) } - -func Unmarshal(data []byte, version int16, value interface{}) error { - typ := elemTypeOf(value) - cache, _ := unmarshalers.Load().(map[versionedType]decodeFunc) - key := versionedType{typ: typ, version: version} - decode := cache[key] - - if decode == nil { - decode = decodeFuncOf(reflect.TypeOf(value).Elem(), version, false, structTag{ - MinVersion: -1, - MaxVersion: -1, - TagID: -2, - Compact: true, - Nullable: true, - }) - - newCache := make(map[versionedType]decodeFunc, len(cache)+1) - newCache[key] = decode - - for typ, fun := range cache { - newCache[typ] = fun - } - - unmarshalers.Store(newCache) - } - - d, _ := decoders.Get().(*decoder) - if d == nil { - d = &decoder{reader: bytes.NewReader(nil)} - } - - d.remain = len(data) - r, _ := d.reader.(*bytes.Reader) - r.Reset(data) - - defer func() { - r.Reset(nil) - d.Reset(r, 0) - decoders.Put(d) - }() - - decode(d, valueOf(value)) - return dontExpectEOF(d.err) -} - -var ( - decoders sync.Pool // *decoder - unmarshalers atomic.Value // map[versionedType]decodeFunc -) diff --git a/tap/extensions/kafka/discard.go b/tap/extensions/kafka/discard.go deleted file mode 100644 index dc131b8c2..000000000 --- a/tap/extensions/kafka/discard.go +++ /dev/null @@ -1,16 +0,0 @@ -package kafka - -import "bufio" - -func discardN(r *bufio.Reader, sz int, n int) (int, error) { - var err error - if n <= sz { - n, err = r.Discard(n) - } else { - n, err = r.Discard(sz) - if err == nil { - err = errShortRead - } - } - return sz - n, err -} diff --git a/tap/extensions/kafka/encode.go b/tap/extensions/kafka/encode.go deleted file mode 100644 index 0c2aedac8..000000000 --- a/tap/extensions/kafka/encode.go +++ /dev/null @@ -1,537 +0,0 @@ -package kafka - -import ( - "bytes" - "encoding/binary" - "hash/crc32" - "io" - "reflect" - "sync" - "sync/atomic" -) - -type encoder struct { - writer io.Writer - err error - table *crc32.Table - crc32 uint32 - buffer [32]byte -} - -type encoderChecksum struct { - reader io.Reader - encoder *encoder -} - -func (e *encoderChecksum) Read(b []byte) (int, error) { - n, err := e.reader.Read(b) - if n > 0 { - e.encoder.update(b[:n]) - } - return n, err -} - -func (e *encoder) Reset(w io.Writer) { - e.writer = w - e.err = nil - e.table = nil - e.crc32 = 0 - e.buffer = [32]byte{} -} - -func (e *encoder) ReadFrom(r io.Reader) (int64, error) { - if e.table != nil { - r = &encoderChecksum{ - reader: r, - encoder: e, - } - } - return io.Copy(e.writer, r) -} - -func (e *encoder) Write(b []byte) (int, error) { - if e.err != nil { - return 0, e.err - } - n, err := e.writer.Write(b) - if n > 0 { - e.update(b[:n]) - } - if err != nil { - e.err = err - } - return n, err -} - -func (e *encoder) WriteByte(b byte) error { - e.buffer[0] = b - _, err := e.Write(e.buffer[:1]) - return err -} - -func (e *encoder) WriteString(s string) (int, error) { - // This implementation is an optimization to avoid the heap allocation that - // would occur when converting the string to a []byte to call crc32.Update. - // - // Strings are rarely long in the kafka protocol, so the use of a 32 byte - // buffer is a good comprise between keeping the encoder value small and - // limiting the number of calls to Write. - // - // We introduced this optimization because memory profiles on the benchmarks - // showed that most heap allocations were caused by this code path. - n := 0 - - for len(s) != 0 { - c := copy(e.buffer[:], s) - w, err := e.Write(e.buffer[:c]) - n += w - if err != nil { - return n, err - } - s = s[c:] - } - - return n, nil -} - -func (e *encoder) update(b []byte) { - if e.table != nil { - e.crc32 = crc32.Update(e.crc32, e.table, b) - } -} - -func (e *encoder) encodeBool(v value) { - b := int8(0) - if v.bool() { - b = 1 - } - e.writeInt8(b) -} - -func (e *encoder) encodeInt8(v value) { - e.writeInt8(v.int8()) -} - -func (e *encoder) encodeInt16(v value) { - e.writeInt16(v.int16()) -} - -func (e *encoder) encodeInt32(v value) { - e.writeInt32(v.int32()) -} - -func (e *encoder) encodeInt64(v value) { - e.writeInt64(v.int64()) -} - -func (e *encoder) encodeString(v value) { - e.writeString(v.string()) -} - -func (e *encoder) encodeCompactString(v value) { - e.writeCompactString(v.string()) -} - -func (e *encoder) encodeNullString(v value) { - e.writeNullString(v.string()) -} - -func (e *encoder) encodeCompactNullString(v value) { - e.writeCompactNullString(v.string()) -} - -func (e *encoder) encodeBytes(v value) { - e.writeBytes(v.bytes()) -} - -func (e *encoder) encodeCompactBytes(v value) { - e.writeCompactBytes(v.bytes()) -} - -func (e *encoder) encodeNullBytes(v value) { - e.writeNullBytes(v.bytes()) -} - -func (e *encoder) encodeCompactNullBytes(v value) { - e.writeCompactNullBytes(v.bytes()) -} - -func (e *encoder) encodeArray(v value, elemType reflect.Type, encodeElem encodeFunc) { - a := v.array(elemType) - n := a.length() - e.writeInt32(int32(n)) - - for i := 0; i < n; i++ { - encodeElem(e, a.index(i)) - } -} - -func (e *encoder) encodeCompactArray(v value, elemType reflect.Type, encodeElem encodeFunc) { - a := v.array(elemType) - n := a.length() - e.writeUnsignedVarInt(uint64(n + 1)) - - for i := 0; i < n; i++ { - encodeElem(e, a.index(i)) - } -} - -func (e *encoder) encodeNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) { - a := v.array(elemType) - if a.isNil() { - e.writeInt32(-1) - return - } - - n := a.length() - e.writeInt32(int32(n)) - - for i := 0; i < n; i++ { - encodeElem(e, a.index(i)) - } -} - -func (e *encoder) encodeCompactNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) { - a := v.array(elemType) - if a.isNil() { - e.writeUnsignedVarInt(0) - return - } - - n := a.length() - e.writeUnsignedVarInt(uint64(n + 1)) - for i := 0; i < n; i++ { - encodeElem(e, a.index(i)) - } -} - -func (e *encoder) writeInt8(i int8) { - writeInt8(e.buffer[:1], i) - _, _ = e.Write(e.buffer[:1]) -} - -func (e *encoder) writeInt16(i int16) { - writeInt16(e.buffer[:2], i) - _, _ = e.Write(e.buffer[:2]) -} - -func (e *encoder) writeInt32(i int32) { - writeInt32(e.buffer[:4], i) - _, _ = e.Write(e.buffer[:4]) -} - -func (e *encoder) writeInt64(i int64) { - writeInt64(e.buffer[:8], i) - _, _ = e.Write(e.buffer[:8]) -} - -func (e *encoder) writeString(s string) { - e.writeInt16(int16(len(s))) - _, _ = e.WriteString(s) -} - -func (e *encoder) writeCompactString(s string) { - e.writeUnsignedVarInt(uint64(len(s)) + 1) - _, _ = e.WriteString(s) -} - -func (e *encoder) writeNullString(s string) { - if s == "" { - e.writeInt16(-1) - } else { - e.writeInt16(int16(len(s))) - _, _ = e.WriteString(s) - } -} - -func (e *encoder) writeCompactNullString(s string) { - if s == "" { - e.writeUnsignedVarInt(0) - } else { - e.writeUnsignedVarInt(uint64(len(s)) + 1) - _, _ = e.WriteString(s) - } -} - -func (e *encoder) writeBytes(b []byte) { - e.writeInt32(int32(len(b))) - _, _ = e.Write(b) -} - -func (e *encoder) writeCompactBytes(b []byte) { - e.writeUnsignedVarInt(uint64(len(b)) + 1) - _, _ = e.Write(b) -} - -func (e *encoder) writeNullBytes(b []byte) { - if b == nil { - e.writeInt32(-1) - } else { - e.writeInt32(int32(len(b))) - _, _ = e.Write(b) - } -} - -func (e *encoder) writeCompactNullBytes(b []byte) { - if b == nil { - e.writeUnsignedVarInt(0) - } else { - e.writeUnsignedVarInt(uint64(len(b)) + 1) - _, _ = e.Write(b) - } -} - -func (e *encoder) writeUnsignedVarInt(i uint64) { - b := e.buffer[:] - n := 0 - - for i >= 0x80 && n < len(b) { - b[n] = byte(i) | 0x80 - i >>= 7 - n++ - } - - if n < len(b) { - b[n] = byte(i) - n++ - } - - _, _ = e.Write(b[:n]) -} - -type encodeFunc func(*encoder, value) - -var ( - _ io.ReaderFrom = (*encoder)(nil) - _ io.Writer = (*encoder)(nil) - _ io.ByteWriter = (*encoder)(nil) - _ io.StringWriter = (*encoder)(nil) - - writerTo = reflect.TypeOf((*io.WriterTo)(nil)).Elem() -) - -func encodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) encodeFunc { - if reflect.PtrTo(typ).Implements(writerTo) { - return writerEncodeFuncOf(typ) - } - switch typ.Kind() { - case reflect.Bool: - return (*encoder).encodeBool - case reflect.Int8: - return (*encoder).encodeInt8 - case reflect.Int16: - return (*encoder).encodeInt16 - case reflect.Int32: - return (*encoder).encodeInt32 - case reflect.Int64: - return (*encoder).encodeInt64 - case reflect.String: - return stringEncodeFuncOf(flexible, tag) - case reflect.Struct: - return structEncodeFuncOf(typ, version, flexible) - case reflect.Slice: - if typ.Elem().Kind() == reflect.Uint8 { // []byte - return bytesEncodeFuncOf(flexible, tag) - } - return arrayEncodeFuncOf(typ, version, flexible, tag) - default: - panic("unsupported type: " + typ.String()) - } -} - -func stringEncodeFuncOf(flexible bool, tag structTag) encodeFunc { - switch { - case flexible && tag.Nullable: - // In flexible messages, all strings are compact - return (*encoder).encodeCompactNullString - case flexible: - // In flexible messages, all strings are compact - return (*encoder).encodeCompactString - case tag.Nullable: - return (*encoder).encodeNullString - default: - return (*encoder).encodeString - } -} - -func bytesEncodeFuncOf(flexible bool, tag structTag) encodeFunc { - switch { - case flexible && tag.Nullable: - // In flexible messages, all arrays are compact - return (*encoder).encodeCompactNullBytes - case flexible: - // In flexible messages, all arrays are compact - return (*encoder).encodeCompactBytes - case tag.Nullable: - return (*encoder).encodeNullBytes - default: - return (*encoder).encodeBytes - } -} - -func structEncodeFuncOf(typ reflect.Type, version int16, flexible bool) encodeFunc { - type field struct { - encode encodeFunc - index index - tagID int - } - - var fields []field - var taggedFields []field - - forEachStructField(typ, func(typ reflect.Type, index index, tag string) { - if typ.Size() != 0 { // skip struct{} - forEachStructTag(tag, func(tag structTag) bool { - if tag.MinVersion <= version && version <= tag.MaxVersion { - f := field{ - encode: encodeFuncOf(typ, version, flexible, tag), - index: index, - tagID: tag.TagID, - } - - if tag.TagID < -1 { - // Normal required field - fields = append(fields, f) - } else { - // Optional tagged field (flexible messages only) - taggedFields = append(taggedFields, f) - } - return false - } - return true - }) - } - }) - - return func(e *encoder, v value) { - for i := range fields { - f := &fields[i] - f.encode(e, v.fieldByIndex(f.index)) - } - - if flexible { - // See https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields - // for details of tag buffers in "flexible" messages. - e.writeUnsignedVarInt(uint64(len(taggedFields))) - - for i := range taggedFields { - f := &taggedFields[i] - e.writeUnsignedVarInt(uint64(f.tagID)) - - buf := &bytes.Buffer{} - se := &encoder{writer: buf} - f.encode(se, v.fieldByIndex(f.index)) - e.writeUnsignedVarInt(uint64(buf.Len())) - _, _ = e.Write(buf.Bytes()) - } - } - } -} - -func arrayEncodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) encodeFunc { - elemType := typ.Elem() - elemFunc := encodeFuncOf(elemType, version, flexible, tag) - switch { - case flexible && tag.Nullable: - // In flexible messages, all arrays are compact - return func(e *encoder, v value) { e.encodeCompactNullArray(v, elemType, elemFunc) } - case flexible: - // In flexible messages, all arrays are compact - return func(e *encoder, v value) { e.encodeCompactArray(v, elemType, elemFunc) } - case tag.Nullable: - return func(e *encoder, v value) { e.encodeNullArray(v, elemType, elemFunc) } - default: - return func(e *encoder, v value) { e.encodeArray(v, elemType, elemFunc) } - } -} - -func writerEncodeFuncOf(typ reflect.Type) encodeFunc { - typ = reflect.PtrTo(typ) - return func(e *encoder, v value) { - // Optimization to write directly into the buffer when the encoder - // does no need to compute a crc32 checksum. - w := io.Writer(e) - if e.table == nil { - w = e.writer - } - _, err := v.iface(typ).(io.WriterTo).WriteTo(w) - if err != nil { - e.err = err - } - } -} - -func writeInt8(b []byte, i int8) { - b[0] = byte(i) -} - -func writeInt16(b []byte, i int16) { - binary.BigEndian.PutUint16(b, uint16(i)) -} - -func writeInt32(b []byte, i int32) { - binary.BigEndian.PutUint32(b, uint32(i)) -} - -func writeInt64(b []byte, i int64) { - binary.BigEndian.PutUint64(b, uint64(i)) -} - -func Marshal(version int16, value interface{}) ([]byte, error) { - typ := typeOf(value) - cache, _ := marshalers.Load().(map[versionedType]encodeFunc) - key := versionedType{typ: typ, version: version} - encode := cache[key] - - if encode == nil { - encode = encodeFuncOf(reflect.TypeOf(value), version, false, structTag{ - MinVersion: -1, - MaxVersion: -1, - TagID: -2, - Compact: true, - Nullable: true, - }) - - newCache := make(map[versionedType]encodeFunc, len(cache)+1) - newCache[key] = encode - - for typ, fun := range cache { - newCache[typ] = fun - } - - marshalers.Store(newCache) - } - - e, _ := encoders.Get().(*encoder) - if e == nil { - e = &encoder{writer: new(bytes.Buffer)} - } - - b, _ := e.writer.(*bytes.Buffer) - defer func() { - b.Reset() - e.Reset(b) - encoders.Put(e) - }() - - encode(e, nonAddressableValueOf(value)) - - if e.err != nil { - return nil, e.err - } - - buf := b.Bytes() - out := make([]byte, len(buf)) - copy(out, buf) - return out, nil -} - -type versionedType struct { - typ _type - version int16 -} - -var ( - encoders sync.Pool // *encoder - marshalers atomic.Value // map[versionedType]encodeFunc -) diff --git a/tap/extensions/kafka/error.go b/tap/extensions/kafka/error.go deleted file mode 100644 index 706b7a7f3..000000000 --- a/tap/extensions/kafka/error.go +++ /dev/null @@ -1,91 +0,0 @@ -package kafka - -import ( - "fmt" -) - -// Error represents client-side protocol errors. -type Error string - -func (e Error) Error() string { return string(e) } - -func Errorf(msg string, args ...interface{}) Error { - return Error(fmt.Sprintf(msg, args...)) -} - -const ( - // ErrNoTopic is returned when a request needs to be sent to a specific - ErrNoTopic Error = "topic not found" - - // ErrNoPartition is returned when a request needs to be sent to a specific - // partition, but the client did not find it in the cluster metadata. - ErrNoPartition Error = "topic partition not found" - - // ErrNoLeader is returned when a request needs to be sent to a partition - // leader, but the client could not determine what the leader was at this - // time. - ErrNoLeader Error = "topic partition has no leader" - - // ErrNoRecord is returned when attempting to write a message containing an - // empty record set (which kafka forbids). - // - // We handle this case client-side because kafka will close the connection - // that it received an empty produce request on, causing all concurrent - // requests to be aborted. - ErrNoRecord Error = "record set contains no records" - - // ErrNoReset is returned by ResetRecordReader when the record reader does - // not support being reset. - ErrNoReset Error = "record sequence does not support reset" -) - -type TopicError struct { - Topic string - Err error -} - -func NewTopicError(topic string, err error) *TopicError { - return &TopicError{Topic: topic, Err: err} -} - -func NewErrNoTopic(topic string) *TopicError { - return NewTopicError(topic, ErrNoTopic) -} - -func (e *TopicError) Error() string { - return fmt.Sprintf("%v (topic=%q)", e.Err, e.Topic) -} - -func (e *TopicError) Unwrap() error { - return e.Err -} - -type TopicPartitionError struct { - Topic string - Partition int32 - Err error -} - -func NewTopicPartitionError(topic string, partition int32, err error) *TopicPartitionError { - return &TopicPartitionError{ - Topic: topic, - Partition: partition, - Err: err, - } -} - -func NewErrNoPartition(topic string, partition int32) *TopicPartitionError { - return NewTopicPartitionError(topic, partition, ErrNoPartition) -} - -func NewErrNoLeader(topic string, partition int32) *TopicPartitionError { - return NewTopicPartitionError(topic, partition, ErrNoLeader) -} - -func (e *TopicPartitionError) Error() string { - return fmt.Sprintf("%v (topic=%q partition=%d)", e.Err, e.Topic, e.Partition) -} - -func (e *TopicPartitionError) Unwrap() error { - return e.Err -} diff --git a/tap/extensions/kafka/go.mod b/tap/extensions/kafka/go.mod index 93768c88a..d11bba41e 100644 --- a/tap/extensions/kafka/go.mod +++ b/tap/extensions/kafka/go.mod @@ -6,14 +6,18 @@ require ( github.com/fatih/camelcase v1.0.0 github.com/ohler55/ojg v1.12.12 github.com/segmentio/kafka-go v0.4.27 + github.com/stretchr/testify v1.6.1 github.com/up9inc/mizu/tap/api v0.0.0 ) require ( + github.com/davecgh/go-spew v1.1.0 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/martian v2.1.0+incompatible // indirect github.com/klauspost/compress v1.14.2 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api diff --git a/tap/extensions/kafka/go.sum b/tap/extensions/kafka/go.sum index c282bd650..534f46926 100644 --- a/tap/extensions/kafka/go.sum +++ b/tap/extensions/kafka/go.sum @@ -1,3 +1,4 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= @@ -25,10 +26,12 @@ github.com/ohler55/ojg v1.12.12/go.mod h1:LBbIVRAgoFbYBXQhRhuEpaJIqq+goSO63/FQ+n github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/segmentio/kafka-go v0.4.27 h1:sIhEozeL/TLN2mZ5dkG462vcGEWYKS+u31sXPjKhAM4= github.com/segmentio/kafka-go v0.4.27/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= @@ -40,5 +43,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tap/extensions/kafka/helpers.go b/tap/extensions/kafka/helpers.go index 6bba27eb2..d53c04fa4 100644 --- a/tap/extensions/kafka/helpers.go +++ b/tap/extensions/kafka/helpers.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "reflect" + "sort" "strconv" "strings" @@ -897,6 +898,10 @@ func representMapAsTable(mapData map[string]interface{}, selectorPrefix string, }) } + sort.Slice(table, func(i, j int) bool { + return table[i].Name < table[j].Name + }) + obj, _ := json.Marshal(table) representation = string(obj) return diff --git a/tap/extensions/kafka/main_test.go b/tap/extensions/kafka/main_test.go new file mode 100644 index 000000000..11dec2945 --- /dev/null +++ b/tap/extensions/kafka/main_test.go @@ -0,0 +1,290 @@ +package kafka + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/up9inc/mizu/tap/api" +) + +const ( + binDir = "bin" + patternBin = "*_req.bin" + patternDissect = "*.json" + msgDissecting = "Dissecting:" + msgAnalyzing = "Analyzing:" + msgRepresenting = "Representing:" + respSuffix = "_res.bin" + expectDir = "expect" + dissectDir = "dissect" + analyzeDir = "analyze" + representDir = "represent" + testUpdate = "TEST_UPDATE" +) + +func TestRegister(t *testing.T) { + dissector := NewDissector() + extension := &api.Extension{} + dissector.Register(extension) + assert.Equal(t, "kafka", extension.Protocol.Name) +} + +func TestMacros(t *testing.T) { + expectedMacros := map[string]string{ + "kafka": `proto.name == "kafka"`, + } + dissector := NewDissector() + macros := dissector.Macros() + assert.Equal(t, expectedMacros, macros) +} + +func TestPing(t *testing.T) { + dissector := NewDissector() + dissector.Ping() +} + +func TestDissect(t *testing.T) { + _, testUpdateEnabled := os.LookupEnv(testUpdate) + + expectDirDissect := path.Join(expectDir, dissectDir) + + if testUpdateEnabled { + os.RemoveAll(expectDirDissect) + err := os.MkdirAll(expectDirDissect, 0775) + assert.Nil(t, err) + } + + dissector := NewDissector() + paths, err := filepath.Glob(path.Join(binDir, patternBin)) + if err != nil { + log.Fatal(err) + } + + options := &api.TrafficFilteringOptions{ + IgnoredUserAgents: []string{}, + } + + for _, _path := range paths { + basePath := _path[:len(_path)-8] + + // Channel to verify the output + itemChannel := make(chan *api.OutputChannelItem) + var emitter api.Emitter = &api.Emitting{ + AppStats: &api.AppStats{}, + OutputChannel: itemChannel, + } + + var items []*api.OutputChannelItem + stop := make(chan bool) + + go func() { + for { + select { + case <-stop: + return + case item := <-itemChannel: + items = append(items, item) + } + } + }() + + // Stream level + counterPair := &api.CounterPair{ + Request: 0, + Response: 0, + } + superIdentifier := &api.SuperIdentifier{} + + // Request + pathClient := _path + fmt.Printf("%s %s\n", msgDissecting, pathClient) + fileClient, err := os.Open(pathClient) + assert.Nil(t, err) + + bufferClient := bufio.NewReader(fileClient) + tcpIDClient := &api.TcpID{ + SrcIP: "1", + DstIP: "2", + SrcPort: "1", + DstPort: "2", + } + reqResMatcher := dissector.NewResponseRequestMatcher() + reqResMatcher.SetMaxTry(10) + err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + log.Println(err) + } + + // Response + pathServer := basePath + respSuffix + fmt.Printf("%s %s\n", msgDissecting, pathServer) + fileServer, err := os.Open(pathServer) + assert.Nil(t, err) + + bufferServer := bufio.NewReader(fileServer) + tcpIDServer := &api.TcpID{ + SrcIP: "2", + DstIP: "1", + SrcPort: "2", + DstPort: "1", + } + err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + log.Println(err) + } + + fileClient.Close() + fileServer.Close() + + pathExpect := path.Join(expectDirDissect, fmt.Sprintf("%s.json", basePath[4:])) + + time.Sleep(10 * time.Millisecond) + + stop <- true + + marshaled, err := json.Marshal(items) + assert.Nil(t, err) + + if testUpdateEnabled { + if len(items) > 0 { + err = os.WriteFile(pathExpect, marshaled, 0644) + assert.Nil(t, err) + } + } else { + if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) { + assert.Len(t, items, 0) + } else { + expectedBytes, err := ioutil.ReadFile(pathExpect) + assert.Nil(t, err) + + assert.JSONEq(t, string(expectedBytes), string(marshaled)) + } + } + } +} + +func TestAnalyze(t *testing.T) { + _, testUpdateEnabled := os.LookupEnv(testUpdate) + + expectDirDissect := path.Join(expectDir, dissectDir) + expectDirAnalyze := path.Join(expectDir, analyzeDir) + + if testUpdateEnabled { + os.RemoveAll(expectDirAnalyze) + err := os.MkdirAll(expectDirAnalyze, 0775) + assert.Nil(t, err) + } + + dissector := NewDissector() + paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect)) + if err != nil { + log.Fatal(err) + } + + for _, _path := range paths { + fmt.Printf("%s %s\n", msgAnalyzing, _path) + + bytes, err := ioutil.ReadFile(_path) + assert.Nil(t, err) + + var items []*api.OutputChannelItem + err = json.Unmarshal(bytes, &items) + assert.Nil(t, err) + + var entries []*api.Entry + for _, item := range items { + entry := dissector.Analyze(item, "", "", "") + entries = append(entries, entry) + } + + pathExpect := path.Join(expectDirAnalyze, filepath.Base(_path)) + + marshaled, err := json.Marshal(entries) + assert.Nil(t, err) + + if testUpdateEnabled { + if len(entries) > 0 { + err = os.WriteFile(pathExpect, marshaled, 0644) + assert.Nil(t, err) + } + } else { + if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) { + assert.Len(t, items, 0) + } else { + expectedBytes, err := ioutil.ReadFile(pathExpect) + assert.Nil(t, err) + + assert.JSONEq(t, string(expectedBytes), string(marshaled)) + } + } + } +} + +func TestRepresent(t *testing.T) { + _, testUpdateEnabled := os.LookupEnv(testUpdate) + + expectDirAnalyze := path.Join(expectDir, analyzeDir) + expectDirRepresent := path.Join(expectDir, representDir) + + if testUpdateEnabled { + os.RemoveAll(expectDirRepresent) + err := os.MkdirAll(expectDirRepresent, 0775) + assert.Nil(t, err) + } + + dissector := NewDissector() + paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect)) + if err != nil { + log.Fatal(err) + } + + for _, _path := range paths { + fmt.Printf("%s %s\n", msgRepresenting, _path) + + bytes, err := ioutil.ReadFile(_path) + assert.Nil(t, err) + + var entries []*api.Entry + err = json.Unmarshal(bytes, &entries) + assert.Nil(t, err) + + var objects []string + for _, entry := range entries { + object, _, err := dissector.Represent(entry.Request, entry.Response) + assert.Nil(t, err) + objects = append(objects, string(object)) + } + + pathExpect := path.Join(expectDirRepresent, filepath.Base(_path)) + + marshaled, err := json.Marshal(objects) + assert.Nil(t, err) + + if testUpdateEnabled { + if len(objects) > 0 { + err = os.WriteFile(pathExpect, marshaled, 0644) + assert.Nil(t, err) + } + } else { + if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) { + assert.Len(t, objects, 0) + } else { + expectedBytes, err := ioutil.ReadFile(pathExpect) + assert.Nil(t, err) + + assert.JSONEq(t, string(expectedBytes), string(marshaled)) + } + } + } +} diff --git a/tap/extensions/kafka/matcher.go b/tap/extensions/kafka/matcher.go index d5c77618a..bc3f22e60 100644 --- a/tap/extensions/kafka/matcher.go +++ b/tap/extensions/kafka/matcher.go @@ -7,8 +7,6 @@ import ( "github.com/up9inc/mizu/tap/api" ) -const maxTry int = 3000 - type RequestResponsePair struct { Request Request Response Response @@ -17,16 +15,21 @@ type RequestResponsePair struct { // Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id} type requestResponseMatcher struct { openMessagesMap *sync.Map + maxTry int } func createResponseRequestMatcher() api.RequestResponseMatcher { - return &requestResponseMatcher{openMessagesMap: &sync.Map{}} + return &requestResponseMatcher{openMessagesMap: &sync.Map{}, maxTry: 3000} } func (matcher *requestResponseMatcher) GetMap() *sync.Map { return matcher.openMessagesMap } +func (matcher *requestResponseMatcher) SetMaxTry(value int) { + matcher.maxTry = value +} + func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair { if response, found := matcher.openMessagesMap.LoadAndDelete(key); found { // Check for a situation that only occurs when a Kafka broker is initiating @@ -44,7 +47,7 @@ func (matcher *requestResponseMatcher) registerResponse(key string, response *Re try := 0 for { try++ - if try > maxTry { + if try > matcher.maxTry { return nil } if request, found := matcher.openMessagesMap.LoadAndDelete(key); found { diff --git a/tap/extensions/kafka/protocol.go b/tap/extensions/kafka/protocol.go index 71457ee55..916a40461 100644 --- a/tap/extensions/kafka/protocol.go +++ b/tap/extensions/kafka/protocol.go @@ -3,7 +3,6 @@ package kafka import ( "fmt" "io" - "net" "reflect" "strconv" "strings" @@ -27,29 +26,20 @@ func (k ApiKey) String() string { return strconv.Itoa(int(k)) } -func (k ApiKey) MinVersion() int16 { return k.apiType().minVersion() } - -func (k ApiKey) MaxVersion() int16 { return k.apiType().maxVersion() } - -func (k ApiKey) SelectVersion(minVersion, maxVersion int16) int16 { - min := k.MinVersion() - max := k.MaxVersion() - switch { - case min > maxVersion: - return min - case max < maxVersion: - return max - default: - return maxVersion - } -} - -func (k ApiKey) apiType() apiType { - if i := int(k); i >= 0 && i < len(apiTypes) { - return apiTypes[i] - } - return apiType{} -} +const ( + // v0 = 0 + v1 = 1 + v2 = 2 + v3 = 3 + v4 = 4 + v5 = 5 + v6 = 6 + v7 = 7 + v8 = 8 + v9 = 9 + v10 = 10 + v11 = 11 +) const ( Produce ApiKey = 0 @@ -164,48 +154,6 @@ type messageType struct { flexible bool gotype reflect.Type decode decodeFunc - encode encodeFunc -} - -type apiType struct { - requests []messageType - responses []messageType -} - -func (t apiType) minVersion() int16 { - if len(t.requests) == 0 { - return 0 - } - return t.requests[0].version -} - -func (t apiType) maxVersion() int16 { - if len(t.requests) == 0 { - return 0 - } - return t.requests[len(t.requests)-1].version -} - -var apiTypes [numApis]apiType - -// Register is automatically called by sub-packages are imported to install a -// new pair of request/response message types. -func Register(req, res Message) { - k1 := req.ApiKey() - k2 := res.ApiKey() - - if k1 != k2 { - panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2)) - } - - apiTypes[k1] = apiType{ - requests: typesOf(req), - responses: typesOf(res), - } -} - -func typesOf(v interface{}) []messageType { - return makeTypes(reflect.TypeOf(v).Elem()) } func makeTypes(t reflect.Type) []messageType { @@ -241,7 +189,6 @@ func makeTypes(t reflect.Type) []messageType { gotype: t, flexible: flexible, decode: decodeFuncOf(t, v, flexible, structTag{}), - encode: encodeFuncOf(t, v, flexible, structTag{}), }) } @@ -378,31 +325,6 @@ type Broker struct { Rack string } -func (b Broker) String() string { - return net.JoinHostPort(b.Host, itoa(b.Port)) -} - -func (b Broker) Format(w fmt.State, v rune) { - switch v { - case 'd': - _, _ = io.WriteString(w, itoa(b.ID)) - case 's': - _, _ = io.WriteString(w, b.String()) - case 'v': - _, _ = io.WriteString(w, itoa(b.ID)) - _, _ = io.WriteString(w, " ") - _, _ = io.WriteString(w, b.String()) - if b.Rack != "" { - _, _ = io.WriteString(w, " ") - _, _ = io.WriteString(w, b.Rack) - } - } -} - -func itoa(i int32) string { - return strconv.Itoa(int(i)) -} - type Topic struct { Name string Error int16 @@ -418,14 +340,6 @@ type Partition struct { Offline []int32 } -// BrokerMessage is an extension of the Message interface implemented by some -// request types to customize the broker assignment logic. -type BrokerMessage interface { - // Given a representation of the kafka cluster state as argument, returns - // the broker that the message should be routed to. - Broker(Cluster) (Broker, error) -} - // GroupMessage is an extension of the Message interface implemented by some // request types to inform the program that they should be routed to a group // coordinator. @@ -443,16 +357,6 @@ type PreparedMessage interface { Prepare(apiVersion int16) } -// Splitter is an interface implemented by messages that can be split into -// multiple requests and have their results merged back by a Merger. -type Splitter interface { - // For a given cluster layout, returns the list of messages constructed - // from the receiver for each requests that should be sent to the cluster. - // The second return value is a Merger which can be used to merge back the - // results of each request into a single message (or an error). - Split(Cluster) ([]Message, Merger, error) -} - // Merger is an interface implemented by messages which can merge multiple // results into one response. type Merger interface { @@ -461,16 +365,3 @@ type Merger interface { // values, other types should trigger a panic. Merge(messages []Message, results []interface{}) (Message, error) } - -// Result converts r to a Message or and error, or panics if r could be be -// converted to these types. -func Result(r interface{}) (Message, error) { - switch v := r.(type) { - case Message: - return v, nil - case error: - return nil, v - default: - panic(fmt.Errorf("BUG: result must be a message or an error but not %T", v)) - } -} diff --git a/tap/extensions/kafka/protocol_make.go b/tap/extensions/kafka/protocol_make.go deleted file mode 100644 index dc476a5e9..000000000 --- a/tap/extensions/kafka/protocol_make.go +++ /dev/null @@ -1,182 +0,0 @@ -package kafka - -import ( - "encoding/binary" - "fmt" - "strconv" -) - -type ApiVersion struct { - ApiKey int16 - MinVersion int16 - MaxVersion int16 -} - -func (v ApiVersion) Format(w fmt.State, r rune) { - switch r { - case 's': - fmt.Fprint(w, apiKey(v.ApiKey)) - case 'd': - switch { - case w.Flag('-'): - fmt.Fprint(w, v.MinVersion) - case w.Flag('+'): - fmt.Fprint(w, v.MaxVersion) - default: - fmt.Fprint(w, v.ApiKey) - } - case 'v': - switch { - case w.Flag('-'): - fmt.Fprintf(w, "v%d", v.MinVersion) - case w.Flag('+'): - fmt.Fprintf(w, "v%d", v.MaxVersion) - case w.Flag('#'): - fmt.Fprintf(w, "kafka.ApiVersion{ApiKey:%d MinVersion:%d MaxVersion:%d}", v.ApiKey, v.MinVersion, v.MaxVersion) - default: - fmt.Fprintf(w, "%s[v%d:v%d]", apiKey(v.ApiKey), v.MinVersion, v.MaxVersion) - } - } -} - -type apiKey int16 - -const ( - produce apiKey = 0 - fetch apiKey = 1 - listOffsets apiKey = 2 - metadata apiKey = 3 - leaderAndIsr apiKey = 4 - stopReplica apiKey = 5 - updateMetadata apiKey = 6 - controlledShutdown apiKey = 7 - offsetCommit apiKey = 8 - offsetFetch apiKey = 9 - findCoordinator apiKey = 10 - joinGroup apiKey = 11 - heartbeat apiKey = 12 - leaveGroup apiKey = 13 - syncGroup apiKey = 14 - describeGroups apiKey = 15 - listGroups apiKey = 16 - saslHandshake apiKey = 17 - apiVersions apiKey = 18 - createTopics apiKey = 19 - deleteTopics apiKey = 20 - deleteRecords apiKey = 21 - initProducerId apiKey = 22 - offsetForLeaderEpoch apiKey = 23 - addPartitionsToTxn apiKey = 24 - addOffsetsToTxn apiKey = 25 - endTxn apiKey = 26 - writeTxnMarkers apiKey = 27 - txnOffsetCommit apiKey = 28 - describeAcls apiKey = 29 - createAcls apiKey = 30 - deleteAcls apiKey = 31 - describeConfigs apiKey = 32 - alterConfigs apiKey = 33 - alterReplicaLogDirs apiKey = 34 - describeLogDirs apiKey = 35 - saslAuthenticate apiKey = 36 - createPartitions apiKey = 37 - createDelegationToken apiKey = 38 - renewDelegationToken apiKey = 39 - expireDelegationToken apiKey = 40 - describeDelegationToken apiKey = 41 - deleteGroups apiKey = 42 - electLeaders apiKey = 43 - incrementalAlterConfigs apiKey = 44 - alterPartitionReassignments apiKey = 45 - listPartitionReassignments apiKey = 46 - offsetDelete apiKey = 47 -) - -func (k apiKey) String() string { - if i := int(k); i >= 0 && i < len(apiKeyStrings) { - return apiKeyStrings[i] - } - return strconv.Itoa(int(k)) -} - -const ( - // v0 = 0 - v1 = 1 - v2 = 2 - v3 = 3 - v4 = 4 - v5 = 5 - v6 = 6 - v7 = 7 - v8 = 8 - v9 = 9 - v10 = 10 - v11 = 11 -) - -var apiKeyStrings = [...]string{ - produce: "Produce", - fetch: "Fetch", - listOffsets: "ListOffsets", - metadata: "Metadata", - leaderAndIsr: "LeaderAndIsr", - stopReplica: "StopReplica", - updateMetadata: "UpdateMetadata", - controlledShutdown: "ControlledShutdown", - offsetCommit: "OffsetCommit", - offsetFetch: "OffsetFetch", - findCoordinator: "FindCoordinator", - joinGroup: "JoinGroup", - heartbeat: "Heartbeat", - leaveGroup: "LeaveGroup", - syncGroup: "SyncGroup", - describeGroups: "DescribeGroups", - listGroups: "ListGroups", - saslHandshake: "SaslHandshake", - apiVersions: "ApiVersions", - createTopics: "CreateTopics", - deleteTopics: "DeleteTopics", - deleteRecords: "DeleteRecords", - initProducerId: "InitProducerId", - offsetForLeaderEpoch: "OffsetForLeaderEpoch", - addPartitionsToTxn: "AddPartitionsToTxn", - addOffsetsToTxn: "AddOffsetsToTxn", - endTxn: "EndTxn", - writeTxnMarkers: "WriteTxnMarkers", - txnOffsetCommit: "TxnOffsetCommit", - describeAcls: "DescribeAcls", - createAcls: "CreateAcls", - deleteAcls: "DeleteAcls", - describeConfigs: "DescribeConfigs", - alterConfigs: "AlterConfigs", - alterReplicaLogDirs: "AlterReplicaLogDirs", - describeLogDirs: "DescribeLogDirs", - saslAuthenticate: "SaslAuthenticate", - createPartitions: "CreatePartitions", - createDelegationToken: "CreateDelegationToken", - renewDelegationToken: "RenewDelegationToken", - expireDelegationToken: "ExpireDelegationToken", - describeDelegationToken: "DescribeDelegationToken", - deleteGroups: "DeleteGroups", - electLeaders: "ElectLeaders", - incrementalAlterConfigs: "IncrementalAlfterConfigs", - alterPartitionReassignments: "AlterPartitionReassignments", - listPartitionReassignments: "ListPartitionReassignments", - offsetDelete: "OffsetDelete", -} - -func makeInt8(b []byte) int8 { - return int8(b[0]) -} - -func makeInt16(b []byte) int16 { - return int16(binary.BigEndian.Uint16(b)) -} - -func makeInt32(b []byte) int32 { - return int32(binary.BigEndian.Uint32(b)) -} - -func makeInt64(b []byte) int64 { - return int64(binary.BigEndian.Uint64(b)) -} diff --git a/tap/extensions/kafka/read.go b/tap/extensions/kafka/read.go deleted file mode 100644 index 6c179b1a6..000000000 --- a/tap/extensions/kafka/read.go +++ /dev/null @@ -1,159 +0,0 @@ -package kafka - -import ( - "bufio" - "errors" - "fmt" - "io" -) - -type readable interface { - readFrom(*bufio.Reader, int) (int, error) -} - -var errShortRead = errors.New("not enough bytes available to load the response") - -func peekRead(r *bufio.Reader, sz int, n int, f func([]byte)) (int, error) { - if n > sz { - return sz, errShortRead - } - b, err := r.Peek(n) - if err != nil { - return sz, err - } - f(b) - return discardN(r, sz, n) -} - -func readInt8(r *bufio.Reader, sz int, v *int8) (int, error) { - return peekRead(r, sz, 1, func(b []byte) { *v = makeInt8(b) }) -} - -func readInt16(r *bufio.Reader, sz int, v *int16) (int, error) { - return peekRead(r, sz, 2, func(b []byte) { *v = makeInt16(b) }) -} - -func readInt32(r *bufio.Reader, sz int, v *int32) (int, error) { - return peekRead(r, sz, 4, func(b []byte) { *v = makeInt32(b) }) -} - -func readInt64(r *bufio.Reader, sz int, v *int64) (int, error) { - return peekRead(r, sz, 8, func(b []byte) { *v = makeInt64(b) }) -} - -func readString(r *bufio.Reader, sz int, v *string) (int, error) { - return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) { - *v, remain, err = readNewString(r, sz, n) - return - }) -} - -func readStringWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) { - var err error - var len int16 - - if sz, err = readInt16(r, sz, &len); err != nil { - return sz, err - } - - n := int(len) - if n > sz { - return sz, errShortRead - } - - return cb(r, sz, n) -} - -func readNewString(r *bufio.Reader, sz int, n int) (string, int, error) { - b, sz, err := readNewBytes(r, sz, n) - return string(b), sz, err -} - -func readBytes(r *bufio.Reader, sz int, v *[]byte) (int, error) { - return readBytesWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) { - *v, remain, err = readNewBytes(r, sz, n) - return - }) -} - -func readBytesWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) { - var err error - var n int - - if sz, err = readArrayLen(r, sz, &n); err != nil { - return sz, err - } - - if n > sz { - return sz, errShortRead - } - - return cb(r, sz, n) -} - -func readNewBytes(r *bufio.Reader, sz int, n int) ([]byte, int, error) { - var err error - var b []byte - var shortRead bool - - if n > 0 { - if sz < n { - n = sz - shortRead = true - } - - b = make([]byte, n) - n, err = io.ReadFull(r, b) - b = b[:n] - sz -= n - - if err == nil && shortRead { - err = errShortRead - } - } - - return b, sz, err -} - -func readArrayLen(r *bufio.Reader, sz int, n *int) (int, error) { - var err error - var len int32 - if sz, err = readInt32(r, sz, &len); err != nil { - return sz, err - } - *n = int(len) - return sz, nil -} - -func ReadAll(r *bufio.Reader, sz int, ptrs ...interface{}) (int, error) { - var err error - - for _, ptr := range ptrs { - if sz, err = readPtr(r, sz, ptr); err != nil { - break - } - } - - return sz, err -} - -func readPtr(r *bufio.Reader, sz int, ptr interface{}) (int, error) { - switch v := ptr.(type) { - case *int8: - return readInt8(r, sz, v) - case *int16: - return readInt16(r, sz, v) - case *int32: - return readInt32(r, sz, v) - case *int64: - return readInt64(r, sz, v) - case *string: - return readString(r, sz, v) - case *[]byte: - return readBytes(r, sz, v) - case readable: - return v.readFrom(r, sz) - default: - panic(fmt.Sprintf("unsupported type: %T", v)) - } -} diff --git a/tap/extensions/kafka/record.go b/tap/extensions/kafka/record.go deleted file mode 100644 index 9e88300c2..000000000 --- a/tap/extensions/kafka/record.go +++ /dev/null @@ -1,279 +0,0 @@ -package kafka - -import ( - "encoding/binary" - "io" - "time" - - "github.com/segmentio/kafka-go/compress" -) - -// Attributes is a bitset representing special attributes set on records. -type Attributes int16 - -const ( - Gzip Attributes = Attributes(compress.Gzip) // 1 - Snappy Attributes = Attributes(compress.Snappy) // 2 - Lz4 Attributes = Attributes(compress.Lz4) // 3 - Zstd Attributes = Attributes(compress.Zstd) // 4 - Transactional Attributes = 1 << 4 - Control Attributes = 1 << 5 -) - -func (a Attributes) Compression() compress.Compression { - return compress.Compression(a & 7) -} - -func (a Attributes) Transactional() bool { - return (a & Transactional) != 0 -} - -func (a Attributes) Control() bool { - return (a & Control) != 0 -} - -func (a Attributes) String() string { - s := a.Compression().String() - if a.Transactional() { - s += "+transactional" - } - if a.Control() { - s += "+control" - } - return s -} - -// Header represents a single entry in a list of record headers. -type Header struct { - Key string - Value []byte -} - -// Record is an interface representing a single kafka record. -// -// Record values are not safe to use concurrently from multiple goroutines. -type Record struct { - // The offset at which the record exists in a topic partition. This value - // is ignored in produce requests. - Offset int64 - - // Returns the time of the record. This value may be omitted in produce - // requests to let kafka set the time when it saves the record. - Time time.Time - - // Returns a byte sequence containing the key of this record. The returned - // sequence may be nil to indicate that the record has no key. If the record - // is part of a RecordSet, the content of the key must remain valid at least - // until the record set is closed (or until the key is closed). - Key Bytes - - // Returns a byte sequence containing the value of this record. The returned - // sequence may be nil to indicate that the record has no value. If the - // record is part of a RecordSet, the content of the value must remain valid - // at least until the record set is closed (or until the value is closed). - Value Bytes - - // Returns the list of headers associated with this record. The returned - // slice may be reused across calls, the program should use it as an - // immutable value. - Headers []Header -} - -// RecordSet represents a sequence of records in Produce requests and Fetch -// responses. All v0, v1, and v2 formats are supported. -type RecordSet struct { - // The message version that this record set will be represented as, valid - // values are 1, or 2. - // - // When reading, this is the value of the highest version used in the - // batches that compose the record set. - // - // When writing, this value dictates the format that the records will be - // encoded in. - Version int8 - - // Attributes set on the record set. - // - // When reading, the attributes are the combination of all attributes in - // the batches that compose the record set. - // - // When writing, the attributes apply to the whole sequence of records in - // the set. - Attributes Attributes - - // A reader exposing the sequence of records. - // - // When reading a RecordSet from an io.Reader, the Records field will be a - // *RecordStream. If the program needs to access the details of each batch - // that compose the stream, it may use type assertions to access the - // underlying types of each batch. - Records RecordReader -} - -// ReadFrom reads the representation of a record set from r into rs, returning -// the number of bytes consumed from r, and an non-nil error if the record set -// could not be read. -func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error) { - // d, _ := r.(*decoder) - // if d == nil { - // d = &decoder{ - // reader: r, - // remain: 4, - // } - // } - - // *rs = RecordSet{} - // limit := d.remain - // size := d.readInt32() - - // if d.err != nil { - // return int64(limit - d.remain), d.err - // } - - // if size <= 0 { - // return 4, nil - // } - - // stream := &RecordStream{ - // Records: make([]RecordReader, 0, 4), - // } - - // var err error - // d.remain = int(size) - - // for d.remain > 0 && err == nil { - // var version byte - - // if d.remain < (magicByteOffset + 1) { - // if len(stream.Records) != 0 { - // break - // } - // return 4, fmt.Errorf("impossible record set shorter than %d bytes", magicByteOffset+1) - // } - - // switch r := d.reader.(type) { - // case bufferedReader: - // b, err := r.Peek(magicByteOffset + 1) - // if err != nil { - // n, _ := r.Discard(len(b)) - // return 4 + int64(n), dontExpectEOF(err) - // } - // version = b[magicByteOffset] - // case bytesBuffer: - // version = r.Bytes()[magicByteOffset] - // default: - // b := make([]byte, magicByteOffset+1) - // if n, err := io.ReadFull(d.reader, b); err != nil { - // return 4 + int64(n), dontExpectEOF(err) - // } - // version = b[magicByteOffset] - // // Reconstruct the prefix that we had to read to determine the version - // // of the record set from the magic byte. - // // - // // Technically this may recurisvely stack readers when consuming all - // // items of the batch, which could hurt performance. In practice this - // // path should not be taken tho, since the decoder would read from a - // // *bufio.Reader which implements the bufferedReader interface. - // d.reader = io.MultiReader(bytes.NewReader(b), d.reader) - // } - - // var tmp RecordSet - // switch version { - // case 0, 1: - // err = tmp.readFromVersion1(d) - // case 2: - // err = tmp.readFromVersion2(d) - // default: - // err = fmt.Errorf("unsupported message version %d for message of size %d", version, size) - // } - - // if tmp.Version > rs.Version { - // rs.Version = tmp.Version - // } - - // rs.Attributes |= tmp.Attributes - - // if tmp.Records != nil { - // stream.Records = append(stream.Records, tmp.Records) - // } - // } - - // if len(stream.Records) != 0 { - // rs.Records = stream - // // Ignore errors if we've successfully read records, so the - // // program can keep making progress. - // err = nil - // } - - // d.discardAll() - // rn := 4 + (int(size) - d.remain) - // d.remain = limit - rn - // return int64(rn), err - return 0, nil -} - -// WriteTo writes the representation of rs into w. The value of rs.Version -// dictates which format that the record set will be represented as. -// -// The error will be ErrNoRecord if rs contained no records. -// -// Note: since this package is only compatible with kafka 0.10 and above, the -// method never produces messages in version 0. If rs.Version is zero, the -// method defaults to producing messages in version 1. -func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) { - // if rs.Records == nil { - // return 0, ErrNoRecord - // } - - // // This optimization avoids rendering the record set in an intermediary - // // buffer when the writer is already a pageBuffer, which is a common case - // // due to the way WriteRequest and WriteResponse are implemented. - // buffer, _ := w.(*pageBuffer) - // bufferOffset := int64(0) - - // if buffer != nil { - // bufferOffset = buffer.Size() - // } else { - // buffer = newPageBuffer() - // defer buffer.unref() - // } - - // size := packUint32(0) - // buffer.Write(size[:]) // size placeholder - - // var err error - // switch rs.Version { - // case 0, 1: - // err = rs.writeToVersion1(buffer, bufferOffset+4) - // case 2: - // err = rs.writeToVersion2(buffer, bufferOffset+4) - // default: - // err = fmt.Errorf("unsupported record set version %d", rs.Version) - // } - // if err != nil { - // return 0, err - // } - - // n := buffer.Size() - bufferOffset - // if n == 0 { - // size = packUint32(^uint32(0)) - // } else { - // size = packUint32(uint32(n) - 4) - // } - // buffer.WriteAt(size[:], bufferOffset) - - // // This condition indicates that the output writer received by `WriteTo` was - // // not a *pageBuffer, in which case we need to flush the buffered records - // // data into it. - // if buffer != w { - // return buffer.WriteTo(w) - // } - - // return n, nil - return 0, nil -} - -func packUint32(u uint32) (b [4]byte) { - binary.BigEndian.PutUint32(b[:], u) - return -} diff --git a/tap/extensions/kafka/record_bytes.go b/tap/extensions/kafka/record_bytes.go deleted file mode 100644 index acc5a76fc..000000000 --- a/tap/extensions/kafka/record_bytes.go +++ /dev/null @@ -1,43 +0,0 @@ -package kafka - -import ( - "github.com/segmentio/kafka-go/protocol" -) - -// Header is a key/value pair type representing headers set on records. -// type Header = protocol.Header - -// Bytes is an interface representing a sequence of bytes. This abstraction -// makes it possible for programs to inject data into produce requests without -// having to load in into an intermediary buffer, or read record keys and values -// from a fetch response directly from internal buffers. -// -// Bytes are not safe to use concurrently from multiple goroutines. -// type Bytes = protocol.Bytes - -// NewBytes constructs a Bytes value from a byte slice. -// -// If b is nil, nil is returned. -// func NewBytes(b []byte) Bytes { return protocol.NewBytes(b) } - -// ReadAll reads b into a byte slice. -// func ReadAll(b Bytes) ([]byte, error) { return protocol.ReadAll(b) } - -// Record is an interface representing a single kafka record. -// -// Record values are not safe to use concurrently from multiple goroutines. -// type Record = protocol.Record - -// RecordReader is an interface representing a sequence of records. Record sets -// are used in both produce and fetch requests to represent the sequence of -// records that are sent to or receive from kafka brokers. -// -// RecordReader values are not safe to use concurrently from multiple goroutines. -type RecordReader = protocol.RecordReader - -// NewRecordReade rconstructs a RecordSet which exposes the sequence of records -// passed as arguments. -func NewRecordReader(records ...Record) RecordReader { - // return protocol.NewRecordReader(records...) - return nil -} diff --git a/tap/extensions/kafka/reflect.go b/tap/extensions/kafka/reflect.go index 763553760..be87fc574 100644 --- a/tap/extensions/kafka/reflect.go +++ b/tap/extensions/kafka/reflect.go @@ -8,46 +8,14 @@ import ( type index []int -type _type struct{ typ reflect.Type } - -func typeOf(x interface{}) _type { - return makeType(reflect.TypeOf(x)) -} - -func elemTypeOf(x interface{}) _type { - return makeType(reflect.TypeOf(x).Elem()) -} - -func makeType(t reflect.Type) _type { - return _type{typ: t} -} - type value struct { val reflect.Value } -func nonAddressableValueOf(x interface{}) value { - return value{val: reflect.ValueOf(x)} -} - func valueOf(x interface{}) value { return value{val: reflect.ValueOf(x).Elem()} } -func (v value) bool() bool { return v.val.Bool() } - -func (v value) int8() int8 { return int8(v.int64()) } - -func (v value) int16() int16 { return int16(v.int64()) } - -func (v value) int32() int32 { return int32(v.int64()) } - -func (v value) int64() int64 { return v.val.Int() } - -func (v value) string() string { return v.val.String() } - -func (v value) bytes() []byte { return v.val.Bytes() } - func (v value) iface(t reflect.Type) interface{} { return v.val.Addr().Interface() } func (v value) array(t reflect.Type) array { return array{val: v.val} } //nolint @@ -88,10 +56,6 @@ func makeArray(t reflect.Type, n int) array { func (a array) index(i int) value { return value{val: a.val.Index(i)} } -func (a array) length() int { return a.val.Len() } - -func (a array) isNil() bool { return a.val.IsNil() } - func indexOf(s reflect.StructField) index { return index(s.Index) } func bytesToString(b []byte) string { return string(b) } diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go index 362d9e1df..d3d286300 100644 --- a/tap/extensions/kafka/request.go +++ b/tap/extensions/kafka/request.go @@ -28,6 +28,9 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su } if size < 8 { + if size == 0 { + return 0, 0, io.EOF + } return 0, 0, fmt.Errorf("A Kafka request header cannot be smaller than 8 bytes") } @@ -42,7 +45,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su correlationID := d.readInt32() clientID := d.readString() - if i := int(apiKey); i < 0 || i >= len(apiTypes) { + if i := int(apiKey); i < 0 || i >= numApis { err = fmt.Errorf("unsupported api key: %d", i) return apiKey, apiVersion, err } @@ -52,12 +55,6 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su return apiKey, apiVersion, err } - t := &apiTypes[apiKey] - if t == nil { - err = fmt.Errorf("unsupported api: %s", apiNames[apiKey]) - return apiKey, apiVersion, err - } - var payload interface{} switch apiKey { @@ -227,61 +224,3 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su return apiKey, apiVersion, nil } - -func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message) error { - apiKey := msg.ApiKey() - - if i := int(apiKey); i < 0 || i >= len(apiTypes) { - return fmt.Errorf("unsupported api key: %d", i) - } - - t := &apiTypes[apiKey] - if t == nil { - return fmt.Errorf("unsupported api: %s", apiNames[apiKey]) - } - - minVersion := t.minVersion() - maxVersion := t.maxVersion() - - if apiVersion < minVersion || apiVersion > maxVersion { - return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion) - } - - r := &t.requests[apiVersion-minVersion] - v := valueOf(msg) - b := newPageBuffer() - defer b.unref() - - e := &encoder{writer: b} - e.writeInt32(0) // placeholder for the request size - e.writeInt16(int16(apiKey)) - e.writeInt16(apiVersion) - e.writeInt32(correlationID) - - if r.flexible { - // Flexible messages use a nullable string for the client ID, then extra space for a - // tag buffer, which begins with a size value. Since we're not writing any fields into the - // latter, we can just write zero for now. - // - // See - // https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields - // for details. - e.writeNullString(clientID) - e.writeUnsignedVarInt(0) - } else { - // Technically, recent versions of kafka interpret this field as a nullable - // string, however kafka 0.10 expected a non-nullable string and fails with - // a NullPointerException when it receives a null client id. - e.writeString(clientID) - } - r.encode(e, v) - err := e.err - - if err == nil { - size := packUint32(uint32(b.Size()) - 4) - _, _ = b.WriteAt(size[:], 0) - _, err = b.WriteTo(w) - } - - return err -} diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go index 0eb7950c7..809889c39 100644 --- a/tap/extensions/kafka/response.go +++ b/tap/extensions/kafka/response.go @@ -25,6 +25,9 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s } if size < 4 { + if size == 0 { + return io.EOF + } return fmt.Errorf("A Kafka response header cannot be smaller than 8 bytes") } @@ -53,7 +56,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s ) reqResPair := reqResMatcher.registerResponse(key, response) if reqResPair == nil { - return fmt.Errorf("Couldn't match a Kafka response to a Kafka request in 3 seconds!") + return fmt.Errorf("Couldn't match a Kafka response to a Kafka request in %d milliseconds!", reqResMatcher.maxTry) } apiKey := reqResPair.Request.ApiKey apiVersion := reqResPair.Request.ApiVersion @@ -284,57 +287,12 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s } emitter.Emit(item) - if i := int(apiKey); i < 0 || i >= len(apiTypes) { + if i := int(apiKey); i < 0 || i >= numApis { err = fmt.Errorf("unsupported api key: %d", i) return err } - t := &apiTypes[apiKey] - if t == nil { - err = fmt.Errorf("unsupported api: %s", apiNames[apiKey]) - return err - } - d.discardAll() return nil } - -func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error { - apiKey := msg.ApiKey() - - if i := int(apiKey); i < 0 || i >= len(apiTypes) { - return fmt.Errorf("unsupported api key: %d", i) - } - - t := &apiTypes[apiKey] - if t == nil { - return fmt.Errorf("unsupported api: %s", apiNames[apiKey]) - } - - minVersion := t.minVersion() - maxVersion := t.maxVersion() - - if apiVersion < minVersion || apiVersion > maxVersion { - return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion) - } - - r := &t.responses[apiVersion-minVersion] - v := valueOf(msg) - b := newPageBuffer() - defer b.unref() - - e := &encoder{writer: b} - e.writeInt32(0) // placeholder for the response size - e.writeInt32(correlationID) - r.encode(e, v) - err := e.err - - if err == nil { - size := packUint32(uint32(b.Size()) - 4) - _, _ = b.WriteAt(size[:], 0) - _, err = b.WriteTo(w) - } - - return err -} diff --git a/tap/extensions/kafka/structs.go b/tap/extensions/kafka/structs.go index 0784a4276..2a74343f0 100644 --- a/tap/extensions/kafka/structs.go +++ b/tap/extensions/kafka/structs.go @@ -12,19 +12,6 @@ const ( RequireAll RequiredAcks = -1 ) -func (acks RequiredAcks) String() string { - switch acks { - case RequireNone: - return "none" - case RequireOne: - return "one" - case RequireAll: - return "all" - default: - return "unknown" - } -} - type UUID struct { TimeLow int32 `json:"timeLow"` TimeMid int16 `json:"timeMid"` diff --git a/tap/extensions/redis/matcher.go b/tap/extensions/redis/matcher.go index e63c2f4b2..18a1f0e7b 100644 --- a/tap/extensions/redis/matcher.go +++ b/tap/extensions/redis/matcher.go @@ -19,6 +19,8 @@ func createResponseRequestMatcher() api.RequestResponseMatcher { func (matcher *requestResponseMatcher) GetMap() *sync.Map { return matcher.openMessagesMap } +func (matcher *requestResponseMatcher) SetMaxTry(value int) { +} func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem { requestRedisMessage := api.GenericMessage{