mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-03 11:35:12 +00:00
Call SetProtocol
in AMQP faster and remove GetProtocol
method (#1097)
* Call `SetProtocol` in AMQP faster and remove `GetProtocol` method * #run_acceptance_tests * Remove the unused fields from the test mocks #run_acceptance_tests
This commit is contained in:
@@ -431,7 +431,6 @@ type TcpReader interface {
|
|||||||
type TcpStream interface {
|
type TcpStream interface {
|
||||||
SetProtocol(protocol *Protocol)
|
SetProtocol(protocol *Protocol)
|
||||||
GetOrigin() Capture
|
GetOrigin() Capture
|
||||||
GetProtocol() *Protocol
|
|
||||||
GetReqResMatchers() []RequestResponseMatcher
|
GetReqResMatchers() []RequestResponseMatcher
|
||||||
GetIsTapTarget() bool
|
GetIsTapTarget() bool
|
||||||
GetIsClosed() bool
|
GetIsClosed() bool
|
||||||
|
@@ -3,7 +3,6 @@ package amqp
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
@@ -75,10 +74,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
var lastMethodFrameMessage Message
|
var lastMethodFrameMessage Message
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &protocol {
|
|
||||||
return errors.New("Identified by another protocol")
|
|
||||||
}
|
|
||||||
|
|
||||||
frame, err := r.ReadFrame()
|
frame, err := r.ReadFrame()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
// We must read until we see an EOF... very important!
|
// We must read until we see an EOF... very important!
|
||||||
@@ -90,6 +85,8 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
// drop
|
// drop
|
||||||
|
|
||||||
case *HeaderFrame:
|
case *HeaderFrame:
|
||||||
|
reader.GetParent().SetProtocol(&protocol)
|
||||||
|
|
||||||
// start content state
|
// start content state
|
||||||
header = f
|
header = f
|
||||||
remaining = int(header.Size)
|
remaining = int(header.Size)
|
||||||
@@ -107,20 +104,22 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
}
|
}
|
||||||
|
|
||||||
case *BodyFrame:
|
case *BodyFrame:
|
||||||
|
reader.GetParent().SetProtocol(&protocol)
|
||||||
|
|
||||||
// continue until terminated
|
// continue until terminated
|
||||||
remaining -= len(f.Body)
|
remaining -= len(f.Body)
|
||||||
switch lastMethodFrameMessage.(type) {
|
switch lastMethodFrameMessage.(type) {
|
||||||
case *BasicPublish:
|
case *BasicPublish:
|
||||||
eventBasicPublish.Body = f.Body
|
eventBasicPublish.Body = f.Body
|
||||||
reader.GetParent().SetProtocol(&protocol)
|
|
||||||
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
||||||
case *BasicDeliver:
|
case *BasicDeliver:
|
||||||
eventBasicDeliver.Body = f.Body
|
eventBasicDeliver.Body = f.Body
|
||||||
reader.GetParent().SetProtocol(&protocol)
|
|
||||||
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
||||||
}
|
}
|
||||||
|
|
||||||
case *MethodFrame:
|
case *MethodFrame:
|
||||||
|
reader.GetParent().SetProtocol(&protocol)
|
||||||
|
|
||||||
lastMethodFrameMessage = f.Method
|
lastMethodFrameMessage = f.Method
|
||||||
switch m := f.Method.(type) {
|
switch m := f.Method.(type) {
|
||||||
case *BasicPublish:
|
case *BasicPublish:
|
||||||
@@ -137,7 +136,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
NoWait: m.NoWait,
|
NoWait: m.NoWait,
|
||||||
Arguments: m.Arguments,
|
Arguments: m.Arguments,
|
||||||
}
|
}
|
||||||
reader.GetParent().SetProtocol(&protocol)
|
|
||||||
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
||||||
|
|
||||||
case *BasicConsume:
|
case *BasicConsume:
|
||||||
@@ -150,7 +148,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
NoWait: m.NoWait,
|
NoWait: m.NoWait,
|
||||||
Arguments: m.Arguments,
|
Arguments: m.Arguments,
|
||||||
}
|
}
|
||||||
reader.GetParent().SetProtocol(&protocol)
|
|
||||||
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
||||||
|
|
||||||
case *BasicDeliver:
|
case *BasicDeliver:
|
||||||
@@ -170,7 +167,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
NoWait: m.NoWait,
|
NoWait: m.NoWait,
|
||||||
Arguments: m.Arguments,
|
Arguments: m.Arguments,
|
||||||
}
|
}
|
||||||
reader.GetParent().SetProtocol(&protocol)
|
|
||||||
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
||||||
|
|
||||||
case *ExchangeDeclare:
|
case *ExchangeDeclare:
|
||||||
@@ -184,7 +180,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
NoWait: m.NoWait,
|
NoWait: m.NoWait,
|
||||||
Arguments: m.Arguments,
|
Arguments: m.Arguments,
|
||||||
}
|
}
|
||||||
reader.GetParent().SetProtocol(&protocol)
|
|
||||||
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
||||||
|
|
||||||
case *ConnectionStart:
|
case *ConnectionStart:
|
||||||
@@ -195,7 +190,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
Mechanisms: m.Mechanisms,
|
Mechanisms: m.Mechanisms,
|
||||||
Locales: m.Locales,
|
Locales: m.Locales,
|
||||||
}
|
}
|
||||||
reader.GetParent().SetProtocol(&protocol)
|
|
||||||
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
||||||
|
|
||||||
case *ConnectionClose:
|
case *ConnectionClose:
|
||||||
@@ -205,7 +199,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
ClassId: m.ClassId,
|
ClassId: m.ClassId,
|
||||||
MethodId: m.MethodId,
|
MethodId: m.MethodId,
|
||||||
}
|
}
|
||||||
reader.GetParent().SetProtocol(&protocol)
|
|
||||||
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -8,7 +8,6 @@ import (
|
|||||||
|
|
||||||
type tcpStream struct {
|
type tcpStream struct {
|
||||||
isClosed bool
|
isClosed bool
|
||||||
protocol *api.Protocol
|
|
||||||
isTapTarget bool
|
isTapTarget bool
|
||||||
origin api.Capture
|
origin api.Capture
|
||||||
reqResMatchers []api.RequestResponseMatcher
|
reqResMatchers []api.RequestResponseMatcher
|
||||||
@@ -27,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture {
|
|||||||
return t.origin
|
return t.origin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetProtocol() *api.Protocol {
|
|
||||||
return t.protocol
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
return t.reqResMatchers
|
return t.reqResMatchers
|
||||||
}
|
}
|
||||||
|
@@ -3,7 +3,6 @@ package http
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
@@ -144,10 +143,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
http2Assembler = createHTTP2Assembler(b)
|
http2Assembler = createHTTP2Assembler(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &http11protocol {
|
|
||||||
return errors.New("Identified by another protocol")
|
|
||||||
}
|
|
||||||
|
|
||||||
if isHTTP2 {
|
if isHTTP2 {
|
||||||
err = handleHTTP2Stream(http2Assembler, reader.GetReadProgress(), reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCaptureTime(), reader.GetEmitter(), options, reqResMatcher)
|
err = handleHTTP2Stream(http2Assembler, reader.GetReadProgress(), reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCaptureTime(), reader.GetEmitter(), options, reqResMatcher)
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
@@ -200,10 +195,6 @@ func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if reader.GetParent().GetProtocol() == nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -8,7 +8,6 @@ import (
|
|||||||
|
|
||||||
type tcpStream struct {
|
type tcpStream struct {
|
||||||
isClosed bool
|
isClosed bool
|
||||||
protocol *api.Protocol
|
|
||||||
isTapTarget bool
|
isTapTarget bool
|
||||||
origin api.Capture
|
origin api.Capture
|
||||||
reqResMatchers []api.RequestResponseMatcher
|
reqResMatchers []api.RequestResponseMatcher
|
||||||
@@ -27,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture {
|
|||||||
return t.origin
|
return t.origin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetProtocol() *api.Protocol {
|
|
||||||
return t.protocol
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
return t.reqResMatchers
|
return t.reqResMatchers
|
||||||
}
|
}
|
||||||
|
@@ -3,7 +3,6 @@ package kafka
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
@@ -38,10 +37,6 @@ func (d dissecting) Ping() {
|
|||||||
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
|
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
|
||||||
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
|
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
|
||||||
for {
|
for {
|
||||||
if reader.GetParent().GetProtocol() != nil && reader.GetParent().GetProtocol() != &_protocol {
|
|
||||||
return errors.New("Identified by another protocol")
|
|
||||||
}
|
|
||||||
|
|
||||||
if reader.GetIsClient() {
|
if reader.GetIsClient() {
|
||||||
_, _, err := ReadRequest(b, reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reqResMatcher)
|
_, _, err := ReadRequest(b, reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reqResMatcher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -8,7 +8,6 @@ import (
|
|||||||
|
|
||||||
type tcpStream struct {
|
type tcpStream struct {
|
||||||
isClosed bool
|
isClosed bool
|
||||||
protocol *api.Protocol
|
|
||||||
isTapTarget bool
|
isTapTarget bool
|
||||||
origin api.Capture
|
origin api.Capture
|
||||||
reqResMatchers []api.RequestResponseMatcher
|
reqResMatchers []api.RequestResponseMatcher
|
||||||
@@ -27,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture {
|
|||||||
return t.origin
|
return t.origin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetProtocol() *api.Protocol {
|
|
||||||
return t.protocol
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
return t.reqResMatchers
|
return t.reqResMatchers
|
||||||
}
|
}
|
||||||
|
@@ -8,7 +8,6 @@ import (
|
|||||||
|
|
||||||
type tcpStream struct {
|
type tcpStream struct {
|
||||||
isClosed bool
|
isClosed bool
|
||||||
protocol *api.Protocol
|
|
||||||
isTapTarget bool
|
isTapTarget bool
|
||||||
origin api.Capture
|
origin api.Capture
|
||||||
reqResMatchers []api.RequestResponseMatcher
|
reqResMatchers []api.RequestResponseMatcher
|
||||||
@@ -27,10 +26,6 @@ func (t *tcpStream) GetOrigin() api.Capture {
|
|||||||
return t.origin
|
return t.origin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetProtocol() *api.Protocol {
|
|
||||||
return t.protocol
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
return t.reqResMatchers
|
return t.reqResMatchers
|
||||||
}
|
}
|
||||||
|
@@ -83,10 +83,6 @@ func (t *tcpStream) GetOrigin() api.Capture {
|
|||||||
return t.origin
|
return t.origin
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tcpStream) GetProtocol() *api.Protocol {
|
|
||||||
return t.protocol
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
|
||||||
return t.reqResMatchers
|
return t.reqResMatchers
|
||||||
}
|
}
|
||||||
|
@@ -11,10 +11,6 @@ func (t *tlsStream) GetOrigin() api.Capture {
|
|||||||
return api.Ebpf
|
return api.Ebpf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *tlsStream) GetProtocol() *api.Protocol {
|
|
||||||
return t.protocol
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *tlsStream) SetProtocol(protocol *api.Protocol) {
|
func (t *tlsStream) SetProtocol(protocol *api.Protocol) {
|
||||||
t.protocol = protocol
|
t.protocol = protocol
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user