mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-12 21:01:36 +00:00
Fix the issues in handleHTTP1ClientStream
and handleHTTP1ServerStream
This commit is contained in:
@@ -3,7 +3,6 @@ package main
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
@@ -52,13 +51,12 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID) (*api.Req
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID) error {
|
||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestResponsePair, error) {
|
||||
requestCounter++
|
||||
req, err := http.ReadRequest(b)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
if err != nil {
|
||||
log.Println("Error reading stream:", err)
|
||||
return nil, err
|
||||
} else {
|
||||
body, _ := ioutil.ReadAll(req.Body)
|
||||
req.Body.Close()
|
||||
@@ -73,17 +71,19 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID) error {
|
||||
tcpID.DstPort,
|
||||
requestCounter,
|
||||
)
|
||||
reqResMatcher.registerRequest(ident, req, time.Now())
|
||||
return err
|
||||
reqResPair := reqResMatcher.registerRequest(ident, req, time.Now())
|
||||
if reqResPair != nil {
|
||||
fmt.Printf("reqResPair: %+v\n", reqResPair)
|
||||
}
|
||||
return reqResPair, nil
|
||||
}
|
||||
|
||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestResponsePair, error) {
|
||||
responseCounter++
|
||||
res, err := http.ReadResponse(b, nil)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
return nil, nil
|
||||
} else if err != nil {
|
||||
if err != nil {
|
||||
log.Println("Error reading stream:", err)
|
||||
return nil, err
|
||||
} else {
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
res.Body.Close()
|
||||
@@ -99,7 +99,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID) (*api.RequestRes
|
||||
)
|
||||
reqResPair := reqResMatcher.registerResponse(ident, res, time.Now())
|
||||
if reqResPair != nil {
|
||||
return reqResPair, nil
|
||||
fmt.Printf("reqResPair: %+v\n", reqResPair)
|
||||
}
|
||||
return nil, err
|
||||
return reqResPair, nil
|
||||
}
|
||||
|
@@ -47,12 +47,11 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a
|
||||
grpcAssembler = createGrpcAssembler(b)
|
||||
}
|
||||
|
||||
var reqResPair *api.RequestResponsePair
|
||||
|
||||
for {
|
||||
if isHTTP2 {
|
||||
reqResPair, err := handleHTTP2Stream(grpcAssembler, tcpID)
|
||||
if reqResPair != nil {
|
||||
return reqResPair
|
||||
}
|
||||
reqResPair, err = handleHTTP2Stream(grpcAssembler, tcpID)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
@@ -60,7 +59,7 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a
|
||||
continue
|
||||
}
|
||||
} else if isClient {
|
||||
err := handleHTTP1ClientStream(b, tcpID)
|
||||
reqResPair, err = handleHTTP1ClientStream(b, tcpID)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
@@ -68,10 +67,7 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
reqResPair, err := handleHTTP1ServerStream(b, tcpID)
|
||||
if reqResPair != nil {
|
||||
return reqResPair
|
||||
}
|
||||
reqResPair, err = handleHTTP1ServerStream(b, tcpID)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
@@ -80,7 +76,7 @@ func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *a
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return reqResPair
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
@@ -4,7 +4,6 @@ import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
@@ -15,8 +14,6 @@ import (
|
||||
)
|
||||
|
||||
type tcpStreamFactory struct {
|
||||
wg sync.WaitGroup
|
||||
doHTTP bool
|
||||
outbountLinkWriter *OutboundLinkWriter
|
||||
}
|
||||
|
||||
@@ -46,8 +43,7 @@ func (h *tcpStream) serverRun(tcpID *api.TcpID) {
|
||||
for _, extension := range extensions {
|
||||
if containsPort(extension.OutboundPorts, h.transport.Src().String()) {
|
||||
extension.Dissector.Ping()
|
||||
reqResPair := extension.Dissector.Dissect(b, false, tcpID)
|
||||
log.Printf("reqResPair: %+v\n", reqResPair)
|
||||
extension.Dissector.Dissect(b, false, tcpID)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -73,10 +69,6 @@ func (h *tcpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream
|
||||
return &stream.r
|
||||
}
|
||||
|
||||
func (factory *tcpStreamFactory) WaitGoRoutines() {
|
||||
factory.wg.Wait()
|
||||
}
|
||||
|
||||
func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int) *streamProps {
|
||||
if hostMode {
|
||||
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true {
|
||||
|
Reference in New Issue
Block a user