mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-29 15:37:05 +00:00
Fix the issues that are introduced by the merge conflict
This commit is contained in:
parent
5bd8b1215a
commit
2d188c2dc9
@ -51,5 +51,5 @@ type OutputChannelItem struct {
|
||||
type Dissector interface {
|
||||
Register(*Extension)
|
||||
Ping()
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, callback func(reqResPair *RequestResponsePair))
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, callback func(item *OutputChannelItem))
|
||||
}
|
||||
|
@ -13,19 +13,18 @@ func init() {
|
||||
|
||||
type dissecting string
|
||||
|
||||
func (g dissecting) Register(extension *api.Extension) {
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Name = "amqp"
|
||||
extension.OutboundPorts = []string{"5671", "5672"}
|
||||
extension.InboundPorts = []string{}
|
||||
}
|
||||
|
||||
func (g dissecting) Ping() {
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong AMQP\n")
|
||||
}
|
||||
|
||||
func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.OutputChannelItem {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(item *api.OutputChannelItem)) {
|
||||
// TODO: Implement
|
||||
return nil
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
@ -9,7 +9,7 @@ require (
|
||||
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7
|
||||
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
|
||||
golang.org/x/text v0.3.4
|
||||
golang.org/x/text v0.3.5
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a
|
||||
)
|
||||
|
||||
|
@ -28,6 +28,8 @@ golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc=
|
||||
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
|
||||
|
@ -11,13 +11,13 @@ import (
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error {
|
||||
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error {
|
||||
streamID, messageHTTP1, err := grpcAssembler.readMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var reqResPair *api.RequestResponsePair
|
||||
var item *api.OutputChannelItem
|
||||
|
||||
switch messageHTTP1 := messageHTTP1.(type) {
|
||||
case http.Request:
|
||||
@ -30,7 +30,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func
|
||||
tcpID.DstPort,
|
||||
streamID,
|
||||
)
|
||||
reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, time.Now())
|
||||
item = reqResMatcher.registerRequest(ident, &messageHTTP1, time.Now())
|
||||
case http.Response:
|
||||
responseCounter++
|
||||
ident := fmt.Sprintf(
|
||||
@ -41,17 +41,17 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, Emit func
|
||||
tcpID.SrcPort,
|
||||
streamID,
|
||||
)
|
||||
reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, time.Now())
|
||||
item = reqResMatcher.registerResponse(ident, &messageHTTP1, time.Now())
|
||||
}
|
||||
|
||||
if reqResPair != nil {
|
||||
Emit(reqResPair)
|
||||
if item != nil {
|
||||
Emit(item)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error {
|
||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error {
|
||||
requestCounter++
|
||||
req, err := http.ReadRequest(b)
|
||||
if err != nil {
|
||||
@ -71,14 +71,14 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqRes
|
||||
tcpID.DstPort,
|
||||
requestCounter,
|
||||
)
|
||||
reqResPair := reqResMatcher.registerRequest(ident, req, time.Now())
|
||||
if reqResPair != nil {
|
||||
Emit(reqResPair)
|
||||
item := reqResMatcher.registerRequest(ident, req, time.Now())
|
||||
if item != nil {
|
||||
Emit(item)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqResPair *api.RequestResponsePair)) error {
|
||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) error {
|
||||
responseCounter++
|
||||
res, err := http.ReadResponse(b, nil)
|
||||
if err != nil {
|
||||
@ -97,9 +97,9 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, Emit func(reqRes
|
||||
tcpID.SrcPort,
|
||||
responseCounter,
|
||||
)
|
||||
reqResPair := reqResMatcher.registerResponse(ident, res, time.Now())
|
||||
if reqResPair != nil {
|
||||
Emit(reqResPair)
|
||||
item := reqResMatcher.registerResponse(ident, res, time.Now())
|
||||
if item != nil {
|
||||
Emit(item)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -4,10 +4,7 @@ import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
@ -25,67 +22,60 @@ type dissecting string
|
||||
|
||||
const ExtensionName = "http"
|
||||
|
||||
func (g dissecting) Register(extension *api.Extension) {
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Name = ExtensionName
|
||||
extension.OutboundPorts = []string{"80", "8080", "443"}
|
||||
extension.InboundPorts = []string{}
|
||||
}
|
||||
|
||||
func (g dissecting) Ping() {
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong HTTP\n")
|
||||
}
|
||||
|
||||
func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.OutputChannelItem {
|
||||
for {
|
||||
if isClient {
|
||||
requestCounter++
|
||||
req, err := http.ReadRequest(b)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
log.Println("Error reading stream:", err)
|
||||
} else {
|
||||
body, _ := ioutil.ReadAll(req.Body)
|
||||
req.Body.Close()
|
||||
log.Printf("Received request: %+v with body: %+v\n", req, body)
|
||||
}
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, Emit func(item *api.OutputChannelItem)) {
|
||||
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
|
||||
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
|
||||
if err != nil {
|
||||
SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)", ident, err, err, err)
|
||||
// Do something?
|
||||
}
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstPort,
|
||||
requestCounter,
|
||||
)
|
||||
reqResMatcher.registerRequest(ident, req, time.Now())
|
||||
} else {
|
||||
responseCounter++
|
||||
res, err := http.ReadResponse(b, nil)
|
||||
var grpcAssembler *GrpcAssembler
|
||||
if isHTTP2 {
|
||||
err := prepareHTTP2Connection(b, isClient)
|
||||
if err != nil {
|
||||
SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)", ident, err, err, err)
|
||||
}
|
||||
grpcAssembler = createGrpcAssembler(b)
|
||||
}
|
||||
|
||||
for {
|
||||
if isHTTP2 {
|
||||
err = handleHTTP2Stream(grpcAssembler, tcpID, Emit)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
return nil
|
||||
break
|
||||
} else if err != nil {
|
||||
log.Println("Error reading stream:", err)
|
||||
} else {
|
||||
body, _ := ioutil.ReadAll(res.Body)
|
||||
res.Body.Close()
|
||||
log.Printf("Received response: %+v with body: %+v\n", res, body)
|
||||
SilentError("HTTP/2", "stream %s error: %s (%v,%+v)", ident, err, err, err)
|
||||
continue
|
||||
}
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcPort,
|
||||
responseCounter,
|
||||
)
|
||||
reqResPair := reqResMatcher.registerResponse(ident, res, time.Now())
|
||||
if reqResPair != nil {
|
||||
return reqResPair
|
||||
} else if isClient {
|
||||
err = handleHTTP1ClientStream(b, tcpID, Emit)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
SilentError("HTTP-request", "stream %s Request error: %s (%v,%+v)", ident, err, err, err)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
err = handleHTTP1ServerStream(b, tcpID, Emit)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
SilentError("HTTP-response", "stream %s Response error: %s (%v,%+v)", ident, err, err, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
@ -13,19 +13,18 @@ func init() {
|
||||
|
||||
type dissecting string
|
||||
|
||||
func (g dissecting) Register(extension *api.Extension) {
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Name = "kafka"
|
||||
extension.OutboundPorts = []string{"9092"}
|
||||
extension.InboundPorts = []string{}
|
||||
}
|
||||
|
||||
func (g dissecting) Ping() {
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong Kafka\n")
|
||||
}
|
||||
|
||||
func (g dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID) *api.OutputChannelItem {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, callback func(item *api.OutputChannelItem)) {
|
||||
// TODO: Implement
|
||||
return nil
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
@ -9,7 +9,7 @@ require (
|
||||
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7
|
||||
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
|
||||
golang.org/x/text v0.3.4
|
||||
golang.org/x/text v0.3.5
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a
|
||||
)
|
||||
|
||||
|
@ -31,6 +31,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc=
|
||||
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
|
@ -189,7 +189,7 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
|
||||
return c.CaptureInfo
|
||||
}
|
||||
|
||||
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) () {
|
||||
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) {
|
||||
hostMode = opts.HostMode
|
||||
|
||||
if GetMemoryProfilingEnabled() {
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
|
||||
type tcpStreamFactory struct {
|
||||
outbountLinkWriter *OutboundLinkWriter
|
||||
OutputChannelItem chan *api.OutputChannelItem
|
||||
OutputChannelItem chan *api.OutputChannelItem
|
||||
}
|
||||
|
||||
const checkTLSPacketAmount = 100
|
||||
@ -29,10 +29,8 @@ func containsPort(ports []string, port string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func Emit(reqResPair *api.RequestResponsePair) {
|
||||
log.Printf("Emit reqResPair: %+v\n", reqResPair)
|
||||
log.Printf("Emit reqResPair.Request.Orig: %v\n", reqResPair.Request.Orig)
|
||||
log.Printf("Emit reqResPair.Response.Orig: %v\n", reqResPair.Response.Orig)
|
||||
func Emit(item *api.OutputChannelItem) {
|
||||
log.Printf("Emit item: %+v\n", item)
|
||||
}
|
||||
|
||||
func (h *tcpStream) clientRun(tcpID *api.TcpID) {
|
||||
|
Loading…
Reference in New Issue
Block a user