mirror of
				https://github.com/kubeshark/kubeshark.git
				synced 2025-10-23 00:12:03 +00:00 
			
		
		
		
	TRA-3212 Passive-Tapper and Mizu share code (#70)
* Use log in tap package instead of fmt. * Moved api/pkg/tap to root. * Added go.mod and go.sum for tap. * Added replace for shared. * api uses tap module instead of tap package. * Removed dependency of tap in shared by moving env var out of tap. * Fixed compilation bugs. * Fixed: Forgot to export struct field HostMode. * Removed unused flag. * Close har output channel when done. * Moved websocket out of mizu and into passive-tapper. * Send connection details over har output channel. * Fixed compilation errors. * Removed unused info from request response cache. * Renamed connection -> connectionID. * Fixed rename bug. * Export setters and getters for filter ips and ports. * Added tap dependency to Dockerfile. * Uncomment error messages. * Renamed `filterIpAddresses` -> `filterAuthorities`. * Renamed ConnectionID -> ConnectionInfo. * Fixed: Missed one replace.
This commit is contained in:
		
							
								
								
									
										241
									
								
								tap/grpc_assembler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										241
									
								
								tap/grpc_assembler.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,241 @@ | ||||
| package tap | ||||
|  | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"bytes" | ||||
| 	"encoding/base64" | ||||
| 	"encoding/binary" | ||||
| 	"errors" | ||||
| 	"io" | ||||
| 	"math" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"strings" | ||||
|  | ||||
| 	"golang.org/x/net/http2" | ||||
| 	"golang.org/x/net/http2/hpack" | ||||
| ) | ||||
|  | ||||
| const frameHeaderLen = 9 | ||||
| var clientPreface = []byte(http2.ClientPreface) | ||||
| const initialHeaderTableSize = 4096 | ||||
| const protoHTTP2 = "HTTP/2.0" | ||||
| const protoMajorHTTP2 = 2 | ||||
| const protoMinorHTTP2 = 0 | ||||
|  | ||||
| var maxHTTP2DataLen int = maxHTTP2DataLenDefault // value initialized during init | ||||
|  | ||||
| type messageFragment struct { | ||||
| 	headers []hpack.HeaderField | ||||
| 	data []byte | ||||
| } | ||||
|  | ||||
| type fragmentsByStream map[uint32]*messageFragment | ||||
|  | ||||
| func (fbs *fragmentsByStream) appendFrame(streamID uint32, frame http2.Frame) { | ||||
| 	switch frame := frame.(type) { | ||||
| 	case *http2.MetaHeadersFrame: | ||||
| 		if existingFragment, ok := (*fbs)[streamID]; ok { | ||||
| 			existingFragment.headers = append(existingFragment.headers, frame.Fields...) | ||||
| 		} else { | ||||
| 			// new fragment | ||||
| 			(*fbs)[streamID] = &messageFragment{headers: frame.Fields} | ||||
| 		} | ||||
| 	case *http2.DataFrame: | ||||
| 		newDataLen := len(frame.Data()) | ||||
| 		if existingFragment, ok := (*fbs)[streamID]; ok { | ||||
| 			existingDataLen := len(existingFragment.data) | ||||
| 			// Never save more than maxHTTP2DataLen bytes | ||||
| 			numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen - existingDataLen), float64(newDataLen))) | ||||
|  | ||||
| 			existingFragment.data = append(existingFragment.data, frame.Data()[:numBytesToAppend]...) | ||||
| 		} else { | ||||
| 			// new fragment | ||||
| 			// In principle, should not happen with DATA frames, because they are always preceded by HEADERS | ||||
|  | ||||
| 			// Never save more than maxHTTP2DataLen bytes | ||||
| 			numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen), float64(newDataLen))) | ||||
|  | ||||
| 			(*fbs)[streamID] = &messageFragment{data: frame.Data()[:numBytesToAppend]} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (fbs *fragmentsByStream) pop(streamID uint32) ([]hpack.HeaderField, []byte) { | ||||
| 	headers := (*fbs)[streamID].headers | ||||
| 	data := (*fbs)[streamID].data | ||||
| 	delete(*fbs, streamID) | ||||
|  | ||||
| 	return headers, data | ||||
| } | ||||
|  | ||||
| func createGrpcAssembler(b *bufio.Reader) GrpcAssembler { | ||||
| 	var framerOutput bytes.Buffer | ||||
| 	framer := http2.NewFramer(&framerOutput, b) | ||||
| 	framer.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) | ||||
| 	return GrpcAssembler{ | ||||
| 		fragmentsByStream: make(fragmentsByStream), | ||||
| 		framer: framer, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type GrpcAssembler struct { | ||||
| 	fragmentsByStream fragmentsByStream | ||||
| 	framer *http2.Framer | ||||
| } | ||||
|  | ||||
| func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) { | ||||
| 	// Exactly one Framer is used for each half connection. | ||||
| 	// (Instead of creating a new Framer for each ReadFrame operation) | ||||
| 	// This is needed in order to decompress the headers, | ||||
| 	// because the compression context is updated with each requests/response. | ||||
| 	frame, err := ga.framer.ReadFrame() | ||||
| 	if err != nil { | ||||
| 		return 0, nil, err | ||||
| 	} | ||||
|  | ||||
| 	streamID := frame.Header().StreamID | ||||
|  | ||||
| 	ga.fragmentsByStream.appendFrame(streamID, frame) | ||||
|  | ||||
| 	if !(ga.isStreamEnd(frame)) { | ||||
| 		return 0, nil, nil | ||||
| 	} | ||||
|  | ||||
| 	headers, data := ga.fragmentsByStream.pop(streamID) | ||||
|  | ||||
| 	// Note: header keys are converted by http.Header.Set to canonical names, e.g. content-type -> Content-Type. | ||||
| 	// By converting the keys we violate the HTTP/2 specification, which state that all headers must be lowercase. | ||||
| 	headersHTTP1 := make(http.Header) | ||||
| 	for _, header := range headers { | ||||
| 		headersHTTP1.Add(header.Name, header.Value) | ||||
| 	} | ||||
| 	dataString := base64.StdEncoding.EncodeToString(data) | ||||
|  | ||||
| 	// Use http1 types only because they are expected in http_matcher. | ||||
| 	// TODO: Create an interface that will be used by http_matcher:registerRequest and http_matcher:registerRequest | ||||
| 	//       to accept both HTTP/1.x and HTTP/2 requests and responses | ||||
| 	var messageHTTP1 interface{} | ||||
| 	if _, ok := headersHTTP1[":method"]; ok { | ||||
| 		messageHTTP1 = http.Request{ | ||||
| 			URL: &url.URL{}, | ||||
| 			Method: "POST", | ||||
| 			Header: headersHTTP1, | ||||
| 			Proto: protoHTTP2, | ||||
| 			ProtoMajor: protoMajorHTTP2, | ||||
| 			ProtoMinor: protoMinorHTTP2, | ||||
| 			Body: io.NopCloser(strings.NewReader(dataString)), | ||||
| 			ContentLength: int64(len(dataString)), | ||||
| 		} | ||||
| 	} else if _, ok := headersHTTP1[":status"]; ok { | ||||
| 		messageHTTP1 = http.Response{ | ||||
| 			Header: headersHTTP1, | ||||
| 			Proto: protoHTTP2, | ||||
| 			ProtoMajor: protoMajorHTTP2, | ||||
| 			ProtoMinor: protoMinorHTTP2, | ||||
| 			Body: io.NopCloser(strings.NewReader(dataString)), | ||||
| 			ContentLength: int64(len(dataString)), | ||||
| 		} | ||||
| 	} else { | ||||
| 		return 0, nil, errors.New("Failed to assemble stream: neither a request nor a message") | ||||
| 	} | ||||
|  | ||||
| 	return streamID, messageHTTP1, nil | ||||
| } | ||||
|  | ||||
| func (ga *GrpcAssembler) isStreamEnd(frame http2.Frame) bool { | ||||
| 	switch frame := frame.(type) { | ||||
| 	case *http2.MetaHeadersFrame: | ||||
| 		if frame.StreamEnded() { | ||||
| 			return true | ||||
| 		} | ||||
| 	case *http2.DataFrame: | ||||
| 		if frame.StreamEnded() { | ||||
| 			return true | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| /* Check if HTTP/2. Remove HTTP/2 client preface from start of buffer if present | ||||
|  */ | ||||
| func checkIsHTTP2Connection(b *bufio.Reader, isClient bool) (bool, error) { | ||||
| 	if isClient { | ||||
| 		return checkIsHTTP2ClientStream(b) | ||||
| 	} | ||||
|  | ||||
| 	return checkIsHTTP2ServerStream(b) | ||||
| } | ||||
|  | ||||
| func prepareHTTP2Connection(b *bufio.Reader, isClient bool) error { | ||||
| 	if !isClient { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	return discardClientPreface(b) | ||||
| } | ||||
|  | ||||
| func checkIsHTTP2ClientStream(b *bufio.Reader) (bool, error) { | ||||
| 	return checkClientPreface(b) | ||||
| } | ||||
|  | ||||
| func checkIsHTTP2ServerStream(b *bufio.Reader) (bool, error) { | ||||
| 	buf, err := b.Peek(frameHeaderLen) | ||||
| 	if err != nil { | ||||
| 		return false, err | ||||
| 	} | ||||
|  | ||||
| 	// If response starts with this text, it is HTTP/1.x | ||||
| 	if bytes.Compare(buf, []byte("HTTP/1.0 ")) == 0 || bytes.Compare(buf, []byte("HTTP/1.1 ")) == 0 { | ||||
| 		return false, nil | ||||
| 	} | ||||
|  | ||||
| 	// Check server connection preface (a settings frame) | ||||
| 	frameHeader := http2.FrameHeader{ | ||||
| 		Length:   uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2]), | ||||
| 		Type:     http2.FrameType(buf[3]), | ||||
| 		Flags:    http2.Flags(buf[4]), | ||||
| 		StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1), | ||||
| 	} | ||||
|  | ||||
| 	if frameHeader.Type != http2.FrameSettings { | ||||
| 		// If HTTP/2, but not start of stream, will also fulfill this condition. | ||||
| 		return false, nil | ||||
| 	} | ||||
|  | ||||
| 	return true, nil | ||||
| } | ||||
|  | ||||
| func checkClientPreface(b *bufio.Reader) (bool, error) { | ||||
| 	bytesStart, err := b.Peek(len(clientPreface)) | ||||
| 	if err != nil { | ||||
| 		return false, err | ||||
| 	} else if len(bytesStart) != len(clientPreface) { | ||||
| 		return false, errors.New("checkClientPreface: not enough bytes read") | ||||
| 	} | ||||
|  | ||||
| 	if !bytes.Equal(bytesStart, clientPreface) { | ||||
| 		return false, nil | ||||
| 	} | ||||
|  | ||||
| 	return true, nil | ||||
| } | ||||
|  | ||||
| func discardClientPreface(b *bufio.Reader) error { | ||||
| 	if isClientPrefacePresent, err := checkClientPreface(b); err != nil { | ||||
| 		return err | ||||
| 	} else if !isClientPrefacePresent{ | ||||
| 		return errors.New("discardClientPreface: does not begin with client preface") | ||||
| 	} | ||||
|  | ||||
| 	// Remove client preface string from the buffer | ||||
| 	n, err := b.Discard(len(clientPreface)) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} else if n != len(clientPreface) { | ||||
| 		return errors.New("discardClientPreface: failed to discard client preface") | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
		Reference in New Issue
	
	Block a user