Add unit tests for Kafka dissector (#807)

* Add unit tests for Kafka dissector

* Return `io.EOF` if request or response header size is zero

* Sort the slice in `representMapAsTable`

* Remove the dead code

* Remove more dead code

* Remove more dead code

* Fix `dissector.Analyze` call

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
This commit is contained in:
M. Mert Yıldıran 2022-02-16 12:18:33 +03:00 committed by GitHub
parent e8d2b7eb3c
commit c67675c138
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 357 additions and 2383 deletions

View File

@ -103,6 +103,7 @@ test-shared: ## Run shared tests
test-extensions: ## Run extensions tests
@echo "running http tests"; cd tap/extensions/http && $(MAKE) test
@echo "running kafka tests"; cd tap/extensions/kafka && $(MAKE) test
@echo "running amqp tests"; cd tap/extensions/amqp && $(MAKE) test
acceptance-test: ## Run acceptance tests

View File

@ -107,6 +107,7 @@ type Dissector interface {
type RequestResponseMatcher interface {
GetMap() *sync.Map
SetMaxTry(value int)
}
type Emitting struct {

View File

@ -21,6 +21,9 @@ func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) SetMaxTry(value int) {
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
requestHTTPMessage := api.GenericMessage{
IsRequest: true,

View File

@ -0,0 +1,16 @@
skipbin := $$(find bin -mindepth 1 -maxdepth 1)
skipexpect := $$(find expect -mindepth 1 -maxdepth 1)
test: test-pull-bin test-pull-expect
@MIZU_TEST=1 go test -v ./... -coverpkg=./... -race -coverprofile=coverage.out -covermode=atomic
test-update: test-pull-bin
@MIZU_TEST=1 TEST_UPDATE=1 go test -v ./... -coverpkg=./... -coverprofile=coverage.out -covermode=atomic
test-pull-bin:
@mkdir -p bin
@[ "${skipbin}" ] && echo "Skipping downloading BINs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp gs://static.up9.io/mizu/test-pcap/bin/kafka/\*.bin bin
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/kafka/\* expect

View File

@ -1,584 +0,0 @@
package kafka
import (
"bytes"
"fmt"
"io"
"sync"
"sync/atomic"
)
// Bytes is an interface implemented by types that represent immutable
// sequences of bytes.
//
// Bytes values are used to abstract the location where record keys and
// values are read from (e.g. in-memory buffers, network sockets, files).
//
// The Close method should be called to release resources held by the object
// when the program is done with it.
//
// Bytes values are generally not safe to use concurrently from multiple
// goroutines.
type Bytes interface {
io.ReadCloser
// Returns the number of bytes remaining to be read from the payload.
Len() int
}
// NewBytes constructs a Bytes value from b.
//
// The returned value references b, it does not make a copy of the backing
// array.
//
// If b is nil, nil is returned to represent a null BYTES value in the kafka
// protocol.
func NewBytes(b []byte) Bytes {
if b == nil {
return nil
}
r := new(bytesReader)
r.Reset(b)
return r
}
// ReadAll is similar to ioutil.ReadAll, but it takes advantage of knowing the
// length of b to minimize the memory footprint.
//
// The function returns a nil slice if b is nil.
// func ReadAll(b Bytes) ([]byte, error) {
// if b == nil {
// return nil, nil
// }
// s := make([]byte, b.Len())
// _, err := io.ReadFull(b, s)
// return s, err
// }
type bytesReader struct{ bytes.Reader }
func (*bytesReader) Close() error { return nil }
type refCount uintptr
func (rc *refCount) ref() { atomic.AddUintptr((*uintptr)(rc), 1) }
func (rc *refCount) unref(onZero func()) {
if atomic.AddUintptr((*uintptr)(rc), ^uintptr(0)) == 0 {
onZero()
}
}
const (
// Size of the memory buffer for a single page. We use a farily
// large size here (64 KiB) because batches exchanged with kafka
// tend to be multiple kilobytes in size, sometimes hundreds.
// Using large pages amortizes the overhead of the page metadata
// and algorithms to manage the pages.
pageSize = 65536
)
type page struct {
refc refCount
offset int64
length int
buffer *[pageSize]byte
}
func newPage(offset int64) *page {
p, _ := pagePool.Get().(*page)
if p != nil {
p.offset = offset
p.length = 0
p.ref()
} else {
p = &page{
refc: 1,
offset: offset,
buffer: &[pageSize]byte{},
}
}
return p
}
func (p *page) ref() { p.refc.ref() }
func (p *page) unref() { p.refc.unref(func() { pagePool.Put(p) }) }
func (p *page) slice(begin, end int64) []byte {
i, j := begin-p.offset, end-p.offset
if i < 0 {
i = 0
} else if i > pageSize {
i = pageSize
}
if j < 0 {
j = 0
} else if j > pageSize {
j = pageSize
}
if i < j {
return p.buffer[i:j]
}
return nil
}
func (p *page) Cap() int { return pageSize }
func (p *page) Len() int { return p.length }
func (p *page) Size() int64 { return int64(p.length) }
func (p *page) Truncate(n int) {
if n < p.length {
p.length = n
}
}
func (p *page) ReadAt(b []byte, off int64) (int, error) {
if off -= p.offset; off < 0 || off > pageSize {
panic("offset out of range")
}
if off > int64(p.length) {
return 0, nil
}
return copy(b, p.buffer[off:p.length]), nil
}
func (p *page) ReadFrom(r io.Reader) (int64, error) {
n, err := io.ReadFull(r, p.buffer[p.length:])
if err == io.EOF || err == io.ErrUnexpectedEOF {
err = nil
}
p.length += n
return int64(n), err
}
func (p *page) WriteAt(b []byte, off int64) (int, error) {
if off -= p.offset; off < 0 || off > pageSize {
panic("offset out of range")
}
n := copy(p.buffer[off:], b)
if end := int(off) + n; end > p.length {
p.length = end
}
return n, nil
}
func (p *page) Write(b []byte) (int, error) {
return p.WriteAt(b, p.offset+int64(p.length))
}
var (
_ io.ReaderAt = (*page)(nil)
_ io.ReaderFrom = (*page)(nil)
_ io.Writer = (*page)(nil)
_ io.WriterAt = (*page)(nil)
)
type pageBuffer struct {
refc refCount
pages contiguousPages
length int
cursor int
}
func newPageBuffer() *pageBuffer {
b, _ := pageBufferPool.Get().(*pageBuffer)
if b != nil {
b.cursor = 0
b.refc.ref()
} else {
b = &pageBuffer{
refc: 1,
pages: make(contiguousPages, 0, 16),
}
}
return b
}
func (pb *pageBuffer) unref() {
pb.refc.unref(func() {
pb.pages.unref()
pb.pages.clear()
pb.pages = pb.pages[:0]
pb.length = 0
pageBufferPool.Put(pb)
})
}
func (pb *pageBuffer) newPage() *page {
return newPage(int64(pb.length))
}
func (pb *pageBuffer) Close() error {
return nil
}
func (pb *pageBuffer) Len() int {
return pb.length - pb.cursor
}
func (pb *pageBuffer) Size() int64 {
return int64(pb.length)
}
func (pb *pageBuffer) Discard(n int) (int, error) {
remain := pb.length - pb.cursor
if remain < n {
n = remain
}
pb.cursor += n
return n, nil
}
func (pb *pageBuffer) Truncate(n int) {
if n < pb.length {
pb.length = n
if n < pb.cursor {
pb.cursor = n
}
for i := range pb.pages {
if p := pb.pages[i]; p.length <= n {
n -= p.length
} else {
if n > 0 {
pb.pages[i].Truncate(n)
i++
}
pb.pages[i:].unref()
pb.pages[i:].clear()
pb.pages = pb.pages[:i]
break
}
}
}
}
func (pb *pageBuffer) Seek(offset int64, whence int) (int64, error) {
c, err := seek(int64(pb.cursor), int64(pb.length), offset, whence)
if err != nil {
return -1, err
}
pb.cursor = int(c)
return c, nil
}
func (pb *pageBuffer) ReadByte() (byte, error) {
b := [1]byte{}
_, err := pb.Read(b[:])
return b[0], err
}
func (pb *pageBuffer) Read(b []byte) (int, error) {
if pb.cursor >= pb.length {
return 0, io.EOF
}
n, err := pb.ReadAt(b, int64(pb.cursor))
pb.cursor += n
return n, err
}
func (pb *pageBuffer) ReadAt(b []byte, off int64) (int, error) {
return pb.pages.ReadAt(b, off)
}
func (pb *pageBuffer) ReadFrom(r io.Reader) (int64, error) {
if len(pb.pages) == 0 {
pb.pages = append(pb.pages, pb.newPage())
}
rn := int64(0)
for {
tail := pb.pages[len(pb.pages)-1]
free := tail.Cap() - tail.Len()
if free == 0 {
tail = pb.newPage()
free = pageSize
pb.pages = append(pb.pages, tail)
}
n, err := tail.ReadFrom(r)
pb.length += int(n)
rn += n
if n < int64(free) {
return rn, err
}
}
}
func (pb *pageBuffer) WriteString(s string) (int, error) {
return pb.Write([]byte(s))
}
func (pb *pageBuffer) Write(b []byte) (int, error) {
wn := len(b)
if wn == 0 {
return 0, nil
}
if len(pb.pages) == 0 {
pb.pages = append(pb.pages, pb.newPage())
}
for len(b) != 0 {
tail := pb.pages[len(pb.pages)-1]
free := tail.Cap() - tail.Len()
if len(b) <= free {
_, _ = tail.Write(b)
pb.length += len(b)
break
}
_, _ = tail.Write(b[:free])
b = b[free:]
pb.length += free
pb.pages = append(pb.pages, pb.newPage())
}
return wn, nil
}
func (pb *pageBuffer) WriteAt(b []byte, off int64) (int, error) {
n, err := pb.pages.WriteAt(b, off)
if err != nil {
return n, err
}
if n < len(b) {
_, _ = pb.Write(b[n:])
}
return len(b), nil
}
func (pb *pageBuffer) WriteTo(w io.Writer) (int64, error) {
var wn int
var err error
pb.pages.scan(int64(pb.cursor), int64(pb.length), func(b []byte) bool {
var n int
n, err = w.Write(b)
wn += n
return err == nil
})
pb.cursor += wn
return int64(wn), err
}
var (
_ io.ReaderAt = (*pageBuffer)(nil)
_ io.ReaderFrom = (*pageBuffer)(nil)
_ io.StringWriter = (*pageBuffer)(nil)
_ io.Writer = (*pageBuffer)(nil)
_ io.WriterAt = (*pageBuffer)(nil)
_ io.WriterTo = (*pageBuffer)(nil)
pagePool sync.Pool
pageBufferPool sync.Pool
)
type contiguousPages []*page
func (pages contiguousPages) unref() {
for _, p := range pages {
p.unref()
}
}
func (pages contiguousPages) clear() {
for i := range pages {
pages[i] = nil
}
}
func (pages contiguousPages) ReadAt(b []byte, off int64) (int, error) {
rn := 0
for _, p := range pages.slice(off, off+int64(len(b))) {
n, _ := p.ReadAt(b, off)
b = b[n:]
rn += n
off += int64(n)
}
return rn, nil
}
func (pages contiguousPages) WriteAt(b []byte, off int64) (int, error) {
wn := 0
for _, p := range pages.slice(off, off+int64(len(b))) {
n, _ := p.WriteAt(b, off)
b = b[n:]
wn += n
off += int64(n)
}
return wn, nil
}
func (pages contiguousPages) slice(begin, end int64) contiguousPages {
i := pages.indexOf(begin)
j := pages.indexOf(end)
if j < len(pages) {
j++
}
return pages[i:j]
}
func (pages contiguousPages) indexOf(offset int64) int {
if len(pages) == 0 {
return 0
}
return int((offset - pages[0].offset) / pageSize)
}
func (pages contiguousPages) scan(begin, end int64, f func([]byte) bool) {
for _, p := range pages.slice(begin, end) {
if !f(p.slice(begin, end)) {
break
}
}
}
var (
_ io.ReaderAt = contiguousPages{}
_ io.WriterAt = contiguousPages{}
)
type pageRef struct {
pages contiguousPages
offset int64
cursor int64
length uint32
once uint32
}
func (ref *pageRef) unref() {
if atomic.CompareAndSwapUint32(&ref.once, 0, 1) {
ref.pages.unref()
ref.pages.clear()
ref.pages = nil
ref.offset = 0
ref.cursor = 0
ref.length = 0
}
}
func (ref *pageRef) Len() int { return int(ref.Size() - ref.cursor) }
func (ref *pageRef) Size() int64 { return int64(ref.length) }
func (ref *pageRef) Close() error { ref.unref(); return nil }
func (ref *pageRef) String() string {
return fmt.Sprintf("[offset=%d cursor=%d length=%d]", ref.offset, ref.cursor, ref.length)
}
func (ref *pageRef) Seek(offset int64, whence int) (int64, error) {
c, err := seek(ref.cursor, int64(ref.length), offset, whence)
if err != nil {
return -1, err
}
ref.cursor = c
return c, nil
}
func (ref *pageRef) ReadByte() (byte, error) {
var c byte
var ok bool
ref.scan(ref.cursor, func(b []byte) bool {
c, ok = b[0], true
return false
})
if ok {
ref.cursor++
} else {
return 0, io.EOF
}
return c, nil
}
func (ref *pageRef) Read(b []byte) (int, error) {
if ref.cursor >= int64(ref.length) {
return 0, io.EOF
}
n, err := ref.ReadAt(b, ref.cursor)
ref.cursor += int64(n)
return n, err
}
func (ref *pageRef) ReadAt(b []byte, off int64) (int, error) {
limit := ref.offset + int64(ref.length)
off += ref.offset
if off >= limit {
return 0, io.EOF
}
if off+int64(len(b)) > limit {
b = b[:limit-off]
}
if len(b) == 0 {
return 0, nil
}
n, err := ref.pages.ReadAt(b, off)
if n == 0 && err == nil {
err = io.EOF
}
return n, err
}
func (ref *pageRef) WriteTo(w io.Writer) (wn int64, err error) {
ref.scan(ref.cursor, func(b []byte) bool {
var n int
n, err = w.Write(b)
wn += int64(n)
return err == nil
})
ref.cursor += wn
return
}
func (ref *pageRef) scan(off int64, f func([]byte) bool) {
begin := ref.offset + off
end := ref.offset + int64(ref.length)
ref.pages.scan(begin, end, f)
}
var (
_ io.Closer = (*pageRef)(nil)
_ io.Seeker = (*pageRef)(nil)
_ io.Reader = (*pageRef)(nil)
_ io.ReaderAt = (*pageRef)(nil)
_ io.WriterTo = (*pageRef)(nil)
)
func seek(cursor, limit, offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
// absolute offset
case io.SeekCurrent:
offset = cursor + offset
case io.SeekEnd:
offset = limit - offset
default:
return -1, fmt.Errorf("seek: invalid whence value: %d", whence)
}
if offset < 0 {
offset = 0
}
if offset > limit {
offset = limit
}
return offset, nil
}

View File

@ -1,143 +0,0 @@
package kafka
import (
"fmt"
"sort"
"strings"
"text/tabwriter"
)
type Cluster struct {
ClusterID string
Controller int32
Brokers map[int32]Broker
Topics map[string]Topic
}
func (c Cluster) BrokerIDs() []int32 {
brokerIDs := make([]int32, 0, len(c.Brokers))
for id := range c.Brokers {
brokerIDs = append(brokerIDs, id)
}
sort.Slice(brokerIDs, func(i, j int) bool {
return brokerIDs[i] < brokerIDs[j]
})
return brokerIDs
}
func (c Cluster) TopicNames() []string {
topicNames := make([]string, 0, len(c.Topics))
for name := range c.Topics {
topicNames = append(topicNames, name)
}
sort.Strings(topicNames)
return topicNames
}
func (c Cluster) IsZero() bool {
return c.ClusterID == "" && c.Controller == 0 && len(c.Brokers) == 0 && len(c.Topics) == 0
}
func (c Cluster) Format(w fmt.State, _ rune) {
tw := new(tabwriter.Writer)
fmt.Fprintf(w, "CLUSTER: %q\n\n", c.ClusterID)
tw.Init(w, 0, 8, 2, ' ', 0)
fmt.Fprint(tw, " BROKER\tHOST\tPORT\tRACK\tCONTROLLER\n")
for _, id := range c.BrokerIDs() {
broker := c.Brokers[id]
fmt.Fprintf(tw, " %d\t%s\t%d\t%s\t%t\n", broker.ID, broker.Host, broker.Port, broker.Rack, broker.ID == c.Controller)
}
tw.Flush()
fmt.Fprintln(w)
tw.Init(w, 0, 8, 2, ' ', 0)
fmt.Fprint(tw, " TOPIC\tPARTITIONS\tBROKERS\n")
topicNames := c.TopicNames()
brokers := make(map[int32]struct{}, len(c.Brokers))
brokerIDs := make([]int32, 0, len(c.Brokers))
for _, name := range topicNames {
topic := c.Topics[name]
for _, p := range topic.Partitions {
for _, id := range p.Replicas {
brokers[id] = struct{}{}
}
}
for id := range brokers {
brokerIDs = append(brokerIDs, id)
}
fmt.Fprintf(tw, " %s\t%d\t%s\n", topic.Name, len(topic.Partitions), formatBrokerIDs(brokerIDs, -1))
for id := range brokers {
delete(brokers, id)
}
brokerIDs = brokerIDs[:0]
}
tw.Flush()
fmt.Fprintln(w)
if w.Flag('+') {
for _, name := range topicNames {
fmt.Fprintf(w, " TOPIC: %q\n\n", name)
tw.Init(w, 0, 8, 2, ' ', 0)
fmt.Fprint(tw, " PARTITION\tREPLICAS\tISR\tOFFLINE\n")
for _, p := range c.Topics[name].Partitions {
fmt.Fprintf(tw, " %d\t%s\t%s\t%s\n", p.ID,
formatBrokerIDs(p.Replicas, -1),
formatBrokerIDs(p.ISR, p.Leader),
formatBrokerIDs(p.Offline, -1),
)
}
tw.Flush()
fmt.Fprintln(w)
}
}
}
func formatBrokerIDs(brokerIDs []int32, leader int32) string {
if len(brokerIDs) == 0 {
return ""
}
if len(brokerIDs) == 1 {
return itoa(brokerIDs[0])
}
sort.Slice(brokerIDs, func(i, j int) bool {
id1 := brokerIDs[i]
id2 := brokerIDs[j]
if id1 == leader {
return true
}
if id2 == leader {
return false
}
return id1 < id2
})
brokerNames := make([]string, len(brokerIDs))
for i, id := range brokerIDs {
brokerNames[i] = itoa(id)
}
return strings.Join(brokerNames, ",")
}
var (
_ fmt.Formatter = Cluster{}
)

View File

@ -1,7 +1,6 @@
package kafka
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
@ -9,8 +8,6 @@ import (
"io/ioutil"
"reflect"
"strings"
"sync"
"sync/atomic"
)
type discarder interface {
@ -26,15 +23,6 @@ type decoder struct {
crc32 uint32
}
func (d *decoder) Reset(r io.Reader, n int) {
d.reader = r
d.remain = n
d.buffer = [8]byte{}
d.err = nil
d.table = nil
d.crc32 = 0
}
func (d *decoder) Read(b []byte) (int, error) {
if d.err != nil {
return 0, d.err
@ -483,52 +471,3 @@ func decodeReadInt32(b []byte) int32 {
func decodeReadInt64(b []byte) int64 {
return int64(binary.BigEndian.Uint64(b))
}
func Unmarshal(data []byte, version int16, value interface{}) error {
typ := elemTypeOf(value)
cache, _ := unmarshalers.Load().(map[versionedType]decodeFunc)
key := versionedType{typ: typ, version: version}
decode := cache[key]
if decode == nil {
decode = decodeFuncOf(reflect.TypeOf(value).Elem(), version, false, structTag{
MinVersion: -1,
MaxVersion: -1,
TagID: -2,
Compact: true,
Nullable: true,
})
newCache := make(map[versionedType]decodeFunc, len(cache)+1)
newCache[key] = decode
for typ, fun := range cache {
newCache[typ] = fun
}
unmarshalers.Store(newCache)
}
d, _ := decoders.Get().(*decoder)
if d == nil {
d = &decoder{reader: bytes.NewReader(nil)}
}
d.remain = len(data)
r, _ := d.reader.(*bytes.Reader)
r.Reset(data)
defer func() {
r.Reset(nil)
d.Reset(r, 0)
decoders.Put(d)
}()
decode(d, valueOf(value))
return dontExpectEOF(d.err)
}
var (
decoders sync.Pool // *decoder
unmarshalers atomic.Value // map[versionedType]decodeFunc
)

View File

@ -1,16 +0,0 @@
package kafka
import "bufio"
func discardN(r *bufio.Reader, sz int, n int) (int, error) {
var err error
if n <= sz {
n, err = r.Discard(n)
} else {
n, err = r.Discard(sz)
if err == nil {
err = errShortRead
}
}
return sz - n, err
}

View File

@ -1,537 +0,0 @@
package kafka
import (
"bytes"
"encoding/binary"
"hash/crc32"
"io"
"reflect"
"sync"
"sync/atomic"
)
type encoder struct {
writer io.Writer
err error
table *crc32.Table
crc32 uint32
buffer [32]byte
}
type encoderChecksum struct {
reader io.Reader
encoder *encoder
}
func (e *encoderChecksum) Read(b []byte) (int, error) {
n, err := e.reader.Read(b)
if n > 0 {
e.encoder.update(b[:n])
}
return n, err
}
func (e *encoder) Reset(w io.Writer) {
e.writer = w
e.err = nil
e.table = nil
e.crc32 = 0
e.buffer = [32]byte{}
}
func (e *encoder) ReadFrom(r io.Reader) (int64, error) {
if e.table != nil {
r = &encoderChecksum{
reader: r,
encoder: e,
}
}
return io.Copy(e.writer, r)
}
func (e *encoder) Write(b []byte) (int, error) {
if e.err != nil {
return 0, e.err
}
n, err := e.writer.Write(b)
if n > 0 {
e.update(b[:n])
}
if err != nil {
e.err = err
}
return n, err
}
func (e *encoder) WriteByte(b byte) error {
e.buffer[0] = b
_, err := e.Write(e.buffer[:1])
return err
}
func (e *encoder) WriteString(s string) (int, error) {
// This implementation is an optimization to avoid the heap allocation that
// would occur when converting the string to a []byte to call crc32.Update.
//
// Strings are rarely long in the kafka protocol, so the use of a 32 byte
// buffer is a good comprise between keeping the encoder value small and
// limiting the number of calls to Write.
//
// We introduced this optimization because memory profiles on the benchmarks
// showed that most heap allocations were caused by this code path.
n := 0
for len(s) != 0 {
c := copy(e.buffer[:], s)
w, err := e.Write(e.buffer[:c])
n += w
if err != nil {
return n, err
}
s = s[c:]
}
return n, nil
}
func (e *encoder) update(b []byte) {
if e.table != nil {
e.crc32 = crc32.Update(e.crc32, e.table, b)
}
}
func (e *encoder) encodeBool(v value) {
b := int8(0)
if v.bool() {
b = 1
}
e.writeInt8(b)
}
func (e *encoder) encodeInt8(v value) {
e.writeInt8(v.int8())
}
func (e *encoder) encodeInt16(v value) {
e.writeInt16(v.int16())
}
func (e *encoder) encodeInt32(v value) {
e.writeInt32(v.int32())
}
func (e *encoder) encodeInt64(v value) {
e.writeInt64(v.int64())
}
func (e *encoder) encodeString(v value) {
e.writeString(v.string())
}
func (e *encoder) encodeCompactString(v value) {
e.writeCompactString(v.string())
}
func (e *encoder) encodeNullString(v value) {
e.writeNullString(v.string())
}
func (e *encoder) encodeCompactNullString(v value) {
e.writeCompactNullString(v.string())
}
func (e *encoder) encodeBytes(v value) {
e.writeBytes(v.bytes())
}
func (e *encoder) encodeCompactBytes(v value) {
e.writeCompactBytes(v.bytes())
}
func (e *encoder) encodeNullBytes(v value) {
e.writeNullBytes(v.bytes())
}
func (e *encoder) encodeCompactNullBytes(v value) {
e.writeCompactNullBytes(v.bytes())
}
func (e *encoder) encodeArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
a := v.array(elemType)
n := a.length()
e.writeInt32(int32(n))
for i := 0; i < n; i++ {
encodeElem(e, a.index(i))
}
}
func (e *encoder) encodeCompactArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
a := v.array(elemType)
n := a.length()
e.writeUnsignedVarInt(uint64(n + 1))
for i := 0; i < n; i++ {
encodeElem(e, a.index(i))
}
}
func (e *encoder) encodeNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
a := v.array(elemType)
if a.isNil() {
e.writeInt32(-1)
return
}
n := a.length()
e.writeInt32(int32(n))
for i := 0; i < n; i++ {
encodeElem(e, a.index(i))
}
}
func (e *encoder) encodeCompactNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) {
a := v.array(elemType)
if a.isNil() {
e.writeUnsignedVarInt(0)
return
}
n := a.length()
e.writeUnsignedVarInt(uint64(n + 1))
for i := 0; i < n; i++ {
encodeElem(e, a.index(i))
}
}
func (e *encoder) writeInt8(i int8) {
writeInt8(e.buffer[:1], i)
_, _ = e.Write(e.buffer[:1])
}
func (e *encoder) writeInt16(i int16) {
writeInt16(e.buffer[:2], i)
_, _ = e.Write(e.buffer[:2])
}
func (e *encoder) writeInt32(i int32) {
writeInt32(e.buffer[:4], i)
_, _ = e.Write(e.buffer[:4])
}
func (e *encoder) writeInt64(i int64) {
writeInt64(e.buffer[:8], i)
_, _ = e.Write(e.buffer[:8])
}
func (e *encoder) writeString(s string) {
e.writeInt16(int16(len(s)))
_, _ = e.WriteString(s)
}
func (e *encoder) writeCompactString(s string) {
e.writeUnsignedVarInt(uint64(len(s)) + 1)
_, _ = e.WriteString(s)
}
func (e *encoder) writeNullString(s string) {
if s == "" {
e.writeInt16(-1)
} else {
e.writeInt16(int16(len(s)))
_, _ = e.WriteString(s)
}
}
func (e *encoder) writeCompactNullString(s string) {
if s == "" {
e.writeUnsignedVarInt(0)
} else {
e.writeUnsignedVarInt(uint64(len(s)) + 1)
_, _ = e.WriteString(s)
}
}
func (e *encoder) writeBytes(b []byte) {
e.writeInt32(int32(len(b)))
_, _ = e.Write(b)
}
func (e *encoder) writeCompactBytes(b []byte) {
e.writeUnsignedVarInt(uint64(len(b)) + 1)
_, _ = e.Write(b)
}
func (e *encoder) writeNullBytes(b []byte) {
if b == nil {
e.writeInt32(-1)
} else {
e.writeInt32(int32(len(b)))
_, _ = e.Write(b)
}
}
func (e *encoder) writeCompactNullBytes(b []byte) {
if b == nil {
e.writeUnsignedVarInt(0)
} else {
e.writeUnsignedVarInt(uint64(len(b)) + 1)
_, _ = e.Write(b)
}
}
func (e *encoder) writeUnsignedVarInt(i uint64) {
b := e.buffer[:]
n := 0
for i >= 0x80 && n < len(b) {
b[n] = byte(i) | 0x80
i >>= 7
n++
}
if n < len(b) {
b[n] = byte(i)
n++
}
_, _ = e.Write(b[:n])
}
type encodeFunc func(*encoder, value)
var (
_ io.ReaderFrom = (*encoder)(nil)
_ io.Writer = (*encoder)(nil)
_ io.ByteWriter = (*encoder)(nil)
_ io.StringWriter = (*encoder)(nil)
writerTo = reflect.TypeOf((*io.WriterTo)(nil)).Elem()
)
func encodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) encodeFunc {
if reflect.PtrTo(typ).Implements(writerTo) {
return writerEncodeFuncOf(typ)
}
switch typ.Kind() {
case reflect.Bool:
return (*encoder).encodeBool
case reflect.Int8:
return (*encoder).encodeInt8
case reflect.Int16:
return (*encoder).encodeInt16
case reflect.Int32:
return (*encoder).encodeInt32
case reflect.Int64:
return (*encoder).encodeInt64
case reflect.String:
return stringEncodeFuncOf(flexible, tag)
case reflect.Struct:
return structEncodeFuncOf(typ, version, flexible)
case reflect.Slice:
if typ.Elem().Kind() == reflect.Uint8 { // []byte
return bytesEncodeFuncOf(flexible, tag)
}
return arrayEncodeFuncOf(typ, version, flexible, tag)
default:
panic("unsupported type: " + typ.String())
}
}
func stringEncodeFuncOf(flexible bool, tag structTag) encodeFunc {
switch {
case flexible && tag.Nullable:
// In flexible messages, all strings are compact
return (*encoder).encodeCompactNullString
case flexible:
// In flexible messages, all strings are compact
return (*encoder).encodeCompactString
case tag.Nullable:
return (*encoder).encodeNullString
default:
return (*encoder).encodeString
}
}
func bytesEncodeFuncOf(flexible bool, tag structTag) encodeFunc {
switch {
case flexible && tag.Nullable:
// In flexible messages, all arrays are compact
return (*encoder).encodeCompactNullBytes
case flexible:
// In flexible messages, all arrays are compact
return (*encoder).encodeCompactBytes
case tag.Nullable:
return (*encoder).encodeNullBytes
default:
return (*encoder).encodeBytes
}
}
func structEncodeFuncOf(typ reflect.Type, version int16, flexible bool) encodeFunc {
type field struct {
encode encodeFunc
index index
tagID int
}
var fields []field
var taggedFields []field
forEachStructField(typ, func(typ reflect.Type, index index, tag string) {
if typ.Size() != 0 { // skip struct{}
forEachStructTag(tag, func(tag structTag) bool {
if tag.MinVersion <= version && version <= tag.MaxVersion {
f := field{
encode: encodeFuncOf(typ, version, flexible, tag),
index: index,
tagID: tag.TagID,
}
if tag.TagID < -1 {
// Normal required field
fields = append(fields, f)
} else {
// Optional tagged field (flexible messages only)
taggedFields = append(taggedFields, f)
}
return false
}
return true
})
}
})
return func(e *encoder, v value) {
for i := range fields {
f := &fields[i]
f.encode(e, v.fieldByIndex(f.index))
}
if flexible {
// See https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
// for details of tag buffers in "flexible" messages.
e.writeUnsignedVarInt(uint64(len(taggedFields)))
for i := range taggedFields {
f := &taggedFields[i]
e.writeUnsignedVarInt(uint64(f.tagID))
buf := &bytes.Buffer{}
se := &encoder{writer: buf}
f.encode(se, v.fieldByIndex(f.index))
e.writeUnsignedVarInt(uint64(buf.Len()))
_, _ = e.Write(buf.Bytes())
}
}
}
}
func arrayEncodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) encodeFunc {
elemType := typ.Elem()
elemFunc := encodeFuncOf(elemType, version, flexible, tag)
switch {
case flexible && tag.Nullable:
// In flexible messages, all arrays are compact
return func(e *encoder, v value) { e.encodeCompactNullArray(v, elemType, elemFunc) }
case flexible:
// In flexible messages, all arrays are compact
return func(e *encoder, v value) { e.encodeCompactArray(v, elemType, elemFunc) }
case tag.Nullable:
return func(e *encoder, v value) { e.encodeNullArray(v, elemType, elemFunc) }
default:
return func(e *encoder, v value) { e.encodeArray(v, elemType, elemFunc) }
}
}
func writerEncodeFuncOf(typ reflect.Type) encodeFunc {
typ = reflect.PtrTo(typ)
return func(e *encoder, v value) {
// Optimization to write directly into the buffer when the encoder
// does no need to compute a crc32 checksum.
w := io.Writer(e)
if e.table == nil {
w = e.writer
}
_, err := v.iface(typ).(io.WriterTo).WriteTo(w)
if err != nil {
e.err = err
}
}
}
func writeInt8(b []byte, i int8) {
b[0] = byte(i)
}
func writeInt16(b []byte, i int16) {
binary.BigEndian.PutUint16(b, uint16(i))
}
func writeInt32(b []byte, i int32) {
binary.BigEndian.PutUint32(b, uint32(i))
}
func writeInt64(b []byte, i int64) {
binary.BigEndian.PutUint64(b, uint64(i))
}
func Marshal(version int16, value interface{}) ([]byte, error) {
typ := typeOf(value)
cache, _ := marshalers.Load().(map[versionedType]encodeFunc)
key := versionedType{typ: typ, version: version}
encode := cache[key]
if encode == nil {
encode = encodeFuncOf(reflect.TypeOf(value), version, false, structTag{
MinVersion: -1,
MaxVersion: -1,
TagID: -2,
Compact: true,
Nullable: true,
})
newCache := make(map[versionedType]encodeFunc, len(cache)+1)
newCache[key] = encode
for typ, fun := range cache {
newCache[typ] = fun
}
marshalers.Store(newCache)
}
e, _ := encoders.Get().(*encoder)
if e == nil {
e = &encoder{writer: new(bytes.Buffer)}
}
b, _ := e.writer.(*bytes.Buffer)
defer func() {
b.Reset()
e.Reset(b)
encoders.Put(e)
}()
encode(e, nonAddressableValueOf(value))
if e.err != nil {
return nil, e.err
}
buf := b.Bytes()
out := make([]byte, len(buf))
copy(out, buf)
return out, nil
}
type versionedType struct {
typ _type
version int16
}
var (
encoders sync.Pool // *encoder
marshalers atomic.Value // map[versionedType]encodeFunc
)

View File

@ -1,91 +0,0 @@
package kafka
import (
"fmt"
)
// Error represents client-side protocol errors.
type Error string
func (e Error) Error() string { return string(e) }
func Errorf(msg string, args ...interface{}) Error {
return Error(fmt.Sprintf(msg, args...))
}
const (
// ErrNoTopic is returned when a request needs to be sent to a specific
ErrNoTopic Error = "topic not found"
// ErrNoPartition is returned when a request needs to be sent to a specific
// partition, but the client did not find it in the cluster metadata.
ErrNoPartition Error = "topic partition not found"
// ErrNoLeader is returned when a request needs to be sent to a partition
// leader, but the client could not determine what the leader was at this
// time.
ErrNoLeader Error = "topic partition has no leader"
// ErrNoRecord is returned when attempting to write a message containing an
// empty record set (which kafka forbids).
//
// We handle this case client-side because kafka will close the connection
// that it received an empty produce request on, causing all concurrent
// requests to be aborted.
ErrNoRecord Error = "record set contains no records"
// ErrNoReset is returned by ResetRecordReader when the record reader does
// not support being reset.
ErrNoReset Error = "record sequence does not support reset"
)
type TopicError struct {
Topic string
Err error
}
func NewTopicError(topic string, err error) *TopicError {
return &TopicError{Topic: topic, Err: err}
}
func NewErrNoTopic(topic string) *TopicError {
return NewTopicError(topic, ErrNoTopic)
}
func (e *TopicError) Error() string {
return fmt.Sprintf("%v (topic=%q)", e.Err, e.Topic)
}
func (e *TopicError) Unwrap() error {
return e.Err
}
type TopicPartitionError struct {
Topic string
Partition int32
Err error
}
func NewTopicPartitionError(topic string, partition int32, err error) *TopicPartitionError {
return &TopicPartitionError{
Topic: topic,
Partition: partition,
Err: err,
}
}
func NewErrNoPartition(topic string, partition int32) *TopicPartitionError {
return NewTopicPartitionError(topic, partition, ErrNoPartition)
}
func NewErrNoLeader(topic string, partition int32) *TopicPartitionError {
return NewTopicPartitionError(topic, partition, ErrNoLeader)
}
func (e *TopicPartitionError) Error() string {
return fmt.Sprintf("%v (topic=%q partition=%d)", e.Err, e.Topic, e.Partition)
}
func (e *TopicPartitionError) Unwrap() error {
return e.Err
}

View File

@ -6,14 +6,18 @@ require (
github.com/fatih/camelcase v1.0.0
github.com/ohler55/ojg v1.12.12
github.com/segmentio/kafka-go v0.4.27
github.com/stretchr/testify v1.6.1
github.com/up9inc/mizu/tap/api v0.0.0
)
require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/martian v2.1.0+incompatible // indirect
github.com/klauspost/compress v1.14.2 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api

View File

@ -1,3 +1,4 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
@ -25,10 +26,12 @@ github.com/ohler55/ojg v1.12.12/go.mod h1:LBbIVRAgoFbYBXQhRhuEpaJIqq+goSO63/FQ+n
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.27 h1:sIhEozeL/TLN2mZ5dkG462vcGEWYKS+u31sXPjKhAM4=
github.com/segmentio/kafka-go v0.4.27/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
@ -40,5 +43,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"sort"
"strconv"
"strings"
@ -897,6 +898,10 @@ func representMapAsTable(mapData map[string]interface{}, selectorPrefix string,
})
}
sort.Slice(table, func(i, j int) bool {
return table[i].Name < table[j].Name
})
obj, _ := json.Marshal(table)
representation = string(obj)
return

View File

@ -0,0 +1,290 @@
package kafka
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/up9inc/mizu/tap/api"
)
const (
binDir = "bin"
patternBin = "*_req.bin"
patternDissect = "*.json"
msgDissecting = "Dissecting:"
msgAnalyzing = "Analyzing:"
msgRepresenting = "Representing:"
respSuffix = "_res.bin"
expectDir = "expect"
dissectDir = "dissect"
analyzeDir = "analyze"
representDir = "represent"
testUpdate = "TEST_UPDATE"
)
func TestRegister(t *testing.T) {
dissector := NewDissector()
extension := &api.Extension{}
dissector.Register(extension)
assert.Equal(t, "kafka", extension.Protocol.Name)
}
func TestMacros(t *testing.T) {
expectedMacros := map[string]string{
"kafka": `proto.name == "kafka"`,
}
dissector := NewDissector()
macros := dissector.Macros()
assert.Equal(t, expectedMacros, macros)
}
func TestPing(t *testing.T) {
dissector := NewDissector()
dissector.Ping()
}
func TestDissect(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
expectDirDissect := path.Join(expectDir, dissectDir)
if testUpdateEnabled {
os.RemoveAll(expectDirDissect)
err := os.MkdirAll(expectDirDissect, 0775)
assert.Nil(t, err)
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(binDir, patternBin))
if err != nil {
log.Fatal(err)
}
options := &api.TrafficFilteringOptions{
IgnoredUserAgents: []string{},
}
for _, _path := range paths {
basePath := _path[:len(_path)-8]
// Channel to verify the output
itemChannel := make(chan *api.OutputChannelItem)
var emitter api.Emitter = &api.Emitting{
AppStats: &api.AppStats{},
OutputChannel: itemChannel,
}
var items []*api.OutputChannelItem
stop := make(chan bool)
go func() {
for {
select {
case <-stop:
return
case item := <-itemChannel:
items = append(items, item)
}
}
}()
// Stream level
counterPair := &api.CounterPair{
Request: 0,
Response: 0,
}
superIdentifier := &api.SuperIdentifier{}
// Request
pathClient := _path
fmt.Printf("%s %s\n", msgDissecting, pathClient)
fileClient, err := os.Open(pathClient)
assert.Nil(t, err)
bufferClient := bufio.NewReader(fileClient)
tcpIDClient := &api.TcpID{
SrcIP: "1",
DstIP: "2",
SrcPort: "1",
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
reqResMatcher.SetMaxTry(10)
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
// Response
pathServer := basePath + respSuffix
fmt.Printf("%s %s\n", msgDissecting, pathServer)
fileServer, err := os.Open(pathServer)
assert.Nil(t, err)
bufferServer := bufio.NewReader(fileServer)
tcpIDServer := &api.TcpID{
SrcIP: "2",
DstIP: "1",
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
fileClient.Close()
fileServer.Close()
pathExpect := path.Join(expectDirDissect, fmt.Sprintf("%s.json", basePath[4:]))
time.Sleep(10 * time.Millisecond)
stop <- true
marshaled, err := json.Marshal(items)
assert.Nil(t, err)
if testUpdateEnabled {
if len(items) > 0 {
err = os.WriteFile(pathExpect, marshaled, 0644)
assert.Nil(t, err)
}
} else {
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
assert.Len(t, items, 0)
} else {
expectedBytes, err := ioutil.ReadFile(pathExpect)
assert.Nil(t, err)
assert.JSONEq(t, string(expectedBytes), string(marshaled))
}
}
}
}
func TestAnalyze(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
expectDirDissect := path.Join(expectDir, dissectDir)
expectDirAnalyze := path.Join(expectDir, analyzeDir)
if testUpdateEnabled {
os.RemoveAll(expectDirAnalyze)
err := os.MkdirAll(expectDirAnalyze, 0775)
assert.Nil(t, err)
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
if err != nil {
log.Fatal(err)
}
for _, _path := range paths {
fmt.Printf("%s %s\n", msgAnalyzing, _path)
bytes, err := ioutil.ReadFile(_path)
assert.Nil(t, err)
var items []*api.OutputChannelItem
err = json.Unmarshal(bytes, &items)
assert.Nil(t, err)
var entries []*api.Entry
for _, item := range items {
entry := dissector.Analyze(item, "", "", "")
entries = append(entries, entry)
}
pathExpect := path.Join(expectDirAnalyze, filepath.Base(_path))
marshaled, err := json.Marshal(entries)
assert.Nil(t, err)
if testUpdateEnabled {
if len(entries) > 0 {
err = os.WriteFile(pathExpect, marshaled, 0644)
assert.Nil(t, err)
}
} else {
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
assert.Len(t, items, 0)
} else {
expectedBytes, err := ioutil.ReadFile(pathExpect)
assert.Nil(t, err)
assert.JSONEq(t, string(expectedBytes), string(marshaled))
}
}
}
}
func TestRepresent(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
expectDirAnalyze := path.Join(expectDir, analyzeDir)
expectDirRepresent := path.Join(expectDir, representDir)
if testUpdateEnabled {
os.RemoveAll(expectDirRepresent)
err := os.MkdirAll(expectDirRepresent, 0775)
assert.Nil(t, err)
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
if err != nil {
log.Fatal(err)
}
for _, _path := range paths {
fmt.Printf("%s %s\n", msgRepresenting, _path)
bytes, err := ioutil.ReadFile(_path)
assert.Nil(t, err)
var entries []*api.Entry
err = json.Unmarshal(bytes, &entries)
assert.Nil(t, err)
var objects []string
for _, entry := range entries {
object, _, err := dissector.Represent(entry.Request, entry.Response)
assert.Nil(t, err)
objects = append(objects, string(object))
}
pathExpect := path.Join(expectDirRepresent, filepath.Base(_path))
marshaled, err := json.Marshal(objects)
assert.Nil(t, err)
if testUpdateEnabled {
if len(objects) > 0 {
err = os.WriteFile(pathExpect, marshaled, 0644)
assert.Nil(t, err)
}
} else {
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
assert.Len(t, objects, 0)
} else {
expectedBytes, err := ioutil.ReadFile(pathExpect)
assert.Nil(t, err)
assert.JSONEq(t, string(expectedBytes), string(marshaled))
}
}
}
}

View File

@ -7,8 +7,6 @@ import (
"github.com/up9inc/mizu/tap/api"
)
const maxTry int = 3000
type RequestResponsePair struct {
Request Request
Response Response
@ -17,16 +15,21 @@ type RequestResponsePair struct {
// Key is {client_addr}_{client_port}_{dest_addr}_{dest_port}_{correlation_id}
type requestResponseMatcher struct {
openMessagesMap *sync.Map
maxTry int
}
func createResponseRequestMatcher() api.RequestResponseMatcher {
return &requestResponseMatcher{openMessagesMap: &sync.Map{}}
return &requestResponseMatcher{openMessagesMap: &sync.Map{}, maxTry: 3000}
}
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) SetMaxTry(value int) {
matcher.maxTry = value
}
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
// Check for a situation that only occurs when a Kafka broker is initiating
@ -44,7 +47,7 @@ func (matcher *requestResponseMatcher) registerResponse(key string, response *Re
try := 0
for {
try++
if try > maxTry {
if try > matcher.maxTry {
return nil
}
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {

View File

@ -3,7 +3,6 @@ package kafka
import (
"fmt"
"io"
"net"
"reflect"
"strconv"
"strings"
@ -27,29 +26,20 @@ func (k ApiKey) String() string {
return strconv.Itoa(int(k))
}
func (k ApiKey) MinVersion() int16 { return k.apiType().minVersion() }
func (k ApiKey) MaxVersion() int16 { return k.apiType().maxVersion() }
func (k ApiKey) SelectVersion(minVersion, maxVersion int16) int16 {
min := k.MinVersion()
max := k.MaxVersion()
switch {
case min > maxVersion:
return min
case max < maxVersion:
return max
default:
return maxVersion
}
}
func (k ApiKey) apiType() apiType {
if i := int(k); i >= 0 && i < len(apiTypes) {
return apiTypes[i]
}
return apiType{}
}
const (
// v0 = 0
v1 = 1
v2 = 2
v3 = 3
v4 = 4
v5 = 5
v6 = 6
v7 = 7
v8 = 8
v9 = 9
v10 = 10
v11 = 11
)
const (
Produce ApiKey = 0
@ -164,48 +154,6 @@ type messageType struct {
flexible bool
gotype reflect.Type
decode decodeFunc
encode encodeFunc
}
type apiType struct {
requests []messageType
responses []messageType
}
func (t apiType) minVersion() int16 {
if len(t.requests) == 0 {
return 0
}
return t.requests[0].version
}
func (t apiType) maxVersion() int16 {
if len(t.requests) == 0 {
return 0
}
return t.requests[len(t.requests)-1].version
}
var apiTypes [numApis]apiType
// Register is automatically called by sub-packages are imported to install a
// new pair of request/response message types.
func Register(req, res Message) {
k1 := req.ApiKey()
k2 := res.ApiKey()
if k1 != k2 {
panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2))
}
apiTypes[k1] = apiType{
requests: typesOf(req),
responses: typesOf(res),
}
}
func typesOf(v interface{}) []messageType {
return makeTypes(reflect.TypeOf(v).Elem())
}
func makeTypes(t reflect.Type) []messageType {
@ -241,7 +189,6 @@ func makeTypes(t reflect.Type) []messageType {
gotype: t,
flexible: flexible,
decode: decodeFuncOf(t, v, flexible, structTag{}),
encode: encodeFuncOf(t, v, flexible, structTag{}),
})
}
@ -378,31 +325,6 @@ type Broker struct {
Rack string
}
func (b Broker) String() string {
return net.JoinHostPort(b.Host, itoa(b.Port))
}
func (b Broker) Format(w fmt.State, v rune) {
switch v {
case 'd':
_, _ = io.WriteString(w, itoa(b.ID))
case 's':
_, _ = io.WriteString(w, b.String())
case 'v':
_, _ = io.WriteString(w, itoa(b.ID))
_, _ = io.WriteString(w, " ")
_, _ = io.WriteString(w, b.String())
if b.Rack != "" {
_, _ = io.WriteString(w, " ")
_, _ = io.WriteString(w, b.Rack)
}
}
}
func itoa(i int32) string {
return strconv.Itoa(int(i))
}
type Topic struct {
Name string
Error int16
@ -418,14 +340,6 @@ type Partition struct {
Offline []int32
}
// BrokerMessage is an extension of the Message interface implemented by some
// request types to customize the broker assignment logic.
type BrokerMessage interface {
// Given a representation of the kafka cluster state as argument, returns
// the broker that the message should be routed to.
Broker(Cluster) (Broker, error)
}
// GroupMessage is an extension of the Message interface implemented by some
// request types to inform the program that they should be routed to a group
// coordinator.
@ -443,16 +357,6 @@ type PreparedMessage interface {
Prepare(apiVersion int16)
}
// Splitter is an interface implemented by messages that can be split into
// multiple requests and have their results merged back by a Merger.
type Splitter interface {
// For a given cluster layout, returns the list of messages constructed
// from the receiver for each requests that should be sent to the cluster.
// The second return value is a Merger which can be used to merge back the
// results of each request into a single message (or an error).
Split(Cluster) ([]Message, Merger, error)
}
// Merger is an interface implemented by messages which can merge multiple
// results into one response.
type Merger interface {
@ -461,16 +365,3 @@ type Merger interface {
// values, other types should trigger a panic.
Merge(messages []Message, results []interface{}) (Message, error)
}
// Result converts r to a Message or and error, or panics if r could be be
// converted to these types.
func Result(r interface{}) (Message, error) {
switch v := r.(type) {
case Message:
return v, nil
case error:
return nil, v
default:
panic(fmt.Errorf("BUG: result must be a message or an error but not %T", v))
}
}

View File

@ -1,182 +0,0 @@
package kafka
import (
"encoding/binary"
"fmt"
"strconv"
)
type ApiVersion struct {
ApiKey int16
MinVersion int16
MaxVersion int16
}
func (v ApiVersion) Format(w fmt.State, r rune) {
switch r {
case 's':
fmt.Fprint(w, apiKey(v.ApiKey))
case 'd':
switch {
case w.Flag('-'):
fmt.Fprint(w, v.MinVersion)
case w.Flag('+'):
fmt.Fprint(w, v.MaxVersion)
default:
fmt.Fprint(w, v.ApiKey)
}
case 'v':
switch {
case w.Flag('-'):
fmt.Fprintf(w, "v%d", v.MinVersion)
case w.Flag('+'):
fmt.Fprintf(w, "v%d", v.MaxVersion)
case w.Flag('#'):
fmt.Fprintf(w, "kafka.ApiVersion{ApiKey:%d MinVersion:%d MaxVersion:%d}", v.ApiKey, v.MinVersion, v.MaxVersion)
default:
fmt.Fprintf(w, "%s[v%d:v%d]", apiKey(v.ApiKey), v.MinVersion, v.MaxVersion)
}
}
}
type apiKey int16
const (
produce apiKey = 0
fetch apiKey = 1
listOffsets apiKey = 2
metadata apiKey = 3
leaderAndIsr apiKey = 4
stopReplica apiKey = 5
updateMetadata apiKey = 6
controlledShutdown apiKey = 7
offsetCommit apiKey = 8
offsetFetch apiKey = 9
findCoordinator apiKey = 10
joinGroup apiKey = 11
heartbeat apiKey = 12
leaveGroup apiKey = 13
syncGroup apiKey = 14
describeGroups apiKey = 15
listGroups apiKey = 16
saslHandshake apiKey = 17
apiVersions apiKey = 18
createTopics apiKey = 19
deleteTopics apiKey = 20
deleteRecords apiKey = 21
initProducerId apiKey = 22
offsetForLeaderEpoch apiKey = 23
addPartitionsToTxn apiKey = 24
addOffsetsToTxn apiKey = 25
endTxn apiKey = 26
writeTxnMarkers apiKey = 27
txnOffsetCommit apiKey = 28
describeAcls apiKey = 29
createAcls apiKey = 30
deleteAcls apiKey = 31
describeConfigs apiKey = 32
alterConfigs apiKey = 33
alterReplicaLogDirs apiKey = 34
describeLogDirs apiKey = 35
saslAuthenticate apiKey = 36
createPartitions apiKey = 37
createDelegationToken apiKey = 38
renewDelegationToken apiKey = 39
expireDelegationToken apiKey = 40
describeDelegationToken apiKey = 41
deleteGroups apiKey = 42
electLeaders apiKey = 43
incrementalAlterConfigs apiKey = 44
alterPartitionReassignments apiKey = 45
listPartitionReassignments apiKey = 46
offsetDelete apiKey = 47
)
func (k apiKey) String() string {
if i := int(k); i >= 0 && i < len(apiKeyStrings) {
return apiKeyStrings[i]
}
return strconv.Itoa(int(k))
}
const (
// v0 = 0
v1 = 1
v2 = 2
v3 = 3
v4 = 4
v5 = 5
v6 = 6
v7 = 7
v8 = 8
v9 = 9
v10 = 10
v11 = 11
)
var apiKeyStrings = [...]string{
produce: "Produce",
fetch: "Fetch",
listOffsets: "ListOffsets",
metadata: "Metadata",
leaderAndIsr: "LeaderAndIsr",
stopReplica: "StopReplica",
updateMetadata: "UpdateMetadata",
controlledShutdown: "ControlledShutdown",
offsetCommit: "OffsetCommit",
offsetFetch: "OffsetFetch",
findCoordinator: "FindCoordinator",
joinGroup: "JoinGroup",
heartbeat: "Heartbeat",
leaveGroup: "LeaveGroup",
syncGroup: "SyncGroup",
describeGroups: "DescribeGroups",
listGroups: "ListGroups",
saslHandshake: "SaslHandshake",
apiVersions: "ApiVersions",
createTopics: "CreateTopics",
deleteTopics: "DeleteTopics",
deleteRecords: "DeleteRecords",
initProducerId: "InitProducerId",
offsetForLeaderEpoch: "OffsetForLeaderEpoch",
addPartitionsToTxn: "AddPartitionsToTxn",
addOffsetsToTxn: "AddOffsetsToTxn",
endTxn: "EndTxn",
writeTxnMarkers: "WriteTxnMarkers",
txnOffsetCommit: "TxnOffsetCommit",
describeAcls: "DescribeAcls",
createAcls: "CreateAcls",
deleteAcls: "DeleteAcls",
describeConfigs: "DescribeConfigs",
alterConfigs: "AlterConfigs",
alterReplicaLogDirs: "AlterReplicaLogDirs",
describeLogDirs: "DescribeLogDirs",
saslAuthenticate: "SaslAuthenticate",
createPartitions: "CreatePartitions",
createDelegationToken: "CreateDelegationToken",
renewDelegationToken: "RenewDelegationToken",
expireDelegationToken: "ExpireDelegationToken",
describeDelegationToken: "DescribeDelegationToken",
deleteGroups: "DeleteGroups",
electLeaders: "ElectLeaders",
incrementalAlterConfigs: "IncrementalAlfterConfigs",
alterPartitionReassignments: "AlterPartitionReassignments",
listPartitionReassignments: "ListPartitionReassignments",
offsetDelete: "OffsetDelete",
}
func makeInt8(b []byte) int8 {
return int8(b[0])
}
func makeInt16(b []byte) int16 {
return int16(binary.BigEndian.Uint16(b))
}
func makeInt32(b []byte) int32 {
return int32(binary.BigEndian.Uint32(b))
}
func makeInt64(b []byte) int64 {
return int64(binary.BigEndian.Uint64(b))
}

View File

@ -1,159 +0,0 @@
package kafka
import (
"bufio"
"errors"
"fmt"
"io"
)
type readable interface {
readFrom(*bufio.Reader, int) (int, error)
}
var errShortRead = errors.New("not enough bytes available to load the response")
func peekRead(r *bufio.Reader, sz int, n int, f func([]byte)) (int, error) {
if n > sz {
return sz, errShortRead
}
b, err := r.Peek(n)
if err != nil {
return sz, err
}
f(b)
return discardN(r, sz, n)
}
func readInt8(r *bufio.Reader, sz int, v *int8) (int, error) {
return peekRead(r, sz, 1, func(b []byte) { *v = makeInt8(b) })
}
func readInt16(r *bufio.Reader, sz int, v *int16) (int, error) {
return peekRead(r, sz, 2, func(b []byte) { *v = makeInt16(b) })
}
func readInt32(r *bufio.Reader, sz int, v *int32) (int, error) {
return peekRead(r, sz, 4, func(b []byte) { *v = makeInt32(b) })
}
func readInt64(r *bufio.Reader, sz int, v *int64) (int, error) {
return peekRead(r, sz, 8, func(b []byte) { *v = makeInt64(b) })
}
func readString(r *bufio.Reader, sz int, v *string) (int, error) {
return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) {
*v, remain, err = readNewString(r, sz, n)
return
})
}
func readStringWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) {
var err error
var len int16
if sz, err = readInt16(r, sz, &len); err != nil {
return sz, err
}
n := int(len)
if n > sz {
return sz, errShortRead
}
return cb(r, sz, n)
}
func readNewString(r *bufio.Reader, sz int, n int) (string, int, error) {
b, sz, err := readNewBytes(r, sz, n)
return string(b), sz, err
}
func readBytes(r *bufio.Reader, sz int, v *[]byte) (int, error) {
return readBytesWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) {
*v, remain, err = readNewBytes(r, sz, n)
return
})
}
func readBytesWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) {
var err error
var n int
if sz, err = readArrayLen(r, sz, &n); err != nil {
return sz, err
}
if n > sz {
return sz, errShortRead
}
return cb(r, sz, n)
}
func readNewBytes(r *bufio.Reader, sz int, n int) ([]byte, int, error) {
var err error
var b []byte
var shortRead bool
if n > 0 {
if sz < n {
n = sz
shortRead = true
}
b = make([]byte, n)
n, err = io.ReadFull(r, b)
b = b[:n]
sz -= n
if err == nil && shortRead {
err = errShortRead
}
}
return b, sz, err
}
func readArrayLen(r *bufio.Reader, sz int, n *int) (int, error) {
var err error
var len int32
if sz, err = readInt32(r, sz, &len); err != nil {
return sz, err
}
*n = int(len)
return sz, nil
}
func ReadAll(r *bufio.Reader, sz int, ptrs ...interface{}) (int, error) {
var err error
for _, ptr := range ptrs {
if sz, err = readPtr(r, sz, ptr); err != nil {
break
}
}
return sz, err
}
func readPtr(r *bufio.Reader, sz int, ptr interface{}) (int, error) {
switch v := ptr.(type) {
case *int8:
return readInt8(r, sz, v)
case *int16:
return readInt16(r, sz, v)
case *int32:
return readInt32(r, sz, v)
case *int64:
return readInt64(r, sz, v)
case *string:
return readString(r, sz, v)
case *[]byte:
return readBytes(r, sz, v)
case readable:
return v.readFrom(r, sz)
default:
panic(fmt.Sprintf("unsupported type: %T", v))
}
}

View File

@ -1,279 +0,0 @@
package kafka
import (
"encoding/binary"
"io"
"time"
"github.com/segmentio/kafka-go/compress"
)
// Attributes is a bitset representing special attributes set on records.
type Attributes int16
const (
Gzip Attributes = Attributes(compress.Gzip) // 1
Snappy Attributes = Attributes(compress.Snappy) // 2
Lz4 Attributes = Attributes(compress.Lz4) // 3
Zstd Attributes = Attributes(compress.Zstd) // 4
Transactional Attributes = 1 << 4
Control Attributes = 1 << 5
)
func (a Attributes) Compression() compress.Compression {
return compress.Compression(a & 7)
}
func (a Attributes) Transactional() bool {
return (a & Transactional) != 0
}
func (a Attributes) Control() bool {
return (a & Control) != 0
}
func (a Attributes) String() string {
s := a.Compression().String()
if a.Transactional() {
s += "+transactional"
}
if a.Control() {
s += "+control"
}
return s
}
// Header represents a single entry in a list of record headers.
type Header struct {
Key string
Value []byte
}
// Record is an interface representing a single kafka record.
//
// Record values are not safe to use concurrently from multiple goroutines.
type Record struct {
// The offset at which the record exists in a topic partition. This value
// is ignored in produce requests.
Offset int64
// Returns the time of the record. This value may be omitted in produce
// requests to let kafka set the time when it saves the record.
Time time.Time
// Returns a byte sequence containing the key of this record. The returned
// sequence may be nil to indicate that the record has no key. If the record
// is part of a RecordSet, the content of the key must remain valid at least
// until the record set is closed (or until the key is closed).
Key Bytes
// Returns a byte sequence containing the value of this record. The returned
// sequence may be nil to indicate that the record has no value. If the
// record is part of a RecordSet, the content of the value must remain valid
// at least until the record set is closed (or until the value is closed).
Value Bytes
// Returns the list of headers associated with this record. The returned
// slice may be reused across calls, the program should use it as an
// immutable value.
Headers []Header
}
// RecordSet represents a sequence of records in Produce requests and Fetch
// responses. All v0, v1, and v2 formats are supported.
type RecordSet struct {
// The message version that this record set will be represented as, valid
// values are 1, or 2.
//
// When reading, this is the value of the highest version used in the
// batches that compose the record set.
//
// When writing, this value dictates the format that the records will be
// encoded in.
Version int8
// Attributes set on the record set.
//
// When reading, the attributes are the combination of all attributes in
// the batches that compose the record set.
//
// When writing, the attributes apply to the whole sequence of records in
// the set.
Attributes Attributes
// A reader exposing the sequence of records.
//
// When reading a RecordSet from an io.Reader, the Records field will be a
// *RecordStream. If the program needs to access the details of each batch
// that compose the stream, it may use type assertions to access the
// underlying types of each batch.
Records RecordReader
}
// ReadFrom reads the representation of a record set from r into rs, returning
// the number of bytes consumed from r, and an non-nil error if the record set
// could not be read.
func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error) {
// d, _ := r.(*decoder)
// if d == nil {
// d = &decoder{
// reader: r,
// remain: 4,
// }
// }
// *rs = RecordSet{}
// limit := d.remain
// size := d.readInt32()
// if d.err != nil {
// return int64(limit - d.remain), d.err
// }
// if size <= 0 {
// return 4, nil
// }
// stream := &RecordStream{
// Records: make([]RecordReader, 0, 4),
// }
// var err error
// d.remain = int(size)
// for d.remain > 0 && err == nil {
// var version byte
// if d.remain < (magicByteOffset + 1) {
// if len(stream.Records) != 0 {
// break
// }
// return 4, fmt.Errorf("impossible record set shorter than %d bytes", magicByteOffset+1)
// }
// switch r := d.reader.(type) {
// case bufferedReader:
// b, err := r.Peek(magicByteOffset + 1)
// if err != nil {
// n, _ := r.Discard(len(b))
// return 4 + int64(n), dontExpectEOF(err)
// }
// version = b[magicByteOffset]
// case bytesBuffer:
// version = r.Bytes()[magicByteOffset]
// default:
// b := make([]byte, magicByteOffset+1)
// if n, err := io.ReadFull(d.reader, b); err != nil {
// return 4 + int64(n), dontExpectEOF(err)
// }
// version = b[magicByteOffset]
// // Reconstruct the prefix that we had to read to determine the version
// // of the record set from the magic byte.
// //
// // Technically this may recurisvely stack readers when consuming all
// // items of the batch, which could hurt performance. In practice this
// // path should not be taken tho, since the decoder would read from a
// // *bufio.Reader which implements the bufferedReader interface.
// d.reader = io.MultiReader(bytes.NewReader(b), d.reader)
// }
// var tmp RecordSet
// switch version {
// case 0, 1:
// err = tmp.readFromVersion1(d)
// case 2:
// err = tmp.readFromVersion2(d)
// default:
// err = fmt.Errorf("unsupported message version %d for message of size %d", version, size)
// }
// if tmp.Version > rs.Version {
// rs.Version = tmp.Version
// }
// rs.Attributes |= tmp.Attributes
// if tmp.Records != nil {
// stream.Records = append(stream.Records, tmp.Records)
// }
// }
// if len(stream.Records) != 0 {
// rs.Records = stream
// // Ignore errors if we've successfully read records, so the
// // program can keep making progress.
// err = nil
// }
// d.discardAll()
// rn := 4 + (int(size) - d.remain)
// d.remain = limit - rn
// return int64(rn), err
return 0, nil
}
// WriteTo writes the representation of rs into w. The value of rs.Version
// dictates which format that the record set will be represented as.
//
// The error will be ErrNoRecord if rs contained no records.
//
// Note: since this package is only compatible with kafka 0.10 and above, the
// method never produces messages in version 0. If rs.Version is zero, the
// method defaults to producing messages in version 1.
func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) {
// if rs.Records == nil {
// return 0, ErrNoRecord
// }
// // This optimization avoids rendering the record set in an intermediary
// // buffer when the writer is already a pageBuffer, which is a common case
// // due to the way WriteRequest and WriteResponse are implemented.
// buffer, _ := w.(*pageBuffer)
// bufferOffset := int64(0)
// if buffer != nil {
// bufferOffset = buffer.Size()
// } else {
// buffer = newPageBuffer()
// defer buffer.unref()
// }
// size := packUint32(0)
// buffer.Write(size[:]) // size placeholder
// var err error
// switch rs.Version {
// case 0, 1:
// err = rs.writeToVersion1(buffer, bufferOffset+4)
// case 2:
// err = rs.writeToVersion2(buffer, bufferOffset+4)
// default:
// err = fmt.Errorf("unsupported record set version %d", rs.Version)
// }
// if err != nil {
// return 0, err
// }
// n := buffer.Size() - bufferOffset
// if n == 0 {
// size = packUint32(^uint32(0))
// } else {
// size = packUint32(uint32(n) - 4)
// }
// buffer.WriteAt(size[:], bufferOffset)
// // This condition indicates that the output writer received by `WriteTo` was
// // not a *pageBuffer, in which case we need to flush the buffered records
// // data into it.
// if buffer != w {
// return buffer.WriteTo(w)
// }
// return n, nil
return 0, nil
}
func packUint32(u uint32) (b [4]byte) {
binary.BigEndian.PutUint32(b[:], u)
return
}

View File

@ -1,43 +0,0 @@
package kafka
import (
"github.com/segmentio/kafka-go/protocol"
)
// Header is a key/value pair type representing headers set on records.
// type Header = protocol.Header
// Bytes is an interface representing a sequence of bytes. This abstraction
// makes it possible for programs to inject data into produce requests without
// having to load in into an intermediary buffer, or read record keys and values
// from a fetch response directly from internal buffers.
//
// Bytes are not safe to use concurrently from multiple goroutines.
// type Bytes = protocol.Bytes
// NewBytes constructs a Bytes value from a byte slice.
//
// If b is nil, nil is returned.
// func NewBytes(b []byte) Bytes { return protocol.NewBytes(b) }
// ReadAll reads b into a byte slice.
// func ReadAll(b Bytes) ([]byte, error) { return protocol.ReadAll(b) }
// Record is an interface representing a single kafka record.
//
// Record values are not safe to use concurrently from multiple goroutines.
// type Record = protocol.Record
// RecordReader is an interface representing a sequence of records. Record sets
// are used in both produce and fetch requests to represent the sequence of
// records that are sent to or receive from kafka brokers.
//
// RecordReader values are not safe to use concurrently from multiple goroutines.
type RecordReader = protocol.RecordReader
// NewRecordReade rconstructs a RecordSet which exposes the sequence of records
// passed as arguments.
func NewRecordReader(records ...Record) RecordReader {
// return protocol.NewRecordReader(records...)
return nil
}

View File

@ -8,46 +8,14 @@ import (
type index []int
type _type struct{ typ reflect.Type }
func typeOf(x interface{}) _type {
return makeType(reflect.TypeOf(x))
}
func elemTypeOf(x interface{}) _type {
return makeType(reflect.TypeOf(x).Elem())
}
func makeType(t reflect.Type) _type {
return _type{typ: t}
}
type value struct {
val reflect.Value
}
func nonAddressableValueOf(x interface{}) value {
return value{val: reflect.ValueOf(x)}
}
func valueOf(x interface{}) value {
return value{val: reflect.ValueOf(x).Elem()}
}
func (v value) bool() bool { return v.val.Bool() }
func (v value) int8() int8 { return int8(v.int64()) }
func (v value) int16() int16 { return int16(v.int64()) }
func (v value) int32() int32 { return int32(v.int64()) }
func (v value) int64() int64 { return v.val.Int() }
func (v value) string() string { return v.val.String() }
func (v value) bytes() []byte { return v.val.Bytes() }
func (v value) iface(t reflect.Type) interface{} { return v.val.Addr().Interface() }
func (v value) array(t reflect.Type) array { return array{val: v.val} } //nolint
@ -88,10 +56,6 @@ func makeArray(t reflect.Type, n int) array {
func (a array) index(i int) value { return value{val: a.val.Index(i)} }
func (a array) length() int { return a.val.Len() }
func (a array) isNil() bool { return a.val.IsNil() }
func indexOf(s reflect.StructField) index { return index(s.Index) }
func bytesToString(b []byte) string { return string(b) }

View File

@ -28,6 +28,9 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
}
if size < 8 {
if size == 0 {
return 0, 0, io.EOF
}
return 0, 0, fmt.Errorf("A Kafka request header cannot be smaller than 8 bytes")
}
@ -42,7 +45,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
correlationID := d.readInt32()
clientID := d.readString()
if i := int(apiKey); i < 0 || i >= len(apiTypes) {
if i := int(apiKey); i < 0 || i >= numApis {
err = fmt.Errorf("unsupported api key: %d", i)
return apiKey, apiVersion, err
}
@ -52,12 +55,6 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
return apiKey, apiVersion, err
}
t := &apiTypes[apiKey]
if t == nil {
err = fmt.Errorf("unsupported api: %s", apiNames[apiKey])
return apiKey, apiVersion, err
}
var payload interface{}
switch apiKey {
@ -227,61 +224,3 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
return apiKey, apiVersion, nil
}
func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message) error {
apiKey := msg.ApiKey()
if i := int(apiKey); i < 0 || i >= len(apiTypes) {
return fmt.Errorf("unsupported api key: %d", i)
}
t := &apiTypes[apiKey]
if t == nil {
return fmt.Errorf("unsupported api: %s", apiNames[apiKey])
}
minVersion := t.minVersion()
maxVersion := t.maxVersion()
if apiVersion < minVersion || apiVersion > maxVersion {
return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
}
r := &t.requests[apiVersion-minVersion]
v := valueOf(msg)
b := newPageBuffer()
defer b.unref()
e := &encoder{writer: b}
e.writeInt32(0) // placeholder for the request size
e.writeInt16(int16(apiKey))
e.writeInt16(apiVersion)
e.writeInt32(correlationID)
if r.flexible {
// Flexible messages use a nullable string for the client ID, then extra space for a
// tag buffer, which begins with a size value. Since we're not writing any fields into the
// latter, we can just write zero for now.
//
// See
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
// for details.
e.writeNullString(clientID)
e.writeUnsignedVarInt(0)
} else {
// Technically, recent versions of kafka interpret this field as a nullable
// string, however kafka 0.10 expected a non-nullable string and fails with
// a NullPointerException when it receives a null client id.
e.writeString(clientID)
}
r.encode(e, v)
err := e.err
if err == nil {
size := packUint32(uint32(b.Size()) - 4)
_, _ = b.WriteAt(size[:], 0)
_, err = b.WriteTo(w)
}
return err
}

View File

@ -25,6 +25,9 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
}
if size < 4 {
if size == 0 {
return io.EOF
}
return fmt.Errorf("A Kafka response header cannot be smaller than 8 bytes")
}
@ -53,7 +56,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
)
reqResPair := reqResMatcher.registerResponse(key, response)
if reqResPair == nil {
return fmt.Errorf("Couldn't match a Kafka response to a Kafka request in 3 seconds!")
return fmt.Errorf("Couldn't match a Kafka response to a Kafka request in %d milliseconds!", reqResMatcher.maxTry)
}
apiKey := reqResPair.Request.ApiKey
apiVersion := reqResPair.Request.ApiVersion
@ -284,57 +287,12 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
}
emitter.Emit(item)
if i := int(apiKey); i < 0 || i >= len(apiTypes) {
if i := int(apiKey); i < 0 || i >= numApis {
err = fmt.Errorf("unsupported api key: %d", i)
return err
}
t := &apiTypes[apiKey]
if t == nil {
err = fmt.Errorf("unsupported api: %s", apiNames[apiKey])
return err
}
d.discardAll()
return nil
}
func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error {
apiKey := msg.ApiKey()
if i := int(apiKey); i < 0 || i >= len(apiTypes) {
return fmt.Errorf("unsupported api key: %d", i)
}
t := &apiTypes[apiKey]
if t == nil {
return fmt.Errorf("unsupported api: %s", apiNames[apiKey])
}
minVersion := t.minVersion()
maxVersion := t.maxVersion()
if apiVersion < minVersion || apiVersion > maxVersion {
return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
}
r := &t.responses[apiVersion-minVersion]
v := valueOf(msg)
b := newPageBuffer()
defer b.unref()
e := &encoder{writer: b}
e.writeInt32(0) // placeholder for the response size
e.writeInt32(correlationID)
r.encode(e, v)
err := e.err
if err == nil {
size := packUint32(uint32(b.Size()) - 4)
_, _ = b.WriteAt(size[:], 0)
_, err = b.WriteTo(w)
}
return err
}

View File

@ -12,19 +12,6 @@ const (
RequireAll RequiredAcks = -1
)
func (acks RequiredAcks) String() string {
switch acks {
case RequireNone:
return "none"
case RequireOne:
return "one"
case RequireAll:
return "all"
default:
return "unknown"
}
}
type UUID struct {
TimeLow int32 `json:"timeLow"`
TimeMid int16 `json:"timeMid"`

View File

@ -19,6 +19,8 @@ func createResponseRequestMatcher() api.RequestResponseMatcher {
func (matcher *requestResponseMatcher) GetMap() *sync.Map {
return matcher.openMessagesMap
}
func (matcher *requestResponseMatcher) SetMaxTry(value int) {
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
requestRedisMessage := api.GenericMessage{