mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-27 05:23:06 +00:00
Refactor Mizu, define an extension API and add new protocols: AMQP, Kafka (#224)
* Separate HTTP related code into `extensions/http` as a Go plugin * Move `extensions` folder into `tap` folder * Move HTTP files into `tap/extensions/lib` for now * Replace `orcaman/concurrent-map` with `sync.Map` * Remove `grpc_assembler.go` * Remove `github.com/up9inc/mizu/tap/extensions/http/lib` * Add a build script to automatically build extensions from a known path and load them * Start to define the extension API * Implement the `run()` function for the TCP stream * Add support of defining multiple ports to the extension API * Set the extension name inside the extension * Declare the `Dissect` function in the extension API * Dissect HTTP request from inside the HTTP extension * Make the distinction of outbound and inbound ports * Dissect HTTP response from inside the HTTP extension * Bring back the HTTP request-response pair matcher * Return a `*api.RequestResponsePair` from the dissection * Bring back the gRPC-HTTP/2 parser * Fix the issues in `handleHTTP1ClientStream` and `handleHTTP1ServerStream` * Call a function pointer to emit dissected data back to the `tap` package * roee changes - trying to fix agent to work with the "api" object) - ***still not working*** * small mistake in the conflicts * Fix the issues that are introduced by the merge conflict * Add `Emitter` interface to the API and send `OutputChannelItem`(s) to `OutputChannel` * Fix the `HTTP1` handlers * Set `ConnectionInfo` in HTTP handlers * Fix the `Dockerfile` to build the extensions * remove some unwanted code * no message * Re-enable `getStreamProps` function * Migrate back from `gopacket/tcpassembly` to `gopacket/reassembly` * Introduce `HTTPPayload` struct and `HTTPPayloader` interface to `MarshalJSON()` all the data structures that are returned by the HTTP protocol * Read `socketHarOutChannel` instead of `filteredHarChannel` * Connect `OutputChannelItem` to the last WebSocket means that finally the web UI started to work again * Add `.env.example` to React app * Marshal and unmarshal `*http.Request`, `*http.Response` pairs * Move `loadExtensions` into `main.go` and map extensions into `extensionsMap` * Add `Summarize()` method to the `Dissector` interface * Add `Analyze` method to the `Dissector` interface and `MizuEntry` to the extension API * Add `Protocol` struct and make it effect the UI * Refactor `BaseEntryDetails` struct and display the source and destination ports in the UI * Display the protocol name inside the details layout * Add `Represent` method to the `Dissector` interface and manipulate the UI through this method * Make the protocol color affect the details layout color and write protocol abbreviation vertically * Remove everything HTTP related from the `tap` package and make the extension system fully functional * Fix the TypeScript warnings * Bring in the files related AMQP into `amqp` directory * Add `--nodefrag` flag to the tapper and bring in the main AMQP code * Implement the AMQP `BasicPublish` and fix some issues in the UI when the response payload is missing * Implement `representBasicPublish` method * Fix several minor issues * Implement the AMQP `BasicDeliver` * Implement the AMQP `QueueDeclare` * Implement the AMQP `ExchangeDeclare` * Implement the AMQP `ConnectionStart` * Implement the AMQP `ConnectionClose` * Implement the AMQP `QueueBind` * Implement the AMQP `BasicConsume` * Fix an issue in `ConnectionStart` * Fix a linter error * Bring in the files related Kafka into `kafka` directory * Fix the build errors in Kafka Go files * Implement `Dissect` method of Kafka and adapt request-response pair matcher to asynchronous client-server stream * Do the "Is reversed?" checked inside `getStreamProps` and fix an issue in Kafka `Dissect` method * Implement `Analyze`, `Summarize` methods of Kafka * Implement the representations for Kafka `Metadata`, `RequestHeader` and `ResponseHeader` * Refactor the AMQP and Kafka implementations to create the summary string only inside the `Analyze` method * Implement the representations for Kafka `ApiVersions` * Implement the representations for Kafka `Produce` * Implement the representations for Kafka `Fetch` * Implement the representations for Kafka `ListOffsets`, `CreateTopics` and `DeleteTopics` * Fix the encoding of AMQP `BasicPublish` and `BasicDeliver` body * Remove the unnecessary logging * Remove more logging * Introduce `Version` field to `Protocol` struct for dynamically switching the HTTP protocol to HTTP/2 * Fix the issues in analysis and representation of HTTP/2 (gRPC) protocol * Fix the issues in summary section of details layout for HTTP/2 (gRPC) protocol * Fix the read errors that freezes the sniffer in HTTP and Kafka * Fix the issues in HTTP POST data * Fix one more issue in HTTP POST data * Fix an infinite loop in Kafka * Fix another freezing issue in Kafka * Revert "UI Infra - Support multiple entry types + refactoring (#211)" This reverts commitf74a52d4dc
. * Fix more issues that are introduced by the merge * Fix the status code in the summary section * adding the cleaner again (why we removed it?). add TODO: on the extension loop . * fix dockerfile (remove deleting .env file) - it is found in dockerignore and fails to build if the file not exists * fix GetEntrties ("/entries" endpoint) - working with "tapApi.BaseEntryDetail" (moved from shared) * Fix an issue in the UI summary section * Refactor the protocol payload structs * Fix a log message in the passive tapper * Adapt `APP_PORTS` environment variable to the new extension system and change its format to `APP_PORTS='{"http": ["8001"]}' ` * Revert "fix dockerfile (remove deleting .env file) - it is found in dockerignore and fails to build if the file not exists" This reverts commit4f514ae1f4
. * Bring in the necessary changes fromf74a52d4dc
* Open the API server URL in the web browser as soon as Mizu is ready * Make the TCP reader consists of a single Go routine (instead of two) and try to dissect in both client and server mode by rewinding * Swap `TcpID` without overwriting it * Sort extension by priority * Try to dissect with looping through all the extensions * fix getStreamProps function. (it should be passed from CLI as it was before). * Turn TCP reader back into two Goroutines (client and server) * typo * Learn `isClient` from the TCP stream * Set `viewer` style `overflow: "auto"` * Fix the memory leaks in AMQP and Kafka dissectors * Revert some of the changes inbe7c65eb6d
* Remove `allExtensionPorts` since it's no longer needed * Remove `APP_PORTS` since it's no longer needed * Fix all of the minor issues in the React code * Check Kafka header size and fail-fast * Break the dissectors loop upon a successful dissection * Don't break the dissector loop. Protocols might collide * Improve the HTTP request-response counter (still not perfect) * Make the HTTP request-response counter perfect * Revert "Revert some of the changes in be7c65eb6d3fb657a059707da3ca559937e59739" This reverts commit08e7d786d8
. * Bring back `filterItems` and `isHealthCheckByUserAgent` functions * Remove some development artifacts * remove unused and commented lines that are not relevant * Fix the performance in TCP stream factory. Make it create two `tcpReader`(s) per extension * Change a log to debug * Make `*api.CounterPair` a field of `tcpReader` * Set `isTapTarget` to always `true` again since `filterAuthorities` implementation has problems * Remove a variable that's only used for logging even though not introduced by this branch * Bring back the `NumberOfRules` field of `ApplicableRules` struct * Remove the unused `NewEntry` function * Move `k8sResolver == nil` check to a more appropriate place * default healthChecksUserAgentHeaders should be empty array (like the default config value) * remove spam console.log * Rules button cause app to crash (access the service via incorrect property) * Ignore all .env* files in docker build. * Better caching in dockerfile: only copy go.mod before go mod download. * Check for errors while loading an extension * Add a comment about why `Protocol` is not a pointer * Bring back the call to `deleteOlderThan` * Remove the `nil` check * Reduce the maximum allowed AMQP message from 128MB to 1MB * Fix an error that only occurs when a Kafka broker is initiating * Revert the change inb2abd7b990
* Fix the service name resolution in all protocols * Remove the `anydirection` flag and fix the issue in `filterAuthorities` * Pass `sync.Map` by reference to `deleteOlderThan` method * Fix the packet capture issue in standalone mode that's introduced by the removal of `anydirection` * Temporarily resolve the memory exhaustion in AMQP * Fix a nil pointer dereference error * Fix the CLI build error * Fix a memory leak that's identified by `pprof` Co-authored-by: Roee Gadot <roee.gadot@up9.com> Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>
This commit is contained in:
639
tap/extensions/kafka/read.go
Normal file
639
tap/extensions/kafka/read.go
Normal file
@@ -0,0 +1,639 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type readable interface {
|
||||
readFrom(*bufio.Reader, int) (int, error)
|
||||
}
|
||||
|
||||
var errShortRead = errors.New("not enough bytes available to load the response")
|
||||
|
||||
func peekRead(r *bufio.Reader, sz int, n int, f func([]byte)) (int, error) {
|
||||
if n > sz {
|
||||
return sz, errShortRead
|
||||
}
|
||||
b, err := r.Peek(n)
|
||||
if err != nil {
|
||||
return sz, err
|
||||
}
|
||||
f(b)
|
||||
return discardN(r, sz, n)
|
||||
}
|
||||
|
||||
func readInt8(r *bufio.Reader, sz int, v *int8) (int, error) {
|
||||
return peekRead(r, sz, 1, func(b []byte) { *v = makeInt8(b) })
|
||||
}
|
||||
|
||||
func readInt16(r *bufio.Reader, sz int, v *int16) (int, error) {
|
||||
return peekRead(r, sz, 2, func(b []byte) { *v = makeInt16(b) })
|
||||
}
|
||||
|
||||
func readInt32(r *bufio.Reader, sz int, v *int32) (int, error) {
|
||||
return peekRead(r, sz, 4, func(b []byte) { *v = makeInt32(b) })
|
||||
}
|
||||
|
||||
func readInt64(r *bufio.Reader, sz int, v *int64) (int, error) {
|
||||
return peekRead(r, sz, 8, func(b []byte) { *v = makeInt64(b) })
|
||||
}
|
||||
|
||||
func readVarInt(r *bufio.Reader, sz int, v *int64) (remain int, err error) {
|
||||
// Optimistically assume that most of the time, there will be data buffered
|
||||
// in the reader. If this is not the case, the buffer will be refilled after
|
||||
// consuming zero bytes from the input.
|
||||
input, _ := r.Peek(r.Buffered())
|
||||
x := uint64(0)
|
||||
s := uint(0)
|
||||
|
||||
for {
|
||||
if len(input) > sz {
|
||||
input = input[:sz]
|
||||
}
|
||||
|
||||
for i, b := range input {
|
||||
if b < 0x80 {
|
||||
x |= uint64(b) << s
|
||||
*v = int64(x>>1) ^ -(int64(x) & 1)
|
||||
n, err := r.Discard(i + 1)
|
||||
return sz - n, err
|
||||
}
|
||||
|
||||
x |= uint64(b&0x7f) << s
|
||||
s += 7
|
||||
}
|
||||
|
||||
// Make room in the input buffer to load more data from the underlying
|
||||
// stream. The x and s variables are left untouched, ensuring that the
|
||||
// varint decoding can continue on the next loop iteration.
|
||||
n, _ := r.Discard(len(input))
|
||||
sz -= n
|
||||
if sz == 0 {
|
||||
return 0, errShortRead
|
||||
}
|
||||
|
||||
// Fill the buffer: ask for one more byte, but in practice the reader
|
||||
// will load way more from the underlying stream.
|
||||
if _, err := r.Peek(1); err != nil {
|
||||
if err == io.EOF {
|
||||
err = errShortRead
|
||||
}
|
||||
return sz, err
|
||||
}
|
||||
|
||||
// Grab as many bytes as possible from the buffer, then go on to the
|
||||
// next loop iteration which is going to consume it.
|
||||
input, _ = r.Peek(r.Buffered())
|
||||
}
|
||||
}
|
||||
|
||||
func readBool(r *bufio.Reader, sz int, v *bool) (int, error) {
|
||||
return peekRead(r, sz, 1, func(b []byte) { *v = b[0] != 0 })
|
||||
}
|
||||
|
||||
func readString(r *bufio.Reader, sz int, v *string) (int, error) {
|
||||
return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) {
|
||||
*v, remain, err = readNewString(r, sz, n)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
func readStringWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) {
|
||||
var err error
|
||||
var len int16
|
||||
|
||||
if sz, err = readInt16(r, sz, &len); err != nil {
|
||||
return sz, err
|
||||
}
|
||||
|
||||
n := int(len)
|
||||
if n > sz {
|
||||
return sz, errShortRead
|
||||
}
|
||||
|
||||
return cb(r, sz, n)
|
||||
}
|
||||
|
||||
func readNewString(r *bufio.Reader, sz int, n int) (string, int, error) {
|
||||
b, sz, err := readNewBytes(r, sz, n)
|
||||
return string(b), sz, err
|
||||
}
|
||||
|
||||
func readBytes(r *bufio.Reader, sz int, v *[]byte) (int, error) {
|
||||
return readBytesWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) {
|
||||
*v, remain, err = readNewBytes(r, sz, n)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
func readBytesWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) {
|
||||
var err error
|
||||
var n int
|
||||
|
||||
if sz, err = readArrayLen(r, sz, &n); err != nil {
|
||||
return sz, err
|
||||
}
|
||||
|
||||
if n > sz {
|
||||
return sz, errShortRead
|
||||
}
|
||||
|
||||
return cb(r, sz, n)
|
||||
}
|
||||
|
||||
func readNewBytes(r *bufio.Reader, sz int, n int) ([]byte, int, error) {
|
||||
var err error
|
||||
var b []byte
|
||||
var shortRead bool
|
||||
|
||||
if n > 0 {
|
||||
if sz < n {
|
||||
n = sz
|
||||
shortRead = true
|
||||
}
|
||||
|
||||
b = make([]byte, n)
|
||||
n, err = io.ReadFull(r, b)
|
||||
b = b[:n]
|
||||
sz -= n
|
||||
|
||||
if err == nil && shortRead {
|
||||
err = errShortRead
|
||||
}
|
||||
}
|
||||
|
||||
return b, sz, err
|
||||
}
|
||||
|
||||
func readArrayLen(r *bufio.Reader, sz int, n *int) (int, error) {
|
||||
var err error
|
||||
var len int32
|
||||
if sz, err = readInt32(r, sz, &len); err != nil {
|
||||
return sz, err
|
||||
}
|
||||
*n = int(len)
|
||||
return sz, nil
|
||||
}
|
||||
|
||||
func readArrayWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int) (int, error)) (int, error) {
|
||||
var err error
|
||||
var len int32
|
||||
|
||||
if sz, err = readInt32(r, sz, &len); err != nil {
|
||||
return sz, err
|
||||
}
|
||||
|
||||
for n := int(len); n > 0; n-- {
|
||||
if sz, err = cb(r, sz); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return sz, err
|
||||
}
|
||||
|
||||
func readStringArray(r *bufio.Reader, sz int, v *[]string) (remain int, err error) {
|
||||
var content []string
|
||||
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
|
||||
var value string
|
||||
if fnRemain, fnErr = readString(r, size, &value); fnErr != nil {
|
||||
return
|
||||
}
|
||||
content = append(content, value)
|
||||
return
|
||||
}
|
||||
if remain, err = readArrayWith(r, sz, fn); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
*v = content
|
||||
return
|
||||
}
|
||||
|
||||
func readMapStringInt32(r *bufio.Reader, sz int, v *map[string][]int32) (remain int, err error) {
|
||||
var len int32
|
||||
if remain, err = readInt32(r, sz, &len); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
content := make(map[string][]int32, len)
|
||||
for i := 0; i < int(len); i++ {
|
||||
var key string
|
||||
var values []int32
|
||||
|
||||
if remain, err = readString(r, remain, &key); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
|
||||
var value int32
|
||||
if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil {
|
||||
return
|
||||
}
|
||||
values = append(values, value)
|
||||
return
|
||||
}
|
||||
if remain, err = readArrayWith(r, remain, fn); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
content[key] = values
|
||||
}
|
||||
*v = content
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func read(r *bufio.Reader, sz int, a interface{}) (int, error) {
|
||||
switch v := a.(type) {
|
||||
case *int8:
|
||||
return readInt8(r, sz, v)
|
||||
case *int16:
|
||||
return readInt16(r, sz, v)
|
||||
case *int32:
|
||||
return readInt32(r, sz, v)
|
||||
case *int64:
|
||||
return readInt64(r, sz, v)
|
||||
case *bool:
|
||||
return readBool(r, sz, v)
|
||||
case *string:
|
||||
return readString(r, sz, v)
|
||||
case *[]byte:
|
||||
return readBytes(r, sz, v)
|
||||
}
|
||||
switch v := reflect.ValueOf(a).Elem(); v.Kind() {
|
||||
case reflect.Struct:
|
||||
return readStruct(r, sz, v)
|
||||
case reflect.Slice:
|
||||
return readSlice(r, sz, v)
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported type: %T", a))
|
||||
}
|
||||
}
|
||||
|
||||
func ReadAll(r *bufio.Reader, sz int, ptrs ...interface{}) (int, error) {
|
||||
var err error
|
||||
|
||||
for _, ptr := range ptrs {
|
||||
if sz, err = readPtr(r, sz, ptr); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return sz, err
|
||||
}
|
||||
|
||||
func readPtr(r *bufio.Reader, sz int, ptr interface{}) (int, error) {
|
||||
switch v := ptr.(type) {
|
||||
case *int8:
|
||||
return readInt8(r, sz, v)
|
||||
case *int16:
|
||||
return readInt16(r, sz, v)
|
||||
case *int32:
|
||||
return readInt32(r, sz, v)
|
||||
case *int64:
|
||||
return readInt64(r, sz, v)
|
||||
case *string:
|
||||
return readString(r, sz, v)
|
||||
case *[]byte:
|
||||
return readBytes(r, sz, v)
|
||||
case readable:
|
||||
return v.readFrom(r, sz)
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported type: %T", v))
|
||||
}
|
||||
}
|
||||
|
||||
func readStruct(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
|
||||
var err error
|
||||
for i, n := 0, v.NumField(); i != n; i++ {
|
||||
if sz, err = read(r, sz, v.Field(i).Addr().Interface()); err != nil {
|
||||
return sz, err
|
||||
}
|
||||
}
|
||||
return sz, nil
|
||||
}
|
||||
|
||||
func readSlice(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
|
||||
var err error
|
||||
var len int32
|
||||
|
||||
if sz, err = readInt32(r, sz, &len); err != nil {
|
||||
return sz, err
|
||||
}
|
||||
|
||||
if n := int(len); n < 0 {
|
||||
v.Set(reflect.Zero(v.Type()))
|
||||
} else {
|
||||
v.Set(reflect.MakeSlice(v.Type(), n, n))
|
||||
|
||||
for i := 0; i != n; i++ {
|
||||
if sz, err = read(r, sz, v.Index(i).Addr().Interface()); err != nil {
|
||||
return sz, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return sz, nil
|
||||
}
|
||||
|
||||
func readFetchResponseHeaderV2(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
|
||||
var n int32
|
||||
var p struct {
|
||||
Partition int32
|
||||
ErrorCode int16
|
||||
HighwaterMarkOffset int64
|
||||
MessageSetSize int32
|
||||
}
|
||||
|
||||
if remain, err = readInt32(r, size, &throttle); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = readInt32(r, remain, &n); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// This error should never trigger, unless there's a bug in the kafka client
|
||||
// or server.
|
||||
if n != 1 {
|
||||
err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n)
|
||||
return
|
||||
}
|
||||
|
||||
// We ignore the topic name because we've requests messages for a single
|
||||
// topic, unless there's a bug in the kafka server we will have received
|
||||
// the name of the topic that we requested.
|
||||
if remain, err = discardString(r, remain); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = readInt32(r, remain, &n); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// This error should never trigger, unless there's a bug in the kafka client
|
||||
// or server.
|
||||
if n != 1 {
|
||||
err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n)
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = read(r, remain, &p); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if p.ErrorCode != 0 {
|
||||
err = Error(p.ErrorCode)
|
||||
return
|
||||
}
|
||||
|
||||
// This error should never trigger, unless there's a bug in the kafka client
|
||||
// or server.
|
||||
if remain != int(p.MessageSetSize) {
|
||||
err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", p.MessageSetSize, remain)
|
||||
return
|
||||
}
|
||||
|
||||
watermark = p.HighwaterMarkOffset
|
||||
return
|
||||
}
|
||||
|
||||
func readFetchResponseHeaderV5(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
|
||||
var n int32
|
||||
type AbortedTransaction struct {
|
||||
ProducerId int64
|
||||
FirstOffset int64
|
||||
}
|
||||
var p struct {
|
||||
Partition int32
|
||||
ErrorCode int16
|
||||
HighwaterMarkOffset int64
|
||||
LastStableOffset int64
|
||||
LogStartOffset int64
|
||||
}
|
||||
var messageSetSize int32
|
||||
var abortedTransactions []AbortedTransaction
|
||||
|
||||
if remain, err = readInt32(r, size, &throttle); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = readInt32(r, remain, &n); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// This error should never trigger, unless there's a bug in the kafka client
|
||||
// or server.
|
||||
if n != 1 {
|
||||
err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n)
|
||||
return
|
||||
}
|
||||
|
||||
// We ignore the topic name because we've requests messages for a single
|
||||
// topic, unless there's a bug in the kafka server we will have received
|
||||
// the name of the topic that we requested.
|
||||
if remain, err = discardString(r, remain); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = readInt32(r, remain, &n); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// This error should never trigger, unless there's a bug in the kafka client
|
||||
// or server.
|
||||
if n != 1 {
|
||||
err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n)
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = read(r, remain, &p); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var abortedTransactionLen int
|
||||
if remain, err = readArrayLen(r, remain, &abortedTransactionLen); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if abortedTransactionLen == -1 {
|
||||
abortedTransactions = nil
|
||||
} else {
|
||||
abortedTransactions = make([]AbortedTransaction, abortedTransactionLen)
|
||||
for i := 0; i < abortedTransactionLen; i++ {
|
||||
if remain, err = read(r, remain, &abortedTransactions[i]); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if p.ErrorCode != 0 {
|
||||
err = Error(p.ErrorCode)
|
||||
return
|
||||
}
|
||||
|
||||
remain, err = readInt32(r, remain, &messageSetSize)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// This error should never trigger, unless there's a bug in the kafka client
|
||||
// or server.
|
||||
if remain != int(messageSetSize) {
|
||||
err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", messageSetSize, remain)
|
||||
return
|
||||
}
|
||||
|
||||
watermark = p.HighwaterMarkOffset
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
func readFetchResponseHeaderV10(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
|
||||
var n int32
|
||||
var errorCode int16
|
||||
type AbortedTransaction struct {
|
||||
ProducerId int64
|
||||
FirstOffset int64
|
||||
}
|
||||
var p struct {
|
||||
Partition int32
|
||||
ErrorCode int16
|
||||
HighwaterMarkOffset int64
|
||||
LastStableOffset int64
|
||||
LogStartOffset int64
|
||||
}
|
||||
var messageSetSize int32
|
||||
var abortedTransactions []AbortedTransaction
|
||||
|
||||
if remain, err = readInt32(r, size, &throttle); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = readInt16(r, remain, &errorCode); err != nil {
|
||||
return
|
||||
}
|
||||
if errorCode != 0 {
|
||||
err = Error(errorCode)
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = discardInt32(r, remain); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = readInt32(r, remain, &n); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// This error should never trigger, unless there's a bug in the kafka client
|
||||
// or server.
|
||||
if n != 1 {
|
||||
err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n)
|
||||
return
|
||||
}
|
||||
|
||||
// We ignore the topic name because we've requests messages for a single
|
||||
// topic, unless there's a bug in the kafka server we will have received
|
||||
// the name of the topic that we requested.
|
||||
if remain, err = discardString(r, remain); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = readInt32(r, remain, &n); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// This error should never trigger, unless there's a bug in the kafka client
|
||||
// or server.
|
||||
if n != 1 {
|
||||
err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n)
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = read(r, remain, &p); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var abortedTransactionLen int
|
||||
if remain, err = readArrayLen(r, remain, &abortedTransactionLen); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if abortedTransactionLen == -1 {
|
||||
abortedTransactions = nil
|
||||
} else {
|
||||
abortedTransactions = make([]AbortedTransaction, abortedTransactionLen)
|
||||
for i := 0; i < abortedTransactionLen; i++ {
|
||||
if remain, err = read(r, remain, &abortedTransactions[i]); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if p.ErrorCode != 0 {
|
||||
err = Error(p.ErrorCode)
|
||||
return
|
||||
}
|
||||
|
||||
remain, err = readInt32(r, remain, &messageSetSize)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// This error should never trigger, unless there's a bug in the kafka client
|
||||
// or server.
|
||||
if remain != int(messageSetSize) {
|
||||
err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", messageSetSize, remain)
|
||||
return
|
||||
}
|
||||
|
||||
watermark = p.HighwaterMarkOffset
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
func readMessageHeader(r *bufio.Reader, sz int) (offset int64, attributes int8, timestamp int64, remain int, err error) {
|
||||
var version int8
|
||||
|
||||
if remain, err = readInt64(r, sz, &offset); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// On discarding the message size and CRC:
|
||||
// ---------------------------------------
|
||||
//
|
||||
// - Not sure why kafka gives the message size here, we already have the
|
||||
// number of remaining bytes in the response and kafka should only truncate
|
||||
// the trailing message.
|
||||
//
|
||||
// - TCP is already taking care of ensuring data integrity, no need to
|
||||
// waste resources doing it a second time so we just skip the message CRC.
|
||||
//
|
||||
if remain, err = discardN(r, remain, 8); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = readInt8(r, remain, &version); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if remain, err = readInt8(r, remain, &attributes); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch version {
|
||||
case 0:
|
||||
case 1:
|
||||
remain, err = readInt64(r, remain, ×tamp)
|
||||
default:
|
||||
err = fmt.Errorf("unsupported message version %d found in fetch response", version)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
Reference in New Issue
Block a user