Files
kubeshark/tap/extensions/http/http2_assembler.go
M. Mert Yıldıran a9a61edd50 Add ARM64 and cross-compilation support to the agent image (#659)
* modified Dockerfile to work for both amd64 (Intel) and arm64 (M1)

* added changelog

* Update `Dockerfile` to have `ARCH` build argument

* Remove `docs/CHANGES.md`

* Upgrade the Basenine version from `v0.3.0` to `v0.4.6`

* Update `publish.yml` to have `ARCH` build argument

* Switch `BasenineImageRepo` to Docker Hub

* Have separate build arguments for `ARCH` and `GOARCH`

* Upgrade the Basenine version from `v0.4.6` to `v0.4.10`

* Oops forgot to update the 10th duplicated shell script

* Fix the oopsie and reduce duplications

* Fix `Dockerfile`

* Fix the incompatibility issue between Go plugins and gold linker in Alpine inside `Dockerfile`

* Fix `asm: xxhash_amd64.s:120: when dynamic linking, R15 is clobbered by a global variable access` error

* Update `Dockerfile` to have cross-compilation on an AMD64 machine

Also revert changes in the shell scripts

* Delete `debug.Dockerfile`

* Create a custom base (`debian:buster-slim` based) image for the shipped image

* Replace `mertyildiran/debian-pcap` with `up9inc/debian-pcap`

* Upgrade Basenine version to `v0.4.12`

* Use `debian:stable-slim` as the base

* Fix the indentation in the `Dockerfile`

* Update `publish.yml`

* Enable `publish.yml` for `feature/multiarch_build` branch

* Tag correctly and set `ARCH` Docker argument

* Remove the lines that are forgotten to be removed from the shell scripts

* Add `MizuAgentImageRepo` constant and use it as default `AgentImage` value

* Bring back `Set up Cloud SDK` step to `Build the CLI and publish` job

* Build ARM64 CLI for Linux as well

* Revert "Enable `publish.yml` for `feature/multiarch_build` branch"

This reverts commit d30be4c1f0.

* Revert Go 1.17 upgrade

* Remove `build_extensions_debug.sh` as well

* Make the `Dockerfile` to compile the agent statically

* Statically link the protocol extensions

* Fix `Dockerfile`

* Bring back `-s -w` flags

* Verify the signatures of the downloads in `dockcross/linux-arm64-musl`

* Revert modifications in some shell scripts

* Make the `BUILDARCH` and `TARGETARCH` separation in the `Dockerfile`

* Separate cross-compilation builder image into a separate repo named `up9inc/linux-arm64-musl-go-libpcap`

* Fill the shell script and specify the tag for `dockcross/linux-arm64-musl`

* Remove the unnecessary dependencies from `builder-native-base`

* Improve the comments in the `Dockerfile`

* Upgrade Basenine version to `v0.4.13`

* Fix `Dockerfile`

* Revert "Revert "Enable `publish.yml` for `feature/multiarch_build` branch""

This reverts commit 303e466bdc.

* Revert "Revert "Revert "Enable `publish.yml` for `feature/multiarch_build` branch"""

This reverts commit 0fe252bbdb.

* Remove `push-docker-debug` from the `Makefile`

* Rename `publish.yml` to `release.yml`

Co-authored-by: Alex Haiut <alex@up9.com>
2022-01-25 21:24:50 +03:00

291 lines
7.3 KiB
Go

package http
import (
"bufio"
"bytes"
"encoding/base64"
"encoding/binary"
"errors"
"io"
"math"
"net/http"
"net/url"
"strconv"
"strings"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
const frameHeaderLen = 9
var clientPreface = []byte(http2.ClientPreface)
const initialHeaderTableSize = 4096
const protoHTTP2 = "HTTP/2.0"
const protoMajorHTTP2 = 2
const protoMinorHTTP2 = 0
var maxHTTP2DataLen = 1 * 1024 * 1024 // 1MB
var grpcStatusCodes = []string{
"OK",
"CANCELLED",
"UNKNOWN",
"INVALID_ARGUMENT",
"DEADLINE_EXCEEDED",
"NOT_FOUND",
"ALREADY_EXISTS",
"PERMISSION_DENIED",
"RESOURCE_EXHAUSTED",
"FAILED_PRECONDITION",
"ABORTED",
"OUT_OF_RANGE",
"UNIMPLEMENTED",
"INTERNAL",
"UNAVAILABLE",
"DATA_LOSS",
"UNAUTHENTICATED",
}
type messageFragment struct {
headers []hpack.HeaderField
data []byte
}
type fragmentsByStream map[uint32]*messageFragment
func (fbs *fragmentsByStream) appendFrame(streamID uint32, frame http2.Frame) {
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if existingFragment, ok := (*fbs)[streamID]; ok {
existingFragment.headers = append(existingFragment.headers, frame.Fields...)
} else {
// new fragment
(*fbs)[streamID] = &messageFragment{headers: frame.Fields}
}
case *http2.DataFrame:
newDataLen := len(frame.Data())
if existingFragment, ok := (*fbs)[streamID]; ok {
existingDataLen := len(existingFragment.data)
// Never save more than maxHTTP2DataLen bytes
numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen-existingDataLen), float64(newDataLen)))
existingFragment.data = append(existingFragment.data, frame.Data()[:numBytesToAppend]...)
} else {
// new fragment
// In principle, should not happen with DATA frames, because they are always preceded by HEADERS
// Never save more than maxHTTP2DataLen bytes
numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen), float64(newDataLen)))
(*fbs)[streamID] = &messageFragment{data: frame.Data()[:numBytesToAppend]}
}
}
}
func (fbs *fragmentsByStream) pop(streamID uint32) ([]hpack.HeaderField, []byte) {
headers := (*fbs)[streamID].headers
data := (*fbs)[streamID].data
delete(*fbs, streamID)
return headers, data
}
func createHTTP2Assembler(b *bufio.Reader) *Http2Assembler {
var framerOutput bytes.Buffer
framer := http2.NewFramer(&framerOutput, b)
framer.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
return &Http2Assembler{
fragmentsByStream: make(fragmentsByStream),
framer: framer,
}
}
type Http2Assembler struct {
fragmentsByStream fragmentsByStream
framer *http2.Framer
}
func (ga *Http2Assembler) readMessage() (streamID uint32, messageHTTP1 interface{}, isGrpc bool, err error) {
// Exactly one Framer is used for each half connection.
// (Instead of creating a new Framer for each ReadFrame operation)
// This is needed in order to decompress the headers,
// because the compression context is updated with each requests/response.
frame, err := ga.framer.ReadFrame()
if err != nil {
return
}
streamID = frame.Header().StreamID
ga.fragmentsByStream.appendFrame(streamID, frame)
if !(ga.isStreamEnd(frame)) {
streamID = 0
return
}
headers, data := ga.fragmentsByStream.pop(streamID)
// Note: header keys are converted by http.Header.Set to canonical names, e.g. content-type -> Content-Type.
// By converting the keys we violate the HTTP/2 specification, which state that all headers must be lowercase.
headersHTTP1 := make(http.Header)
for _, header := range headers {
headersHTTP1.Add(header.Name, header.Value)
}
dataString := base64.StdEncoding.EncodeToString(data)
// Use http1 types only because they are expected in http_matcher.
method := headersHTTP1.Get(":method")
status := headersHTTP1.Get(":status")
// gRPC detection
grpcStatus := headersHTTP1.Get("Grpc-Status")
if grpcStatus != "" {
isGrpc = true
status = grpcStatus
}
if strings.Contains(headersHTTP1.Get("Content-Type"), "application/grpc") {
isGrpc = true
grpcPath := headersHTTP1.Get(":path")
pathSegments := strings.Split(grpcPath, "/")
if len(pathSegments) > 0 {
method = pathSegments[len(pathSegments)-1]
}
}
if method != "" {
messageHTTP1 = http.Request{
URL: &url.URL{},
Method: method,
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
ProtoMinor: protoMinorHTTP2,
Body: io.NopCloser(strings.NewReader(dataString)),
ContentLength: int64(len(dataString)),
}
} else if status != "" {
var statusCode int
statusCode, err = strconv.Atoi(status)
if err != nil {
return
}
messageHTTP1 = http.Response{
StatusCode: statusCode,
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
ProtoMinor: protoMinorHTTP2,
Body: io.NopCloser(strings.NewReader(dataString)),
ContentLength: int64(len(dataString)),
}
} else {
err = errors.New("failed to assemble stream: neither a request nor a message")
return
}
return
}
func (ga *Http2Assembler) isStreamEnd(frame http2.Frame) bool {
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if frame.StreamEnded() {
return true
}
case *http2.DataFrame:
if frame.StreamEnded() {
return true
}
}
return false
}
/* Check if HTTP/2. Remove HTTP/2 client preface from start of buffer if present
*/
func checkIsHTTP2Connection(b *bufio.Reader, isClient bool) (bool, error) {
if isClient {
return checkIsHTTP2ClientStream(b)
}
return checkIsHTTP2ServerStream(b)
}
func prepareHTTP2Connection(b *bufio.Reader, isClient bool) error {
if !isClient {
return nil
}
return discardClientPreface(b)
}
func checkIsHTTP2ClientStream(b *bufio.Reader) (bool, error) {
return checkClientPreface(b)
}
func checkIsHTTP2ServerStream(b *bufio.Reader) (bool, error) {
buf, err := b.Peek(frameHeaderLen)
if err != nil {
return false, err
}
// If response starts with HTTP/1. then it's not HTTP/2
if bytes.HasPrefix(buf, []byte("HTTP/1.")) {
return false, nil
}
// Check server connection preface (a settings frame)
frameHeader := http2.FrameHeader{
Length: uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2]),
Type: http2.FrameType(buf[3]),
Flags: http2.Flags(buf[4]),
StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
}
if frameHeader.Type != http2.FrameSettings {
// If HTTP/2, but not start of stream, will also fulfill this condition.
return false, nil
}
return true, nil
}
func checkClientPreface(b *bufio.Reader) (bool, error) {
bytesStart, err := b.Peek(len(clientPreface))
if err != nil {
return false, err
} else if len(bytesStart) != len(clientPreface) {
return false, errors.New("checkClientPreface: not enough bytes read")
}
if !bytes.Equal(bytesStart, clientPreface) {
return false, nil
}
return true, nil
}
func discardClientPreface(b *bufio.Reader) error {
if isClientPrefacePresent, err := checkClientPreface(b); err != nil {
return err
} else if !isClientPrefacePresent {
return errors.New("discardClientPreface: does not begin with client preface")
}
// Remove client preface string from the buffer
n, err := b.Discard(len(clientPreface))
if err != nil {
return err
} else if n != len(clientPreface) {
return errors.New("discardClientPreface: failed to discard client preface")
}
return nil
}