mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-08 22:10:50 +00:00
Make the HTTP request-response counter perfect
This commit is contained in:
@@ -42,6 +42,11 @@ type TcpID struct {
|
||||
Ident string
|
||||
}
|
||||
|
||||
type CounterPair struct {
|
||||
Request uint
|
||||
Response uint
|
||||
}
|
||||
|
||||
type GenericMessage struct {
|
||||
IsRequest bool `json:"is_request"`
|
||||
CaptureTime time.Time `json:"capture_time"`
|
||||
@@ -63,7 +68,7 @@ type OutputChannelItem struct {
|
||||
type Dissector interface {
|
||||
Register(*Extension)
|
||||
Ping()
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, emitter Emitter) error
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, emitter Emitter) error
|
||||
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
|
||||
Summarize(entry *MizuEntry) *BaseEntryDetails
|
||||
Represent(entry *MizuEntry) (Protocol, []byte, error)
|
||||
|
@@ -41,7 +41,7 @@ func (d dissecting) Ping() {
|
||||
|
||||
const amqpRequest string = "amqp_request"
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
|
||||
r := AmqpReader{b}
|
||||
|
||||
var remaining int
|
||||
|
@@ -14,27 +14,16 @@ import (
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
func populateCounterMap(ident string) {
|
||||
if counterMap[ident] == nil {
|
||||
counterMap[ident] = &counterPair{
|
||||
request: 0,
|
||||
response: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter api.Emitter) error {
|
||||
streamID, messageHTTP1, err := grpcAssembler.readMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
populateCounterMap(tcpID.Ident)
|
||||
|
||||
var item *api.OutputChannelItem
|
||||
|
||||
switch messageHTTP1 := messageHTTP1.(type) {
|
||||
case http.Request:
|
||||
counterMap[tcpID.Ident].request++
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
tcpID.SrcIP,
|
||||
@@ -54,7 +43,6 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
|
||||
}
|
||||
}
|
||||
case http.Response:
|
||||
counterMap[tcpID.Ident].response++
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
tcpID.DstIP,
|
||||
@@ -83,14 +71,13 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emitter) error {
|
||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
|
||||
req, err := http.ReadRequest(b)
|
||||
if err != nil {
|
||||
// log.Println("Error reading stream:", err)
|
||||
return err
|
||||
}
|
||||
populateCounterMap(tcpID.Ident)
|
||||
counterMap[tcpID.Ident].request++
|
||||
counterPair.Request++
|
||||
|
||||
body, err := ioutil.ReadAll(req.Body)
|
||||
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
@@ -110,7 +97,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emit
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstPort,
|
||||
counterMap[tcpID.Ident].request,
|
||||
counterPair.Request,
|
||||
)
|
||||
item := reqResMatcher.registerRequest(ident, req, time.Now())
|
||||
if item != nil {
|
||||
@@ -126,14 +113,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emit
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emitter) error {
|
||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
|
||||
res, err := http.ReadResponse(b, nil)
|
||||
if err != nil {
|
||||
// log.Println("Error reading stream:", err)
|
||||
return err
|
||||
}
|
||||
populateCounterMap(tcpID.Ident)
|
||||
counterMap[tcpID.Ident].response++
|
||||
counterPair.Response++
|
||||
var req string
|
||||
req = fmt.Sprintf("<no-request-seen>")
|
||||
|
||||
@@ -163,7 +149,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, emitter api.Emit
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcPort,
|
||||
counterMap[tcpID.Ident].response,
|
||||
counterPair.Response,
|
||||
)
|
||||
item := reqResMatcher.registerResponse(ident, res, time.Now())
|
||||
if item != nil {
|
||||
|
@@ -12,13 +12,6 @@ import (
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
type counterPair struct {
|
||||
request uint
|
||||
response uint
|
||||
}
|
||||
|
||||
var counterMap map[string]*counterPair
|
||||
|
||||
var protocol api.Protocol = api.Protocol{
|
||||
Name: "http",
|
||||
LongName: "Hypertext Transfer Protocol -- HTTP/1.1",
|
||||
@@ -52,7 +45,6 @@ const (
|
||||
|
||||
func init() {
|
||||
log.Println("Initializing HTTP extension.")
|
||||
counterMap = make(map[string]*counterPair)
|
||||
}
|
||||
|
||||
type dissecting string
|
||||
@@ -65,7 +57,7 @@ func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
|
||||
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
|
||||
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
|
||||
if err != nil {
|
||||
@@ -94,7 +86,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
|
||||
}
|
||||
success = true
|
||||
} else if isClient {
|
||||
err = handleHTTP1ClientStream(b, tcpID, emitter)
|
||||
err = handleHTTP1ClientStream(b, tcpID, counterPair, emitter)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
@@ -103,7 +95,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em
|
||||
}
|
||||
success = true
|
||||
} else {
|
||||
err = handleHTTP1ServerStream(b, tcpID, emitter)
|
||||
err = handleHTTP1ServerStream(b, tcpID, counterPair, emitter)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
|
@@ -36,7 +36,7 @@ func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", _protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, emitter api.Emitter) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
|
||||
for {
|
||||
if isClient {
|
||||
_, _, err := ReadRequest(b, tcpID)
|
||||
|
@@ -91,7 +91,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func (h *tcpReader) run(wg *sync.WaitGroup, isClient bool) {
|
||||
func (h *tcpReader) run(wg *sync.WaitGroup, counterPair *api.CounterPair) {
|
||||
defer wg.Done()
|
||||
|
||||
data, err := io.ReadAll(h)
|
||||
@@ -103,6 +103,6 @@ func (h *tcpReader) run(wg *sync.WaitGroup, isClient bool) {
|
||||
|
||||
for _, extension := range extensions {
|
||||
r.Reset(data)
|
||||
extension.Dissector.Dissect(bufio.NewReader(r), isClient, h.tcpID, h.Emitter)
|
||||
extension.Dissector.Dissect(bufio.NewReader(r), h.isClient, h.tcpID, counterPair, h.Emitter)
|
||||
}
|
||||
}
|
||||
|
@@ -48,6 +48,10 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
optchecker: reassembly.NewTCPOptionCheck(),
|
||||
}
|
||||
if stream.isTapTarget {
|
||||
counterPair := &api.CounterPair{
|
||||
Request: 0,
|
||||
Response: 0,
|
||||
}
|
||||
stream.client = tcpReader{
|
||||
msgQueue: make(chan tcpReaderDataMsg),
|
||||
ident: fmt.Sprintf("%s %s", net, transport),
|
||||
@@ -73,15 +77,15 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
DstPort: transport.Src().String(),
|
||||
},
|
||||
parent: stream,
|
||||
isClient: true,
|
||||
isClient: false,
|
||||
isOutgoing: props.isOutgoing,
|
||||
outboundLinkWriter: factory.outboundLinkWriter,
|
||||
Emitter: factory.Emitter,
|
||||
}
|
||||
factory.wg.Add(2)
|
||||
// Start reading from channel stream.reader.bytes
|
||||
go stream.client.run(&factory.wg, true)
|
||||
go stream.server.run(&factory.wg, false)
|
||||
go stream.client.run(&factory.wg, counterPair)
|
||||
go stream.server.run(&factory.wg, counterPair)
|
||||
}
|
||||
return stream
|
||||
}
|
||||
|
Reference in New Issue
Block a user