mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-22 22:53:04 +00:00
Compare commits
4 Commits
35.0-dev9
...
35.0-dev13
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01af6aa19c | ||
|
|
2bfae1baae | ||
|
|
2df9fb49db | ||
|
|
c1b2cda468 |
@@ -131,6 +131,7 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engin
|
||||
routes.MetadataRoutes(ginApp)
|
||||
routes.StatusRoutes(ginApp)
|
||||
routes.DbRoutes(ginApp)
|
||||
routes.ReplayRoutes(ginApp)
|
||||
|
||||
return ginApp
|
||||
}
|
||||
@@ -155,7 +156,7 @@ func runInTapperMode() {
|
||||
|
||||
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
|
||||
tapOpts := &tap.TapOpts{
|
||||
HostMode: hostMode,
|
||||
HostMode: hostMode,
|
||||
}
|
||||
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
@@ -22,7 +22,16 @@ func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tap
|
||||
if params.EnableFullEntries {
|
||||
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
||||
} else {
|
||||
extension := extensionsMap[entry.Protocol.Name]
|
||||
protocol, ok := protocolsMap[entry.ProtocolId]
|
||||
if !ok {
|
||||
return fmt.Errorf("protocol not found, protocol: %v", protocol)
|
||||
}
|
||||
|
||||
extension, ok := extensionsMap[protocol.Name]
|
||||
if !ok {
|
||||
return fmt.Errorf("extension not found, extension: %v", protocol.Name)
|
||||
}
|
||||
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
message, _ = models.CreateBaseEntryWebSocketMessage(base)
|
||||
}
|
||||
|
||||
@@ -134,7 +134,7 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
|
||||
|
||||
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGeneratorSink)
|
||||
oasGenerator.HandleEntry(mizuEntry)
|
||||
oasGenerator.HandleEntry(mizuEntry, &item.Protocol)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,10 +14,14 @@ import (
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var extensionsMap map[string]*tapApi.Extension // global
|
||||
var (
|
||||
extensionsMap map[string]*tapApi.Extension // global
|
||||
protocolsMap map[string]*tapApi.Protocol //global
|
||||
)
|
||||
|
||||
func InitExtensionsMap(ref map[string]*tapApi.Extension) {
|
||||
extensionsMap = ref
|
||||
func InitMaps(extensions map[string]*tapApi.Extension, protocols map[string]*tapApi.Protocol) {
|
||||
extensionsMap = extensions
|
||||
protocolsMap = protocols
|
||||
}
|
||||
|
||||
type EventHandlers interface {
|
||||
|
||||
@@ -11,8 +11,8 @@ import (
|
||||
"github.com/up9inc/mizu/agent/pkg/api"
|
||||
"github.com/up9inc/mizu/agent/pkg/utils"
|
||||
"github.com/up9inc/mizu/logger"
|
||||
"github.com/up9inc/mizu/tap/dbgctl"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
"github.com/up9inc/mizu/tap/dbgctl"
|
||||
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
|
||||
httpExt "github.com/up9inc/mizu/tap/extensions/http"
|
||||
kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka"
|
||||
@@ -22,11 +22,13 @@ import (
|
||||
var (
|
||||
Extensions []*tapApi.Extension // global
|
||||
ExtensionsMap map[string]*tapApi.Extension // global
|
||||
ProtocolsMap map[string]*tapApi.Protocol //global
|
||||
)
|
||||
|
||||
func LoadExtensions() {
|
||||
Extensions = make([]*tapApi.Extension, 0)
|
||||
ExtensionsMap = make(map[string]*tapApi.Extension)
|
||||
ProtocolsMap = make(map[string]*tapApi.Protocol)
|
||||
|
||||
extensionHttp := &tapApi.Extension{}
|
||||
dissectorHttp := httpExt.NewDissector()
|
||||
@@ -34,6 +36,10 @@ func LoadExtensions() {
|
||||
extensionHttp.Dissector = dissectorHttp
|
||||
Extensions = append(Extensions, extensionHttp)
|
||||
ExtensionsMap[extensionHttp.Protocol.Name] = extensionHttp
|
||||
protocolsHttp := dissectorHttp.GetProtocols()
|
||||
for k, v := range protocolsHttp {
|
||||
ProtocolsMap[k] = v
|
||||
}
|
||||
|
||||
if !dbgctl.MizuTapperDisableNonHttpExtensions {
|
||||
extensionAmqp := &tapApi.Extension{}
|
||||
@@ -42,6 +48,10 @@ func LoadExtensions() {
|
||||
extensionAmqp.Dissector = dissectorAmqp
|
||||
Extensions = append(Extensions, extensionAmqp)
|
||||
ExtensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp
|
||||
protocolsAmqp := dissectorAmqp.GetProtocols()
|
||||
for k, v := range protocolsAmqp {
|
||||
ProtocolsMap[k] = v
|
||||
}
|
||||
|
||||
extensionKafka := &tapApi.Extension{}
|
||||
dissectorKafka := kafkaExt.NewDissector()
|
||||
@@ -49,6 +59,10 @@ func LoadExtensions() {
|
||||
extensionKafka.Dissector = dissectorKafka
|
||||
Extensions = append(Extensions, extensionKafka)
|
||||
ExtensionsMap[extensionKafka.Protocol.Name] = extensionKafka
|
||||
protocolsKafka := dissectorKafka.GetProtocols()
|
||||
for k, v := range protocolsKafka {
|
||||
ProtocolsMap[k] = v
|
||||
}
|
||||
|
||||
extensionRedis := &tapApi.Extension{}
|
||||
dissectorRedis := redisExt.NewDissector()
|
||||
@@ -56,13 +70,17 @@ func LoadExtensions() {
|
||||
extensionRedis.Dissector = dissectorRedis
|
||||
Extensions = append(Extensions, extensionRedis)
|
||||
ExtensionsMap[extensionRedis.Protocol.Name] = extensionRedis
|
||||
protocolsRedis := dissectorRedis.GetProtocols()
|
||||
for k, v := range protocolsRedis {
|
||||
ProtocolsMap[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(Extensions, func(i, j int) bool {
|
||||
return Extensions[i].Protocol.Priority < Extensions[j].Protocol.Priority
|
||||
})
|
||||
|
||||
api.InitExtensionsMap(ExtensionsMap)
|
||||
api.InitMaps(ExtensionsMap, ProtocolsMap)
|
||||
}
|
||||
|
||||
func ConfigureBasenineServer(host string, port string, dbSize int64, logLevel logging.Level, insertionFilter string) {
|
||||
|
||||
34
agent/pkg/controllers/replay_controller.go
Normal file
34
agent/pkg/controllers/replay_controller.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/replay"
|
||||
"github.com/up9inc/mizu/agent/pkg/validation"
|
||||
"github.com/up9inc/mizu/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
replayTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
func ReplayRequest(c *gin.Context) {
|
||||
logger.Log.Debug("Starting replay")
|
||||
replayDetails := &replay.Details{}
|
||||
if err := c.Bind(replayDetails); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Validating replay, %v", replayDetails)
|
||||
if err := validation.Validate(replayDetails); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Log.Debug("Executing replay, %v", replayDetails)
|
||||
result := replay.ExecuteRequest(replayDetails, replayTimeout)
|
||||
c.JSON(http.StatusOK, result)
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package entries
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
@@ -38,11 +39,20 @@ func (e *BasenineEntriesProvider) GetEntries(entriesRequest *models.EntriesReque
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
extension := app.ExtensionsMap[entry.Protocol.Name]
|
||||
protocol, ok := app.ProtocolsMap[entry.ProtocolId]
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("protocol not found, protocol: %v", protocol)
|
||||
}
|
||||
|
||||
extension, ok := app.ExtensionsMap[protocol.Name]
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("extension not found, extension: %v", protocol.Name)
|
||||
}
|
||||
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
|
||||
dataSlice = append(dataSlice, &tapApi.EntryWrapper{
|
||||
Protocol: entry.Protocol,
|
||||
Protocol: *protocol,
|
||||
Data: entry,
|
||||
Base: base,
|
||||
})
|
||||
@@ -68,7 +78,16 @@ func (e *BasenineEntriesProvider) GetEntry(singleEntryRequest *models.SingleEntr
|
||||
return nil, errors.New(string(bytes))
|
||||
}
|
||||
|
||||
extension := app.ExtensionsMap[entry.Protocol.Name]
|
||||
protocol, ok := app.ProtocolsMap[entry.ProtocolId]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("protocol not found, protocol: %v", protocol)
|
||||
}
|
||||
|
||||
extension, ok := app.ExtensionsMap[protocol.Name]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("extension not found, extension: %v", protocol.Name)
|
||||
}
|
||||
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
var representation []byte
|
||||
representation, err = extension.Dissector.Represent(entry.Request, entry.Response)
|
||||
@@ -78,7 +97,7 @@ func (e *BasenineEntriesProvider) GetEntry(singleEntryRequest *models.SingleEntr
|
||||
|
||||
var rules []map[string]interface{}
|
||||
var isRulesEnabled bool
|
||||
if entry.Protocol.Name == "http" {
|
||||
if protocol.Name == "http" {
|
||||
harEntry, _ := har.NewEntry(entry.Request, entry.Response, entry.StartTime, entry.ElapsedTime)
|
||||
_, rulesMatched, _isRulesEnabled := models.RunValidationRulesState(*harEntry, entry.Destination.Name)
|
||||
isRulesEnabled = _isRulesEnabled
|
||||
@@ -89,7 +108,7 @@ func (e *BasenineEntriesProvider) GetEntry(singleEntryRequest *models.SingleEntr
|
||||
}
|
||||
|
||||
return &tapApi.EntryWrapper{
|
||||
Protocol: entry.Protocol,
|
||||
Protocol: *protocol,
|
||||
Representation: string(representation),
|
||||
Data: entry,
|
||||
Base: base,
|
||||
|
||||
@@ -6,9 +6,8 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/har"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
|
||||
"github.com/up9inc/mizu/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -17,7 +16,7 @@ var (
|
||||
)
|
||||
|
||||
type OasGeneratorSink interface {
|
||||
HandleEntry(mizuEntry *api.Entry)
|
||||
HandleEntry(mizuEntry *api.Entry, protocol *api.Protocol)
|
||||
}
|
||||
|
||||
type OasGenerator interface {
|
||||
@@ -59,12 +58,12 @@ func (g *defaultOasGenerator) IsStarted() bool {
|
||||
return g.started
|
||||
}
|
||||
|
||||
func (g *defaultOasGenerator) HandleEntry(mizuEntry *api.Entry) {
|
||||
func (g *defaultOasGenerator) HandleEntry(mizuEntry *api.Entry, protocol *api.Protocol) {
|
||||
if !g.started {
|
||||
return
|
||||
}
|
||||
|
||||
if mizuEntry.Protocol.Name == "http" {
|
||||
if protocol.Name == "http" {
|
||||
dest := mizuEntry.Destination.Name
|
||||
if dest == "" {
|
||||
logger.Log.Debugf("OAS: Unresolved entry %d", mizuEntry.Id)
|
||||
@@ -86,7 +85,7 @@ func (g *defaultOasGenerator) HandleEntry(mizuEntry *api.Entry) {
|
||||
|
||||
g.handleHARWithSource(entryWSource)
|
||||
} else {
|
||||
logger.Log.Debugf("OAS: Unsupported protocol in entry %d: %s", mizuEntry.Id, mizuEntry.Protocol.Name)
|
||||
logger.Log.Debugf("OAS: Unsupported protocol in entry %d: %s", mizuEntry.Id, protocol.Name)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ func GetTappedPodsStatus() []shared.TappedPodStatus {
|
||||
|
||||
func SetNodeToTappedPodMap(nodeToTappedPodsMap shared.NodeToPodsMap) {
|
||||
summary := nodeToTappedPodsMap.Summary()
|
||||
logger.Log.Infof("Setting node to tapped pods map to %v", summary)
|
||||
logger.Log.Debugf("Setting node to tapped pods map to %v", summary)
|
||||
|
||||
nodeHostToTappedPodsMap = nodeToTappedPodsMap
|
||||
}
|
||||
|
||||
167
agent/pkg/replay/replay.go
Normal file
167
agent/pkg/replay/replay.go
Normal file
@@ -0,0 +1,167 @@
|
||||
package replay
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/up9inc/mizu/agent/pkg/app"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
mizuhttp "github.com/up9inc/mizu/tap/extensions/http"
|
||||
)
|
||||
|
||||
var (
|
||||
inProcessRequestsLocker = sync.Mutex{}
|
||||
inProcessRequests = 0
|
||||
)
|
||||
|
||||
const maxParallelAction = 5
|
||||
|
||||
type Details struct {
|
||||
Method string `json:"method"`
|
||||
Url string `json:"url"`
|
||||
Body string `json:"body"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
Success bool `json:"status"`
|
||||
Data interface{} `json:"data"`
|
||||
ErrorMessage string `json:"errorMessage"`
|
||||
}
|
||||
|
||||
func incrementCounter() bool {
|
||||
result := false
|
||||
inProcessRequestsLocker.Lock()
|
||||
if inProcessRequests < maxParallelAction {
|
||||
inProcessRequests++
|
||||
result = true
|
||||
}
|
||||
inProcessRequestsLocker.Unlock()
|
||||
return result
|
||||
}
|
||||
|
||||
func decrementCounter() {
|
||||
inProcessRequestsLocker.Lock()
|
||||
inProcessRequests--
|
||||
inProcessRequestsLocker.Unlock()
|
||||
}
|
||||
|
||||
func getEntryFromRequestResponse(extension *tapApi.Extension, request *http.Request, response *http.Response) *tapApi.Entry {
|
||||
captureTime := time.Now()
|
||||
|
||||
itemTmp := tapApi.OutputChannelItem{
|
||||
Protocol: *extension.Protocol,
|
||||
ConnectionInfo: &tapApi.ConnectionInfo{
|
||||
ClientIP: "",
|
||||
ClientPort: "1",
|
||||
ServerIP: "",
|
||||
ServerPort: "1",
|
||||
IsOutgoing: false,
|
||||
},
|
||||
Capture: "",
|
||||
Timestamp: time.Now().UnixMilli(),
|
||||
Pair: &tapApi.RequestResponsePair{
|
||||
Request: tapApi.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: captureTime,
|
||||
CaptureSize: 0,
|
||||
Payload: &mizuhttp.HTTPPayload{
|
||||
Type: mizuhttp.TypeHttpRequest,
|
||||
Data: request,
|
||||
},
|
||||
},
|
||||
Response: tapApi.GenericMessage{
|
||||
IsRequest: false,
|
||||
CaptureTime: captureTime,
|
||||
CaptureSize: 0,
|
||||
Payload: &mizuhttp.HTTPPayload{
|
||||
Type: mizuhttp.TypeHttpResponse,
|
||||
Data: response,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Analyze is expecting an item that's marshalled and unmarshalled
|
||||
itemMarshalled, err := json.Marshal(itemTmp)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
var finalItem *tapApi.OutputChannelItem
|
||||
if err := json.Unmarshal(itemMarshalled, &finalItem); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return extension.Dissector.Analyze(finalItem, "", "", "")
|
||||
}
|
||||
|
||||
func ExecuteRequest(replayData *Details, timeout time.Duration) *Response {
|
||||
if incrementCounter() {
|
||||
defer decrementCounter()
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(strings.ToUpper(replayData.Method), replayData.Url, bytes.NewBufferString(replayData.Body))
|
||||
if err != nil {
|
||||
return &Response{
|
||||
Success: false,
|
||||
Data: nil,
|
||||
ErrorMessage: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
for headerKey, headerValue := range replayData.Headers {
|
||||
request.Header.Add(headerKey, headerValue)
|
||||
}
|
||||
request.Header.Add("x-mizu", uuid.New().String())
|
||||
response, requestErr := client.Do(request)
|
||||
|
||||
if requestErr != nil {
|
||||
return &Response{
|
||||
Success: false,
|
||||
Data: nil,
|
||||
ErrorMessage: requestErr.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
extension := app.ExtensionsMap["http"] // # TODO: maybe pass the extension to the function so it can be tested
|
||||
entry := getEntryFromRequestResponse(extension, request, response)
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
var representation []byte
|
||||
representation, err = extension.Dissector.Represent(entry.Request, entry.Response)
|
||||
if err != nil {
|
||||
return &Response{
|
||||
Success: false,
|
||||
Data: nil,
|
||||
ErrorMessage: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
return &Response{
|
||||
Success: true,
|
||||
Data: &tapApi.EntryWrapper{
|
||||
Protocol: *extension.Protocol,
|
||||
Representation: string(representation),
|
||||
Data: entry,
|
||||
Base: base,
|
||||
Rules: nil,
|
||||
IsRulesEnabled: false,
|
||||
},
|
||||
ErrorMessage: "",
|
||||
}
|
||||
} else {
|
||||
return &Response{
|
||||
Success: false,
|
||||
Data: nil,
|
||||
ErrorMessage: fmt.Sprintf("reached threshold of %d requests", maxParallelAction),
|
||||
}
|
||||
}
|
||||
}
|
||||
108
agent/pkg/replay/replay_internal_test.go
Normal file
108
agent/pkg/replay/replay_internal_test.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package replay
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"encoding/json"
|
||||
|
||||
"github.com/google/uuid"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
mizuhttp "github.com/up9inc/mizu/tap/extensions/http"
|
||||
)
|
||||
|
||||
func TestValid(t *testing.T) {
|
||||
client := &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
tests := map[string]*Details{
|
||||
"40x": {
|
||||
Method: "GET",
|
||||
Url: "http://httpbin.org/status/404",
|
||||
Body: "",
|
||||
Headers: map[string]string{},
|
||||
},
|
||||
"20x": {
|
||||
Method: "GET",
|
||||
Url: "http://httpbin.org/status/200",
|
||||
Body: "",
|
||||
Headers: map[string]string{},
|
||||
},
|
||||
"50x": {
|
||||
Method: "GET",
|
||||
Url: "http://httpbin.org/status/500",
|
||||
Body: "",
|
||||
Headers: map[string]string{},
|
||||
},
|
||||
// TODO: this should be fixes, currently not working because of header name with ":"
|
||||
//":path-header": {
|
||||
// Method: "GET",
|
||||
// Url: "http://httpbin.org/get",
|
||||
// Body: "",
|
||||
// Headers: map[string]string{
|
||||
// ":path": "/get",
|
||||
// },
|
||||
// },
|
||||
}
|
||||
|
||||
for testCaseName, replayData := range tests {
|
||||
t.Run(fmt.Sprintf("%+v", testCaseName), func(t *testing.T) {
|
||||
request, err := http.NewRequest(strings.ToUpper(replayData.Method), replayData.Url, bytes.NewBufferString(replayData.Body))
|
||||
if err != nil {
|
||||
t.Errorf("Error executing request")
|
||||
}
|
||||
|
||||
for headerKey, headerValue := range replayData.Headers {
|
||||
request.Header.Add(headerKey, headerValue)
|
||||
}
|
||||
request.Header.Add("x-mizu", uuid.New().String())
|
||||
response, requestErr := client.Do(request)
|
||||
|
||||
if requestErr != nil {
|
||||
t.Errorf("failed: %v, ", requestErr)
|
||||
}
|
||||
|
||||
extensionHttp := &tapApi.Extension{}
|
||||
dissectorHttp := mizuhttp.NewDissector()
|
||||
dissectorHttp.Register(extensionHttp)
|
||||
extensionHttp.Dissector = dissectorHttp
|
||||
extension := extensionHttp
|
||||
|
||||
entry := getEntryFromRequestResponse(extension, request, response)
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
|
||||
// Represent is expecting an entry that's marshalled and unmarshalled
|
||||
entryMarshalled, err := json.Marshal(entry)
|
||||
if err != nil {
|
||||
t.Errorf("failed marshaling entry: %v, ", err)
|
||||
}
|
||||
var entryUnmarshalled *tapApi.Entry
|
||||
if err := json.Unmarshal(entryMarshalled, &entryUnmarshalled); err != nil {
|
||||
t.Errorf("failed unmarshaling entry: %v, ", err)
|
||||
}
|
||||
|
||||
var representation []byte
|
||||
representation, err = extension.Dissector.Represent(entryUnmarshalled.Request, entryUnmarshalled.Response)
|
||||
if err != nil {
|
||||
t.Errorf("failed: %v, ", err)
|
||||
}
|
||||
|
||||
result := &tapApi.EntryWrapper{
|
||||
Protocol: *extension.Protocol,
|
||||
Representation: string(representation),
|
||||
Data: entry,
|
||||
Base: base,
|
||||
Rules: nil,
|
||||
IsRulesEnabled: false,
|
||||
}
|
||||
t.Logf("%+v", result)
|
||||
//data, _ := json.MarshalIndent(result, "", " ")
|
||||
//t.Logf("%+v", string(data))
|
||||
})
|
||||
}
|
||||
}
|
||||
13
agent/pkg/routes/replay_routes.go
Normal file
13
agent/pkg/routes/replay_routes.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package routes
|
||||
|
||||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/controllers"
|
||||
)
|
||||
|
||||
// ReplayRoutes defines the group of replay routes.
|
||||
func ReplayRoutes(app *gin.Engine) {
|
||||
routeGroup := app.Group("/replay")
|
||||
|
||||
routeGroup.POST("/", controllers.ReplayRequest)
|
||||
}
|
||||
@@ -57,4 +57,5 @@ func init() {
|
||||
tapCmd.Flags().Bool(configStructs.ServiceMeshName, defaultTapConfig.ServiceMesh, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
|
||||
tapCmd.Flags().Bool(configStructs.TlsName, defaultTapConfig.Tls, "Record tls traffic")
|
||||
tapCmd.Flags().Bool(configStructs.ProfilerName, defaultTapConfig.Profiler, "Run pprof server")
|
||||
tapCmd.Flags().Int(configStructs.MaxLiveStreamsName, defaultTapConfig.MaxLiveStreams, "Maximum live tcp streams to handle concurrently")
|
||||
}
|
||||
|
||||
@@ -176,6 +176,7 @@ func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider
|
||||
MizuServiceAccountExists: state.mizuServiceAccountExists,
|
||||
ServiceMesh: config.Config.Tap.ServiceMesh,
|
||||
Tls: config.Config.Tap.Tls,
|
||||
MaxLiveStreams: config.Config.Tap.MaxLiveStreams,
|
||||
}, startTime)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -27,6 +27,7 @@ const (
|
||||
ServiceMeshName = "service-mesh"
|
||||
TlsName = "tls"
|
||||
ProfilerName = "profiler"
|
||||
MaxLiveStreamsName = "max-live-streams"
|
||||
)
|
||||
|
||||
type TapConfig struct {
|
||||
@@ -47,6 +48,7 @@ type TapConfig struct {
|
||||
ServiceMesh bool `yaml:"service-mesh" default:"false"`
|
||||
Tls bool `yaml:"tls" default:"false"`
|
||||
Profiler bool `yaml:"profiler" default:"false"`
|
||||
MaxLiveStreams int `yaml:"max-live-streams" default:"500"`
|
||||
}
|
||||
|
||||
func (config *TapConfig) PodRegex() *regexp.Regexp {
|
||||
|
||||
@@ -48,6 +48,7 @@ type TapperSyncerConfig struct {
|
||||
MizuServiceAccountExists bool
|
||||
ServiceMesh bool
|
||||
Tls bool
|
||||
MaxLiveStreams int
|
||||
}
|
||||
|
||||
func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig, startTime time.Time) (*MizuTapperSyncer, error) {
|
||||
@@ -337,7 +338,8 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
tapperSyncer.config.MizuApiFilteringOptions,
|
||||
tapperSyncer.config.LogLevel,
|
||||
tapperSyncer.config.ServiceMesh,
|
||||
tapperSyncer.config.Tls); err != nil {
|
||||
tapperSyncer.config.Tls,
|
||||
tapperSyncer.config.MaxLiveStreams); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
"github.com/op/go-logging"
|
||||
"github.com/up9inc/mizu/logger"
|
||||
@@ -382,11 +383,11 @@ func (provider *Provider) GetMizuApiServerPodObject(opts *ApiServerOptions, moun
|
||||
Tolerations: []core.Toleration{
|
||||
{
|
||||
Operator: core.TolerationOpExists,
|
||||
Effect: core.TaintEffectNoExecute,
|
||||
Effect: core.TaintEffectNoExecute,
|
||||
},
|
||||
{
|
||||
Operator: core.TolerationOpExists,
|
||||
Effect: core.TaintEffectNoSchedule,
|
||||
Effect: core.TaintEffectNoSchedule,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -711,7 +712,7 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeNames []string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, serviceMesh bool, tls bool) error {
|
||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeNames []string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, serviceMesh bool, tls bool, maxLiveStreams int) error {
|
||||
logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeNames), namespace, daemonSetName, podImage, tapperPodName)
|
||||
|
||||
if len(nodeNames) == 0 {
|
||||
@@ -729,6 +730,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
"--tap",
|
||||
"--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp),
|
||||
"--nodefrag",
|
||||
"--max-live-streams", strconv.Itoa(maxLiveStreams),
|
||||
}
|
||||
|
||||
if serviceMesh {
|
||||
|
||||
@@ -55,7 +55,6 @@ type WebSocketMessageMetadata struct {
|
||||
MessageType WebSocketMessageType `json:"messageType,omitempty"`
|
||||
}
|
||||
|
||||
|
||||
type WebSocketStatusMessage struct {
|
||||
*WebSocketMessageMetadata
|
||||
TappingStatus []TappedPodStatus `json:"tappingStatus"`
|
||||
|
||||
@@ -91,7 +91,6 @@ type OutputChannelItem struct {
|
||||
Timestamp int64
|
||||
ConnectionInfo *ConnectionInfo
|
||||
Pair *RequestResponsePair
|
||||
Summary *BaseEntry
|
||||
Namespace string
|
||||
}
|
||||
|
||||
@@ -116,6 +115,7 @@ func (p *ReadProgress) Reset() {
|
||||
|
||||
type Dissector interface {
|
||||
Register(*Extension)
|
||||
GetProtocols() map[string]*Protocol
|
||||
Ping()
|
||||
Dissect(b *bufio.Reader, reader TcpReader, options *TrafficFilteringOptions) error
|
||||
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
|
||||
@@ -151,7 +151,7 @@ func (e *Emitting) Emit(item *OutputChannelItem) {
|
||||
|
||||
type Entry struct {
|
||||
Id string `json:"id"`
|
||||
Protocol Protocol `json:"proto"`
|
||||
ProtocolId string `json:"protocol"`
|
||||
Capture Capture `json:"capture"`
|
||||
Source *TCP `json:"src"`
|
||||
Destination *TCP `json:"dst"`
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect11/amqp/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect13/amqp/\* expect
|
||||
|
||||
@@ -26,12 +26,20 @@ var protocol = api.Protocol{
|
||||
Priority: 1,
|
||||
}
|
||||
|
||||
var protocolsMap = map[string]*api.Protocol{
|
||||
fmt.Sprintf("%s/%s/%s", protocol.Name, protocol.Version, protocol.Abbreviation): &protocol,
|
||||
}
|
||||
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = &protocol
|
||||
}
|
||||
|
||||
func (d dissecting) GetProtocols() map[string]*api.Protocol {
|
||||
return protocolsMap
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s", protocol.Name)
|
||||
}
|
||||
@@ -214,8 +222,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
|
||||
reqDetails["method"] = request["method"]
|
||||
return &api.Entry{
|
||||
Protocol: protocol,
|
||||
Capture: item.Capture,
|
||||
ProtocolId: fmt.Sprintf("%s/%s/%s", protocol.Name, protocol.Version, protocol.Abbreviation),
|
||||
Capture: item.Capture,
|
||||
Source: &api.TCP{
|
||||
Name: resolvedSource,
|
||||
IP: item.ConnectionInfo.ClientIP,
|
||||
@@ -277,7 +285,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Protocol: *protocolsMap[entry.ProtocolId],
|
||||
Capture: entry.Capture,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
@@ -322,7 +330,7 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin
|
||||
|
||||
func (d dissecting) Macros() map[string]string {
|
||||
return map[string]string{
|
||||
`amqp`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name),
|
||||
`amqp`: fmt.Sprintf(`protocol == "%s/%s/%s"`, protocol.Name, protocol.Version, protocol.Abbreviation),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ func TestRegister(t *testing.T) {
|
||||
|
||||
func TestMacros(t *testing.T) {
|
||||
expectedMacros := map[string]string{
|
||||
"amqp": `proto.name == "amqp"`,
|
||||
"amqp": `protocol == "amqp/0-9-1/AMQP"`,
|
||||
}
|
||||
dissector := NewDissector()
|
||||
macros := dissector.Macros()
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect12/http/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect13/http/\* expect
|
||||
|
||||
@@ -98,6 +98,15 @@ var graphQL2Protocol = api.Protocol{
|
||||
Priority: 0,
|
||||
}
|
||||
|
||||
var protocolsMap = map[string]*api.Protocol{
|
||||
fmt.Sprintf("%s/%s/%s", http10protocol.Name, http10protocol.Version, http10protocol.Abbreviation): &http10protocol,
|
||||
fmt.Sprintf("%s/%s/%s", http11protocol.Name, http11protocol.Version, http11protocol.Abbreviation): &http11protocol,
|
||||
fmt.Sprintf("%s/%s/%s", http2Protocol.Name, http2Protocol.Version, http2Protocol.Abbreviation): &http2Protocol,
|
||||
fmt.Sprintf("%s/%s/%s", grpcProtocol.Name, grpcProtocol.Version, grpcProtocol.Abbreviation): &grpcProtocol,
|
||||
fmt.Sprintf("%s/%s/%s", graphQL1Protocol.Name, graphQL1Protocol.Version, graphQL1Protocol.Abbreviation): &graphQL1Protocol,
|
||||
fmt.Sprintf("%s/%s/%s", graphQL2Protocol.Name, graphQL2Protocol.Version, graphQL2Protocol.Abbreviation): &graphQL2Protocol,
|
||||
}
|
||||
|
||||
const (
|
||||
TypeHttpRequest = iota
|
||||
TypeHttpResponse
|
||||
@@ -109,6 +118,10 @@ func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = &http11protocol
|
||||
}
|
||||
|
||||
func (d dissecting) GetProtocols() map[string]*api.Protocol {
|
||||
return protocolsMap
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s", http11protocol.Name)
|
||||
}
|
||||
@@ -281,8 +294,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
}
|
||||
|
||||
return &api.Entry{
|
||||
Protocol: item.Protocol,
|
||||
Capture: item.Capture,
|
||||
ProtocolId: fmt.Sprintf("%s/%s/%s", item.Protocol.Name, item.Protocol.Version, item.Protocol.Abbreviation),
|
||||
Capture: item.Capture,
|
||||
Source: &api.TCP{
|
||||
Name: resolvedSource,
|
||||
IP: item.ConnectionInfo.ClientIP,
|
||||
@@ -315,7 +328,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Protocol: *protocolsMap[entry.ProtocolId],
|
||||
Capture: entry.Capture,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
@@ -503,10 +516,10 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin
|
||||
|
||||
func (d dissecting) Macros() map[string]string {
|
||||
return map[string]string{
|
||||
`http`: fmt.Sprintf(`proto.name == "%s" and proto.version.startsWith("%c")`, http11protocol.Name, http11protocol.Version[0]),
|
||||
`http2`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s"`, http11protocol.Name, http2Protocol.Version),
|
||||
`grpc`: fmt.Sprintf(`proto.name == "%s" and proto.version == "%s" and proto.macro == "%s"`, http11protocol.Name, grpcProtocol.Version, grpcProtocol.Macro),
|
||||
`gql`: fmt.Sprintf(`proto.name == "%s" and proto.macro == "%s"`, graphQL1Protocol.Name, graphQL1Protocol.Macro),
|
||||
`http`: fmt.Sprintf(`protocol == "%s/%s/%s" or protocol == "%s/%s/%s"`, http10protocol.Name, http10protocol.Version, http10protocol.Abbreviation, http11protocol.Name, http11protocol.Version, http11protocol.Abbreviation),
|
||||
`http2`: fmt.Sprintf(`protocol == "%s/%s/%s"`, http2Protocol.Name, http2Protocol.Version, http2Protocol.Abbreviation),
|
||||
`grpc`: fmt.Sprintf(`protocol == "%s/%s/%s"`, grpcProtocol.Name, grpcProtocol.Version, grpcProtocol.Abbreviation),
|
||||
`gql`: fmt.Sprintf(`protocol == "%s/%s/%s" or protocol == "%s/%s/%s"`, graphQL1Protocol.Name, graphQL1Protocol.Version, graphQL1Protocol.Abbreviation, graphQL2Protocol.Name, graphQL2Protocol.Version, graphQL2Protocol.Abbreviation),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -44,10 +44,10 @@ func TestRegister(t *testing.T) {
|
||||
|
||||
func TestMacros(t *testing.T) {
|
||||
expectedMacros := map[string]string{
|
||||
"http": `proto.name == "http" and proto.version.startsWith("1")`,
|
||||
"http2": `proto.name == "http" and proto.version == "2.0"`,
|
||||
"grpc": `proto.name == "http" and proto.version == "2.0" and proto.macro == "grpc"`,
|
||||
"gql": `proto.name == "http" and proto.macro == "gql"`,
|
||||
"http": `protocol == "http/1.0/HTTP" or protocol == "http/1.1/HTTP"`,
|
||||
"http2": `protocol == "http/2.0/HTTP/2"`,
|
||||
"grpc": `protocol == "http/2.0/gRPC"`,
|
||||
"gql": `protocol == "http/1.1/GQL" or protocol == "http/2.0/GQL"`,
|
||||
}
|
||||
dissector := NewDissector()
|
||||
macros := dissector.Macros()
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect11/kafka/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect13/kafka/\* expect
|
||||
|
||||
@@ -24,12 +24,20 @@ var _protocol = api.Protocol{
|
||||
Priority: 2,
|
||||
}
|
||||
|
||||
var protocolsMap = map[string]*api.Protocol{
|
||||
fmt.Sprintf("%s/%s/%s", _protocol.Name, _protocol.Version, _protocol.Abbreviation): &_protocol,
|
||||
}
|
||||
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = &_protocol
|
||||
}
|
||||
|
||||
func (d dissecting) GetProtocols() map[string]*api.Protocol {
|
||||
return protocolsMap
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s", _protocol.Name)
|
||||
}
|
||||
@@ -62,8 +70,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
elapsedTime = 0
|
||||
}
|
||||
return &api.Entry{
|
||||
Protocol: _protocol,
|
||||
Capture: item.Capture,
|
||||
ProtocolId: fmt.Sprintf("%s/%s/%s", _protocol.Name, _protocol.Version, _protocol.Abbreviation),
|
||||
Capture: item.Capture,
|
||||
Source: &api.TCP{
|
||||
Name: resolvedSource,
|
||||
IP: item.ConnectionInfo.ClientIP,
|
||||
@@ -187,7 +195,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Protocol: *protocolsMap[entry.ProtocolId],
|
||||
Capture: entry.Capture,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
@@ -243,7 +251,7 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin
|
||||
|
||||
func (d dissecting) Macros() map[string]string {
|
||||
return map[string]string{
|
||||
`kafka`: fmt.Sprintf(`proto.name == "%s"`, _protocol.Name),
|
||||
`kafka`: fmt.Sprintf(`protocol == "%s/%s/%s"`, _protocol.Name, _protocol.Version, _protocol.Abbreviation),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ func TestRegister(t *testing.T) {
|
||||
|
||||
func TestMacros(t *testing.T) {
|
||||
expectedMacros := map[string]string{
|
||||
"kafka": `proto.name == "kafka"`,
|
||||
"kafka": `protocol == "kafka/12/KAFKA"`,
|
||||
}
|
||||
dissector := NewDissector()
|
||||
macros := dissector.Macros()
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect11/redis/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect13/redis/\* expect
|
||||
|
||||
@@ -24,12 +24,20 @@ var protocol = api.Protocol{
|
||||
Priority: 3,
|
||||
}
|
||||
|
||||
var protocolsMap = map[string]*api.Protocol{
|
||||
fmt.Sprintf("%s/%s/%s", protocol.Name, protocol.Version, protocol.Abbreviation): &protocol,
|
||||
}
|
||||
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = &protocol
|
||||
}
|
||||
|
||||
func (d dissecting) GetProtocols() map[string]*api.Protocol {
|
||||
return protocolsMap
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
log.Printf("pong %s", protocol.Name)
|
||||
}
|
||||
@@ -70,8 +78,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
elapsedTime = 0
|
||||
}
|
||||
return &api.Entry{
|
||||
Protocol: protocol,
|
||||
Capture: item.Capture,
|
||||
ProtocolId: fmt.Sprintf("%s/%s/%s", protocol.Name, protocol.Version, protocol.Abbreviation),
|
||||
Capture: item.Capture,
|
||||
Source: &api.TCP{
|
||||
Name: resolvedSource,
|
||||
IP: item.ConnectionInfo.ClientIP,
|
||||
@@ -115,7 +123,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Protocol: *protocolsMap[entry.ProtocolId],
|
||||
Capture: entry.Capture,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
@@ -144,7 +152,7 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin
|
||||
|
||||
func (d dissecting) Macros() map[string]string {
|
||||
return map[string]string{
|
||||
`redis`: fmt.Sprintf(`proto.name == "%s"`, protocol.Name),
|
||||
`redis`: fmt.Sprintf(`protocol == "%s/%s/%s"`, protocol.Name, protocol.Version, protocol.Abbreviation),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ func TestRegister(t *testing.T) {
|
||||
|
||||
func TestMacros(t *testing.T) {
|
||||
expectedMacros := map[string]string{
|
||||
"redis": `proto.name == "redis"`,
|
||||
"redis": `protocol == "redis/3.x/REDIS"`,
|
||||
}
|
||||
dissector := NewDissector()
|
||||
macros := dissector.Macros()
|
||||
|
||||
@@ -85,7 +85,7 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp
|
||||
|
||||
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
|
||||
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
|
||||
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d, opts=%v",
|
||||
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d, opts=%+v",
|
||||
maxBufferedPagesTotal, maxBufferedPagesPerConnection, opts)
|
||||
a.Assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
|
||||
a.Assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
|
||||
|
||||
@@ -193,7 +193,8 @@ func (t *TlsTapper) tapGoPid(procfs string, pid uint32, namespace string) error
|
||||
hooks := goHooks{}
|
||||
|
||||
if err := hooks.installUprobes(&t.bpfObjects, exePath); err != nil {
|
||||
return err
|
||||
logger.Log.Infof("PID skipped not a Go binary or symbol table is stripped (pid: %v) %v", pid, exePath)
|
||||
return nil // hide the error on purpose, its OK for a process to be not a Go binary or stripped Go binary
|
||||
}
|
||||
|
||||
logger.Log.Infof("Tapping TLS (pid: %v) (Go: %v)", pid, exePath)
|
||||
|
||||
Reference in New Issue
Block a user