diff --git a/tap/extensions/kafka/buffer.go b/tap/extensions/kafka/buffer.go new file mode 100644 index 000000000..d57d2c96c --- /dev/null +++ b/tap/extensions/kafka/buffer.go @@ -0,0 +1,645 @@ +package main + +import ( + "bytes" + "fmt" + "io" + "math" + "sync" + "sync/atomic" +) + +// Bytes is an interface implemented by types that represent immutable +// sequences of bytes. +// +// Bytes values are used to abstract the location where record keys and +// values are read from (e.g. in-memory buffers, network sockets, files). +// +// The Close method should be called to release resources held by the object +// when the program is done with it. +// +// Bytes values are generally not safe to use concurrently from multiple +// goroutines. +type Bytes interface { + io.ReadCloser + // Returns the number of bytes remaining to be read from the payload. + Len() int +} + +// NewBytes constructs a Bytes value from b. +// +// The returned value references b, it does not make a copy of the backing +// array. +// +// If b is nil, nil is returned to represent a null BYTES value in the kafka +// protocol. +func NewBytes(b []byte) Bytes { + if b == nil { + return nil + } + r := new(bytesReader) + r.Reset(b) + return r +} + +// ReadAll is similar to ioutil.ReadAll, but it takes advantage of knowing the +// length of b to minimize the memory footprint. +// +// The function returns a nil slice if b is nil. +// func ReadAll(b Bytes) ([]byte, error) { +// if b == nil { +// return nil, nil +// } +// s := make([]byte, b.Len()) +// _, err := io.ReadFull(b, s) +// return s, err +// } + +type bytesReader struct{ bytes.Reader } + +func (*bytesReader) Close() error { return nil } + +type refCount uintptr + +func (rc *refCount) ref() { atomic.AddUintptr((*uintptr)(rc), 1) } + +func (rc *refCount) unref(onZero func()) { + if atomic.AddUintptr((*uintptr)(rc), ^uintptr(0)) == 0 { + onZero() + } +} + +const ( + // Size of the memory buffer for a single page. We use a farily + // large size here (64 KiB) because batches exchanged with kafka + // tend to be multiple kilobytes in size, sometimes hundreds. + // Using large pages amortizes the overhead of the page metadata + // and algorithms to manage the pages. + pageSize = 65536 +) + +type page struct { + refc refCount + offset int64 + length int + buffer *[pageSize]byte +} + +func newPage(offset int64) *page { + p, _ := pagePool.Get().(*page) + if p != nil { + p.offset = offset + p.length = 0 + p.ref() + } else { + p = &page{ + refc: 1, + offset: offset, + buffer: &[pageSize]byte{}, + } + } + return p +} + +func (p *page) ref() { p.refc.ref() } + +func (p *page) unref() { p.refc.unref(func() { pagePool.Put(p) }) } + +func (p *page) slice(begin, end int64) []byte { + i, j := begin-p.offset, end-p.offset + + if i < 0 { + i = 0 + } else if i > pageSize { + i = pageSize + } + + if j < 0 { + j = 0 + } else if j > pageSize { + j = pageSize + } + + if i < j { + return p.buffer[i:j] + } + + return nil +} + +func (p *page) Cap() int { return pageSize } + +func (p *page) Len() int { return p.length } + +func (p *page) Size() int64 { return int64(p.length) } + +func (p *page) Truncate(n int) { + if n < p.length { + p.length = n + } +} + +func (p *page) ReadAt(b []byte, off int64) (int, error) { + if off -= p.offset; off < 0 || off > pageSize { + panic("offset out of range") + } + if off > int64(p.length) { + return 0, nil + } + return copy(b, p.buffer[off:p.length]), nil +} + +func (p *page) ReadFrom(r io.Reader) (int64, error) { + n, err := io.ReadFull(r, p.buffer[p.length:]) + if err == io.EOF || err == io.ErrUnexpectedEOF { + err = nil + } + p.length += n + return int64(n), err +} + +func (p *page) WriteAt(b []byte, off int64) (int, error) { + if off -= p.offset; off < 0 || off > pageSize { + panic("offset out of range") + } + n := copy(p.buffer[off:], b) + if end := int(off) + n; end > p.length { + p.length = end + } + return n, nil +} + +func (p *page) Write(b []byte) (int, error) { + return p.WriteAt(b, p.offset+int64(p.length)) +} + +var ( + _ io.ReaderAt = (*page)(nil) + _ io.ReaderFrom = (*page)(nil) + _ io.Writer = (*page)(nil) + _ io.WriterAt = (*page)(nil) +) + +type pageBuffer struct { + refc refCount + pages contiguousPages + length int + cursor int +} + +func newPageBuffer() *pageBuffer { + b, _ := pageBufferPool.Get().(*pageBuffer) + if b != nil { + b.cursor = 0 + b.refc.ref() + } else { + b = &pageBuffer{ + refc: 1, + pages: make(contiguousPages, 0, 16), + } + } + return b +} + +func (pb *pageBuffer) refTo(ref *pageRef, begin, end int64) { + length := end - begin + + if length > math.MaxUint32 { + panic("reference to contiguous buffer pages exceeds the maximum size of 4 GB") + } + + ref.pages = append(ref.buffer[:0], pb.pages.slice(begin, end)...) + ref.pages.ref() + ref.offset = begin + ref.length = uint32(length) +} + +func (pb *pageBuffer) ref(begin, end int64) *pageRef { + ref := new(pageRef) + pb.refTo(ref, begin, end) + return ref +} + +func (pb *pageBuffer) unref() { + pb.refc.unref(func() { + pb.pages.unref() + pb.pages.clear() + pb.pages = pb.pages[:0] + pb.length = 0 + pageBufferPool.Put(pb) + }) +} + +func (pb *pageBuffer) newPage() *page { + return newPage(int64(pb.length)) +} + +func (pb *pageBuffer) Close() error { + return nil +} + +func (pb *pageBuffer) Len() int { + return pb.length - pb.cursor +} + +func (pb *pageBuffer) Size() int64 { + return int64(pb.length) +} + +func (pb *pageBuffer) Discard(n int) (int, error) { + remain := pb.length - pb.cursor + if remain < n { + n = remain + } + pb.cursor += n + return n, nil +} + +func (pb *pageBuffer) Truncate(n int) { + if n < pb.length { + pb.length = n + + if n < pb.cursor { + pb.cursor = n + } + + for i := range pb.pages { + if p := pb.pages[i]; p.length <= n { + n -= p.length + } else { + if n > 0 { + pb.pages[i].Truncate(n) + i++ + } + pb.pages[i:].unref() + pb.pages[i:].clear() + pb.pages = pb.pages[:i] + break + } + } + } +} + +func (pb *pageBuffer) Seek(offset int64, whence int) (int64, error) { + c, err := seek(int64(pb.cursor), int64(pb.length), offset, whence) + if err != nil { + return -1, err + } + pb.cursor = int(c) + return c, nil +} + +func (pb *pageBuffer) ReadByte() (byte, error) { + b := [1]byte{} + _, err := pb.Read(b[:]) + return b[0], err +} + +func (pb *pageBuffer) Read(b []byte) (int, error) { + if pb.cursor >= pb.length { + return 0, io.EOF + } + n, err := pb.ReadAt(b, int64(pb.cursor)) + pb.cursor += n + return n, err +} + +func (pb *pageBuffer) ReadAt(b []byte, off int64) (int, error) { + return pb.pages.ReadAt(b, off) +} + +func (pb *pageBuffer) ReadFrom(r io.Reader) (int64, error) { + if len(pb.pages) == 0 { + pb.pages = append(pb.pages, pb.newPage()) + } + + rn := int64(0) + + for { + tail := pb.pages[len(pb.pages)-1] + free := tail.Cap() - tail.Len() + + if free == 0 { + tail = pb.newPage() + free = pageSize + pb.pages = append(pb.pages, tail) + } + + n, err := tail.ReadFrom(r) + pb.length += int(n) + rn += n + if n < int64(free) { + return rn, err + } + } +} + +func (pb *pageBuffer) WriteString(s string) (int, error) { + return pb.Write([]byte(s)) +} + +func (pb *pageBuffer) Write(b []byte) (int, error) { + wn := len(b) + if wn == 0 { + return 0, nil + } + + if len(pb.pages) == 0 { + pb.pages = append(pb.pages, pb.newPage()) + } + + for len(b) != 0 { + tail := pb.pages[len(pb.pages)-1] + free := tail.Cap() - tail.Len() + + if len(b) <= free { + tail.Write(b) + pb.length += len(b) + break + } + + tail.Write(b[:free]) + b = b[free:] + + pb.length += free + pb.pages = append(pb.pages, pb.newPage()) + } + + return wn, nil +} + +func (pb *pageBuffer) WriteAt(b []byte, off int64) (int, error) { + n, err := pb.pages.WriteAt(b, off) + if err != nil { + return n, err + } + if n < len(b) { + pb.Write(b[n:]) + } + return len(b), nil +} + +func (pb *pageBuffer) WriteTo(w io.Writer) (int64, error) { + var wn int + var err error + pb.pages.scan(int64(pb.cursor), int64(pb.length), func(b []byte) bool { + var n int + n, err = w.Write(b) + wn += n + return err == nil + }) + pb.cursor += wn + return int64(wn), err +} + +var ( + _ io.ReaderAt = (*pageBuffer)(nil) + _ io.ReaderFrom = (*pageBuffer)(nil) + _ io.StringWriter = (*pageBuffer)(nil) + _ io.Writer = (*pageBuffer)(nil) + _ io.WriterAt = (*pageBuffer)(nil) + _ io.WriterTo = (*pageBuffer)(nil) + + pagePool sync.Pool + pageBufferPool sync.Pool +) + +type contiguousPages []*page + +func (pages contiguousPages) ref() { + for _, p := range pages { + p.ref() + } +} + +func (pages contiguousPages) unref() { + for _, p := range pages { + p.unref() + } +} + +func (pages contiguousPages) clear() { + for i := range pages { + pages[i] = nil + } +} + +func (pages contiguousPages) ReadAt(b []byte, off int64) (int, error) { + rn := 0 + + for _, p := range pages.slice(off, off+int64(len(b))) { + n, _ := p.ReadAt(b, off) + b = b[n:] + rn += n + off += int64(n) + } + + return rn, nil +} + +func (pages contiguousPages) WriteAt(b []byte, off int64) (int, error) { + wn := 0 + + for _, p := range pages.slice(off, off+int64(len(b))) { + n, _ := p.WriteAt(b, off) + b = b[n:] + wn += n + off += int64(n) + } + + return wn, nil +} + +func (pages contiguousPages) slice(begin, end int64) contiguousPages { + i := pages.indexOf(begin) + j := pages.indexOf(end) + if j < len(pages) { + j++ + } + return pages[i:j] +} + +func (pages contiguousPages) indexOf(offset int64) int { + if len(pages) == 0 { + return 0 + } + return int((offset - pages[0].offset) / pageSize) +} + +func (pages contiguousPages) scan(begin, end int64, f func([]byte) bool) { + for _, p := range pages.slice(begin, end) { + if !f(p.slice(begin, end)) { + break + } + } +} + +var ( + _ io.ReaderAt = contiguousPages{} + _ io.WriterAt = contiguousPages{} +) + +type pageRef struct { + buffer [2]*page + pages contiguousPages + offset int64 + cursor int64 + length uint32 + once uint32 +} + +func (ref *pageRef) unref() { + if atomic.CompareAndSwapUint32(&ref.once, 0, 1) { + ref.pages.unref() + ref.pages.clear() + ref.pages = nil + ref.offset = 0 + ref.cursor = 0 + ref.length = 0 + } +} + +func (ref *pageRef) Len() int { return int(ref.Size() - ref.cursor) } + +func (ref *pageRef) Size() int64 { return int64(ref.length) } + +func (ref *pageRef) Close() error { ref.unref(); return nil } + +func (ref *pageRef) String() string { + return fmt.Sprintf("[offset=%d cursor=%d length=%d]", ref.offset, ref.cursor, ref.length) +} + +func (ref *pageRef) Seek(offset int64, whence int) (int64, error) { + c, err := seek(ref.cursor, int64(ref.length), offset, whence) + if err != nil { + return -1, err + } + ref.cursor = c + return c, nil +} + +func (ref *pageRef) ReadByte() (byte, error) { + var c byte + var ok bool + ref.scan(ref.cursor, func(b []byte) bool { + c, ok = b[0], true + return false + }) + if ok { + ref.cursor++ + } else { + return 0, io.EOF + } + return c, nil +} + +func (ref *pageRef) Read(b []byte) (int, error) { + if ref.cursor >= int64(ref.length) { + return 0, io.EOF + } + n, err := ref.ReadAt(b, ref.cursor) + ref.cursor += int64(n) + return n, err +} + +func (ref *pageRef) ReadAt(b []byte, off int64) (int, error) { + limit := ref.offset + int64(ref.length) + off += ref.offset + + if off >= limit { + return 0, io.EOF + } + + if off+int64(len(b)) > limit { + b = b[:limit-off] + } + + if len(b) == 0 { + return 0, nil + } + + n, err := ref.pages.ReadAt(b, off) + if n == 0 && err == nil { + err = io.EOF + } + return n, err +} + +func (ref *pageRef) WriteTo(w io.Writer) (wn int64, err error) { + ref.scan(ref.cursor, func(b []byte) bool { + var n int + n, err = w.Write(b) + wn += int64(n) + return err == nil + }) + ref.cursor += wn + return +} + +func (ref *pageRef) scan(off int64, f func([]byte) bool) { + begin := ref.offset + off + end := ref.offset + int64(ref.length) + ref.pages.scan(begin, end, f) +} + +var ( + _ io.Closer = (*pageRef)(nil) + _ io.Seeker = (*pageRef)(nil) + _ io.Reader = (*pageRef)(nil) + _ io.ReaderAt = (*pageRef)(nil) + _ io.WriterTo = (*pageRef)(nil) +) + +type pageRefAllocator struct { + refs []pageRef + head int + size int +} + +func (a *pageRefAllocator) newPageRef() *pageRef { + if a.head == len(a.refs) { + a.refs = make([]pageRef, a.size) + a.head = 0 + } + ref := &a.refs[a.head] + a.head++ + return ref +} + +func unref(x interface{}) { + if r, _ := x.(interface{ unref() }); r != nil { + r.unref() + } +} + +func seek(cursor, limit, offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + // absolute offset + case io.SeekCurrent: + offset = cursor + offset + case io.SeekEnd: + offset = limit - offset + default: + return -1, fmt.Errorf("seek: invalid whence value: %d", whence) + } + if offset < 0 { + offset = 0 + } + if offset > limit { + offset = limit + } + return offset, nil +} + +func closeBytes(b Bytes) { + if b != nil { + b.Close() + } +} + +func resetBytes(b Bytes) { + if r, _ := b.(interface{ Reset() }); r != nil { + r.Reset() + } +} diff --git a/tap/extensions/kafka/cluster.go b/tap/extensions/kafka/cluster.go new file mode 100644 index 000000000..a1e693581 --- /dev/null +++ b/tap/extensions/kafka/cluster.go @@ -0,0 +1,143 @@ +package main + +import ( + "fmt" + "sort" + "strings" + "text/tabwriter" +) + +type Cluster struct { + ClusterID string + Controller int32 + Brokers map[int32]Broker + Topics map[string]Topic +} + +func (c Cluster) BrokerIDs() []int32 { + brokerIDs := make([]int32, 0, len(c.Brokers)) + for id := range c.Brokers { + brokerIDs = append(brokerIDs, id) + } + sort.Slice(brokerIDs, func(i, j int) bool { + return brokerIDs[i] < brokerIDs[j] + }) + return brokerIDs +} + +func (c Cluster) TopicNames() []string { + topicNames := make([]string, 0, len(c.Topics)) + for name := range c.Topics { + topicNames = append(topicNames, name) + } + sort.Strings(topicNames) + return topicNames +} + +func (c Cluster) IsZero() bool { + return c.ClusterID == "" && c.Controller == 0 && len(c.Brokers) == 0 && len(c.Topics) == 0 +} + +func (c Cluster) Format(w fmt.State, _ rune) { + tw := new(tabwriter.Writer) + fmt.Fprintf(w, "CLUSTER: %q\n\n", c.ClusterID) + + tw.Init(w, 0, 8, 2, ' ', 0) + fmt.Fprint(tw, " BROKER\tHOST\tPORT\tRACK\tCONTROLLER\n") + + for _, id := range c.BrokerIDs() { + broker := c.Brokers[id] + fmt.Fprintf(tw, " %d\t%s\t%d\t%s\t%t\n", broker.ID, broker.Host, broker.Port, broker.Rack, broker.ID == c.Controller) + } + + tw.Flush() + fmt.Fprintln(w) + + tw.Init(w, 0, 8, 2, ' ', 0) + fmt.Fprint(tw, " TOPIC\tPARTITIONS\tBROKERS\n") + topicNames := c.TopicNames() + brokers := make(map[int32]struct{}, len(c.Brokers)) + brokerIDs := make([]int32, 0, len(c.Brokers)) + + for _, name := range topicNames { + topic := c.Topics[name] + + for _, p := range topic.Partitions { + for _, id := range p.Replicas { + brokers[id] = struct{}{} + } + } + + for id := range brokers { + brokerIDs = append(brokerIDs, id) + } + + fmt.Fprintf(tw, " %s\t%d\t%s\n", topic.Name, len(topic.Partitions), formatBrokerIDs(brokerIDs, -1)) + + for id := range brokers { + delete(brokers, id) + } + + brokerIDs = brokerIDs[:0] + } + + tw.Flush() + fmt.Fprintln(w) + + if w.Flag('+') { + for _, name := range topicNames { + fmt.Fprintf(w, " TOPIC: %q\n\n", name) + + tw.Init(w, 0, 8, 2, ' ', 0) + fmt.Fprint(tw, " PARTITION\tREPLICAS\tISR\tOFFLINE\n") + + for _, p := range c.Topics[name].Partitions { + fmt.Fprintf(tw, " %d\t%s\t%s\t%s\n", p.ID, + formatBrokerIDs(p.Replicas, -1), + formatBrokerIDs(p.ISR, p.Leader), + formatBrokerIDs(p.Offline, -1), + ) + } + + tw.Flush() + fmt.Fprintln(w) + } + } +} + +func formatBrokerIDs(brokerIDs []int32, leader int32) string { + if len(brokerIDs) == 0 { + return "" + } + + if len(brokerIDs) == 1 { + return itoa(brokerIDs[0]) + } + + sort.Slice(brokerIDs, func(i, j int) bool { + id1 := brokerIDs[i] + id2 := brokerIDs[j] + + if id1 == leader { + return true + } + + if id2 == leader { + return false + } + + return id1 < id2 + }) + + brokerNames := make([]string, len(brokerIDs)) + + for i, id := range brokerIDs { + brokerNames[i] = itoa(id) + } + + return strings.Join(brokerNames, ",") +} + +var ( + _ fmt.Formatter = Cluster{} +) diff --git a/tap/extensions/kafka/compression.go b/tap/extensions/kafka/compression.go new file mode 100644 index 000000000..e5b0485b8 --- /dev/null +++ b/tap/extensions/kafka/compression.go @@ -0,0 +1,30 @@ +package main + +import ( + "errors" + + "github.com/segmentio/kafka-go/compress" +) + +type Compression = compress.Compression + +type CompressionCodec = compress.Codec + +// TODO: this file should probably go away once the internals of the package +// have moved to use the protocol package. +const ( + compressionCodecMask = 0x07 +) + +var ( + errUnknownCodec = errors.New("the compression code is invalid or its codec has not been imported") +) + +// resolveCodec looks up a codec by Code() +func resolveCodec(code int8) (CompressionCodec, error) { + codec := compress.Compression(code).Codec() + if codec == nil { + return nil, errUnknownCodec + } + return codec, nil +} diff --git a/tap/extensions/kafka/decode.go b/tap/extensions/kafka/decode.go new file mode 100644 index 000000000..6dd46599f --- /dev/null +++ b/tap/extensions/kafka/decode.go @@ -0,0 +1,598 @@ +package main + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "io/ioutil" + "reflect" + "strings" + "sync" + "sync/atomic" +) + +type discarder interface { + Discard(int) (int, error) +} + +type decoder struct { + reader io.Reader + remain int + buffer [8]byte + err error + table *crc32.Table + crc32 uint32 +} + +func (d *decoder) Reset(r io.Reader, n int) { + d.reader = r + d.remain = n + d.buffer = [8]byte{} + d.err = nil + d.table = nil + d.crc32 = 0 +} + +func (d *decoder) Read(b []byte) (int, error) { + if d.err != nil { + return 0, d.err + } + if d.remain == 0 { + return 0, io.EOF + } + if len(b) > d.remain { + b = b[:d.remain] + } + n, err := d.reader.Read(b) + if n > 0 && d.table != nil { + d.crc32 = crc32.Update(d.crc32, d.table, b[:n]) + } + d.remain -= n + return n, err +} + +func (d *decoder) ReadByte() (byte, error) { + c := d.readByte() + return c, d.err +} + +func (d *decoder) done() bool { + return d.remain == 0 || d.err != nil +} + +func (d *decoder) setCRC(table *crc32.Table) { + d.table, d.crc32 = table, 0 +} + +func (d *decoder) decodeBool(v value) { + v.setBool(d.readBool()) +} + +func (d *decoder) decodeInt8(v value) { + v.setInt8(d.readInt8()) +} + +func (d *decoder) decodeInt16(v value) { + v.setInt16(d.readInt16()) +} + +func (d *decoder) decodeInt32(v value) { + v.setInt32(d.readInt32()) +} + +func (d *decoder) decodeInt64(v value) { + v.setInt64(d.readInt64()) +} + +func (d *decoder) decodeString(v value) { + v.setString(d.readString()) +} + +func (d *decoder) decodeCompactString(v value) { + v.setString(d.readCompactString()) +} + +func (d *decoder) decodeBytes(v value) { + v.setBytes(d.readBytes()) +} + +func (d *decoder) decodeCompactBytes(v value) { + v.setBytes(d.readCompactBytes()) +} + +func (d *decoder) decodeArray(v value, elemType reflect.Type, decodeElem decodeFunc) { + if n := d.readInt32(); n < 0 { + v.setArray(array{}) + } else { + a := makeArray(elemType, int(n)) + for i := 0; i < int(n) && d.remain > 0; i++ { + decodeElem(d, a.index(i)) + } + v.setArray(a) + } +} + +func (d *decoder) decodeCompactArray(v value, elemType reflect.Type, decodeElem decodeFunc) { + if n := d.readUnsignedVarInt(); n < 1 { + v.setArray(array{}) + } else { + a := makeArray(elemType, int(n-1)) + for i := 0; i < int(n-1) && d.remain > 0; i++ { + decodeElem(d, a.index(i)) + } + v.setArray(a) + } +} + +func (d *decoder) decodeRecordV0(v value) { + x := &RecordV0{} + x.Unknown = d.readInt8() + x.Attributes = d.readInt8() + x.TimestampDelta = d.readInt8() + x.OffsetDelta = d.readInt8() + + x.KeyLength = int8(d.readVarInt()) + key := strings.Builder{} + for i := 0; i < int(x.KeyLength); i++ { + key.WriteString(fmt.Sprintf("%c", d.readInt8())) + } + x.Key = key.String() + + x.ValueLen = int8(d.readVarInt()) + value := strings.Builder{} + for i := 0; i < int(x.ValueLen); i++ { + value.WriteString(fmt.Sprintf("%c", d.readInt8())) + } + x.Value = value.String() + + headerLen := d.readInt8() / 2 + headers := make([]RecordHeader, 0) + for i := 0; i < int(headerLen); i++ { + header := &RecordHeader{} + + header.HeaderKeyLength = int8(d.readVarInt()) + headerKey := strings.Builder{} + for j := 0; j < int(header.HeaderKeyLength); j++ { + headerKey.WriteString(fmt.Sprintf("%c", d.readInt8())) + } + header.HeaderKey = headerKey.String() + + header.HeaderValueLength = int8(d.readVarInt()) + headerValue := strings.Builder{} + for j := 0; j < int(header.HeaderValueLength); j++ { + headerValue.WriteString(fmt.Sprintf("%c", d.readInt8())) + } + header.Value = headerValue.String() + + headers = append(headers, *header) + } + x.Headers = headers + + v.val.Set(valueOf(x).val) +} + +func (d *decoder) discardAll() { + d.discard(d.remain) +} + +func (d *decoder) discard(n int) { + if n > d.remain { + n = d.remain + } + var err error + if r, _ := d.reader.(discarder); r != nil { + n, err = r.Discard(n) + d.remain -= n + } else { + _, err = io.Copy(ioutil.Discard, d) + } + d.setError(err) +} + +func (d *decoder) read(n int) []byte { + b := make([]byte, n) + n, err := io.ReadFull(d, b) + b = b[:n] + d.setError(err) + return b +} + +func (d *decoder) writeTo(w io.Writer, n int) { + limit := d.remain + if n < limit { + d.remain = n + } + c, err := io.Copy(w, d) + if int(c) < n && err == nil { + err = io.ErrUnexpectedEOF + } + d.remain = limit - int(c) + d.setError(err) +} + +func (d *decoder) setError(err error) { + if d.err == nil && err != nil { + d.err = err + d.discardAll() + } +} + +func (d *decoder) readFull(b []byte) bool { + n, err := io.ReadFull(d, b) + d.setError(err) + return n == len(b) +} + +func (d *decoder) readByte() byte { + if d.readFull(d.buffer[:1]) { + return d.buffer[0] + } + return 0 +} + +func (d *decoder) readBool() bool { + return d.readByte() != 0 +} + +func (d *decoder) readInt8() int8 { + if d.readFull(d.buffer[:1]) { + return decodeReadInt8(d.buffer[:1]) + } + return 0 +} + +func (d *decoder) readInt16() int16 { + if d.readFull(d.buffer[:2]) { + return decodeReadInt16(d.buffer[:2]) + } + return 0 +} + +func (d *decoder) readInt32() int32 { + if d.readFull(d.buffer[:4]) { + return decodeReadInt32(d.buffer[:4]) + } + return 0 +} + +func (d *decoder) readInt64() int64 { + if d.readFull(d.buffer[:8]) { + return decodeReadInt64(d.buffer[:8]) + } + return 0 +} + +func (d *decoder) readString() string { + if n := d.readInt16(); n < 0 { + return "" + } else { + return bytesToString(d.read(int(n))) + } +} + +func (d *decoder) readVarString() string { + if n := d.readVarInt(); n < 0 { + return "" + } else { + return bytesToString(d.read(int(n))) + } +} + +func (d *decoder) readCompactString() string { + if n := d.readUnsignedVarInt(); n < 1 { + return "" + } else { + return bytesToString(d.read(int(n - 1))) + } +} + +func (d *decoder) readBytes() []byte { + if n := d.readInt32(); n < 0 { + return nil + } else { + return d.read(int(n)) + } +} + +func (d *decoder) readBytesTo(w io.Writer) bool { + if n := d.readInt32(); n < 0 { + return false + } else { + d.writeTo(w, int(n)) + return d.err == nil + } +} + +func (d *decoder) readVarBytes() []byte { + if n := d.readVarInt(); n < 0 { + return nil + } else { + return d.read(int(n)) + } +} + +func (d *decoder) readVarBytesTo(w io.Writer) bool { + if n := d.readVarInt(); n < 0 { + return false + } else { + d.writeTo(w, int(n)) + return d.err == nil + } +} + +func (d *decoder) readCompactBytes() []byte { + if n := d.readUnsignedVarInt(); n < 1 { + return nil + } else { + return d.read(int(n - 1)) + } +} + +func (d *decoder) readCompactBytesTo(w io.Writer) bool { + if n := d.readUnsignedVarInt(); n < 1 { + return false + } else { + d.writeTo(w, int(n-1)) + return d.err == nil + } +} + +func (d *decoder) readVarInt() int64 { + n := 11 // varints are at most 11 bytes + + if n > d.remain { + n = d.remain + } + + x := uint64(0) + s := uint(0) + + for n > 0 { + b := d.readByte() + + if (b & 0x80) == 0 { + x |= uint64(b) << s + return int64(x>>1) ^ -(int64(x) & 1) + } + + x |= uint64(b&0x7f) << s + s += 7 + n-- + } + + d.setError(fmt.Errorf("cannot decode varint from input stream")) + return 0 +} + +func (d *decoder) readUnsignedVarInt() uint64 { + n := 11 // varints are at most 11 bytes + + if n > d.remain { + n = d.remain + } + + x := uint64(0) + s := uint(0) + + for n > 0 { + b := d.readByte() + + if (b & 0x80) == 0 { + x |= uint64(b) << s + return x + } + + x |= uint64(b&0x7f) << s + s += 7 + n-- + } + + d.setError(fmt.Errorf("cannot decode unsigned varint from input stream")) + return 0 +} + +type decodeFunc func(*decoder, value) + +var ( + _ io.Reader = (*decoder)(nil) + _ io.ByteReader = (*decoder)(nil) + + readerFrom = reflect.TypeOf((*io.ReaderFrom)(nil)).Elem() +) + +func decodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) decodeFunc { + if reflect.PtrTo(typ).Implements(readerFrom) { + return readerDecodeFuncOf(typ) + } + switch typ.Kind() { + case reflect.Bool: + return (*decoder).decodeBool + case reflect.Int8: + return (*decoder).decodeInt8 + case reflect.Int16: + return (*decoder).decodeInt16 + case reflect.Int32: + return (*decoder).decodeInt32 + case reflect.Int64: + return (*decoder).decodeInt64 + case reflect.String: + return stringDecodeFuncOf(flexible, tag) + case reflect.Struct: + return structDecodeFuncOf(typ, version, flexible) + case reflect.Slice: + if typ.Elem().Kind() == reflect.Uint8 { // []byte + return bytesDecodeFuncOf(flexible, tag) + } + return arrayDecodeFuncOf(typ, version, flexible, tag) + default: + panic("unsupported type: " + typ.String()) + } +} + +func stringDecodeFuncOf(flexible bool, tag structTag) decodeFunc { + if flexible { + // In flexible messages, all strings are compact + return (*decoder).decodeCompactString + } + return (*decoder).decodeString +} + +func bytesDecodeFuncOf(flexible bool, tag structTag) decodeFunc { + if flexible { + // In flexible messages, all arrays are compact + return (*decoder).decodeCompactBytes + } + return (*decoder).decodeBytes +} + +func structDecodeFuncOf(typ reflect.Type, version int16, flexible bool) decodeFunc { + type field struct { + decode decodeFunc + index index + tagID int + } + + var fields []field + taggedFields := map[int]*field{} + + if typ == reflect.TypeOf(RecordV0{}) { + return (*decoder).decodeRecordV0 + } + + forEachStructField(typ, func(typ reflect.Type, index index, tag string) { + forEachStructTag(tag, func(tag structTag) bool { + if tag.MinVersion <= version && version <= tag.MaxVersion { + f := field{ + decode: decodeFuncOf(typ, version, flexible, tag), + index: index, + tagID: tag.TagID, + } + + if tag.TagID < -1 { + // Normal required field + fields = append(fields, f) + } else { + // Optional tagged field (flexible messages only) + taggedFields[tag.TagID] = &f + } + return false + } + return true + }) + }) + + return func(d *decoder, v value) { + for i := range fields { + f := &fields[i] + f.decode(d, v.fieldByIndex(f.index)) + } + + if flexible { + // See https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields + // for details of tag buffers in "flexible" messages. + n := int(d.readUnsignedVarInt()) + + for i := 0; i < n; i++ { + tagID := int(d.readUnsignedVarInt()) + size := int(d.readUnsignedVarInt()) + + f, ok := taggedFields[tagID] + if ok { + f.decode(d, v.fieldByIndex(f.index)) + } else { + d.read(size) + } + } + } + } +} + +func arrayDecodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) decodeFunc { + elemType := typ.Elem() + elemFunc := decodeFuncOf(elemType, version, flexible, tag) + if flexible { + // In flexible messages, all arrays are compact + return func(d *decoder, v value) { d.decodeCompactArray(v, elemType, elemFunc) } + } + + return func(d *decoder, v value) { d.decodeArray(v, elemType, elemFunc) } +} + +func readerDecodeFuncOf(typ reflect.Type) decodeFunc { + typ = reflect.PtrTo(typ) + return func(d *decoder, v value) { + if d.err == nil { + _, err := v.iface(typ).(io.ReaderFrom).ReadFrom(d) + if err != nil { + d.setError(err) + } + } + } +} + +func decodeReadInt8(b []byte) int8 { + return int8(b[0]) +} + +func decodeReadInt16(b []byte) int16 { + return int16(binary.BigEndian.Uint16(b)) +} + +func decodeReadInt32(b []byte) int32 { + return int32(binary.BigEndian.Uint32(b)) +} + +func decodeReadInt64(b []byte) int64 { + return int64(binary.BigEndian.Uint64(b)) +} + +func Unmarshal(data []byte, version int16, value interface{}) error { + typ := elemTypeOf(value) + cache, _ := unmarshalers.Load().(map[versionedType]decodeFunc) + key := versionedType{typ: typ, version: version} + decode := cache[key] + + if decode == nil { + decode = decodeFuncOf(reflect.TypeOf(value).Elem(), version, false, structTag{ + MinVersion: -1, + MaxVersion: -1, + TagID: -2, + Compact: true, + Nullable: true, + }) + + newCache := make(map[versionedType]decodeFunc, len(cache)+1) + newCache[key] = decode + + for typ, fun := range cache { + newCache[typ] = fun + } + + unmarshalers.Store(newCache) + } + + d, _ := decoders.Get().(*decoder) + if d == nil { + d = &decoder{reader: bytes.NewReader(nil)} + } + + d.remain = len(data) + r, _ := d.reader.(*bytes.Reader) + r.Reset(data) + + defer func() { + r.Reset(nil) + d.Reset(r, 0) + decoders.Put(d) + }() + + decode(d, valueOf(value)) + return dontExpectEOF(d.err) +} + +var ( + decoders sync.Pool // *decoder + unmarshalers atomic.Value // map[versionedType]decodeFunc +) diff --git a/tap/extensions/kafka/discard.go b/tap/extensions/kafka/discard.go new file mode 100644 index 000000000..cff70c9b9 --- /dev/null +++ b/tap/extensions/kafka/discard.go @@ -0,0 +1,50 @@ +package main + +import "bufio" + +func discardN(r *bufio.Reader, sz int, n int) (int, error) { + var err error + if n <= sz { + n, err = r.Discard(n) + } else { + n, err = r.Discard(sz) + if err == nil { + err = errShortRead + } + } + return sz - n, err +} + +func discardInt8(r *bufio.Reader, sz int) (int, error) { + return discardN(r, sz, 1) +} + +func discardInt16(r *bufio.Reader, sz int) (int, error) { + return discardN(r, sz, 2) +} + +func discardInt32(r *bufio.Reader, sz int) (int, error) { + return discardN(r, sz, 4) +} + +func discardInt64(r *bufio.Reader, sz int) (int, error) { + return discardN(r, sz, 8) +} + +func discardString(r *bufio.Reader, sz int) (int, error) { + return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (int, error) { + if n < 0 { + return sz, nil + } + return discardN(r, sz, n) + }) +} + +func discardBytes(r *bufio.Reader, sz int) (int, error) { + return readBytesWith(r, sz, func(r *bufio.Reader, sz int, n int) (int, error) { + if n < 0 { + return sz, nil + } + return discardN(r, sz, n) + }) +} diff --git a/tap/extensions/kafka/encode.go b/tap/extensions/kafka/encode.go new file mode 100644 index 000000000..10d126994 --- /dev/null +++ b/tap/extensions/kafka/encode.go @@ -0,0 +1,645 @@ +package main + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "reflect" + "sync" + "sync/atomic" +) + +type encoder struct { + writer io.Writer + err error + table *crc32.Table + crc32 uint32 + buffer [32]byte +} + +type encoderChecksum struct { + reader io.Reader + encoder *encoder +} + +func (e *encoderChecksum) Read(b []byte) (int, error) { + n, err := e.reader.Read(b) + if n > 0 { + e.encoder.update(b[:n]) + } + return n, err +} + +func (e *encoder) Reset(w io.Writer) { + e.writer = w + e.err = nil + e.table = nil + e.crc32 = 0 + e.buffer = [32]byte{} +} + +func (e *encoder) ReadFrom(r io.Reader) (int64, error) { + if e.table != nil { + r = &encoderChecksum{ + reader: r, + encoder: e, + } + } + return io.Copy(e.writer, r) +} + +func (e *encoder) Write(b []byte) (int, error) { + if e.err != nil { + return 0, e.err + } + n, err := e.writer.Write(b) + if n > 0 { + e.update(b[:n]) + } + if err != nil { + e.err = err + } + return n, err +} + +func (e *encoder) WriteByte(b byte) error { + e.buffer[0] = b + _, err := e.Write(e.buffer[:1]) + return err +} + +func (e *encoder) WriteString(s string) (int, error) { + // This implementation is an optimization to avoid the heap allocation that + // would occur when converting the string to a []byte to call crc32.Update. + // + // Strings are rarely long in the kafka protocol, so the use of a 32 byte + // buffer is a good comprise between keeping the encoder value small and + // limiting the number of calls to Write. + // + // We introduced this optimization because memory profiles on the benchmarks + // showed that most heap allocations were caused by this code path. + n := 0 + + for len(s) != 0 { + c := copy(e.buffer[:], s) + w, err := e.Write(e.buffer[:c]) + n += w + if err != nil { + return n, err + } + s = s[c:] + } + + return n, nil +} + +func (e *encoder) setCRC(table *crc32.Table) { + e.table, e.crc32 = table, 0 +} + +func (e *encoder) update(b []byte) { + if e.table != nil { + e.crc32 = crc32.Update(e.crc32, e.table, b) + } +} + +func (e *encoder) encodeBool(v value) { + b := int8(0) + if v.bool() { + b = 1 + } + e.writeInt8(b) +} + +func (e *encoder) encodeInt8(v value) { + e.writeInt8(v.int8()) +} + +func (e *encoder) encodeInt16(v value) { + e.writeInt16(v.int16()) +} + +func (e *encoder) encodeInt32(v value) { + e.writeInt32(v.int32()) +} + +func (e *encoder) encodeInt64(v value) { + e.writeInt64(v.int64()) +} + +func (e *encoder) encodeString(v value) { + e.writeString(v.string()) +} + +func (e *encoder) encodeVarString(v value) { + e.writeVarString(v.string()) +} + +func (e *encoder) encodeCompactString(v value) { + e.writeCompactString(v.string()) +} + +func (e *encoder) encodeNullString(v value) { + e.writeNullString(v.string()) +} + +func (e *encoder) encodeVarNullString(v value) { + e.writeVarNullString(v.string()) +} + +func (e *encoder) encodeCompactNullString(v value) { + e.writeCompactNullString(v.string()) +} + +func (e *encoder) encodeBytes(v value) { + e.writeBytes(v.bytes()) +} + +func (e *encoder) encodeVarBytes(v value) { + e.writeVarBytes(v.bytes()) +} + +func (e *encoder) encodeCompactBytes(v value) { + e.writeCompactBytes(v.bytes()) +} + +func (e *encoder) encodeNullBytes(v value) { + e.writeNullBytes(v.bytes()) +} + +func (e *encoder) encodeVarNullBytes(v value) { + e.writeVarNullBytes(v.bytes()) +} + +func (e *encoder) encodeCompactNullBytes(v value) { + e.writeCompactNullBytes(v.bytes()) +} + +func (e *encoder) encodeArray(v value, elemType reflect.Type, encodeElem encodeFunc) { + a := v.array(elemType) + n := a.length() + e.writeInt32(int32(n)) + + for i := 0; i < n; i++ { + encodeElem(e, a.index(i)) + } +} + +func (e *encoder) encodeCompactArray(v value, elemType reflect.Type, encodeElem encodeFunc) { + a := v.array(elemType) + n := a.length() + e.writeUnsignedVarInt(uint64(n + 1)) + + for i := 0; i < n; i++ { + encodeElem(e, a.index(i)) + } +} + +func (e *encoder) encodeNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) { + a := v.array(elemType) + if a.isNil() { + e.writeInt32(-1) + return + } + + n := a.length() + e.writeInt32(int32(n)) + + for i := 0; i < n; i++ { + encodeElem(e, a.index(i)) + } +} + +func (e *encoder) encodeCompactNullArray(v value, elemType reflect.Type, encodeElem encodeFunc) { + a := v.array(elemType) + if a.isNil() { + e.writeUnsignedVarInt(0) + return + } + + n := a.length() + e.writeUnsignedVarInt(uint64(n + 1)) + for i := 0; i < n; i++ { + encodeElem(e, a.index(i)) + } +} + +func (e *encoder) writeInt8(i int8) { + writeInt8(e.buffer[:1], i) + e.Write(e.buffer[:1]) +} + +func (e *encoder) writeInt16(i int16) { + writeInt16(e.buffer[:2], i) + e.Write(e.buffer[:2]) +} + +func (e *encoder) writeInt32(i int32) { + writeInt32(e.buffer[:4], i) + e.Write(e.buffer[:4]) +} + +func (e *encoder) writeInt64(i int64) { + writeInt64(e.buffer[:8], i) + e.Write(e.buffer[:8]) +} + +func (e *encoder) writeString(s string) { + e.writeInt16(int16(len(s))) + e.WriteString(s) +} + +func (e *encoder) writeVarString(s string) { + e.writeVarInt(int64(len(s))) + e.WriteString(s) +} + +func (e *encoder) writeCompactString(s string) { + e.writeUnsignedVarInt(uint64(len(s)) + 1) + e.WriteString(s) +} + +func (e *encoder) writeNullString(s string) { + if s == "" { + e.writeInt16(-1) + } else { + e.writeInt16(int16(len(s))) + e.WriteString(s) + } +} + +func (e *encoder) writeVarNullString(s string) { + if s == "" { + e.writeVarInt(-1) + } else { + e.writeVarInt(int64(len(s))) + e.WriteString(s) + } +} + +func (e *encoder) writeCompactNullString(s string) { + if s == "" { + e.writeUnsignedVarInt(0) + } else { + e.writeUnsignedVarInt(uint64(len(s)) + 1) + e.WriteString(s) + } +} + +func (e *encoder) writeBytes(b []byte) { + e.writeInt32(int32(len(b))) + e.Write(b) +} + +func (e *encoder) writeVarBytes(b []byte) { + e.writeVarInt(int64(len(b))) + e.Write(b) +} + +func (e *encoder) writeCompactBytes(b []byte) { + e.writeUnsignedVarInt(uint64(len(b)) + 1) + e.Write(b) +} + +func (e *encoder) writeNullBytes(b []byte) { + if b == nil { + e.writeInt32(-1) + } else { + e.writeInt32(int32(len(b))) + e.Write(b) + } +} + +func (e *encoder) writeVarNullBytes(b []byte) { + if b == nil { + e.writeVarInt(-1) + } else { + e.writeVarInt(int64(len(b))) + e.Write(b) + } +} + +func (e *encoder) writeCompactNullBytes(b []byte) { + if b == nil { + e.writeUnsignedVarInt(0) + } else { + e.writeUnsignedVarInt(uint64(len(b)) + 1) + e.Write(b) + } +} + +func (e *encoder) writeBytesFrom(b Bytes) error { + size := int64(b.Len()) + e.writeInt32(int32(size)) + n, err := io.Copy(e, b) + if err == nil && n != size { + err = fmt.Errorf("size of bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF) + } + return err +} + +func (e *encoder) writeNullBytesFrom(b Bytes) error { + if b == nil { + e.writeInt32(-1) + return nil + } else { + size := int64(b.Len()) + e.writeInt32(int32(size)) + n, err := io.Copy(e, b) + if err == nil && n != size { + err = fmt.Errorf("size of nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF) + } + return err + } +} + +func (e *encoder) writeVarNullBytesFrom(b Bytes) error { + if b == nil { + e.writeVarInt(-1) + return nil + } else { + size := int64(b.Len()) + e.writeVarInt(size) + n, err := io.Copy(e, b) + if err == nil && n != size { + err = fmt.Errorf("size of nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF) + } + return err + } +} + +func (e *encoder) writeCompactNullBytesFrom(b Bytes) error { + if b == nil { + e.writeUnsignedVarInt(0) + return nil + } else { + size := int64(b.Len()) + e.writeUnsignedVarInt(uint64(size + 1)) + n, err := io.Copy(e, b) + if err == nil && n != size { + err = fmt.Errorf("size of compact nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF) + } + return err + } +} + +func (e *encoder) writeVarInt(i int64) { + e.writeUnsignedVarInt(uint64((i << 1) ^ (i >> 63))) +} + +func (e *encoder) writeUnsignedVarInt(i uint64) { + b := e.buffer[:] + n := 0 + + for i >= 0x80 && n < len(b) { + b[n] = byte(i) | 0x80 + i >>= 7 + n++ + } + + if n < len(b) { + b[n] = byte(i) + n++ + } + + e.Write(b[:n]) +} + +type encodeFunc func(*encoder, value) + +var ( + _ io.ReaderFrom = (*encoder)(nil) + _ io.Writer = (*encoder)(nil) + _ io.ByteWriter = (*encoder)(nil) + _ io.StringWriter = (*encoder)(nil) + + writerTo = reflect.TypeOf((*io.WriterTo)(nil)).Elem() +) + +func encodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) encodeFunc { + if reflect.PtrTo(typ).Implements(writerTo) { + return writerEncodeFuncOf(typ) + } + switch typ.Kind() { + case reflect.Bool: + return (*encoder).encodeBool + case reflect.Int8: + return (*encoder).encodeInt8 + case reflect.Int16: + return (*encoder).encodeInt16 + case reflect.Int32: + return (*encoder).encodeInt32 + case reflect.Int64: + return (*encoder).encodeInt64 + case reflect.String: + return stringEncodeFuncOf(flexible, tag) + case reflect.Struct: + return structEncodeFuncOf(typ, version, flexible) + case reflect.Slice: + if typ.Elem().Kind() == reflect.Uint8 { // []byte + return bytesEncodeFuncOf(flexible, tag) + } + return arrayEncodeFuncOf(typ, version, flexible, tag) + default: + panic("unsupported type: " + typ.String()) + } +} + +func stringEncodeFuncOf(flexible bool, tag structTag) encodeFunc { + switch { + case flexible && tag.Nullable: + // In flexible messages, all strings are compact + return (*encoder).encodeCompactNullString + case flexible: + // In flexible messages, all strings are compact + return (*encoder).encodeCompactString + case tag.Nullable: + return (*encoder).encodeNullString + default: + return (*encoder).encodeString + } +} + +func bytesEncodeFuncOf(flexible bool, tag structTag) encodeFunc { + switch { + case flexible && tag.Nullable: + // In flexible messages, all arrays are compact + return (*encoder).encodeCompactNullBytes + case flexible: + // In flexible messages, all arrays are compact + return (*encoder).encodeCompactBytes + case tag.Nullable: + return (*encoder).encodeNullBytes + default: + return (*encoder).encodeBytes + } +} + +func structEncodeFuncOf(typ reflect.Type, version int16, flexible bool) encodeFunc { + type field struct { + encode encodeFunc + index index + tagID int + } + + var fields []field + var taggedFields []field + + forEachStructField(typ, func(typ reflect.Type, index index, tag string) { + if typ.Size() != 0 { // skip struct{} + forEachStructTag(tag, func(tag structTag) bool { + if tag.MinVersion <= version && version <= tag.MaxVersion { + f := field{ + encode: encodeFuncOf(typ, version, flexible, tag), + index: index, + tagID: tag.TagID, + } + + if tag.TagID < -1 { + // Normal required field + fields = append(fields, f) + } else { + // Optional tagged field (flexible messages only) + taggedFields = append(taggedFields, f) + } + return false + } + return true + }) + } + }) + + return func(e *encoder, v value) { + for i := range fields { + f := &fields[i] + f.encode(e, v.fieldByIndex(f.index)) + } + + if flexible { + // See https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields + // for details of tag buffers in "flexible" messages. + e.writeUnsignedVarInt(uint64(len(taggedFields))) + + for i := range taggedFields { + f := &taggedFields[i] + e.writeUnsignedVarInt(uint64(f.tagID)) + + buf := &bytes.Buffer{} + se := &encoder{writer: buf} + f.encode(se, v.fieldByIndex(f.index)) + e.writeUnsignedVarInt(uint64(buf.Len())) + e.Write(buf.Bytes()) + } + } + } +} + +func arrayEncodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) encodeFunc { + elemType := typ.Elem() + elemFunc := encodeFuncOf(elemType, version, flexible, tag) + switch { + case flexible && tag.Nullable: + // In flexible messages, all arrays are compact + return func(e *encoder, v value) { e.encodeCompactNullArray(v, elemType, elemFunc) } + case flexible: + // In flexible messages, all arrays are compact + return func(e *encoder, v value) { e.encodeCompactArray(v, elemType, elemFunc) } + case tag.Nullable: + return func(e *encoder, v value) { e.encodeNullArray(v, elemType, elemFunc) } + default: + return func(e *encoder, v value) { e.encodeArray(v, elemType, elemFunc) } + } +} + +func writerEncodeFuncOf(typ reflect.Type) encodeFunc { + typ = reflect.PtrTo(typ) + return func(e *encoder, v value) { + // Optimization to write directly into the buffer when the encoder + // does no need to compute a crc32 checksum. + w := io.Writer(e) + if e.table == nil { + w = e.writer + } + _, err := v.iface(typ).(io.WriterTo).WriteTo(w) + if err != nil { + e.err = err + } + } +} + +func writeInt8(b []byte, i int8) { + b[0] = byte(i) +} + +func writeInt16(b []byte, i int16) { + binary.BigEndian.PutUint16(b, uint16(i)) +} + +func writeInt32(b []byte, i int32) { + binary.BigEndian.PutUint32(b, uint32(i)) +} + +func writeInt64(b []byte, i int64) { + binary.BigEndian.PutUint64(b, uint64(i)) +} + +func Marshal(version int16, value interface{}) ([]byte, error) { + typ := typeOf(value) + cache, _ := marshalers.Load().(map[versionedType]encodeFunc) + key := versionedType{typ: typ, version: version} + encode := cache[key] + + if encode == nil { + encode = encodeFuncOf(reflect.TypeOf(value), version, false, structTag{ + MinVersion: -1, + MaxVersion: -1, + TagID: -2, + Compact: true, + Nullable: true, + }) + + newCache := make(map[versionedType]encodeFunc, len(cache)+1) + newCache[key] = encode + + for typ, fun := range cache { + newCache[typ] = fun + } + + marshalers.Store(newCache) + } + + e, _ := encoders.Get().(*encoder) + if e == nil { + e = &encoder{writer: new(bytes.Buffer)} + } + + b, _ := e.writer.(*bytes.Buffer) + defer func() { + b.Reset() + e.Reset(b) + encoders.Put(e) + }() + + encode(e, nonAddressableValueOf(value)) + + if e.err != nil { + return nil, e.err + } + + buf := b.Bytes() + out := make([]byte, len(buf)) + copy(out, buf) + return out, nil +} + +type versionedType struct { + typ _type + version int16 +} + +var ( + encoders sync.Pool // *encoder + marshalers atomic.Value // map[versionedType]encodeFunc +) diff --git a/tap/extensions/kafka/error.go b/tap/extensions/kafka/error.go new file mode 100644 index 000000000..b5f53d8fb --- /dev/null +++ b/tap/extensions/kafka/error.go @@ -0,0 +1,91 @@ +package main + +import ( + "fmt" +) + +// Error represents client-side protocol errors. +type Error string + +func (e Error) Error() string { return string(e) } + +func Errorf(msg string, args ...interface{}) Error { + return Error(fmt.Sprintf(msg, args...)) +} + +const ( + // ErrNoTopic is returned when a request needs to be sent to a specific + ErrNoTopic Error = "topic not found" + + // ErrNoPartition is returned when a request needs to be sent to a specific + // partition, but the client did not find it in the cluster metadata. + ErrNoPartition Error = "topic partition not found" + + // ErrNoLeader is returned when a request needs to be sent to a partition + // leader, but the client could not determine what the leader was at this + // time. + ErrNoLeader Error = "topic partition has no leader" + + // ErrNoRecord is returned when attempting to write a message containing an + // empty record set (which kafka forbids). + // + // We handle this case client-side because kafka will close the connection + // that it received an empty produce request on, causing all concurrent + // requests to be aborted. + ErrNoRecord Error = "record set contains no records" + + // ErrNoReset is returned by ResetRecordReader when the record reader does + // not support being reset. + ErrNoReset Error = "record sequence does not support reset" +) + +type TopicError struct { + Topic string + Err error +} + +func NewTopicError(topic string, err error) *TopicError { + return &TopicError{Topic: topic, Err: err} +} + +func NewErrNoTopic(topic string) *TopicError { + return NewTopicError(topic, ErrNoTopic) +} + +func (e *TopicError) Error() string { + return fmt.Sprintf("%v (topic=%q)", e.Err, e.Topic) +} + +func (e *TopicError) Unwrap() error { + return e.Err +} + +type TopicPartitionError struct { + Topic string + Partition int32 + Err error +} + +func NewTopicPartitionError(topic string, partition int32, err error) *TopicPartitionError { + return &TopicPartitionError{ + Topic: topic, + Partition: partition, + Err: err, + } +} + +func NewErrNoPartition(topic string, partition int32) *TopicPartitionError { + return NewTopicPartitionError(topic, partition, ErrNoPartition) +} + +func NewErrNoLeader(topic string, partition int32) *TopicPartitionError { + return NewTopicPartitionError(topic, partition, ErrNoLeader) +} + +func (e *TopicPartitionError) Error() string { + return fmt.Sprintf("%v (topic=%q partition=%d)", e.Err, e.Topic, e.Partition) +} + +func (e *TopicPartitionError) Unwrap() error { + return e.Err +} diff --git a/tap/extensions/kafka/matcher.go b/tap/extensions/kafka/matcher.go new file mode 100644 index 000000000..83be3401b --- /dev/null +++ b/tap/extensions/kafka/matcher.go @@ -0,0 +1,55 @@ +package main + +import ( + "log" + + cmap "github.com/orcaman/concurrent-map" +) + +var reqResMatcher = CreateResponseRequestMatcher() // global + +type RequestResponsePair struct { + Request Request + Response Response +} + +// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}::{correlation_id} +type requestResponseMatcher struct { + openMessagesMap cmap.ConcurrentMap +} + +func CreateResponseRequestMatcher() requestResponseMatcher { + newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()} + return *newMatcher +} + +func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair { + if response, found := matcher.openMessagesMap.Pop(key); found { + return matcher.preparePair(request, response.(*Response)) + } + + matcher.openMessagesMap.Set(key, request) + return nil +} + +func (matcher *requestResponseMatcher) registerResponse(key string, response *Response) *RequestResponsePair { + if request, found := matcher.openMessagesMap.Pop(key); found { + return matcher.preparePair(request.(*Request), response) + } + + matcher.openMessagesMap.Set(key, response) + return nil +} + +func (matcher *requestResponseMatcher) preparePair(request *Request, response *Response) *RequestResponsePair { + return &RequestResponsePair{ + Request: *request, + Response: *response, + } +} + +func (reqResPair *RequestResponsePair) print() { + log.Printf("----------------\n") + reqResPair.Request.print() + reqResPair.Response.print() +} diff --git a/tap/extensions/kafka/protocol.go b/tap/extensions/kafka/protocol.go new file mode 100644 index 000000000..ec8af3298 --- /dev/null +++ b/tap/extensions/kafka/protocol.go @@ -0,0 +1,480 @@ +package main + +import ( + "fmt" + "io" + "net" + "reflect" + "strconv" + "strings" +) + +// Message is an interface implemented by all request and response types of the +// kafka protocol. +// +// This interface is used mostly as a safe-guard to provide a compile-time check +// for values passed to functions dealing kafka message types. +type Message interface { + ApiKey() ApiKey +} + +type ApiKey int16 + +func (k ApiKey) String() string { + if i := int(k); i >= 0 && i < len(apiNames) { + return apiNames[i] + } + return strconv.Itoa(int(k)) +} + +func (k ApiKey) MinVersion() int16 { return k.apiType().minVersion() } + +func (k ApiKey) MaxVersion() int16 { return k.apiType().maxVersion() } + +func (k ApiKey) SelectVersion(minVersion, maxVersion int16) int16 { + min := k.MinVersion() + max := k.MaxVersion() + switch { + case min > maxVersion: + return min + case max < maxVersion: + return max + default: + return maxVersion + } +} + +func (k ApiKey) apiType() apiType { + if i := int(k); i >= 0 && i < len(apiTypes) { + return apiTypes[i] + } + return apiType{} +} + +const ( + Produce ApiKey = 0 + Fetch ApiKey = 1 + ListOffsets ApiKey = 2 + Metadata ApiKey = 3 + LeaderAndIsr ApiKey = 4 + StopReplica ApiKey = 5 + UpdateMetadata ApiKey = 6 + ControlledShutdown ApiKey = 7 + OffsetCommit ApiKey = 8 + OffsetFetch ApiKey = 9 + FindCoordinator ApiKey = 10 + JoinGroup ApiKey = 11 + Heartbeat ApiKey = 12 + LeaveGroup ApiKey = 13 + SyncGroup ApiKey = 14 + DescribeGroups ApiKey = 15 + ListGroups ApiKey = 16 + SaslHandshake ApiKey = 17 + ApiVersions ApiKey = 18 + CreateTopics ApiKey = 19 + DeleteTopics ApiKey = 20 + DeleteRecords ApiKey = 21 + InitProducerId ApiKey = 22 + OffsetForLeaderEpoch ApiKey = 23 + AddPartitionsToTxn ApiKey = 24 + AddOffsetsToTxn ApiKey = 25 + EndTxn ApiKey = 26 + WriteTxnMarkers ApiKey = 27 + TxnOffsetCommit ApiKey = 28 + DescribeAcls ApiKey = 29 + CreateAcls ApiKey = 30 + DeleteAcls ApiKey = 31 + DescribeConfigs ApiKey = 32 + AlterConfigs ApiKey = 33 + AlterReplicaLogDirs ApiKey = 34 + DescribeLogDirs ApiKey = 35 + SaslAuthenticate ApiKey = 36 + CreatePartitions ApiKey = 37 + CreateDelegationToken ApiKey = 38 + RenewDelegationToken ApiKey = 39 + ExpireDelegationToken ApiKey = 40 + DescribeDelegationToken ApiKey = 41 + DeleteGroups ApiKey = 42 + ElectLeaders ApiKey = 43 + IncrementalAlterConfigs ApiKey = 44 + AlterPartitionReassignments ApiKey = 45 + ListPartitionReassignments ApiKey = 46 + OffsetDelete ApiKey = 47 + DescribeClientQuotas ApiKey = 48 + AlterClientQuotas ApiKey = 49 + + numApis = 50 +) + +var apiNames = [numApis]string{ + Produce: "Produce", + Fetch: "Fetch", + ListOffsets: "ListOffsets", + Metadata: "Metadata", + LeaderAndIsr: "LeaderAndIsr", + StopReplica: "StopReplica", + UpdateMetadata: "UpdateMetadata", + ControlledShutdown: "ControlledShutdown", + OffsetCommit: "OffsetCommit", + OffsetFetch: "OffsetFetch", + FindCoordinator: "FindCoordinator", + JoinGroup: "JoinGroup", + Heartbeat: "Heartbeat", + LeaveGroup: "LeaveGroup", + SyncGroup: "SyncGroup", + DescribeGroups: "DescribeGroups", + ListGroups: "ListGroups", + SaslHandshake: "SaslHandshake", + ApiVersions: "ApiVersions", + CreateTopics: "CreateTopics", + DeleteTopics: "DeleteTopics", + DeleteRecords: "DeleteRecords", + InitProducerId: "InitProducerId", + OffsetForLeaderEpoch: "OffsetForLeaderEpoch", + AddPartitionsToTxn: "AddPartitionsToTxn", + AddOffsetsToTxn: "AddOffsetsToTxn", + EndTxn: "EndTxn", + WriteTxnMarkers: "WriteTxnMarkers", + TxnOffsetCommit: "TxnOffsetCommit", + DescribeAcls: "DescribeAcls", + CreateAcls: "CreateAcls", + DeleteAcls: "DeleteAcls", + DescribeConfigs: "DescribeConfigs", + AlterConfigs: "AlterConfigs", + AlterReplicaLogDirs: "AlterReplicaLogDirs", + DescribeLogDirs: "DescribeLogDirs", + SaslAuthenticate: "SaslAuthenticate", + CreatePartitions: "CreatePartitions", + CreateDelegationToken: "CreateDelegationToken", + RenewDelegationToken: "RenewDelegationToken", + ExpireDelegationToken: "ExpireDelegationToken", + DescribeDelegationToken: "DescribeDelegationToken", + DeleteGroups: "DeleteGroups", + ElectLeaders: "ElectLeaders", + IncrementalAlterConfigs: "IncrementalAlterConfigs", + AlterPartitionReassignments: "AlterPartitionReassignments", + ListPartitionReassignments: "ListPartitionReassignments", + OffsetDelete: "OffsetDelete", + DescribeClientQuotas: "DescribeClientQuotas", + AlterClientQuotas: "AlterClientQuotas", +} + +type messageType struct { + version int16 + flexible bool + gotype reflect.Type + decode decodeFunc + encode encodeFunc +} + +func (t *messageType) new() Message { + return reflect.New(t.gotype).Interface().(Message) +} + +type apiType struct { + requests []messageType + responses []messageType +} + +func (t apiType) minVersion() int16 { + if len(t.requests) == 0 { + return 0 + } + return t.requests[0].version +} + +func (t apiType) maxVersion() int16 { + if len(t.requests) == 0 { + return 0 + } + return t.requests[len(t.requests)-1].version +} + +var apiTypes [numApis]apiType + +// Register is automatically called by sub-packages are imported to install a +// new pair of request/response message types. +func Register(req, res Message) { + k1 := req.ApiKey() + k2 := res.ApiKey() + + if k1 != k2 { + panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2)) + } + + apiTypes[k1] = apiType{ + requests: typesOf(req), + responses: typesOf(res), + } +} + +func typesOf(v interface{}) []messageType { + return makeTypes(reflect.TypeOf(v).Elem()) +} + +func makeTypes(t reflect.Type) []messageType { + minVersion := int16(-1) + maxVersion := int16(-1) + + // All future versions will be flexible (according to spec), so don't need to + // worry about maxes here. + minFlexibleVersion := int16(-1) + + forEachStructField(t, func(_ reflect.Type, _ index, tag string) { + forEachStructTag(tag, func(tag structTag) bool { + if minVersion < 0 || tag.MinVersion < minVersion { + minVersion = tag.MinVersion + } + if maxVersion < 0 || tag.MaxVersion > maxVersion { + maxVersion = tag.MaxVersion + } + if tag.TagID > -2 && (minFlexibleVersion < 0 || tag.MinVersion < minFlexibleVersion) { + minFlexibleVersion = tag.MinVersion + } + return true + }) + }) + + types := make([]messageType, 0, (maxVersion-minVersion)+1) + + for v := minVersion; v <= maxVersion; v++ { + flexible := minFlexibleVersion >= 0 && v >= minFlexibleVersion + + types = append(types, messageType{ + version: v, + gotype: t, + flexible: flexible, + decode: decodeFuncOf(t, v, flexible, structTag{}), + encode: encodeFuncOf(t, v, flexible, structTag{}), + }) + } + + return types +} + +type structTag struct { + MinVersion int16 + MaxVersion int16 + Compact bool + Nullable bool + TagID int +} + +func forEachStructTag(tag string, do func(structTag) bool) { + if tag == "-" { + return // special case to ignore the field + } + + forEach(tag, '|', func(s string) bool { + tag := structTag{ + MinVersion: -1, + MaxVersion: -1, + + // Legitimate tag IDs can start at 0. We use -1 as a placeholder to indicate + // that the message type is flexible, so that leaves -2 as the default for + // indicating that there is no tag ID and the message is not flexible. + TagID: -2, + } + + var err error + forEach(s, ',', func(s string) bool { + switch { + case strings.HasPrefix(s, "min="): + tag.MinVersion, err = parseVersion(s[4:]) + case strings.HasPrefix(s, "max="): + tag.MaxVersion, err = parseVersion(s[4:]) + case s == "tag": + tag.TagID = -1 + case strings.HasPrefix(s, "tag="): + tag.TagID, err = strconv.Atoi(s[4:]) + case s == "compact": + tag.Compact = true + case s == "nullable": + tag.Nullable = true + default: + err = fmt.Errorf("unrecognized option: %q", s) + } + return err == nil + }) + + if err != nil { + panic(fmt.Errorf("malformed struct tag: %w", err)) + } + + if tag.MinVersion < 0 && tag.MaxVersion >= 0 { + panic(fmt.Errorf("missing minimum version in struct tag: %q", s)) + } + + if tag.MaxVersion < 0 && tag.MinVersion >= 0 { + panic(fmt.Errorf("missing maximum version in struct tag: %q", s)) + } + + if tag.MinVersion > tag.MaxVersion { + panic(fmt.Errorf("invalid version range in struct tag: %q", s)) + } + + return do(tag) + }) +} + +func forEach(s string, sep byte, do func(string) bool) bool { + for len(s) != 0 { + p := "" + i := strings.IndexByte(s, sep) + if i < 0 { + p, s = s, "" + } else { + p, s = s[:i], s[i+1:] + } + if !do(p) { + return false + } + } + return true +} + +func forEachStructField(t reflect.Type, do func(reflect.Type, index, string)) { + for i, n := 0, t.NumField(); i < n; i++ { + f := t.Field(i) + + if f.PkgPath != "" && f.Name != "_" { + continue + } + + kafkaTag, ok := f.Tag.Lookup("kafka") + if !ok { + kafkaTag = "|" + } + + do(f.Type, indexOf(f), kafkaTag) + } +} + +func parseVersion(s string) (int16, error) { + if !strings.HasPrefix(s, "v") { + return 0, fmt.Errorf("invalid version number: %q", s) + } + i, err := strconv.ParseInt(s[1:], 10, 16) + if err != nil { + return 0, fmt.Errorf("invalid version number: %q: %w", s, err) + } + if i < 0 { + return 0, fmt.Errorf("invalid negative version number: %q", s) + } + return int16(i), nil +} + +func dontExpectEOF(err error) error { + switch err { + case nil: + return nil + case io.EOF: + return io.ErrUnexpectedEOF + default: + return err + } +} + +type Broker struct { + ID int32 + Host string + Port int32 + Rack string +} + +func (b Broker) String() string { + return net.JoinHostPort(b.Host, itoa(b.Port)) +} + +func (b Broker) Format(w fmt.State, v rune) { + switch v { + case 'd': + io.WriteString(w, itoa(b.ID)) + case 's': + io.WriteString(w, b.String()) + case 'v': + io.WriteString(w, itoa(b.ID)) + io.WriteString(w, " ") + io.WriteString(w, b.String()) + if b.Rack != "" { + io.WriteString(w, " ") + io.WriteString(w, b.Rack) + } + } +} + +func itoa(i int32) string { + return strconv.Itoa(int(i)) +} + +type Topic struct { + Name string + Error int16 + Partitions map[int32]Partition +} + +type Partition struct { + ID int32 + Error int16 + Leader int32 + Replicas []int32 + ISR []int32 + Offline []int32 +} + +// BrokerMessage is an extension of the Message interface implemented by some +// request types to customize the broker assignment logic. +type BrokerMessage interface { + // Given a representation of the kafka cluster state as argument, returns + // the broker that the message should be routed to. + Broker(Cluster) (Broker, error) +} + +// GroupMessage is an extension of the Message interface implemented by some +// request types to inform the program that they should be routed to a group +// coordinator. +type GroupMessage interface { + // Returns the group configured on the message. + Group() string +} + +// PreparedMessage is an extension of the Message interface implemented by some +// request types which may need to run some pre-processing on their state before +// being sent. +type PreparedMessage interface { + // Prepares the message before being sent to a kafka broker using the API + // version passed as argument. + Prepare(apiVersion int16) +} + +// Splitter is an interface implemented by messages that can be split into +// multiple requests and have their results merged back by a Merger. +type Splitter interface { + // For a given cluster layout, returns the list of messages constructed + // from the receiver for each requests that should be sent to the cluster. + // The second return value is a Merger which can be used to merge back the + // results of each request into a single message (or an error). + Split(Cluster) ([]Message, Merger, error) +} + +// Merger is an interface implemented by messages which can merge multiple +// results into one response. +type Merger interface { + // Given a list of message and associated results, merge them back into a + // response (or an error). The results must be either Message or error + // values, other types should trigger a panic. + Merge(messages []Message, results []interface{}) (Message, error) +} + +// Result converts r to a Message or and error, or panics if r could be be +// converted to these types. +func Result(r interface{}) (Message, error) { + switch v := r.(type) { + case Message: + return v, nil + case error: + return nil, v + default: + panic(fmt.Errorf("BUG: result must be a message or an error but not %T", v)) + } +} diff --git a/tap/extensions/kafka/protocol_make.go b/tap/extensions/kafka/protocol_make.go new file mode 100644 index 000000000..2fc71a254 --- /dev/null +++ b/tap/extensions/kafka/protocol_make.go @@ -0,0 +1,219 @@ +package main + +import ( + "encoding/binary" + "fmt" + "strconv" +) + +type ApiVersion struct { + ApiKey int16 + MinVersion int16 + MaxVersion int16 +} + +func (v ApiVersion) Format(w fmt.State, r rune) { + switch r { + case 's': + fmt.Fprint(w, apiKey(v.ApiKey)) + case 'd': + switch { + case w.Flag('-'): + fmt.Fprint(w, v.MinVersion) + case w.Flag('+'): + fmt.Fprint(w, v.MaxVersion) + default: + fmt.Fprint(w, v.ApiKey) + } + case 'v': + switch { + case w.Flag('-'): + fmt.Fprintf(w, "v%d", v.MinVersion) + case w.Flag('+'): + fmt.Fprintf(w, "v%d", v.MaxVersion) + case w.Flag('#'): + fmt.Fprintf(w, "kafka.ApiVersion{ApiKey:%d MinVersion:%d MaxVersion:%d}", v.ApiKey, v.MinVersion, v.MaxVersion) + default: + fmt.Fprintf(w, "%s[v%d:v%d]", apiKey(v.ApiKey), v.MinVersion, v.MaxVersion) + } + } +} + +type apiKey int16 + +const ( + produce apiKey = 0 + fetch apiKey = 1 + listOffsets apiKey = 2 + metadata apiKey = 3 + leaderAndIsr apiKey = 4 + stopReplica apiKey = 5 + updateMetadata apiKey = 6 + controlledShutdown apiKey = 7 + offsetCommit apiKey = 8 + offsetFetch apiKey = 9 + findCoordinator apiKey = 10 + joinGroup apiKey = 11 + heartbeat apiKey = 12 + leaveGroup apiKey = 13 + syncGroup apiKey = 14 + describeGroups apiKey = 15 + listGroups apiKey = 16 + saslHandshake apiKey = 17 + apiVersions apiKey = 18 + createTopics apiKey = 19 + deleteTopics apiKey = 20 + deleteRecords apiKey = 21 + initProducerId apiKey = 22 + offsetForLeaderEpoch apiKey = 23 + addPartitionsToTxn apiKey = 24 + addOffsetsToTxn apiKey = 25 + endTxn apiKey = 26 + writeTxnMarkers apiKey = 27 + txnOffsetCommit apiKey = 28 + describeAcls apiKey = 29 + createAcls apiKey = 30 + deleteAcls apiKey = 31 + describeConfigs apiKey = 32 + alterConfigs apiKey = 33 + alterReplicaLogDirs apiKey = 34 + describeLogDirs apiKey = 35 + saslAuthenticate apiKey = 36 + createPartitions apiKey = 37 + createDelegationToken apiKey = 38 + renewDelegationToken apiKey = 39 + expireDelegationToken apiKey = 40 + describeDelegationToken apiKey = 41 + deleteGroups apiKey = 42 + electLeaders apiKey = 43 + incrementalAlterConfigs apiKey = 44 + alterPartitionReassignments apiKey = 45 + listPartitionReassignments apiKey = 46 + offsetDelete apiKey = 47 +) + +func (k apiKey) String() string { + if i := int(k); i >= 0 && i < len(apiKeyStrings) { + return apiKeyStrings[i] + } + return strconv.Itoa(int(k)) +} + +type apiVersion int16 + +const ( + v0 = 0 + v1 = 1 + v2 = 2 + v3 = 3 + v4 = 4 + v5 = 5 + v6 = 6 + v7 = 7 + v8 = 8 + v9 = 9 + v10 = 10 +) + +var apiKeyStrings = [...]string{ + produce: "Produce", + fetch: "Fetch", + listOffsets: "ListOffsets", + metadata: "Metadata", + leaderAndIsr: "LeaderAndIsr", + stopReplica: "StopReplica", + updateMetadata: "UpdateMetadata", + controlledShutdown: "ControlledShutdown", + offsetCommit: "OffsetCommit", + offsetFetch: "OffsetFetch", + findCoordinator: "FindCoordinator", + joinGroup: "JoinGroup", + heartbeat: "Heartbeat", + leaveGroup: "LeaveGroup", + syncGroup: "SyncGroup", + describeGroups: "DescribeGroups", + listGroups: "ListGroups", + saslHandshake: "SaslHandshake", + apiVersions: "ApiVersions", + createTopics: "CreateTopics", + deleteTopics: "DeleteTopics", + deleteRecords: "DeleteRecords", + initProducerId: "InitProducerId", + offsetForLeaderEpoch: "OffsetForLeaderEpoch", + addPartitionsToTxn: "AddPartitionsToTxn", + addOffsetsToTxn: "AddOffsetsToTxn", + endTxn: "EndTxn", + writeTxnMarkers: "WriteTxnMarkers", + txnOffsetCommit: "TxnOffsetCommit", + describeAcls: "DescribeAcls", + createAcls: "CreateAcls", + deleteAcls: "DeleteAcls", + describeConfigs: "DescribeConfigs", + alterConfigs: "AlterConfigs", + alterReplicaLogDirs: "AlterReplicaLogDirs", + describeLogDirs: "DescribeLogDirs", + saslAuthenticate: "SaslAuthenticate", + createPartitions: "CreatePartitions", + createDelegationToken: "CreateDelegationToken", + renewDelegationToken: "RenewDelegationToken", + expireDelegationToken: "ExpireDelegationToken", + describeDelegationToken: "DescribeDelegationToken", + deleteGroups: "DeleteGroups", + electLeaders: "ElectLeaders", + incrementalAlterConfigs: "IncrementalAlfterConfigs", + alterPartitionReassignments: "AlterPartitionReassignments", + listPartitionReassignments: "ListPartitionReassignments", + offsetDelete: "OffsetDelete", +} + +type requestHeader struct { + Size int32 + ApiKey int16 + ApiVersion int16 + CorrelationID int32 + ClientID string +} + +func sizeofString(s string) int32 { + return 2 + int32(len(s)) +} + +func (h requestHeader) size() int32 { + return 4 + 2 + 2 + 4 + sizeofString(h.ClientID) +} + +// func (h requestHeader) writeTo(wb *writeBuffer) { +// wb.writeInt32(h.Size) +// wb.writeInt16(h.ApiKey) +// wb.writeInt16(h.ApiVersion) +// wb.writeInt32(h.CorrelationID) +// wb.writeString(h.ClientID) +// } + +type request interface { + size() int32 + // writable +} + +func makeInt8(b []byte) int8 { + return int8(b[0]) +} + +func makeInt16(b []byte) int16 { + return int16(binary.BigEndian.Uint16(b)) +} + +func makeInt32(b []byte) int32 { + return int32(binary.BigEndian.Uint32(b)) +} + +func makeInt64(b []byte) int64 { + return int64(binary.BigEndian.Uint64(b)) +} + +func expectZeroSize(sz int, err error) error { + if err == nil && sz != 0 { + err = fmt.Errorf("reading a response left %d unread bytes", sz) + } + return err +} diff --git a/tap/extensions/kafka/read.go b/tap/extensions/kafka/read.go new file mode 100644 index 000000000..965891c30 --- /dev/null +++ b/tap/extensions/kafka/read.go @@ -0,0 +1,639 @@ +package main + +import ( + "bufio" + "errors" + "fmt" + "io" + "reflect" +) + +type readable interface { + readFrom(*bufio.Reader, int) (int, error) +} + +var errShortRead = errors.New("not enough bytes available to load the response") + +func peekRead(r *bufio.Reader, sz int, n int, f func([]byte)) (int, error) { + if n > sz { + return sz, errShortRead + } + b, err := r.Peek(n) + if err != nil { + return sz, err + } + f(b) + return discardN(r, sz, n) +} + +func readInt8(r *bufio.Reader, sz int, v *int8) (int, error) { + return peekRead(r, sz, 1, func(b []byte) { *v = makeInt8(b) }) +} + +func readInt16(r *bufio.Reader, sz int, v *int16) (int, error) { + return peekRead(r, sz, 2, func(b []byte) { *v = makeInt16(b) }) +} + +func readInt32(r *bufio.Reader, sz int, v *int32) (int, error) { + return peekRead(r, sz, 4, func(b []byte) { *v = makeInt32(b) }) +} + +func readInt64(r *bufio.Reader, sz int, v *int64) (int, error) { + return peekRead(r, sz, 8, func(b []byte) { *v = makeInt64(b) }) +} + +func readVarInt(r *bufio.Reader, sz int, v *int64) (remain int, err error) { + // Optimistically assume that most of the time, there will be data buffered + // in the reader. If this is not the case, the buffer will be refilled after + // consuming zero bytes from the input. + input, _ := r.Peek(r.Buffered()) + x := uint64(0) + s := uint(0) + + for { + if len(input) > sz { + input = input[:sz] + } + + for i, b := range input { + if b < 0x80 { + x |= uint64(b) << s + *v = int64(x>>1) ^ -(int64(x) & 1) + n, err := r.Discard(i + 1) + return sz - n, err + } + + x |= uint64(b&0x7f) << s + s += 7 + } + + // Make room in the input buffer to load more data from the underlying + // stream. The x and s variables are left untouched, ensuring that the + // varint decoding can continue on the next loop iteration. + n, _ := r.Discard(len(input)) + sz -= n + if sz == 0 { + return 0, errShortRead + } + + // Fill the buffer: ask for one more byte, but in practice the reader + // will load way more from the underlying stream. + if _, err := r.Peek(1); err != nil { + if err == io.EOF { + err = errShortRead + } + return sz, err + } + + // Grab as many bytes as possible from the buffer, then go on to the + // next loop iteration which is going to consume it. + input, _ = r.Peek(r.Buffered()) + } +} + +func readBool(r *bufio.Reader, sz int, v *bool) (int, error) { + return peekRead(r, sz, 1, func(b []byte) { *v = b[0] != 0 }) +} + +func readString(r *bufio.Reader, sz int, v *string) (int, error) { + return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) { + *v, remain, err = readNewString(r, sz, n) + return + }) +} + +func readStringWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) { + var err error + var len int16 + + if sz, err = readInt16(r, sz, &len); err != nil { + return sz, err + } + + n := int(len) + if n > sz { + return sz, errShortRead + } + + return cb(r, sz, n) +} + +func readNewString(r *bufio.Reader, sz int, n int) (string, int, error) { + b, sz, err := readNewBytes(r, sz, n) + return string(b), sz, err +} + +func readBytes(r *bufio.Reader, sz int, v *[]byte) (int, error) { + return readBytesWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) { + *v, remain, err = readNewBytes(r, sz, n) + return + }) +} + +func readBytesWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) { + var err error + var n int + + if sz, err = readArrayLen(r, sz, &n); err != nil { + return sz, err + } + + if n > sz { + return sz, errShortRead + } + + return cb(r, sz, n) +} + +func readNewBytes(r *bufio.Reader, sz int, n int) ([]byte, int, error) { + var err error + var b []byte + var shortRead bool + + if n > 0 { + if sz < n { + n = sz + shortRead = true + } + + b = make([]byte, n) + n, err = io.ReadFull(r, b) + b = b[:n] + sz -= n + + if err == nil && shortRead { + err = errShortRead + } + } + + return b, sz, err +} + +func readArrayLen(r *bufio.Reader, sz int, n *int) (int, error) { + var err error + var len int32 + if sz, err = readInt32(r, sz, &len); err != nil { + return sz, err + } + *n = int(len) + return sz, nil +} + +func readArrayWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int) (int, error)) (int, error) { + var err error + var len int32 + + if sz, err = readInt32(r, sz, &len); err != nil { + return sz, err + } + + for n := int(len); n > 0; n-- { + if sz, err = cb(r, sz); err != nil { + break + } + } + + return sz, err +} + +func readStringArray(r *bufio.Reader, sz int, v *[]string) (remain int, err error) { + var content []string + fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { + var value string + if fnRemain, fnErr = readString(r, size, &value); fnErr != nil { + return + } + content = append(content, value) + return + } + if remain, err = readArrayWith(r, sz, fn); err != nil { + return + } + + *v = content + return +} + +func readMapStringInt32(r *bufio.Reader, sz int, v *map[string][]int32) (remain int, err error) { + var len int32 + if remain, err = readInt32(r, sz, &len); err != nil { + return + } + + content := make(map[string][]int32, len) + for i := 0; i < int(len); i++ { + var key string + var values []int32 + + if remain, err = readString(r, remain, &key); err != nil { + return + } + + fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { + var value int32 + if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil { + return + } + values = append(values, value) + return + } + if remain, err = readArrayWith(r, remain, fn); err != nil { + return + } + + content[key] = values + } + *v = content + + return +} + +func read(r *bufio.Reader, sz int, a interface{}) (int, error) { + switch v := a.(type) { + case *int8: + return readInt8(r, sz, v) + case *int16: + return readInt16(r, sz, v) + case *int32: + return readInt32(r, sz, v) + case *int64: + return readInt64(r, sz, v) + case *bool: + return readBool(r, sz, v) + case *string: + return readString(r, sz, v) + case *[]byte: + return readBytes(r, sz, v) + } + switch v := reflect.ValueOf(a).Elem(); v.Kind() { + case reflect.Struct: + return readStruct(r, sz, v) + case reflect.Slice: + return readSlice(r, sz, v) + default: + panic(fmt.Sprintf("unsupported type: %T", a)) + } +} + +func ReadAll(r *bufio.Reader, sz int, ptrs ...interface{}) (int, error) { + var err error + + for _, ptr := range ptrs { + if sz, err = readPtr(r, sz, ptr); err != nil { + break + } + } + + return sz, err +} + +func readPtr(r *bufio.Reader, sz int, ptr interface{}) (int, error) { + switch v := ptr.(type) { + case *int8: + return readInt8(r, sz, v) + case *int16: + return readInt16(r, sz, v) + case *int32: + return readInt32(r, sz, v) + case *int64: + return readInt64(r, sz, v) + case *string: + return readString(r, sz, v) + case *[]byte: + return readBytes(r, sz, v) + case readable: + return v.readFrom(r, sz) + default: + panic(fmt.Sprintf("unsupported type: %T", v)) + } +} + +func readStruct(r *bufio.Reader, sz int, v reflect.Value) (int, error) { + var err error + for i, n := 0, v.NumField(); i != n; i++ { + if sz, err = read(r, sz, v.Field(i).Addr().Interface()); err != nil { + return sz, err + } + } + return sz, nil +} + +func readSlice(r *bufio.Reader, sz int, v reflect.Value) (int, error) { + var err error + var len int32 + + if sz, err = readInt32(r, sz, &len); err != nil { + return sz, err + } + + if n := int(len); n < 0 { + v.Set(reflect.Zero(v.Type())) + } else { + v.Set(reflect.MakeSlice(v.Type(), n, n)) + + for i := 0; i != n; i++ { + if sz, err = read(r, sz, v.Index(i).Addr().Interface()); err != nil { + return sz, err + } + } + } + + return sz, nil +} + +func readFetchResponseHeaderV2(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) { + var n int32 + var p struct { + Partition int32 + ErrorCode int16 + HighwaterMarkOffset int64 + MessageSetSize int32 + } + + if remain, err = readInt32(r, size, &throttle); err != nil { + return + } + + if remain, err = readInt32(r, remain, &n); err != nil { + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if n != 1 { + err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n) + return + } + + // We ignore the topic name because we've requests messages for a single + // topic, unless there's a bug in the kafka server we will have received + // the name of the topic that we requested. + if remain, err = discardString(r, remain); err != nil { + return + } + + if remain, err = readInt32(r, remain, &n); err != nil { + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if n != 1 { + err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n) + return + } + + if remain, err = read(r, remain, &p); err != nil { + return + } + + if p.ErrorCode != 0 { + err = Error(p.ErrorCode) + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if remain != int(p.MessageSetSize) { + err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", p.MessageSetSize, remain) + return + } + + watermark = p.HighwaterMarkOffset + return +} + +func readFetchResponseHeaderV5(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) { + var n int32 + type AbortedTransaction struct { + ProducerId int64 + FirstOffset int64 + } + var p struct { + Partition int32 + ErrorCode int16 + HighwaterMarkOffset int64 + LastStableOffset int64 + LogStartOffset int64 + } + var messageSetSize int32 + var abortedTransactions []AbortedTransaction + + if remain, err = readInt32(r, size, &throttle); err != nil { + return + } + + if remain, err = readInt32(r, remain, &n); err != nil { + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if n != 1 { + err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n) + return + } + + // We ignore the topic name because we've requests messages for a single + // topic, unless there's a bug in the kafka server we will have received + // the name of the topic that we requested. + if remain, err = discardString(r, remain); err != nil { + return + } + + if remain, err = readInt32(r, remain, &n); err != nil { + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if n != 1 { + err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n) + return + } + + if remain, err = read(r, remain, &p); err != nil { + return + } + + var abortedTransactionLen int + if remain, err = readArrayLen(r, remain, &abortedTransactionLen); err != nil { + return + } + + if abortedTransactionLen == -1 { + abortedTransactions = nil + } else { + abortedTransactions = make([]AbortedTransaction, abortedTransactionLen) + for i := 0; i < abortedTransactionLen; i++ { + if remain, err = read(r, remain, &abortedTransactions[i]); err != nil { + return + } + } + } + + if p.ErrorCode != 0 { + err = Error(p.ErrorCode) + return + } + + remain, err = readInt32(r, remain, &messageSetSize) + if err != nil { + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if remain != int(messageSetSize) { + err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", messageSetSize, remain) + return + } + + watermark = p.HighwaterMarkOffset + return + +} + +func readFetchResponseHeaderV10(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) { + var n int32 + var errorCode int16 + type AbortedTransaction struct { + ProducerId int64 + FirstOffset int64 + } + var p struct { + Partition int32 + ErrorCode int16 + HighwaterMarkOffset int64 + LastStableOffset int64 + LogStartOffset int64 + } + var messageSetSize int32 + var abortedTransactions []AbortedTransaction + + if remain, err = readInt32(r, size, &throttle); err != nil { + return + } + + if remain, err = readInt16(r, remain, &errorCode); err != nil { + return + } + if errorCode != 0 { + err = Error(errorCode) + return + } + + if remain, err = discardInt32(r, remain); err != nil { + return + } + + if remain, err = readInt32(r, remain, &n); err != nil { + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if n != 1 { + err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n) + return + } + + // We ignore the topic name because we've requests messages for a single + // topic, unless there's a bug in the kafka server we will have received + // the name of the topic that we requested. + if remain, err = discardString(r, remain); err != nil { + return + } + + if remain, err = readInt32(r, remain, &n); err != nil { + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if n != 1 { + err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n) + return + } + + if remain, err = read(r, remain, &p); err != nil { + return + } + + var abortedTransactionLen int + if remain, err = readArrayLen(r, remain, &abortedTransactionLen); err != nil { + return + } + + if abortedTransactionLen == -1 { + abortedTransactions = nil + } else { + abortedTransactions = make([]AbortedTransaction, abortedTransactionLen) + for i := 0; i < abortedTransactionLen; i++ { + if remain, err = read(r, remain, &abortedTransactions[i]); err != nil { + return + } + } + } + + if p.ErrorCode != 0 { + err = Error(p.ErrorCode) + return + } + + remain, err = readInt32(r, remain, &messageSetSize) + if err != nil { + return + } + + // This error should never trigger, unless there's a bug in the kafka client + // or server. + if remain != int(messageSetSize) { + err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", messageSetSize, remain) + return + } + + watermark = p.HighwaterMarkOffset + return + +} + +func readMessageHeader(r *bufio.Reader, sz int) (offset int64, attributes int8, timestamp int64, remain int, err error) { + var version int8 + + if remain, err = readInt64(r, sz, &offset); err != nil { + return + } + + // On discarding the message size and CRC: + // --------------------------------------- + // + // - Not sure why kafka gives the message size here, we already have the + // number of remaining bytes in the response and kafka should only truncate + // the trailing message. + // + // - TCP is already taking care of ensuring data integrity, no need to + // waste resources doing it a second time so we just skip the message CRC. + // + if remain, err = discardN(r, remain, 8); err != nil { + return + } + + if remain, err = readInt8(r, remain, &version); err != nil { + return + } + + if remain, err = readInt8(r, remain, &attributes); err != nil { + return + } + + switch version { + case 0: + case 1: + remain, err = readInt64(r, remain, ×tamp) + default: + err = fmt.Errorf("unsupported message version %d found in fetch response", version) + } + + return +} diff --git a/tap/extensions/kafka/record.go b/tap/extensions/kafka/record.go new file mode 100644 index 000000000..55a634175 --- /dev/null +++ b/tap/extensions/kafka/record.go @@ -0,0 +1,314 @@ +package main + +import ( + "encoding/binary" + "io" + "time" + + "github.com/segmentio/kafka-go/compress" +) + +// Attributes is a bitset representing special attributes set on records. +type Attributes int16 + +const ( + Gzip Attributes = Attributes(compress.Gzip) // 1 + Snappy Attributes = Attributes(compress.Snappy) // 2 + Lz4 Attributes = Attributes(compress.Lz4) // 3 + Zstd Attributes = Attributes(compress.Zstd) // 4 + Transactional Attributes = 1 << 4 + Control Attributes = 1 << 5 +) + +func (a Attributes) Compression() compress.Compression { + return compress.Compression(a & 7) +} + +func (a Attributes) Transactional() bool { + return (a & Transactional) != 0 +} + +func (a Attributes) Control() bool { + return (a & Control) != 0 +} + +func (a Attributes) String() string { + s := a.Compression().String() + if a.Transactional() { + s += "+transactional" + } + if a.Control() { + s += "+control" + } + return s +} + +// Header represents a single entry in a list of record headers. +type Header struct { + Key string + Value []byte +} + +// Record is an interface representing a single kafka record. +// +// Record values are not safe to use concurrently from multiple goroutines. +type Record struct { + // The offset at which the record exists in a topic partition. This value + // is ignored in produce requests. + Offset int64 + + // Returns the time of the record. This value may be omitted in produce + // requests to let kafka set the time when it saves the record. + Time time.Time + + // Returns a byte sequence containing the key of this record. The returned + // sequence may be nil to indicate that the record has no key. If the record + // is part of a RecordSet, the content of the key must remain valid at least + // until the record set is closed (or until the key is closed). + Key Bytes + + // Returns a byte sequence containing the value of this record. The returned + // sequence may be nil to indicate that the record has no value. If the + // record is part of a RecordSet, the content of the value must remain valid + // at least until the record set is closed (or until the value is closed). + Value Bytes + + // Returns the list of headers associated with this record. The returned + // slice may be reused across calls, the program should use it as an + // immutable value. + Headers []Header +} + +// RecordSet represents a sequence of records in Produce requests and Fetch +// responses. All v0, v1, and v2 formats are supported. +type RecordSet struct { + // The message version that this record set will be represented as, valid + // values are 1, or 2. + // + // When reading, this is the value of the highest version used in the + // batches that compose the record set. + // + // When writing, this value dictates the format that the records will be + // encoded in. + Version int8 + + // Attributes set on the record set. + // + // When reading, the attributes are the combination of all attributes in + // the batches that compose the record set. + // + // When writing, the attributes apply to the whole sequence of records in + // the set. + Attributes Attributes + + // A reader exposing the sequence of records. + // + // When reading a RecordSet from an io.Reader, the Records field will be a + // *RecordStream. If the program needs to access the details of each batch + // that compose the stream, it may use type assertions to access the + // underlying types of each batch. + Records RecordReader +} + +// bufferedReader is an interface implemented by types like bufio.Reader, which +// we use to optimize prefix reads by accessing the internal buffer directly +// through calls to Peek. +type bufferedReader interface { + Discard(int) (int, error) + Peek(int) ([]byte, error) +} + +// bytesBuffer is an interface implemented by types like bytes.Buffer, which we +// use to optimize prefix reads by accessing the internal buffer directly +// through calls to Bytes. +type bytesBuffer interface { + Bytes() []byte +} + +// magicByteOffset is the position of the magic byte in all versions of record +// sets in the kafka protocol. +const magicByteOffset = 16 + +// ReadFrom reads the representation of a record set from r into rs, returning +// the number of bytes consumed from r, and an non-nil error if the record set +// could not be read. +func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error) { + // d, _ := r.(*decoder) + // if d == nil { + // d = &decoder{ + // reader: r, + // remain: 4, + // } + // } + + // *rs = RecordSet{} + // limit := d.remain + // size := d.readInt32() + + // if d.err != nil { + // return int64(limit - d.remain), d.err + // } + + // if size <= 0 { + // return 4, nil + // } + + // stream := &RecordStream{ + // Records: make([]RecordReader, 0, 4), + // } + + // var err error + // d.remain = int(size) + + // for d.remain > 0 && err == nil { + // var version byte + + // if d.remain < (magicByteOffset + 1) { + // if len(stream.Records) != 0 { + // break + // } + // return 4, fmt.Errorf("impossible record set shorter than %d bytes", magicByteOffset+1) + // } + + // switch r := d.reader.(type) { + // case bufferedReader: + // b, err := r.Peek(magicByteOffset + 1) + // if err != nil { + // n, _ := r.Discard(len(b)) + // return 4 + int64(n), dontExpectEOF(err) + // } + // version = b[magicByteOffset] + // case bytesBuffer: + // version = r.Bytes()[magicByteOffset] + // default: + // b := make([]byte, magicByteOffset+1) + // if n, err := io.ReadFull(d.reader, b); err != nil { + // return 4 + int64(n), dontExpectEOF(err) + // } + // version = b[magicByteOffset] + // // Reconstruct the prefix that we had to read to determine the version + // // of the record set from the magic byte. + // // + // // Technically this may recurisvely stack readers when consuming all + // // items of the batch, which could hurt performance. In practice this + // // path should not be taken tho, since the decoder would read from a + // // *bufio.Reader which implements the bufferedReader interface. + // d.reader = io.MultiReader(bytes.NewReader(b), d.reader) + // } + + // var tmp RecordSet + // switch version { + // case 0, 1: + // err = tmp.readFromVersion1(d) + // case 2: + // err = tmp.readFromVersion2(d) + // default: + // err = fmt.Errorf("unsupported message version %d for message of size %d", version, size) + // } + + // if tmp.Version > rs.Version { + // rs.Version = tmp.Version + // } + + // rs.Attributes |= tmp.Attributes + + // if tmp.Records != nil { + // stream.Records = append(stream.Records, tmp.Records) + // } + // } + + // if len(stream.Records) != 0 { + // rs.Records = stream + // // Ignore errors if we've successfully read records, so the + // // program can keep making progress. + // err = nil + // } + + // d.discardAll() + // rn := 4 + (int(size) - d.remain) + // d.remain = limit - rn + // return int64(rn), err + return 0, nil +} + +// WriteTo writes the representation of rs into w. The value of rs.Version +// dictates which format that the record set will be represented as. +// +// The error will be ErrNoRecord if rs contained no records. +// +// Note: since this package is only compatible with kafka 0.10 and above, the +// method never produces messages in version 0. If rs.Version is zero, the +// method defaults to producing messages in version 1. +func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) { + // if rs.Records == nil { + // return 0, ErrNoRecord + // } + + // // This optimization avoids rendering the record set in an intermediary + // // buffer when the writer is already a pageBuffer, which is a common case + // // due to the way WriteRequest and WriteResponse are implemented. + // buffer, _ := w.(*pageBuffer) + // bufferOffset := int64(0) + + // if buffer != nil { + // bufferOffset = buffer.Size() + // } else { + // buffer = newPageBuffer() + // defer buffer.unref() + // } + + // size := packUint32(0) + // buffer.Write(size[:]) // size placeholder + + // var err error + // switch rs.Version { + // case 0, 1: + // err = rs.writeToVersion1(buffer, bufferOffset+4) + // case 2: + // err = rs.writeToVersion2(buffer, bufferOffset+4) + // default: + // err = fmt.Errorf("unsupported record set version %d", rs.Version) + // } + // if err != nil { + // return 0, err + // } + + // n := buffer.Size() - bufferOffset + // if n == 0 { + // size = packUint32(^uint32(0)) + // } else { + // size = packUint32(uint32(n) - 4) + // } + // buffer.WriteAt(size[:], bufferOffset) + + // // This condition indicates that the output writer received by `WriteTo` was + // // not a *pageBuffer, in which case we need to flush the buffered records + // // data into it. + // if buffer != w { + // return buffer.WriteTo(w) + // } + + // return n, nil + return 0, nil +} + +func makeTime(t int64) time.Time { + return time.Unix(t/1000, (t%1000)*int64(time.Millisecond)) +} + +func timestamp(t time.Time) int64 { + if t.IsZero() { + return 0 + } + return t.UnixNano() / int64(time.Millisecond) +} + +func packUint32(u uint32) (b [4]byte) { + binary.BigEndian.PutUint32(b[:], u) + return +} + +func packUint64(u uint64) (b [8]byte) { + binary.BigEndian.PutUint64(b[:], u) + return +} diff --git a/tap/extensions/kafka/record_bytes.go b/tap/extensions/kafka/record_bytes.go new file mode 100644 index 000000000..cd142f405 --- /dev/null +++ b/tap/extensions/kafka/record_bytes.go @@ -0,0 +1,43 @@ +package main + +import ( + "github.com/segmentio/kafka-go/protocol" +) + +// Header is a key/value pair type representing headers set on records. +// type Header = protocol.Header + +// Bytes is an interface representing a sequence of bytes. This abstraction +// makes it possible for programs to inject data into produce requests without +// having to load in into an intermediary buffer, or read record keys and values +// from a fetch response directly from internal buffers. +// +// Bytes are not safe to use concurrently from multiple goroutines. +// type Bytes = protocol.Bytes + +// NewBytes constructs a Bytes value from a byte slice. +// +// If b is nil, nil is returned. +// func NewBytes(b []byte) Bytes { return protocol.NewBytes(b) } + +// ReadAll reads b into a byte slice. +// func ReadAll(b Bytes) ([]byte, error) { return protocol.ReadAll(b) } + +// Record is an interface representing a single kafka record. +// +// Record values are not safe to use concurrently from multiple goroutines. +// type Record = protocol.Record + +// RecordReader is an interface representing a sequence of records. Record sets +// are used in both produce and fetch requests to represent the sequence of +// records that are sent to or receive from kafka brokers. +// +// RecordReader values are not safe to use concurrently from multiple goroutines. +type RecordReader = protocol.RecordReader + +// NewRecordReade rconstructs a RecordSet which exposes the sequence of records +// passed as arguments. +func NewRecordReader(records ...Record) RecordReader { + // return protocol.NewRecordReader(records...) + return nil +} diff --git a/tap/extensions/kafka/reflect.go b/tap/extensions/kafka/reflect.go new file mode 100644 index 000000000..65b59ab13 --- /dev/null +++ b/tap/extensions/kafka/reflect.go @@ -0,0 +1,101 @@ +// +build !unsafe + +package main + +import ( + "reflect" +) + +type index []int + +type _type struct{ typ reflect.Type } + +func typeOf(x interface{}) _type { + return makeType(reflect.TypeOf(x)) +} + +func elemTypeOf(x interface{}) _type { + return makeType(reflect.TypeOf(x).Elem()) +} + +func makeType(t reflect.Type) _type { + return _type{typ: t} +} + +type value struct { + val reflect.Value +} + +func nonAddressableValueOf(x interface{}) value { + return value{val: reflect.ValueOf(x)} +} + +func valueOf(x interface{}) value { + return value{val: reflect.ValueOf(x).Elem()} +} + +func makeValue(t reflect.Type) value { + return value{val: reflect.New(t).Elem()} +} + +func (v value) bool() bool { return v.val.Bool() } + +func (v value) int8() int8 { return int8(v.int64()) } + +func (v value) int16() int16 { return int16(v.int64()) } + +func (v value) int32() int32 { return int32(v.int64()) } + +func (v value) int64() int64 { return v.val.Int() } + +func (v value) string() string { return v.val.String() } + +func (v value) bytes() []byte { return v.val.Bytes() } + +func (v value) iface(t reflect.Type) interface{} { return v.val.Addr().Interface() } + +func (v value) array(t reflect.Type) array { return array{val: v.val} } + +func (v value) setBool(b bool) { v.val.SetBool(b) } + +func (v value) setInt8(i int8) { v.setInt64(int64(i)) } + +func (v value) setInt16(i int16) { v.setInt64(int64(i)) } + +func (v value) setInt32(i int32) { v.setInt64(int64(i)) } + +func (v value) setInt64(i int64) { v.val.SetInt(i) } + +func (v value) setString(s string) { v.val.SetString(s) } + +func (v value) setBytes(b []byte) { v.val.SetBytes(b) } + +func (v value) setArray(a array) { + if a.val.IsValid() { + v.val.Set(a.val) + } else { + v.val.Set(reflect.Zero(v.val.Type())) + } +} + +func (v value) fieldByIndex(i index) value { + return value{val: v.val.FieldByIndex(i)} +} + +type array struct { + val reflect.Value +} + +func makeArray(t reflect.Type, n int) array { + return array{val: reflect.MakeSlice(reflect.SliceOf(t), n, n)} +} + +func (a array) index(i int) value { return value{val: a.val.Index(i)} } + +func (a array) length() int { return a.val.Len() } + +func (a array) isNil() bool { return a.val.IsNil() } + +func indexOf(s reflect.StructField) index { return index(s.Index) } + +func bytesToString(b []byte) string { return string(b) } diff --git a/tap/extensions/kafka/request.go b/tap/extensions/kafka/request.go new file mode 100644 index 000000000..a4f196897 --- /dev/null +++ b/tap/extensions/kafka/request.go @@ -0,0 +1,293 @@ +package main + +import ( + "fmt" + "io" + "log" + "reflect" + + "github.com/google/gopacket" +) + +type Request struct { + Size int32 + ApiKey ApiKey + ApiVersion int16 + CorrelationID int32 + ClientID string + Payload interface{} +} + +func (req *Request) print() { + log.Printf("> Request [%d]\n", req.Size) + log.Printf("ApiKey: %v\n", req.ApiKey) + log.Printf("ApiVersion: %v\n", req.ApiVersion) + log.Printf("CorrelationID: %v\n", req.CorrelationID) + log.Printf("ClientID: %v\n", req.ClientID) + log.Printf("Payload: %+v\n", req.Payload) +} + +func ReadRequest(r io.Reader, net gopacket.Flow, transport gopacket.Flow) (apiKey ApiKey, apiVersion int16, err error) { + d := &decoder{reader: r, remain: 4} + size := d.readInt32() + + if err = d.err; err != nil { + err = dontExpectEOF(err) + return + } + + d.remain = int(size) + apiKey = ApiKey(d.readInt16()) + apiVersion = d.readInt16() + correlationID := d.readInt32() + clientID := d.readString() + + if i := int(apiKey); i < 0 || i >= len(apiTypes) { + err = fmt.Errorf("unsupported api key: %d", i) + return + } + + if err = d.err; err != nil { + err = dontExpectEOF(err) + return + } + + t := &apiTypes[apiKey] + if t == nil { + err = fmt.Errorf("unsupported api: %s", apiNames[apiKey]) + return + } + + var payload interface{} + + switch apiKey { + case Metadata: + var mt interface{} + var metadataRequest interface{} + if apiVersion >= 11 { + types := makeTypes(reflect.TypeOf(&MetadataRequestV11{}).Elem()) + mt = types[0] + metadataRequest = &MetadataRequestV11{} + } else if apiVersion >= 10 { + types := makeTypes(reflect.TypeOf(&MetadataRequestV10{}).Elem()) + mt = types[0] + metadataRequest = &MetadataRequestV10{} + } else if apiVersion >= 8 { + types := makeTypes(reflect.TypeOf(&MetadataRequestV8{}).Elem()) + mt = types[0] + metadataRequest = &MetadataRequestV8{} + } else if apiVersion >= 4 { + types := makeTypes(reflect.TypeOf(&MetadataRequestV4{}).Elem()) + mt = types[0] + metadataRequest = &MetadataRequestV4{} + } else { + types := makeTypes(reflect.TypeOf(&MetadataRequestV0{}).Elem()) + mt = types[0] + metadataRequest = &MetadataRequestV0{} + } + mt.(messageType).decode(d, valueOf(metadataRequest)) + payload = metadataRequest + break + case ApiVersions: + var mt interface{} + var apiVersionsRequest interface{} + if apiVersion >= 3 { + types := makeTypes(reflect.TypeOf(&ApiVersionsRequestV3{}).Elem()) + mt = types[0] + apiVersionsRequest = &ApiVersionsRequestV3{} + } else { + types := makeTypes(reflect.TypeOf(&ApiVersionsRequestV0{}).Elem()) + mt = types[0] + apiVersionsRequest = &ApiVersionsRequestV0{} + } + mt.(messageType).decode(d, valueOf(apiVersionsRequest)) + payload = apiVersionsRequest + break + case Produce: + var mt interface{} + var produceRequest interface{} + if apiVersion >= 3 { + types := makeTypes(reflect.TypeOf(&ProduceRequestV3{}).Elem()) + mt = types[0] + produceRequest = &ProduceRequestV3{} + } else { + types := makeTypes(reflect.TypeOf(&ProduceRequestV0{}).Elem()) + mt = types[0] + produceRequest = &ProduceRequestV0{} + } + mt.(messageType).decode(d, valueOf(produceRequest)) + payload = produceRequest + break + case Fetch: + var mt interface{} + var fetchRequest interface{} + if apiVersion >= 11 { + types := makeTypes(reflect.TypeOf(&FetchRequestV11{}).Elem()) + mt = types[0] + fetchRequest = &FetchRequestV11{} + } else if apiVersion >= 9 { + types := makeTypes(reflect.TypeOf(&FetchRequestV9{}).Elem()) + mt = types[0] + fetchRequest = &FetchRequestV9{} + } else if apiVersion >= 7 { + types := makeTypes(reflect.TypeOf(&FetchRequestV7{}).Elem()) + mt = types[0] + fetchRequest = &FetchRequestV7{} + } else if apiVersion >= 5 { + types := makeTypes(reflect.TypeOf(&FetchRequestV5{}).Elem()) + mt = types[0] + fetchRequest = &FetchRequestV5{} + } else if apiVersion >= 4 { + types := makeTypes(reflect.TypeOf(&FetchRequestV4{}).Elem()) + mt = types[0] + fetchRequest = &FetchRequestV4{} + } else if apiVersion >= 3 { + types := makeTypes(reflect.TypeOf(&FetchRequestV3{}).Elem()) + mt = types[0] + fetchRequest = &FetchRequestV3{} + } else { + types := makeTypes(reflect.TypeOf(&FetchRequestV0{}).Elem()) + mt = types[0] + fetchRequest = &FetchRequestV0{} + } + mt.(messageType).decode(d, valueOf(fetchRequest)) + payload = fetchRequest + case ListOffsets: + var mt interface{} + var listOffsetsRequest interface{} + if apiVersion >= 4 { + types := makeTypes(reflect.TypeOf(&ListOffsetsRequestV4{}).Elem()) + mt = types[0] + listOffsetsRequest = &ListOffsetsRequestV4{} + } else if apiVersion >= 2 { + types := makeTypes(reflect.TypeOf(&ListOffsetsRequestV2{}).Elem()) + mt = types[0] + listOffsetsRequest = &ListOffsetsRequestV2{} + } else if apiVersion >= 1 { + types := makeTypes(reflect.TypeOf(&ListOffsetsRequestV1{}).Elem()) + mt = types[0] + listOffsetsRequest = &ListOffsetsRequestV1{} + } else { + types := makeTypes(reflect.TypeOf(&ListOffsetsRequestV0{}).Elem()) + mt = types[0] + listOffsetsRequest = &ListOffsetsRequestV0{} + } + mt.(messageType).decode(d, valueOf(listOffsetsRequest)) + payload = listOffsetsRequest + break + case CreateTopics: + var mt interface{} + var createTopicsRequest interface{} + if apiVersion >= 1 { + types := makeTypes(reflect.TypeOf(&CreateTopicsRequestV1{}).Elem()) + mt = types[0] + createTopicsRequest = &CreateTopicsRequestV1{} + } else { + types := makeTypes(reflect.TypeOf(&CreateTopicsRequestV0{}).Elem()) + mt = types[0] + createTopicsRequest = &CreateTopicsRequestV0{} + } + mt.(messageType).decode(d, valueOf(createTopicsRequest)) + payload = createTopicsRequest + break + case DeleteTopics: + var mt interface{} + var deleteTopicsRequest interface{} + if apiVersion >= 6 { + types := makeTypes(reflect.TypeOf(&DeleteTopicsRequestV6{}).Elem()) + mt = types[0] + deleteTopicsRequest = &DeleteTopicsRequestV6{} + } else { + types := makeTypes(reflect.TypeOf(&DeleteTopicsRequestV0{}).Elem()) + mt = types[0] + deleteTopicsRequest = &DeleteTopicsRequestV0{} + } + mt.(messageType).decode(d, valueOf(deleteTopicsRequest)) + payload = deleteTopicsRequest + default: + log.Printf("[WARNING] (Request) Not implemented: %s\n", apiKey) + break + } + + request := &Request{ + Size: size, + ApiKey: apiKey, + ApiVersion: apiVersion, + CorrelationID: correlationID, + ClientID: clientID, + Payload: payload, + } + + key := fmt.Sprintf( + "%s:%s->%s:%s::%d", + net.Src().String(), + transport.Src().String(), + net.Dst().String(), + transport.Dst().String(), + correlationID, + ) + // fmt.Printf("key: %v\n", key) + reqResMatcher.registerRequest(key, request) + + d.discardAll() + + return +} + +func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message) error { + apiKey := msg.ApiKey() + + if i := int(apiKey); i < 0 || i >= len(apiTypes) { + return fmt.Errorf("unsupported api key: %d", i) + } + + t := &apiTypes[apiKey] + if t == nil { + return fmt.Errorf("unsupported api: %s", apiNames[apiKey]) + } + + minVersion := t.minVersion() + maxVersion := t.maxVersion() + + if apiVersion < minVersion || apiVersion > maxVersion { + return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion) + } + + r := &t.requests[apiVersion-minVersion] + v := valueOf(msg) + b := newPageBuffer() + defer b.unref() + + e := &encoder{writer: b} + e.writeInt32(0) // placeholder for the request size + e.writeInt16(int16(apiKey)) + e.writeInt16(apiVersion) + e.writeInt32(correlationID) + + if r.flexible { + // Flexible messages use a nullable string for the client ID, then extra space for a + // tag buffer, which begins with a size value. Since we're not writing any fields into the + // latter, we can just write zero for now. + // + // See + // https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields + // for details. + e.writeNullString(clientID) + e.writeUnsignedVarInt(0) + } else { + // Technically, recent versions of kafka interpret this field as a nullable + // string, however kafka 0.10 expected a non-nullable string and fails with + // a NullPointerException when it receives a null client id. + e.writeString(clientID) + } + r.encode(e, v) + err := e.err + + if err == nil { + size := packUint32(uint32(b.Size()) - 4) + b.WriteAt(size[:], 0) + _, err = b.WriteTo(w) + } + + return err +} diff --git a/tap/extensions/kafka/response.go b/tap/extensions/kafka/response.go new file mode 100644 index 000000000..31a1935f9 --- /dev/null +++ b/tap/extensions/kafka/response.go @@ -0,0 +1,303 @@ +package main + +import ( + "fmt" + "io" + "log" + "reflect" + + "github.com/google/gopacket" +) + +type Response struct { + Size int32 + CorrelationID int32 + Payload interface{} +} + +func (res *Response) print() { + log.Printf("> Response [%d]\n", res.Size) + log.Printf("CorrelationID: %v\n", res.CorrelationID) + log.Printf("Payload: %+v\n", res.Payload) +} + +func ReadResponse(r io.Reader, net gopacket.Flow, transport gopacket.Flow) (err error) { + d := &decoder{reader: r, remain: 4} + size := d.readInt32() + + if err = d.err; err != nil { + err = dontExpectEOF(err) + return + } + + d.remain = int(size) + correlationID := d.readInt32() + var payload interface{} + response := &Response{ + Size: size, + CorrelationID: correlationID, + Payload: payload, + } + + key := fmt.Sprintf( + "%s:%s->%s:%s::%d", + net.Src().String(), + transport.Src().String(), + net.Dst().String(), + transport.Dst().String(), + correlationID, + ) + // fmt.Printf("key: %v\n", key) + reqResPair := reqResMatcher.registerResponse(key, response) + apiKey := reqResPair.Request.ApiKey + apiVersion := reqResPair.Request.ApiVersion + + switch apiKey { + case Metadata: + var mt interface{} + var metadataResponse interface{} + if apiVersion >= 11 { + types := makeTypes(reflect.TypeOf(&MetadataResponseV11{}).Elem()) + mt = types[0] + metadataResponse = &MetadataResponseV11{} + } else if apiVersion >= 10 { + types := makeTypes(reflect.TypeOf(&MetadataResponseV10{}).Elem()) + mt = types[0] + metadataResponse = &MetadataResponseV10{} + } else if apiVersion >= 8 { + types := makeTypes(reflect.TypeOf(&MetadataResponseV8{}).Elem()) + mt = types[0] + metadataResponse = &MetadataResponseV8{} + } else if apiVersion >= 7 { + types := makeTypes(reflect.TypeOf(&MetadataResponseV7{}).Elem()) + mt = types[0] + metadataResponse = &MetadataResponseV7{} + } else if apiVersion >= 5 { + types := makeTypes(reflect.TypeOf(&MetadataResponseV5{}).Elem()) + mt = types[0] + metadataResponse = &MetadataResponseV5{} + } else if apiVersion >= 3 { + types := makeTypes(reflect.TypeOf(&MetadataResponseV3{}).Elem()) + mt = types[0] + metadataResponse = &MetadataResponseV3{} + } else if apiVersion >= 2 { + types := makeTypes(reflect.TypeOf(&MetadataResponseV2{}).Elem()) + mt = types[0] + metadataResponse = &MetadataResponseV2{} + } else if apiVersion >= 1 { + types := makeTypes(reflect.TypeOf(&MetadataResponseV1{}).Elem()) + mt = types[0] + metadataResponse = &MetadataResponseV1{} + } else { + types := makeTypes(reflect.TypeOf(&MetadataResponseV0{}).Elem()) + mt = types[0] + metadataResponse = &MetadataResponseV0{} + } + mt.(messageType).decode(d, valueOf(metadataResponse)) + reqResPair.Response.Payload = metadataResponse + break + case ApiVersions: + var mt interface{} + var apiVersionsResponse interface{} + if apiVersion >= 1 { + types := makeTypes(reflect.TypeOf(&ApiVersionsResponseV1{}).Elem()) + mt = types[0] + apiVersionsResponse = &ApiVersionsResponseV1{} + } else { + types := makeTypes(reflect.TypeOf(&ApiVersionsResponseV0{}).Elem()) + mt = types[0] + apiVersionsResponse = &ApiVersionsResponseV0{} + } + mt.(messageType).decode(d, valueOf(apiVersionsResponse)) + reqResPair.Response.Payload = apiVersionsResponse + break + case Produce: + var mt interface{} + var produceResponse interface{} + if apiVersion >= 8 { + types := makeTypes(reflect.TypeOf(&ProduceResponseV8{}).Elem()) + mt = types[0] + produceResponse = &ProduceResponseV8{} + } else if apiVersion >= 5 { + types := makeTypes(reflect.TypeOf(&ProduceResponseV5{}).Elem()) + mt = types[0] + produceResponse = &ProduceResponseV5{} + } else if apiVersion >= 2 { + types := makeTypes(reflect.TypeOf(&ProduceResponseV2{}).Elem()) + mt = types[0] + produceResponse = &ProduceResponseV2{} + } else if apiVersion >= 1 { + types := makeTypes(reflect.TypeOf(&ProduceResponseV1{}).Elem()) + mt = types[0] + produceResponse = &ProduceResponseV1{} + } else { + types := makeTypes(reflect.TypeOf(&ProduceResponseV0{}).Elem()) + mt = types[0] + produceResponse = &ProduceResponseV0{} + } + mt.(messageType).decode(d, valueOf(produceResponse)) + reqResPair.Response.Payload = produceResponse + break + case Fetch: + var mt interface{} + var fetchResponse interface{} + if apiVersion >= 11 { + types := makeTypes(reflect.TypeOf(&FetchResponseV11{}).Elem()) + mt = types[0] + fetchResponse = &FetchResponseV11{} + } else if apiVersion >= 7 { + types := makeTypes(reflect.TypeOf(&FetchResponseV7{}).Elem()) + mt = types[0] + fetchResponse = &FetchResponseV7{} + } else if apiVersion >= 5 { + types := makeTypes(reflect.TypeOf(&FetchResponseV5{}).Elem()) + mt = types[0] + fetchResponse = &FetchResponseV5{} + } else if apiVersion >= 4 { + types := makeTypes(reflect.TypeOf(&FetchResponseV4{}).Elem()) + mt = types[0] + fetchResponse = &FetchResponseV4{} + } else if apiVersion >= 1 { + types := makeTypes(reflect.TypeOf(&FetchResponseV1{}).Elem()) + mt = types[0] + fetchResponse = &FetchResponseV1{} + } else { + types := makeTypes(reflect.TypeOf(&FetchResponseV0{}).Elem()) + mt = types[0] + fetchResponse = &FetchResponseV0{} + } + mt.(messageType).decode(d, valueOf(fetchResponse)) + reqResPair.Response.Payload = fetchResponse + break + case ListOffsets: + var mt interface{} + var listOffsetsResponse interface{} + if apiVersion >= 4 { + types := makeTypes(reflect.TypeOf(&ListOffsetsResponseV4{}).Elem()) + mt = types[0] + listOffsetsResponse = &ListOffsetsResponseV4{} + } else if apiVersion >= 2 { + types := makeTypes(reflect.TypeOf(&ListOffsetsResponseV2{}).Elem()) + mt = types[0] + listOffsetsResponse = &ListOffsetsResponseV2{} + } else if apiVersion >= 1 { + types := makeTypes(reflect.TypeOf(&ListOffsetsResponseV1{}).Elem()) + mt = types[0] + listOffsetsResponse = &ListOffsetsResponseV1{} + } else { + types := makeTypes(reflect.TypeOf(&ListOffsetsResponseV0{}).Elem()) + mt = types[0] + listOffsetsResponse = &ListOffsetsResponseV0{} + } + mt.(messageType).decode(d, valueOf(listOffsetsResponse)) + reqResPair.Response.Payload = listOffsetsResponse + case CreateTopics: + var mt interface{} + var createTopicsResponse interface{} + if apiVersion >= 7 { + types := makeTypes(reflect.TypeOf(&CreateTopicsResponseV0{}).Elem()) + mt = types[0] + createTopicsResponse = &CreateTopicsResponseV0{} + } else if apiVersion >= 5 { + types := makeTypes(reflect.TypeOf(&CreateTopicsResponseV5{}).Elem()) + mt = types[0] + createTopicsResponse = &CreateTopicsResponseV5{} + } else if apiVersion >= 2 { + types := makeTypes(reflect.TypeOf(&CreateTopicsResponseV2{}).Elem()) + mt = types[0] + createTopicsResponse = &CreateTopicsResponseV2{} + } else if apiVersion >= 1 { + types := makeTypes(reflect.TypeOf(&CreateTopicsResponseV1{}).Elem()) + mt = types[0] + createTopicsResponse = &CreateTopicsResponseV1{} + } else { + types := makeTypes(reflect.TypeOf(&CreateTopicsResponseV0{}).Elem()) + mt = types[0] + createTopicsResponse = &CreateTopicsResponseV0{} + } + mt.(messageType).decode(d, valueOf(createTopicsResponse)) + reqResPair.Response.Payload = createTopicsResponse + break + case DeleteTopics: + var mt interface{} + var deleteTopicsResponse interface{} + if apiVersion >= 6 { + types := makeTypes(reflect.TypeOf(&DeleteTopicsReponseV6{}).Elem()) + mt = types[0] + deleteTopicsResponse = &DeleteTopicsReponseV6{} + } else if apiVersion >= 5 { + types := makeTypes(reflect.TypeOf(&DeleteTopicsReponseV5{}).Elem()) + mt = types[0] + deleteTopicsResponse = &DeleteTopicsReponseV5{} + } else if apiVersion >= 1 { + types := makeTypes(reflect.TypeOf(&DeleteTopicsReponseV1{}).Elem()) + mt = types[0] + deleteTopicsResponse = &DeleteTopicsReponseV1{} + } else { + types := makeTypes(reflect.TypeOf(&DeleteTopicsReponseV0{}).Elem()) + mt = types[0] + deleteTopicsResponse = &DeleteTopicsReponseV0{} + } + mt.(messageType).decode(d, valueOf(deleteTopicsResponse)) + reqResPair.Response.Payload = deleteTopicsResponse + default: + log.Printf("[WARNING] (Response) Not implemented: %s\n", apiKey) + break + } + + reqResPair.print() + + if i := int(apiKey); i < 0 || i >= len(apiTypes) { + err = fmt.Errorf("unsupported api key: %d", i) + return + } + + t := &apiTypes[apiKey] + if t == nil { + err = fmt.Errorf("unsupported api: %s", apiNames[apiKey]) + return + } + + d.discardAll() + + return +} + +func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error { + apiKey := msg.ApiKey() + + if i := int(apiKey); i < 0 || i >= len(apiTypes) { + return fmt.Errorf("unsupported api key: %d", i) + } + + t := &apiTypes[apiKey] + if t == nil { + return fmt.Errorf("unsupported api: %s", apiNames[apiKey]) + } + + minVersion := t.minVersion() + maxVersion := t.maxVersion() + + if apiVersion < minVersion || apiVersion > maxVersion { + return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion) + } + + r := &t.responses[apiVersion-minVersion] + v := valueOf(msg) + b := newPageBuffer() + defer b.unref() + + e := &encoder{writer: b} + e.writeInt32(0) // placeholder for the response size + e.writeInt32(correlationID) + r.encode(e, v) + err := e.err + + if err == nil { + size := packUint32(uint32(b.Size()) - 4) + b.WriteAt(size[:], 0) + _, err = b.WriteTo(w) + } + + return err +} diff --git a/tap/extensions/kafka/structs.go b/tap/extensions/kafka/structs.go new file mode 100644 index 000000000..db3daae59 --- /dev/null +++ b/tap/extensions/kafka/structs.go @@ -0,0 +1,1000 @@ +package main + +import ( + "time" +) + +type RequiredAcks int16 + +const ( + RequireNone RequiredAcks = 0 + RequireOne RequiredAcks = 1 + RequireAll RequiredAcks = -1 +) + +func (acks RequiredAcks) String() string { + switch acks { + case RequireNone: + return "none" + case RequireOne: + return "one" + case RequireAll: + return "all" + default: + return "unknown" + } +} + +type UUID struct { + TimeLow int32 + TimeMid int16 + TimeHiAndVersion int16 + ClockSeq int16 + NodePart1 int32 + NodePart22 int16 +} + +// Metadata Request (Version: 0) + +type MetadataRequestTopicV0 struct { + Name string +} + +type MetadataRequestV0 struct { + Topics []MetadataRequestTopicV0 +} + +// Metadata Request (Version: 4) + +type MetadataRequestV4 struct { + Topics []MetadataRequestTopicV0 + AllowAutoTopicCreation bool +} + +// Metadata Request (Version: 8) + +type MetadataRequestV8 struct { + Topics []MetadataRequestTopicV0 + AllowAutoTopicCreation bool + IncludeClusterAuthorizedOperations bool + IncludeTopicAuthorizedOperations bool +} + +// Metadata Request (Version: 10) + +type MetadataRequestTopicV10 struct { + Name string + UUID UUID +} + +type MetadataRequestV10 struct { + Topics []MetadataRequestTopicV10 + AllowAutoTopicCreation bool + IncludeClusterAuthorizedOperations bool + IncludeTopicAuthorizedOperations bool +} + +// Metadata Request (Version: 11) + +type MetadataRequestV11 struct { + Topics []MetadataRequestTopicV10 + AllowAutoTopicCreation bool + IncludeTopicAuthorizedOperations bool +} + +// Metadata Response (Version: 0) + +type BrokerV0 struct { + NodeId int32 + Host string + Port int32 +} + +type PartitionsV0 struct { + ErrorCode int16 + PartitionIndex int32 + LeaderId int32 + ReplicaNodes int32 + IsrNodes int32 +} + +type TopicV0 struct { + ErrorCode int16 + Name string + Partitions []PartitionsV0 +} + +type MetadataResponseV0 struct { + Brokers []BrokerV0 + Topics []TopicV0 +} + +// Metadata Response (Version: 1) + +type BrokerV1 struct { + NodeId int32 + Host string + Port int32 + Rack string +} + +type TopicV1 struct { + ErrorCode int16 + Name string + IsInternal bool + Partitions []PartitionsV0 +} + +type MetadataResponseV1 struct { + Brokers []BrokerV1 + ControllerID int32 + Topics []TopicV1 +} + +// Metadata Response (Version: 2) + +type MetadataResponseV2 struct { + Brokers []BrokerV1 + ClusterID string + ControllerID int32 + Topics []TopicV1 +} + +// Metadata Response (Version: 3) + +type MetadataResponseV3 struct { + ThrottleTimeMs int32 + Brokers []BrokerV1 + ClusterID string + ControllerID int32 + Topics []TopicV1 +} + +// Metadata Response (Version: 5) + +type PartitionsV5 struct { + ErrorCode int16 + PartitionIndex int32 + LeaderId int32 + ReplicaNodes int32 + IsrNodes int32 + OfflineReplicas int32 +} + +type TopicV5 struct { + ErrorCode int16 + Name string + IsInternal bool + Partitions []PartitionsV5 +} + +type MetadataResponseV5 struct { + ThrottleTimeMs int32 + Brokers []BrokerV1 + ClusterID string + ControllerID int32 + Topics []TopicV5 +} + +// Metadata Response (Version: 7) + +type PartitionsV7 struct { + ErrorCode int16 + PartitionIndex int32 + LeaderId int32 + LeaderEpoch int32 + ReplicaNodes int32 + IsrNodes int32 + OfflineReplicas int32 +} + +type TopicV7 struct { + ErrorCode int16 + Name string + IsInternal bool + Partitions []PartitionsV7 +} + +type MetadataResponseV7 struct { + ThrottleTimeMs int32 + Brokers []BrokerV1 + ClusterID string + ControllerID int32 + Topics []TopicV7 +} + +// Metadata Response (Version: 8) + +type TopicV8 struct { + ErrorCode int16 + Name string + IsInternal bool + Partitions []PartitionsV7 + TopicAuthorizedOperations int32 +} + +type MetadataResponseV8 struct { + ThrottleTimeMs int32 + Brokers []BrokerV1 + ClusterID string + ControllerID int32 + Topics []TopicV8 + ClusterAuthorizedOperations int32 +} + +// Metadata Response (Version: 10) + +type TopicV10 struct { + ErrorCode int16 + Name string + TopicID UUID + IsInternal bool + Partitions []PartitionsV7 + TopicAuthorizedOperations int32 +} + +type MetadataResponseV10 struct { + ThrottleTimeMs int32 + Brokers []BrokerV1 + ClusterID string + ControllerID int32 + Topics []TopicV10 + ClusterAuthorizedOperations int32 +} + +// Metadata Response (Version: 11) + +type MetadataResponseV11 struct { + ThrottleTimeMs int32 + Brokers []BrokerV1 + ClusterID string + ControllerID int32 + Topics []TopicV10 +} + +// ApiVersions Request (Version: 0) + +type ApiVersionsRequestV0 struct{} + +// ApiVersions Request (Version: 3) + +type ApiVersionsRequestV3 struct { + ClientSoftwareName string + ClientSoftwareVersion string +} + +// ApiVersions Response (Version: 0) + +type ApiVersionsResponseApiKey struct { + ApiKey int16 + MinVersion int16 + MaxVersion int16 +} + +type ApiVersionsResponseV0 struct { + ErrorCode int16 + ApiKeys []ApiVersionsResponseApiKey +} + +// ApiVersions Response (Version: 1) + +type ApiVersionsResponseV1 struct { + ErrorCode int16 + ApiKeys []ApiVersionsResponseApiKey // FIXME: `confluent-kafka-python` causes memory leak + ThrottleTimeMs int32 +} + +// Produce Request (Version: 0) + +// Message is a kafka message type +type MessageV0 struct { + Codec int8 // codec used to compress the message contents + CompressionLevel int // compression level + LogAppendTime bool // the used timestamp is LogAppendTime + Key []byte // the message key, may be nil + Value []byte // the message contents + Set *MessageSet // the message set a message might wrap + Version int8 // v1 requires Kafka 0.10 + Timestamp time.Time // the timestamp of the message (version 1+ only) + + compressedSize int // used for computing the compression ratio metrics +} + +// MessageBlock represents a part of request with message +type MessageBlock struct { + Offset int64 + Msg *MessageV0 +} + +// MessageSet is a replacement for RecordBatch in older versions +type MessageSet struct { + PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock + OverflowMessage bool // whether the set on the wire contained an overflow message + Messages []*MessageBlock +} + +type RecordHeader struct { + HeaderKeyLength int8 + HeaderKey string + HeaderValueLength int8 + Value string +} + +// Record is kafka record type +type RecordV0 struct { + Unknown int8 + Attributes int8 + TimestampDelta int8 + OffsetDelta int8 + KeyLength int8 + Key string + ValueLen int8 + Value string + Headers []RecordHeader +} + +// RecordBatch are records from one kafka request +type RecordBatch struct { + BaseOffset int64 + BatchLength int32 + PartitionLeaderEpoch int32 + Magic int8 + Crc int32 + Attributes int16 + LastOffsetDelta int32 + FirstTimestamp int64 + MaxTimestamp int64 + ProducerId int64 + ProducerEpoch int16 + BaseSequence int32 + Record []RecordV0 +} + +type Records struct { + RecordBatch RecordBatch + // TODO: Implement `MessageSet` + // MessageSet MessageSet +} + +type PartitionData struct { + Index int32 + Unknown int32 + Records Records +} + +type Partitions struct { + Length int32 + PartitionData PartitionData +} + +type TopicData struct { + Topic string + Partitions Partitions +} + +type ProduceRequestV0 struct { + RequiredAcks RequiredAcks + Timeout int32 + TopicData []TopicData +} + +// Produce Request (Version: 3) + +type ProduceRequestV3 struct { + TransactionalID string + RequiredAcks RequiredAcks + Timeout int32 + TopicData []TopicData +} + +// Produce Response (Version: 0) + +type PartitionResponseV0 struct { + Index int32 + ErrorCode int16 + BaseOffset int64 +} + +type ResponseV0 struct { + Name string + PartitionResponses []PartitionResponseV0 +} + +type ProduceResponseV0 struct { + Responses []ResponseV0 +} + +// Produce Response (Version: 1) + +type ProduceResponseV1 struct { + Responses []ResponseV0 + ThrottleTimeMs int32 +} + +// Produce Response (Version: 2) + +type PartitionResponseV2 struct { + Index int32 + ErrorCode int16 + BaseOffset int64 + LogAppendTimeMs int64 +} + +type ResponseV2 struct { + Name string + PartitionResponses []PartitionResponseV2 +} + +type ProduceResponseV2 struct { + Responses []ResponseV2 + ThrottleTimeMs int32 +} + +// Produce Response (Version: 5) + +type PartitionResponseV5 struct { + Index int32 + ErrorCode int16 + BaseOffset int64 + LogAppendTimeMs int64 + LogStartOffset int64 +} + +type ResponseV5 struct { + Name string + PartitionResponses []PartitionResponseV5 +} + +type ProduceResponseV5 struct { + Responses []ResponseV5 + ThrottleTimeMs int32 +} + +// Produce Response (Version: 8) + +type RecordErrors struct { + BatchIndex int32 + BatchIndexErrorMessage string +} + +type PartitionResponseV8 struct { + Index int32 + ErrorCode int16 + BaseOffset int64 + LogAppendTimeMs int64 + LogStartOffset int64 + RecordErrors RecordErrors + ErrorMessage string +} + +type ResponseV8 struct { + Name string + PartitionResponses []PartitionResponseV8 +} + +type ProduceResponseV8 struct { + Responses []ResponseV8 + ThrottleTimeMs int32 +} + +// Fetch Request (Version: 0) + +type FetchPartitionV0 struct { + Partition int32 + FetchOffset int64 + PartitionMaxBytes int32 +} + +type FetchTopicV0 struct { + Topic string + Partitions []FetchPartitionV0 +} + +type FetchRequestV0 struct { + ReplicaId int32 + MaxWaitMs int32 + MinBytes int32 + Topics []FetchTopicV0 +} + +// Fetch Request (Version: 3) + +type FetchRequestV3 struct { + ReplicaId int32 + MaxWaitMs int32 + MinBytes int32 + MaxBytes int32 + Topics []FetchTopicV0 +} + +// Fetch Request (Version: 4) + +type FetchRequestV4 struct { + ReplicaId int32 + MaxWaitMs int32 + MinBytes int32 + MaxBytes int32 + IsolationLevel int8 + Topics []FetchTopicV0 +} + +// Fetch Request (Version: 5) + +type FetchPartitionV5 struct { + Partition int32 + FetchOffset int64 + LogStartOffset int64 + PartitionMaxBytes int32 +} + +type FetchTopicV5 struct { + Topic string + Partitions []FetchPartitionV5 +} + +type FetchRequestV5 struct { + ReplicaId int32 + MaxWaitMs int32 + MinBytes int32 + MaxBytes int32 + IsolationLevel int8 + Topics []FetchTopicV5 +} + +// Fetch Request (Version: 7) + +type ForgottenTopicsDataV7 struct { + Topic string + Partitions []int32 +} + +type FetchRequestV7 struct { + ReplicaId int32 + MaxWaitMs int32 + MinBytes int32 + MaxBytes int32 + IsolationLevel int8 + SessionId int32 + SessionEpoch int32 + Topics []FetchTopicV5 + ForgottenTopicsData ForgottenTopicsDataV7 +} + +// Fetch Request (Version: 9) + +type FetchPartitionV9 struct { + Partition int32 + CurrentLeaderEpoch int32 + FetchOffset int64 + LogStartOffset int64 + PartitionMaxBytes int32 +} + +type FetchTopicV9 struct { + Topic string + Partitions []FetchPartitionV9 +} + +type FetchRequestV9 struct { + ReplicaId int32 + MaxWaitMs int32 + MinBytes int32 + MaxBytes int32 + IsolationLevel int8 + SessionId int32 + SessionEpoch int32 + Topics []FetchTopicV9 + ForgottenTopicsData ForgottenTopicsDataV7 +} + +// Fetch Request (Version: 11) + +type FetchRequestV11 struct { + ReplicaId int32 + MaxWaitMs int32 + MinBytes int32 + MaxBytes int32 + IsolationLevel int8 + SessionId int32 + SessionEpoch int32 + Topics []FetchTopicV9 + ForgottenTopicsData ForgottenTopicsDataV7 + RackId string +} + +// Fetch Response (Version: 0) + +type PartitionResponseFetchV0 struct { + Partition int32 + ErrorCode int16 + HighWatermark int64 + RecordSet Records +} + +type ResponseFetchV0 struct { + Topic string + PartitionResponses []PartitionResponseFetchV0 +} + +type FetchResponseV0 struct { + Responses []ResponseFetchV0 +} + +// Fetch Response (Version: 1) + +type FetchResponseV1 struct { + ThrottleTimeMs int32 + Responses []ResponseFetchV0 +} + +// Fetch Response (Version: 4) + +type AbortedTransactionsV4 struct { + ProducerId int32 + FirstOffset int32 +} + +type PartitionResponseFetchV4 struct { + Partition int32 + ErrorCode int16 + HighWatermark int64 + LastStableOffset int64 + AbortedTransactions AbortedTransactionsV4 + RecordSet Records +} + +type ResponseFetchV4 struct { + Topic string + PartitionResponses []PartitionResponseFetchV4 +} + +type FetchResponseV4 struct { + ThrottleTimeMs int32 + Responses []ResponseFetchV4 +} + +// Fetch Response (Version: 5) + +type PartitionResponseFetchV5 struct { + Partition int32 + ErrorCode int16 + HighWatermark int64 + LastStableOffset int64 + LogStartOffset int64 + AbortedTransactions AbortedTransactionsV4 + RecordSet Records +} + +type ResponseFetchV5 struct { + Topic string + PartitionResponses []PartitionResponseFetchV5 +} + +type FetchResponseV5 struct { + ThrottleTimeMs int32 + Responses []ResponseFetchV5 +} + +// Fetch Response (Version: 7) + +type FetchResponseV7 struct { + ThrottleTimeMs int32 + ErrorCode int16 + SessionId int32 + Responses []ResponseFetchV5 +} + +// Fetch Response (Version: 11) + +type PartitionResponseFetchV11 struct { + Partition int32 + ErrorCode int16 + HighWatermark int64 + LastStableOffset int64 + LogStartOffset int64 + AbortedTransactions AbortedTransactionsV4 + PreferredReadReplica int32 + RecordSet Records +} + +type ResponseFetchV11 struct { + Topic string + PartitionResponses []PartitionResponseFetchV11 +} + +type FetchResponseV11 struct { + ThrottleTimeMs int32 + ErrorCode int16 + SessionId int32 + Responses []ResponseFetchV5 +} + +// ListOffsets Request (Version: 0) + +type ListOffsetsRequestPartitionV0 struct { + PartitionIndex int32 + Timestamp int64 + MaxNumOffsets int32 +} + +type ListOffsetsRequestTopicV0 struct { + Name string + Partitions []ListOffsetsRequestPartitionV0 +} + +type ListOffsetsRequestV0 struct { + ReplicaId int32 + Topics []ListOffsetsRequestTopicV0 +} + +// ListOffsets Request (Version: 1) + +type ListOffsetsRequestPartitionV1 struct { + PartitionIndex int32 + Timestamp int64 +} + +type ListOffsetsRequestTopicV1 struct { + Name string + Partitions []ListOffsetsRequestPartitionV1 +} + +type ListOffsetsRequestV1 struct { + ReplicaId int32 + Topics []ListOffsetsRequestTopicV1 +} + +// ListOffsets Request (Version: 2) + +type ListOffsetsRequestV2 struct { + ReplicaId int32 + IsolationLevel int8 + Topics []ListOffsetsRequestTopicV1 +} + +// ListOffsets Request (Version: 4) + +type ListOffsetsRequestPartitionV4 struct { + PartitionIndex int32 + CurrentLeaderEpoch int32 + Timestamp int64 +} + +type ListOffsetsRequestTopicV4 struct { + Name string + Partitions []ListOffsetsRequestPartitionV4 +} + +type ListOffsetsRequestV4 struct { + ReplicaId int32 + Topics []ListOffsetsRequestTopicV4 +} + +// ListOffsets Response (Version: 0) + +type ListOffsetsResponsePartitionV0 struct { + PartitionIndex int32 + ErrorCode int16 + OldStyleOffsets int64 +} + +type ListOffsetsResponseTopicV0 struct { + Name string + Partitions []ListOffsetsResponsePartitionV0 +} + +type ListOffsetsResponseV0 struct { + Topics []ListOffsetsResponseTopicV0 +} + +// ListOffsets Response (Version: 1) + +type ListOffsetsResponsePartitionV1 struct { + PartitionIndex int32 + ErrorCode int16 + Timestamp int64 + Offset int64 +} + +type ListOffsetsResponseTopicV1 struct { + Name string + Partitions []ListOffsetsResponsePartitionV1 +} + +type ListOffsetsResponseV1 struct { + Topics []ListOffsetsResponseTopicV1 +} + +// ListOffsets Response (Version: 2) + +type ListOffsetsResponseV2 struct { + ThrottleTimeMs int32 + Topics []ListOffsetsResponseTopicV1 +} + +// ListOffsets Response (Version: 4) + +type ListOffsetsResponsePartitionV4 struct { + PartitionIndex int32 + ErrorCode int16 + Timestamp int64 + Offset int64 + LeaderEpoch int32 +} + +type ListOffsetsResponseTopicV4 struct { + Name string + Partitions []ListOffsetsResponsePartitionV4 +} + +type ListOffsetsResponseV4 struct { + Topics []ListOffsetsResponseTopicV4 +} + +// CreateTopics Request (Version: 0) + +type AssignmentsV0 struct { + PartitionIndex int32 + BrokerIds []int32 +} + +type CreateTopicsRequestConfigsV0 struct { + Name string + Value string +} + +type CreateTopicsRequestTopicV0 struct { + Name string + NumPartitions int32 + ReplicationFactor int16 + Assignments []AssignmentsV0 + Configs []CreateTopicsRequestConfigsV0 +} + +type CreateTopicsRequestV0 struct { + Topics []CreateTopicsRequestTopicV0 + TimeoutMs int32 +} + +// CreateTopics Request (Version: 1) + +type CreateTopicsRequestV1 struct { + Topics []CreateTopicsRequestTopicV0 + TimeoutMs int32 + ValidateOnly bool +} + +// CreateTopics Response (Version: 0) + +type CreateTopicsResponseTopicV0 struct { + Name string + ErrorCode int16 +} + +type CreateTopicsResponseV0 struct { + Topics []CreateTopicsResponseTopicV0 +} + +// CreateTopics Response (Version: 1) + +type CreateTopicsResponseTopicV1 struct { + Name string + ErrorCode int16 + ErrorMessage string +} + +type CreateTopicsResponseV1 struct { + Topics []CreateTopicsResponseTopicV1 +} + +// CreateTopics Response (Version: 2) + +type CreateTopicsResponseV2 struct { + ThrottleTimeMs int32 + Topics []CreateTopicsResponseTopicV1 +} + +// CreateTopics Response (Version: 5) + +type CreateTopicsResponseConfigsV5 struct { + Name string + Value string + ReadOnly bool + ConfigSource int8 + IsSensitive bool +} + +type CreateTopicsResponseTopicV5 struct { + Name string + ErrorCode int16 + ErrorMessage string + NumPartitions int32 + ReplicationFactor int16 + Configs []CreateTopicsResponseConfigsV5 +} + +type CreateTopicsResponseV5 struct { + ThrottleTimeMs int32 + Topics []CreateTopicsResponseTopicV5 +} + +// CreateTopics Response (Version: 7) + +type CreateTopicsResponseTopicV7 struct { + Name string + TopicID UUID + ErrorCode int16 + ErrorMessage string + NumPartitions int32 + ReplicationFactor int16 + Configs []CreateTopicsResponseConfigsV5 +} + +type CreateTopicsResponseV7 struct { + ThrottleTimeMs int32 + Topics []CreateTopicsResponseTopicV7 +} + +// DeleteTopics Request (Version: 0) + +type DeleteTopicsRequestV0 struct { + TopicNames []string + TimemoutMs int32 +} + +// DeleteTopics Request (Version: 6) + +type DeleteTopicsRequestTopicV6 struct { + Name string + UUID UUID +} + +type DeleteTopicsRequestV6 struct { + Topics []DeleteTopicsRequestTopicV6 + TimemoutMs int32 +} + +// DeleteTopics Response (Version: 0) + +type DeleteTopicsReponseResponseV0 struct { + Name string + ErrorCode int16 +} + +type DeleteTopicsReponseV0 struct { + Responses []DeleteTopicsReponseResponseV0 +} + +// DeleteTopics Response (Version: 1) + +type DeleteTopicsReponseV1 struct { + ThrottleTimeMs int32 + Responses []DeleteTopicsReponseResponseV0 +} + +// DeleteTopics Response (Version: 5) + +type DeleteTopicsReponseResponseV5 struct { + Name string + ErrorCode int16 + ErrorMessage string +} + +type DeleteTopicsReponseV5 struct { + ThrottleTimeMs int32 + Responses []DeleteTopicsReponseResponseV5 +} + +// DeleteTopics Response (Version: 6) + +type DeleteTopicsReponseResponseV6 struct { + Name string + TopicID UUID + ErrorCode int16 + ErrorMessage string +} + +type DeleteTopicsReponseV6 struct { + ThrottleTimeMs int32 + Responses []DeleteTopicsReponseResponseV6 +}