mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-27 08:39:49 +00:00
Add HTTP2 Over Cleartext (H2C) support (#510)
* Add HTTP2 Over Cleartext (H2C) support * Remove a parameter which is a remnant of debugging
This commit is contained in:
parent
e667597e6e
commit
a62842ac9f
@ -7,6 +7,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/up9inc/mizu/tap/api"
|
"github.com/up9inc/mizu/tap/api"
|
||||||
)
|
)
|
||||||
@ -34,12 +35,13 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
|
|||||||
switch messageHTTP1 := messageHTTP1.(type) {
|
switch messageHTTP1 := messageHTTP1.(type) {
|
||||||
case http.Request:
|
case http.Request:
|
||||||
ident := fmt.Sprintf(
|
ident := fmt.Sprintf(
|
||||||
"%s->%s %s->%s %d",
|
"%s->%s %s->%s %d %s",
|
||||||
tcpID.SrcIP,
|
tcpID.SrcIP,
|
||||||
tcpID.DstIP,
|
tcpID.DstIP,
|
||||||
tcpID.SrcPort,
|
tcpID.SrcPort,
|
||||||
tcpID.DstPort,
|
tcpID.DstPort,
|
||||||
streamID,
|
streamID,
|
||||||
|
"HTTP2",
|
||||||
)
|
)
|
||||||
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime)
|
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime)
|
||||||
if item != nil {
|
if item != nil {
|
||||||
@ -53,12 +55,13 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
|
|||||||
}
|
}
|
||||||
case http.Response:
|
case http.Response:
|
||||||
ident := fmt.Sprintf(
|
ident := fmt.Sprintf(
|
||||||
"%s->%s %s->%s %d",
|
"%s->%s %s->%s %d %s",
|
||||||
tcpID.DstIP,
|
tcpID.DstIP,
|
||||||
tcpID.SrcIP,
|
tcpID.SrcIP,
|
||||||
tcpID.DstPort,
|
tcpID.DstPort,
|
||||||
tcpID.SrcPort,
|
tcpID.SrcPort,
|
||||||
streamID,
|
streamID,
|
||||||
|
"HTTP2",
|
||||||
)
|
)
|
||||||
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime)
|
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime)
|
||||||
if item != nil {
|
if item != nil {
|
||||||
@ -84,23 +87,30 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
|
||||||
req, err := http.ReadRequest(b)
|
req, err = http.ReadRequest(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
counterPair.Request++
|
counterPair.Request++
|
||||||
|
|
||||||
body, err := ioutil.ReadAll(req.Body)
|
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
|
||||||
|
if strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") && strings.ToLower(req.Header.Get("Upgrade")) == "h2c" {
|
||||||
|
switchingProtocolsHTTP2 = true
|
||||||
|
}
|
||||||
|
|
||||||
|
var body []byte
|
||||||
|
body, err = ioutil.ReadAll(req.Body)
|
||||||
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||||
|
|
||||||
ident := fmt.Sprintf(
|
ident := fmt.Sprintf(
|
||||||
"%s->%s %s->%s %d",
|
"%s->%s %s->%s %d %s",
|
||||||
tcpID.SrcIP,
|
tcpID.SrcIP,
|
||||||
tcpID.DstIP,
|
tcpID.DstIP,
|
||||||
tcpID.SrcPort,
|
tcpID.SrcPort,
|
||||||
tcpID.DstPort,
|
tcpID.DstPort,
|
||||||
counterPair.Request,
|
counterPair.Request,
|
||||||
|
"HTTP1",
|
||||||
)
|
)
|
||||||
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
|
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
|
||||||
if item != nil {
|
if item != nil {
|
||||||
@ -113,26 +123,34 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
|||||||
}
|
}
|
||||||
filterAndEmit(item, emitter, options)
|
filterAndEmit(item, emitter, options)
|
||||||
}
|
}
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions) (switchingProtocolsHTTP2 bool, err error) {
|
||||||
res, err := http.ReadResponse(b, nil)
|
var res *http.Response
|
||||||
|
res, err = http.ReadResponse(b, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
counterPair.Response++
|
counterPair.Response++
|
||||||
|
|
||||||
body, err := ioutil.ReadAll(res.Body)
|
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
|
||||||
|
if res.StatusCode == 101 && strings.Contains(strings.ToLower(res.Header.Get("Connection")), "upgrade") && strings.ToLower(res.Header.Get("Upgrade")) == "h2c" {
|
||||||
|
switchingProtocolsHTTP2 = true
|
||||||
|
}
|
||||||
|
|
||||||
|
var body []byte
|
||||||
|
body, err = ioutil.ReadAll(res.Body)
|
||||||
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||||
|
|
||||||
ident := fmt.Sprintf(
|
ident := fmt.Sprintf(
|
||||||
"%s->%s %s->%s %d",
|
"%s->%s %s->%s %d %s",
|
||||||
tcpID.DstIP,
|
tcpID.DstIP,
|
||||||
tcpID.SrcIP,
|
tcpID.SrcIP,
|
||||||
tcpID.DstPort,
|
tcpID.DstPort,
|
||||||
tcpID.SrcPort,
|
tcpID.SrcPort,
|
||||||
counterPair.Response,
|
counterPair.Response,
|
||||||
|
"HTTP1",
|
||||||
)
|
)
|
||||||
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime)
|
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime)
|
||||||
if item != nil {
|
if item != nil {
|
||||||
@ -145,5 +163,5 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
|||||||
}
|
}
|
||||||
filterAndEmit(item, emitter, options)
|
filterAndEmit(item, emitter, options)
|
||||||
}
|
}
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -85,7 +86,15 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
|||||||
}
|
}
|
||||||
|
|
||||||
dissected := false
|
dissected := false
|
||||||
|
switchingProtocolsHTTP2 := false
|
||||||
for {
|
for {
|
||||||
|
if switchingProtocolsHTTP2 {
|
||||||
|
switchingProtocolsHTTP2 = false
|
||||||
|
isHTTP2, err = checkIsHTTP2Connection(b, isClient)
|
||||||
|
prepareHTTP2Connection(b, isClient)
|
||||||
|
http2Assembler = createHTTP2Assembler(b)
|
||||||
|
}
|
||||||
|
|
||||||
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
|
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
|
||||||
return errors.New("Identified by another protocol")
|
return errors.New("Identified by another protocol")
|
||||||
}
|
}
|
||||||
@ -99,15 +108,39 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
|||||||
}
|
}
|
||||||
dissected = true
|
dissected = true
|
||||||
} else if isClient {
|
} else if isClient {
|
||||||
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
|
var req *http.Request
|
||||||
|
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options)
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
dissected = true
|
dissected = true
|
||||||
|
|
||||||
|
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
|
||||||
|
if switchingProtocolsHTTP2 {
|
||||||
|
ident := fmt.Sprintf(
|
||||||
|
"%s->%s %s->%s 1 %s",
|
||||||
|
tcpID.SrcIP,
|
||||||
|
tcpID.DstIP,
|
||||||
|
tcpID.SrcPort,
|
||||||
|
tcpID.DstPort,
|
||||||
|
"HTTP2",
|
||||||
|
)
|
||||||
|
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
|
||||||
|
if item != nil {
|
||||||
|
item.ConnectionInfo = &api.ConnectionInfo{
|
||||||
|
ClientIP: tcpID.SrcIP,
|
||||||
|
ClientPort: tcpID.SrcPort,
|
||||||
|
ServerIP: tcpID.DstIP,
|
||||||
|
ServerPort: tcpID.DstPort,
|
||||||
|
IsOutgoing: true,
|
||||||
|
}
|
||||||
|
filterAndEmit(item, emitter, options)
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
|
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options)
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@ -132,6 +165,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
|||||||
reqDetails := request["details"].(map[string]interface{})
|
reqDetails := request["details"].(map[string]interface{})
|
||||||
resDetails := response["details"].(map[string]interface{})
|
resDetails := response["details"].(map[string]interface{})
|
||||||
|
|
||||||
|
isRequestUpgradedH2C := false
|
||||||
|
|
||||||
for _, header := range reqDetails["headers"].([]interface{}) {
|
for _, header := range reqDetails["headers"].([]interface{}) {
|
||||||
h := header.(map[string]interface{})
|
h := header.(map[string]interface{})
|
||||||
if h["name"] == "Host" {
|
if h["name"] == "Host" {
|
||||||
@ -143,13 +178,19 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
|||||||
if h["name"] == ":path" {
|
if h["name"] == ":path" {
|
||||||
path = h["value"].(string)
|
path = h["value"].(string)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if h["name"] == "Upgrade" {
|
||||||
|
if h["value"].(string) == "h2c" {
|
||||||
|
isRequestUpgradedH2C = true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if resDetails["bodySize"].(float64) < 0 {
|
if resDetails["bodySize"].(float64) < 0 {
|
||||||
resDetails["bodySize"] = 0
|
resDetails["bodySize"] = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
if item.Protocol.Version == "2.0" {
|
if item.Protocol.Version == "2.0" && !isRequestUpgradedH2C {
|
||||||
service = authority
|
service = authority
|
||||||
} else {
|
} else {
|
||||||
service = host
|
service = host
|
||||||
@ -192,7 +233,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
|||||||
resDetails["statusText"] = grpcStatusCodes[statusCode]
|
resDetails["statusText"] = grpcStatusCodes[statusCode]
|
||||||
}
|
}
|
||||||
|
|
||||||
if item.Protocol.Version == "2.0" {
|
if item.Protocol.Version == "2.0" && !isRequestUpgradedH2C {
|
||||||
reqDetails["url"] = path
|
reqDetails["url"] = path
|
||||||
request["url"] = path
|
request["url"] = path
|
||||||
}
|
}
|
||||||
|
@ -92,6 +92,6 @@ func splitIdent(ident string) []string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func genKey(split []string) string {
|
func genKey(split []string) string {
|
||||||
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
|
key := fmt.Sprintf("%s:%s->%s:%s,%s%s", split[0], split[2], split[1], split[3], split[4], split[5])
|
||||||
return key
|
return key
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user