Files
kubeshark/tap/extensions/kafka/read.go
2022-02-01 12:08:55 +02:00

160 lines
3.2 KiB
Go

package kafka
import (
"bufio"
"errors"
"fmt"
"io"
)
type readable interface {
readFrom(*bufio.Reader, int) (int, error)
}
var errShortRead = errors.New("not enough bytes available to load the response")
func peekRead(r *bufio.Reader, sz int, n int, f func([]byte)) (int, error) {
if n > sz {
return sz, errShortRead
}
b, err := r.Peek(n)
if err != nil {
return sz, err
}
f(b)
return discardN(r, sz, n)
}
func readInt8(r *bufio.Reader, sz int, v *int8) (int, error) {
return peekRead(r, sz, 1, func(b []byte) { *v = makeInt8(b) })
}
func readInt16(r *bufio.Reader, sz int, v *int16) (int, error) {
return peekRead(r, sz, 2, func(b []byte) { *v = makeInt16(b) })
}
func readInt32(r *bufio.Reader, sz int, v *int32) (int, error) {
return peekRead(r, sz, 4, func(b []byte) { *v = makeInt32(b) })
}
func readInt64(r *bufio.Reader, sz int, v *int64) (int, error) {
return peekRead(r, sz, 8, func(b []byte) { *v = makeInt64(b) })
}
func readString(r *bufio.Reader, sz int, v *string) (int, error) {
return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) {
*v, remain, err = readNewString(r, sz, n)
return
})
}
func readStringWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) {
var err error
var len int16
if sz, err = readInt16(r, sz, &len); err != nil {
return sz, err
}
n := int(len)
if n > sz {
return sz, errShortRead
}
return cb(r, sz, n)
}
func readNewString(r *bufio.Reader, sz int, n int) (string, int, error) {
b, sz, err := readNewBytes(r, sz, n)
return string(b), sz, err
}
func readBytes(r *bufio.Reader, sz int, v *[]byte) (int, error) {
return readBytesWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) {
*v, remain, err = readNewBytes(r, sz, n)
return
})
}
func readBytesWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int, int) (int, error)) (int, error) {
var err error
var n int
if sz, err = readArrayLen(r, sz, &n); err != nil {
return sz, err
}
if n > sz {
return sz, errShortRead
}
return cb(r, sz, n)
}
func readNewBytes(r *bufio.Reader, sz int, n int) ([]byte, int, error) {
var err error
var b []byte
var shortRead bool
if n > 0 {
if sz < n {
n = sz
shortRead = true
}
b = make([]byte, n)
n, err = io.ReadFull(r, b)
b = b[:n]
sz -= n
if err == nil && shortRead {
err = errShortRead
}
}
return b, sz, err
}
func readArrayLen(r *bufio.Reader, sz int, n *int) (int, error) {
var err error
var len int32
if sz, err = readInt32(r, sz, &len); err != nil {
return sz, err
}
*n = int(len)
return sz, nil
}
func ReadAll(r *bufio.Reader, sz int, ptrs ...interface{}) (int, error) {
var err error
for _, ptr := range ptrs {
if sz, err = readPtr(r, sz, ptr); err != nil {
break
}
}
return sz, err
}
func readPtr(r *bufio.Reader, sz int, ptr interface{}) (int, error) {
switch v := ptr.(type) {
case *int8:
return readInt8(r, sz, v)
case *int16:
return readInt16(r, sz, v)
case *int32:
return readInt32(r, sz, v)
case *int64:
return readInt64(r, sz, v)
case *string:
return readString(r, sz, v)
case *[]byte:
return readBytes(r, sz, v)
case readable:
return v.readFrom(r, sz)
default:
panic(fmt.Sprintf("unsupported type: %T", v))
}
}