Remove grpc_assembler.go

This commit is contained in:
M. Mert Yildiran 2021-08-17 05:19:12 +03:00
parent 790ba30654
commit 2166801910
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
2 changed files with 54 additions and 299 deletions

View File

@ -1,244 +0,0 @@
package lib
import (
"bufio"
"bytes"
"encoding/base64"
"encoding/binary"
"errors"
"io"
"math"
"net/http"
"net/url"
"strings"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
const frameHeaderLen = 9
var clientPreface = []byte(http2.ClientPreface)
const initialHeaderTableSize = 4096
const protoHTTP2 = "HTTP/2.0"
const protoMajorHTTP2 = 2
const protoMinorHTTP2 = 0
const maxHTTP2DataLenDefault = 1 * 1024 * 1024 // 1MB
var maxHTTP2DataLen int = maxHTTP2DataLenDefault // value initialized during init
type messageFragment struct {
headers []hpack.HeaderField
data []byte
}
type fragmentsByStream map[uint32]*messageFragment
func (fbs *fragmentsByStream) appendFrame(streamID uint32, frame http2.Frame) {
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if existingFragment, ok := (*fbs)[streamID]; ok {
existingFragment.headers = append(existingFragment.headers, frame.Fields...)
} else {
// new fragment
(*fbs)[streamID] = &messageFragment{headers: frame.Fields}
}
case *http2.DataFrame:
newDataLen := len(frame.Data())
if existingFragment, ok := (*fbs)[streamID]; ok {
existingDataLen := len(existingFragment.data)
// Never save more than maxHTTP2DataLen bytes
numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen-existingDataLen), float64(newDataLen)))
existingFragment.data = append(existingFragment.data, frame.Data()[:numBytesToAppend]...)
} else {
// new fragment
// In principle, should not happen with DATA frames, because they are always preceded by HEADERS
// Never save more than maxHTTP2DataLen bytes
numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen), float64(newDataLen)))
(*fbs)[streamID] = &messageFragment{data: frame.Data()[:numBytesToAppend]}
}
}
}
func (fbs *fragmentsByStream) pop(streamID uint32) ([]hpack.HeaderField, []byte) {
headers := (*fbs)[streamID].headers
data := (*fbs)[streamID].data
delete(*fbs, streamID)
return headers, data
}
func createGrpcAssembler(b *bufio.Reader) GrpcAssembler {
var framerOutput bytes.Buffer
framer := http2.NewFramer(&framerOutput, b)
framer.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
return GrpcAssembler{
fragmentsByStream: make(fragmentsByStream),
framer: framer,
}
}
type GrpcAssembler struct {
fragmentsByStream fragmentsByStream
framer *http2.Framer
}
func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
// Exactly one Framer is used for each half connection.
// (Instead of creating a new Framer for each ReadFrame operation)
// This is needed in order to decompress the headers,
// because the compression context is updated with each requests/response.
frame, err := ga.framer.ReadFrame()
if err != nil {
return 0, nil, err
}
streamID := frame.Header().StreamID
ga.fragmentsByStream.appendFrame(streamID, frame)
if !(ga.isStreamEnd(frame)) {
return 0, nil, nil
}
headers, data := ga.fragmentsByStream.pop(streamID)
// Note: header keys are converted by http.Header.Set to canonical names, e.g. content-type -> Content-Type.
// By converting the keys we violate the HTTP/2 specification, which state that all headers must be lowercase.
headersHTTP1 := make(http.Header)
for _, header := range headers {
headersHTTP1.Add(header.Name, header.Value)
}
dataString := base64.StdEncoding.EncodeToString(data)
// Use http1 types only because they are expected in http_matcher.
// TODO: Create an interface that will be used by http_matcher:registerRequest and http_matcher:registerRequest
// to accept both HTTP/1.x and HTTP/2 requests and responses
var messageHTTP1 interface{}
if _, ok := headersHTTP1[":method"]; ok {
messageHTTP1 = http.Request{
URL: &url.URL{},
Method: "POST",
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
ProtoMinor: protoMinorHTTP2,
Body: io.NopCloser(strings.NewReader(dataString)),
ContentLength: int64(len(dataString)),
}
} else if _, ok := headersHTTP1[":status"]; ok {
messageHTTP1 = http.Response{
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
ProtoMinor: protoMinorHTTP2,
Body: io.NopCloser(strings.NewReader(dataString)),
ContentLength: int64(len(dataString)),
}
} else {
return 0, nil, errors.New("Failed to assemble stream: neither a request nor a message")
}
return streamID, messageHTTP1, nil
}
func (ga *GrpcAssembler) isStreamEnd(frame http2.Frame) bool {
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if frame.StreamEnded() {
return true
}
case *http2.DataFrame:
if frame.StreamEnded() {
return true
}
}
return false
}
/* Check if HTTP/2. Remove HTTP/2 client preface from start of buffer if present
*/
func checkIsHTTP2Connection(b *bufio.Reader, isClient bool) (bool, error) {
if isClient {
return checkIsHTTP2ClientStream(b)
}
return checkIsHTTP2ServerStream(b)
}
func prepareHTTP2Connection(b *bufio.Reader, isClient bool) error {
if !isClient {
return nil
}
return discardClientPreface(b)
}
func checkIsHTTP2ClientStream(b *bufio.Reader) (bool, error) {
return checkClientPreface(b)
}
func checkIsHTTP2ServerStream(b *bufio.Reader) (bool, error) {
buf, err := b.Peek(frameHeaderLen)
if err != nil {
return false, err
}
// If response starts with this text, it is HTTP/1.x
if bytes.Compare(buf, []byte("HTTP/1.0 ")) == 0 || bytes.Compare(buf, []byte("HTTP/1.1 ")) == 0 {
return false, nil
}
// Check server connection preface (a settings frame)
frameHeader := http2.FrameHeader{
Length: uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2]),
Type: http2.FrameType(buf[3]),
Flags: http2.Flags(buf[4]),
StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
}
if frameHeader.Type != http2.FrameSettings {
// If HTTP/2, but not start of stream, will also fulfill this condition.
return false, nil
}
return true, nil
}
func checkClientPreface(b *bufio.Reader) (bool, error) {
bytesStart, err := b.Peek(len(clientPreface))
if err != nil {
return false, err
} else if len(bytesStart) != len(clientPreface) {
return false, errors.New("checkClientPreface: not enough bytes read")
}
if !bytes.Equal(bytesStart, clientPreface) {
return false, nil
}
return true, nil
}
func discardClientPreface(b *bufio.Reader) error {
if isClientPrefacePresent, err := checkClientPreface(b); err != nil {
return err
} else if !isClientPrefacePresent {
return errors.New("discardClientPreface: does not begin with client preface")
}
// Remove client preface string from the buffer
n, err := b.Discard(len(clientPreface))
if err != nil {
return err
} else if n != len(clientPreface) {
return errors.New("discardClientPreface: failed to discard client preface")
}
return nil
}

View File

@ -60,7 +60,6 @@ type httpReader struct {
data []byte data []byte
captureTime time.Time captureTime time.Time
hexdump bool hexdump bool
grpcAssembler GrpcAssembler
messageCount uint messageCount uint
harWriter *HarWriter harWriter *HarWriter
packetsSeen uint packetsSeen uint
@ -102,20 +101,20 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
b := bufio.NewReader(h) b := bufio.NewReader(h)
if isHTTP2, err := checkIsHTTP2Connection(b, h.isClient); err != nil { // if isHTTP2, err := checkIsHTTP2Connection(b, h.isClient); err != nil {
SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)", h.ident, err, err, err) // SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)", h.ident, err, err, err)
// Do something? // // Do something?
} else { // } else {
h.isHTTP2 = isHTTP2 // h.isHTTP2 = isHTTP2
} // }
if h.isHTTP2 { // if h.isHTTP2 {
err := prepareHTTP2Connection(b, h.isClient) // err := prepareHTTP2Connection(b, h.isClient)
if err != nil { // if err != nil {
SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)", h.ident, err, err, err) // SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)", h.ident, err, err, err)
} // }
h.grpcAssembler = createGrpcAssembler(b) // h.grpcAssembler = createGrpcAssembler(b)
} // }
for true { for true {
if h.isHTTP2 { if h.isHTTP2 {
@ -147,51 +146,51 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
} }
func (h *httpReader) handleHTTP2Stream() error { func (h *httpReader) handleHTTP2Stream() error {
streamID, messageHTTP1, err := h.grpcAssembler.readMessage() // streamID, messageHTTP1, err := h.grpcAssembler.readMessage()
h.messageCount++ // h.messageCount++
if err != nil { // if err != nil {
return err // return err
} // }
var reqResPair *requestResponsePair // var reqResPair *requestResponsePair
var connectionInfo *ConnectionInfo // var connectionInfo *ConnectionInfo
switch messageHTTP1 := messageHTTP1.(type) { // switch messageHTTP1 := messageHTTP1.(type) {
case http.Request: // case http.Request:
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID) // ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID)
connectionInfo = &ConnectionInfo{ // connectionInfo = &ConnectionInfo{
ClientIP: h.tcpID.srcIP, // ClientIP: h.tcpID.srcIP,
ClientPort: h.tcpID.srcPort, // ClientPort: h.tcpID.srcPort,
ServerIP: h.tcpID.dstIP, // ServerIP: h.tcpID.dstIP,
ServerPort: h.tcpID.dstPort, // ServerPort: h.tcpID.dstPort,
IsOutgoing: h.isOutgoing, // IsOutgoing: h.isOutgoing,
} // }
reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime) // reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime)
case http.Response: // case http.Response:
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID) // ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID)
connectionInfo = &ConnectionInfo{ // connectionInfo = &ConnectionInfo{
ClientIP: h.tcpID.dstIP, // ClientIP: h.tcpID.dstIP,
ClientPort: h.tcpID.dstPort, // ClientPort: h.tcpID.dstPort,
ServerIP: h.tcpID.srcIP, // ServerIP: h.tcpID.srcIP,
ServerPort: h.tcpID.srcPort, // ServerPort: h.tcpID.srcPort,
IsOutgoing: h.isOutgoing, // IsOutgoing: h.isOutgoing,
} // }
reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime) // reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime)
} // }
if reqResPair != nil { // if reqResPair != nil {
// statsTracker.incMatchedMessages() // // statsTracker.incMatchedMessages()
if h.harWriter != nil { // if h.harWriter != nil {
h.harWriter.WritePair( // h.harWriter.WritePair(
reqResPair.Request.orig.(*http.Request), // reqResPair.Request.orig.(*http.Request),
reqResPair.Request.captureTime, // reqResPair.Request.captureTime,
reqResPair.Response.orig.(*http.Response), // reqResPair.Response.orig.(*http.Response),
reqResPair.Response.captureTime, // reqResPair.Response.captureTime,
connectionInfo, // connectionInfo,
) // )
} // }
} // }
return nil return nil
} }