mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-03-10 06:32:19 +00:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b52458642 | ||
|
|
858a64687d | ||
|
|
819ccf54cd | ||
|
|
7cc077c8a0 | ||
|
|
fae5f22d25 | ||
|
|
eba7a3b476 | ||
|
|
cf231538f4 | ||
|
|
073b0b72d3 | ||
|
|
c8705822b3 | ||
|
|
d4436d9f15 | ||
|
|
4e0ff74944 |
4
.github/workflows/pr_validation.yml
vendored
4
.github/workflows/pr_validation.yml
vendored
@@ -18,7 +18,7 @@ jobs:
|
||||
- name: Set up Go 1.16
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.16'
|
||||
go-version: '1.16'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
@@ -33,7 +33,7 @@ jobs:
|
||||
- name: Set up Go 1.16
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.16'
|
||||
go-version: '1.16'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -26,3 +26,6 @@ build
|
||||
|
||||
# Environment variables
|
||||
.env
|
||||
|
||||
# pprof
|
||||
pprof/*
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os/exec"
|
||||
"strings"
|
||||
@@ -12,7 +11,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestTapAndFetch(t *testing.T) {
|
||||
func TestTap(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("ignored acceptance test")
|
||||
}
|
||||
@@ -93,37 +92,6 @@ func TestTapAndFetch(t *testing.T) {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
fetchCmdArgs := getDefaultFetchCommandArgs()
|
||||
fetchCmd := exec.Command(cliPath, fetchCmdArgs...)
|
||||
t.Logf("running command: %v", fetchCmd.String())
|
||||
|
||||
if err := fetchCmd.Start(); err != nil {
|
||||
t.Errorf("failed to start fetch command, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
harCheckFunc := func() error {
|
||||
harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har")
|
||||
if readFileErr != nil {
|
||||
return fmt.Errorf("failed to read har file, err: %v", readFileErr)
|
||||
}
|
||||
|
||||
harEntries, err := getEntriesFromHarBytes(harBytes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get entries from har, err: %v", err)
|
||||
}
|
||||
|
||||
if len(harEntries) == 0 {
|
||||
return fmt.Errorf("unexpected har entries result - Expected more than 0 entries")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
if err := retriesExecute(shortRetriesCount, harCheckFunc); err != nil {
|
||||
t.Errorf("%v", err)
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,13 +76,6 @@ func getDefaultTapNamespace() []string {
|
||||
return []string{"-n", "mizu-tests"}
|
||||
}
|
||||
|
||||
func getDefaultFetchCommandArgs() []string {
|
||||
fetchCommand := "fetch"
|
||||
defaultCmdArgs := getDefaultCommandArgs()
|
||||
|
||||
return append([]string{fetchCommand}, defaultCmdArgs...)
|
||||
}
|
||||
|
||||
func getDefaultConfigCommandArgs() []string {
|
||||
configCommand := "config"
|
||||
defaultCmdArgs := getDefaultCommandArgs()
|
||||
@@ -91,10 +84,10 @@ func getDefaultConfigCommandArgs() []string {
|
||||
}
|
||||
|
||||
func retriesExecute(retriesCount int, executeFunc func() error) error {
|
||||
var lastError error
|
||||
var lastError interface{}
|
||||
|
||||
for i := 0; i < retriesCount; i++ {
|
||||
if err := executeFunc(); err != nil {
|
||||
if err := tryExecuteFunc(executeFunc); err != nil {
|
||||
lastError = err
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
@@ -107,6 +100,16 @@ func retriesExecute(retriesCount int, executeFunc func() error) error {
|
||||
return fmt.Errorf("reached max retries count, retries count: %v, last err: %v", retriesCount, lastError)
|
||||
}
|
||||
|
||||
func tryExecuteFunc(executeFunc func() error) (err interface{}) {
|
||||
defer func() {
|
||||
if panicErr := recover(); panicErr != nil {
|
||||
err = panicErr
|
||||
}
|
||||
}()
|
||||
|
||||
return executeFunc()
|
||||
}
|
||||
|
||||
func waitTapPodsReady(apiServerUrl string) error {
|
||||
resolvingUrl := fmt.Sprintf("%v/status/tappersCount", apiServerUrl)
|
||||
tapPodsReadyFunc := func() error {
|
||||
@@ -179,19 +182,6 @@ func cleanupCommand(cmd *exec.Cmd) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getEntriesFromHarBytes(harBytes []byte) ([]interface{}, error) {
|
||||
harInterface, convertErr := jsonBytesToInterface(harBytes)
|
||||
if convertErr != nil {
|
||||
return nil, convertErr
|
||||
}
|
||||
|
||||
har := harInterface.(map[string]interface{})
|
||||
harLog := har["log"].(map[string]interface{})
|
||||
harEntries := harLog["entries"].([]interface{})
|
||||
|
||||
return harEntries, nil
|
||||
}
|
||||
|
||||
func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
|
||||
tapStatus := tapStatusInterface.(map[string]interface{})
|
||||
podsInterface := tapStatus["pods"].([]interface{})
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
# mizu agent
|
||||
Agent for MIZU (API server and tapper)
|
||||
Basic APIs:
|
||||
* /fetch - retrieve traffic data
|
||||
* /stats - retrieve statistics of collected data
|
||||
* /viewer - web ui
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package controllers
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/google/martian/har"
|
||||
"mizuserver/pkg/database"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/providers"
|
||||
@@ -10,11 +11,9 @@ import (
|
||||
"mizuserver/pkg/utils"
|
||||
"mizuserver/pkg/validation"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/martian/har"
|
||||
"github.com/romana/rlog"
|
||||
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
@@ -48,7 +47,7 @@ func GetEntries(c *gin.Context) {
|
||||
Find(&entries)
|
||||
|
||||
if len(entries) > 0 && order == database.OrderDesc {
|
||||
// the entries always order from oldest to newest so we should revers
|
||||
// the entries always order from oldest to newest - we should reverse
|
||||
utils.ReverseSlice(entries)
|
||||
}
|
||||
|
||||
@@ -64,93 +63,6 @@ func GetEntries(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, baseEntries)
|
||||
}
|
||||
|
||||
func GetHARs(c *gin.Context) {
|
||||
entriesFilter := &models.HarFetchRequestQuery{}
|
||||
order := database.OrderDesc
|
||||
if err := c.BindQuery(entriesFilter); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
}
|
||||
err := validation.Validate(entriesFilter)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
}
|
||||
|
||||
var timestampFrom, timestampTo int64
|
||||
|
||||
if entriesFilter.From < 0 {
|
||||
timestampFrom = 0
|
||||
} else {
|
||||
timestampFrom = entriesFilter.From
|
||||
}
|
||||
if entriesFilter.To <= 0 {
|
||||
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
} else {
|
||||
timestampTo = entriesFilter.To
|
||||
}
|
||||
|
||||
var entries []tapApi.MizuEntry
|
||||
database.GetEntriesTable().
|
||||
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
|
||||
Order(fmt.Sprintf("timestamp %s", order)).
|
||||
Find(&entries)
|
||||
|
||||
if len(entries) > 0 {
|
||||
// the entries always order from oldest to newest so we should revers
|
||||
utils.ReverseSlice(entries)
|
||||
}
|
||||
|
||||
harsObject := map[string]*models.ExtendedHAR{}
|
||||
|
||||
for _, entryData := range entries {
|
||||
var harEntry har.Entry
|
||||
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
|
||||
if entryData.ResolvedDestination != "" {
|
||||
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, entryData.ResolvedDestination)
|
||||
}
|
||||
|
||||
var fileName string
|
||||
sourceOfEntry := entryData.ResolvedSource
|
||||
if sourceOfEntry != "" {
|
||||
// naively assumes the proper service source is http
|
||||
sourceOfEntry = fmt.Sprintf("http://%s", sourceOfEntry)
|
||||
//replace / from the file name cause they end up creating a corrupted folder
|
||||
fileName = fmt.Sprintf("%s.har", strings.ReplaceAll(sourceOfEntry, "/", "_"))
|
||||
} else {
|
||||
fileName = "unknown_source.har"
|
||||
}
|
||||
if harOfSource, ok := harsObject[fileName]; ok {
|
||||
harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry)
|
||||
} else {
|
||||
var entriesHar []*har.Entry
|
||||
entriesHar = append(entriesHar, &harEntry)
|
||||
harsObject[fileName] = &models.ExtendedHAR{
|
||||
Log: &models.ExtendedLog{
|
||||
Version: "1.2",
|
||||
Creator: &models.ExtendedCreator{
|
||||
Creator: &har.Creator{
|
||||
Name: "mizu",
|
||||
Version: "0.0.2",
|
||||
},
|
||||
},
|
||||
Entries: entriesHar,
|
||||
},
|
||||
}
|
||||
// leave undefined when no source is present, otherwise modeler assumes source is empty string ""
|
||||
if sourceOfEntry != "" {
|
||||
harsObject[fileName].Log.Creator.Source = &sourceOfEntry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
retObj := map[string][]byte{}
|
||||
for k, v := range harsObject {
|
||||
bytesData, _ := json.Marshal(v)
|
||||
retObj[k] = bytesData
|
||||
}
|
||||
buffer := utils.ZipData(retObj)
|
||||
c.Data(http.StatusOK, "application/octet-stream", buffer.Bytes())
|
||||
}
|
||||
|
||||
func UploadEntries(c *gin.Context) {
|
||||
rlog.Infof("Upload entries - started\n")
|
||||
|
||||
@@ -202,15 +114,21 @@ func GetFullEntries(c *gin.Context) {
|
||||
timestampTo = entriesFilter.To
|
||||
}
|
||||
|
||||
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
|
||||
result := make([]models.FullEntryDetails, 0)
|
||||
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, nil)
|
||||
|
||||
result := make([]har.Entry, 0)
|
||||
for _, data := range entriesArray {
|
||||
harEntry := models.FullEntryDetails{}
|
||||
if err := models.GetEntry(&data, &harEntry); err != nil {
|
||||
var pair tapApi.RequestResponsePair
|
||||
if err := json.Unmarshal([]byte(data.Entry), &pair); err != nil {
|
||||
continue
|
||||
}
|
||||
result = append(result, harEntry)
|
||||
harEntry, err := utils.NewEntry(&pair)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
result = append(result, *harEntry)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, result)
|
||||
}
|
||||
|
||||
@@ -220,27 +138,12 @@ func GetEntry(c *gin.Context) {
|
||||
Where(map[string]string{"entryId": c.Param("entryId")}).
|
||||
First(&entryData)
|
||||
|
||||
fullEntry := models.FullEntryDetails{}
|
||||
if err := models.GetEntry(&entryData, &fullEntry); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, map[string]interface{}{
|
||||
"error": true,
|
||||
"msg": "Can't get entry details",
|
||||
})
|
||||
}
|
||||
|
||||
// FIXME: Fix the part below
|
||||
// fullEntryWithPolicy := models.FullEntryWithPolicy{}
|
||||
// if err := models.GetEntry(&entryData, &fullEntryWithPolicy); err != nil {
|
||||
// c.JSON(http.StatusInternalServerError, map[string]interface{}{
|
||||
// "error": true,
|
||||
// "msg": "Can't get entry details",
|
||||
// })
|
||||
// }
|
||||
extension := extensionsMap[entryData.ProtocolName]
|
||||
protocol, representation, _ := extension.Dissector.Represent(&entryData)
|
||||
protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData)
|
||||
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
|
||||
Protocol: protocol,
|
||||
Representation: string(representation),
|
||||
BodySize: bodySize,
|
||||
Data: entryData,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -57,10 +57,16 @@ func initDataBase(databasePath string) *gorm.DB {
|
||||
return temp
|
||||
}
|
||||
|
||||
func GetEntriesFromDb(timestampFrom int64, timestampTo int64) []tapApi.MizuEntry {
|
||||
func GetEntriesFromDb(timestampFrom int64, timestampTo int64, protocolName *string) []tapApi.MizuEntry {
|
||||
order := OrderDesc
|
||||
protocolNameCondition := "1 = 1"
|
||||
if protocolName != nil {
|
||||
protocolNameCondition = fmt.Sprintf("protocolKey = '%s'", *protocolName)
|
||||
}
|
||||
|
||||
var entries []tapApi.MizuEntry
|
||||
GetEntriesTable().
|
||||
Where(protocolNameCondition).
|
||||
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
|
||||
Order(fmt.Sprintf("timestamp %s", order)).
|
||||
Find(&entries)
|
||||
|
||||
@@ -2,9 +2,7 @@ package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
|
||||
"mizuserver/pkg/rules"
|
||||
"mizuserver/pkg/utils"
|
||||
|
||||
@@ -17,47 +15,14 @@ func GetEntry(r *tapApi.MizuEntry, v tapApi.DataUnmarshaler) error {
|
||||
return v.UnmarshalData(r)
|
||||
}
|
||||
|
||||
func NewApplicableRules(status bool, latency int64, number int) tapApi.ApplicableRules {
|
||||
ar := tapApi.ApplicableRules{}
|
||||
ar.Status = status
|
||||
ar.Latency = latency
|
||||
ar.NumberOfRules = number
|
||||
return ar
|
||||
}
|
||||
|
||||
type FullEntryDetails struct {
|
||||
har.Entry
|
||||
}
|
||||
|
||||
type FullEntryDetailsExtra struct {
|
||||
har.Entry
|
||||
}
|
||||
|
||||
func (fed *FullEntryDetails) UnmarshalData(entry *tapApi.MizuEntry) error {
|
||||
if err := json.Unmarshal([]byte(entry.Entry), &fed.Entry); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if entry.ResolvedDestination != "" {
|
||||
fed.Entry.Request.URL = utils.SetHostname(fed.Entry.Request.URL, entry.ResolvedDestination)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fedex *FullEntryDetailsExtra) UnmarshalData(entry *tapApi.MizuEntry) error {
|
||||
if err := json.Unmarshal([]byte(entry.Entry), &fedex.Entry); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if entry.ResolvedSource != "" {
|
||||
fedex.Entry.Request.Headers = append(fedex.Request.Headers, har.Header{Name: "x-mizu-source", Value: entry.ResolvedSource})
|
||||
}
|
||||
if entry.ResolvedDestination != "" {
|
||||
fedex.Entry.Request.Headers = append(fedex.Request.Headers, har.Header{Name: "x-mizu-destination", Value: entry.ResolvedDestination})
|
||||
fedex.Entry.Request.URL = utils.SetHostname(fedex.Entry.Request.URL, entry.ResolvedDestination)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// TODO: until we fixed the Rules feature
|
||||
//func NewApplicableRules(status bool, latency int64, number int) tapApi.ApplicableRules {
|
||||
// ar := tapApi.ApplicableRules{}
|
||||
// ar.Status = status
|
||||
// ar.Latency = latency
|
||||
// ar.NumberOfRules = number
|
||||
// return ar
|
||||
//}
|
||||
|
||||
type EntriesFilter struct {
|
||||
Limit int `form:"limit" validate:"required,min=1,max=200"`
|
||||
@@ -147,9 +112,15 @@ type FullEntryWithPolicy struct {
|
||||
}
|
||||
|
||||
func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error {
|
||||
if err := json.Unmarshal([]byte(entry.Entry), &fewp.Entry); err != nil {
|
||||
var pair tapApi.RequestResponsePair
|
||||
if err := json.Unmarshal([]byte(entry.Entry), &pair); err != nil {
|
||||
return err
|
||||
}
|
||||
harEntry, err := utils.NewEntry(&pair)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fewp.Entry = *harEntry
|
||||
|
||||
_, resultPolicyToSend := rules.MatchRequestPolicy(fewp.Entry, entry.Service)
|
||||
fewp.RulesMatched = resultPolicyToSend
|
||||
@@ -157,9 +128,10 @@ func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules {
|
||||
numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
|
||||
statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend, numberOfRules)
|
||||
ar := NewApplicableRules(statusPolicyToSend, latency, numberOfRules)
|
||||
return ar
|
||||
}
|
||||
// TODO: until we fixed the Rules feature
|
||||
//func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules {
|
||||
// numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
|
||||
// statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend, numberOfRules)
|
||||
// ar := NewApplicableRules(statusPolicyToSend, latency, numberOfRules)
|
||||
// return ar
|
||||
//}
|
||||
|
||||
@@ -15,8 +15,6 @@ func EntriesRoutes(ginApp *gin.Engine) {
|
||||
routeGroup.GET("/uploadEntries", controllers.UploadEntries)
|
||||
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
|
||||
|
||||
routeGroup.GET("/har", controllers.GetHARs)
|
||||
|
||||
routeGroup.GET("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
|
||||
routeGroup.GET("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB
|
||||
|
||||
|
||||
@@ -5,12 +5,14 @@ import (
|
||||
"compress/zlib"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/google/martian/har"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"mizuserver/pkg/database"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/utils"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
@@ -129,21 +131,33 @@ func UploadEntriesImpl(token string, model string, envPrefix string, sleepInterv
|
||||
for {
|
||||
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
rlog.Infof("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
|
||||
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
|
||||
protocolFilter := "http"
|
||||
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, &protocolFilter)
|
||||
|
||||
if len(entriesArray) > 0 {
|
||||
|
||||
fullEntriesExtra := make([]models.FullEntryDetailsExtra, 0)
|
||||
result := make([]har.Entry, 0)
|
||||
for _, data := range entriesArray {
|
||||
harEntry := models.FullEntryDetailsExtra{}
|
||||
if err := models.GetEntry(&data, &harEntry); err != nil {
|
||||
var pair tapApi.RequestResponsePair
|
||||
if err := json.Unmarshal([]byte(data.Entry), &pair); err != nil {
|
||||
continue
|
||||
}
|
||||
fullEntriesExtra = append(fullEntriesExtra, harEntry)
|
||||
harEntry, err := utils.NewEntry(&pair)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if data.ResolvedSource != "" {
|
||||
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-source", Value: data.ResolvedSource})
|
||||
}
|
||||
if data.ResolvedDestination != "" {
|
||||
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-destination", Value: data.ResolvedDestination})
|
||||
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, data.ResolvedDestination)
|
||||
}
|
||||
result = append(result, *harEntry)
|
||||
}
|
||||
rlog.Infof("About to upload %v entries\n", len(fullEntriesExtra))
|
||||
|
||||
body, jMarshalErr := json.Marshal(fullEntriesExtra)
|
||||
rlog.Infof("About to upload %v entries\n", len(result))
|
||||
|
||||
body, jMarshalErr := json.Marshal(result)
|
||||
if jMarshalErr != nil {
|
||||
analyzeInformation.Reset()
|
||||
rlog.Infof("Stopping analyzing")
|
||||
|
||||
259
agent/pkg/utils/har.go
Normal file
259
agent/pkg/utils/har.go
Normal file
@@ -0,0 +1,259 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/martian/har"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
|
||||
// Keep it because we might want cookies in the future
|
||||
//func BuildCookies(rawCookies []interface{}) []har.Cookie {
|
||||
// cookies := make([]har.Cookie, 0, len(rawCookies))
|
||||
//
|
||||
// for _, cookie := range rawCookies {
|
||||
// c := cookie.(map[string]interface{})
|
||||
// expiresStr := ""
|
||||
// if c["expires"] != nil {
|
||||
// expiresStr = c["expires"].(string)
|
||||
// }
|
||||
// expires, _ := time.Parse(time.RFC3339, expiresStr)
|
||||
// httpOnly := false
|
||||
// if c["httponly"] != nil {
|
||||
// httpOnly, _ = strconv.ParseBool(c["httponly"].(string))
|
||||
// }
|
||||
// secure := false
|
||||
// if c["secure"] != nil {
|
||||
// secure, _ = strconv.ParseBool(c["secure"].(string))
|
||||
// }
|
||||
// path := ""
|
||||
// if c["path"] != nil {
|
||||
// path = c["path"].(string)
|
||||
// }
|
||||
// domain := ""
|
||||
// if c["domain"] != nil {
|
||||
// domain = c["domain"].(string)
|
||||
// }
|
||||
//
|
||||
// cookies = append(cookies, har.Cookie{
|
||||
// Name: c["name"].(string),
|
||||
// Value: c["value"].(string),
|
||||
// Path: path,
|
||||
// Domain: domain,
|
||||
// HTTPOnly: httpOnly,
|
||||
// Secure: secure,
|
||||
// Expires: expires,
|
||||
// Expires8601: expiresStr,
|
||||
// })
|
||||
// }
|
||||
//
|
||||
// return cookies
|
||||
//}
|
||||
|
||||
func BuildHeaders(rawHeaders []interface{}) ([]har.Header, string, string, string, string, string) {
|
||||
var host, scheme, authority, path, status string
|
||||
headers := make([]har.Header, 0, len(rawHeaders))
|
||||
|
||||
for _, header := range rawHeaders {
|
||||
h := header.(map[string]interface{})
|
||||
|
||||
headers = append(headers, har.Header{
|
||||
Name: h["name"].(string),
|
||||
Value: h["value"].(string),
|
||||
})
|
||||
|
||||
if h["name"] == "Host" {
|
||||
host = h["value"].(string)
|
||||
}
|
||||
if h["name"] == ":authority" {
|
||||
authority = h["value"].(string)
|
||||
}
|
||||
if h["name"] == ":scheme" {
|
||||
scheme = h["value"].(string)
|
||||
}
|
||||
if h["name"] == ":path" {
|
||||
path = h["value"].(string)
|
||||
}
|
||||
if h["name"] == ":status" {
|
||||
path = h["value"].(string)
|
||||
}
|
||||
}
|
||||
|
||||
return headers, host, scheme, authority, path, status
|
||||
}
|
||||
|
||||
func BuildPostParams(rawParams []interface{}) []har.Param {
|
||||
params := make([]har.Param, 0, len(rawParams))
|
||||
for _, param := range rawParams {
|
||||
p := param.(map[string]interface{})
|
||||
name := ""
|
||||
if p["name"] != nil {
|
||||
name = p["name"].(string)
|
||||
}
|
||||
value := ""
|
||||
if p["value"] != nil {
|
||||
value = p["value"].(string)
|
||||
}
|
||||
fileName := ""
|
||||
if p["fileName"] != nil {
|
||||
fileName = p["fileName"].(string)
|
||||
}
|
||||
contentType := ""
|
||||
if p["contentType"] != nil {
|
||||
contentType = p["contentType"].(string)
|
||||
}
|
||||
|
||||
params = append(params, har.Param{
|
||||
Name: name,
|
||||
Value: value,
|
||||
Filename: fileName,
|
||||
ContentType: contentType,
|
||||
})
|
||||
}
|
||||
|
||||
return params
|
||||
}
|
||||
|
||||
func NewRequest(request *api.GenericMessage) (harRequest *har.Request, err error) {
|
||||
reqDetails := request.Payload.(map[string]interface{})["details"].(map[string]interface{})
|
||||
|
||||
headers, host, scheme, authority, path, _ := BuildHeaders(reqDetails["headers"].([]interface{}))
|
||||
cookies := make([]har.Cookie, 0) // BuildCookies(reqDetails["cookies"].([]interface{}))
|
||||
|
||||
postData, _ := reqDetails["postData"].(map[string]interface{})
|
||||
mimeType, _ := postData["mimeType"]
|
||||
if mimeType == nil || len(mimeType.(string)) == 0 {
|
||||
mimeType = "text/html"
|
||||
}
|
||||
text, _ := postData["text"]
|
||||
postDataText := ""
|
||||
if text != nil {
|
||||
postDataText = text.(string)
|
||||
}
|
||||
|
||||
queryString := make([]har.QueryString, 0)
|
||||
for _, _qs := range reqDetails["queryString"].([]interface{}) {
|
||||
qs := _qs.(map[string]interface{})
|
||||
queryString = append(queryString, har.QueryString{
|
||||
Name: qs["name"].(string),
|
||||
Value: qs["value"].(string),
|
||||
})
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://%s%s", host, reqDetails["url"].(string))
|
||||
if strings.HasPrefix(mimeType.(string), "application/grpc") {
|
||||
url = fmt.Sprintf("%s://%s%s", scheme, authority, path)
|
||||
}
|
||||
|
||||
harParams := make([]har.Param, 0)
|
||||
if postData["params"] != nil {
|
||||
harParams = BuildPostParams(postData["params"].([]interface{}))
|
||||
}
|
||||
|
||||
harRequest = &har.Request{
|
||||
Method: reqDetails["method"].(string),
|
||||
URL: url,
|
||||
HTTPVersion: reqDetails["httpVersion"].(string),
|
||||
HeadersSize: -1,
|
||||
BodySize: int64(bytes.NewBufferString(postDataText).Len()),
|
||||
QueryString: queryString,
|
||||
Headers: headers,
|
||||
Cookies: cookies,
|
||||
PostData: &har.PostData{
|
||||
MimeType: mimeType.(string),
|
||||
Params: harParams,
|
||||
Text: postDataText,
|
||||
},
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err error) {
|
||||
resDetails := response.Payload.(map[string]interface{})["details"].(map[string]interface{})
|
||||
|
||||
headers, _, _, _, _, _status := BuildHeaders(resDetails["headers"].([]interface{}))
|
||||
cookies := make([]har.Cookie, 0) // BuildCookies(resDetails["cookies"].([]interface{}))
|
||||
|
||||
content, _ := resDetails["content"].(map[string]interface{})
|
||||
mimeType, _ := content["mimeType"]
|
||||
if mimeType == nil || len(mimeType.(string)) == 0 {
|
||||
mimeType = "text/html"
|
||||
}
|
||||
encoding, _ := content["encoding"]
|
||||
text, _ := content["text"]
|
||||
bodyText := ""
|
||||
if text != nil {
|
||||
bodyText = text.(string)
|
||||
}
|
||||
|
||||
harContent := &har.Content{
|
||||
Encoding: encoding.(string),
|
||||
MimeType: mimeType.(string),
|
||||
Text: []byte(bodyText),
|
||||
Size: int64(len(bodyText)),
|
||||
}
|
||||
|
||||
status := int(resDetails["status"].(float64))
|
||||
if strings.HasPrefix(mimeType.(string), "application/grpc") {
|
||||
status, err = strconv.Atoi(_status)
|
||||
if err != nil {
|
||||
tap.SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)", err, err, err)
|
||||
return nil, errors.New("failed converting response status to int for HAR")
|
||||
}
|
||||
}
|
||||
|
||||
harResponse = &har.Response{
|
||||
HTTPVersion: resDetails["httpVersion"].(string),
|
||||
Status: status,
|
||||
StatusText: resDetails["statusText"].(string),
|
||||
HeadersSize: -1,
|
||||
BodySize: int64(bytes.NewBufferString(bodyText).Len()),
|
||||
Headers: headers,
|
||||
Cookies: cookies,
|
||||
Content: harContent,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func NewEntry(pair *api.RequestResponsePair) (*har.Entry, error) {
|
||||
harRequest, err := NewRequest(&pair.Request)
|
||||
if err != nil {
|
||||
tap.SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)", err, err, err)
|
||||
return nil, errors.New("failed converting request to HAR")
|
||||
}
|
||||
|
||||
harResponse, err := NewResponse(&pair.Response)
|
||||
if err != nil {
|
||||
fmt.Printf("err: %+v\n", err)
|
||||
tap.SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)", err, err, err)
|
||||
return nil, errors.New("failed converting response to HAR")
|
||||
}
|
||||
|
||||
totalTime := pair.Response.CaptureTime.Sub(pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
if totalTime < 1 {
|
||||
totalTime = 1
|
||||
}
|
||||
|
||||
harEntry := har.Entry{
|
||||
StartedDateTime: pair.Request.CaptureTime,
|
||||
Time: totalTime,
|
||||
Request: harRequest,
|
||||
Response: harResponse,
|
||||
Cache: &har.Cache{},
|
||||
Timings: &har.Timings{
|
||||
Send: -1,
|
||||
Wait: -1,
|
||||
Receive: totalTime,
|
||||
},
|
||||
}
|
||||
|
||||
return &harEntry, nil
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -131,28 +130,6 @@ func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, er
|
||||
return generalStats, nil
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) GetHars(fromTimestamp int, toTimestamp int) (*zip.Reader, error) {
|
||||
if !provider.isReady {
|
||||
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
|
||||
}
|
||||
resp, err := http.Get(fmt.Sprintf("%s/api/har?from=%v&to=%v", provider.url, fromTimestamp, toTimestamp))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed getting har from api server %w", err)
|
||||
}
|
||||
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed reading hars %w", err)
|
||||
}
|
||||
|
||||
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed craeting zip reader %w", err)
|
||||
}
|
||||
return zipReader, nil
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) GetVersion() (string, error) {
|
||||
if !provider.isReady {
|
||||
|
||||
@@ -3,7 +3,6 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
@@ -63,8 +62,8 @@ func openBrowser(url string) {
|
||||
default:
|
||||
err = fmt.Errorf("unsupported platform")
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("error while opening browser, %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/creasty/defaults"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/config/configStructs"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu/version"
|
||||
"github.com/up9inc/mizu/cli/telemetry"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
)
|
||||
|
||||
var fetchCmd = &cobra.Command{
|
||||
Use: "fetch",
|
||||
Short: "Download recorded traffic to files",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
go telemetry.ReportRun("fetch", config.Config.Fetch)
|
||||
|
||||
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, make sure one running")
|
||||
return nil
|
||||
}
|
||||
|
||||
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
|
||||
return err
|
||||
} else if !isCompatible {
|
||||
return nil
|
||||
}
|
||||
RunMizuFetch()
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(fetchCmd)
|
||||
|
||||
defaultFetchConfig := configStructs.FetchConfig{}
|
||||
defaults.Set(&defaultFetchConfig)
|
||||
|
||||
fetchCmd.Flags().StringP(configStructs.DirectoryFetchName, "d", defaultFetchConfig.Directory, "Provide a custom directory for fetched entries")
|
||||
fetchCmd.Flags().Int(configStructs.FromTimestampFetchName, defaultFetchConfig.FromTimestamp, "Custom start timestamp for fetched entries")
|
||||
fetchCmd.Flags().Int(configStructs.ToTimestampFetchName, defaultFetchConfig.ToTimestamp, "Custom end timestamp fetched entries")
|
||||
fetchCmd.Flags().Uint16P(configStructs.GuiPortFetchName, "p", defaultFetchConfig.GuiPort, "Provide a custom port for the web interface webserver")
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu/fsUtils"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
)
|
||||
|
||||
func RunMizuFetch() {
|
||||
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
|
||||
}
|
||||
|
||||
zipReader, err := apiserver.Provider.GetHars(config.Config.Fetch.FromTimestamp, config.Config.Fetch.ToTimestamp)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed fetch data from API server %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := fsUtils.Unzip(zipReader, config.Config.Fetch.Directory); err != nil {
|
||||
logger.Log.Debugf("[ERROR] failed unzip %v", err)
|
||||
}
|
||||
}
|
||||
@@ -64,7 +64,6 @@ func init() {
|
||||
tapCmd.Flags().StringSliceP(configStructs.PlainTextFilterRegexesTapName, "r", defaultTapConfig.PlainTextFilterRegexes, "List of regex expressions that are used to filter matching values from text/plain http bodies")
|
||||
tapCmd.Flags().Bool(configStructs.DisableRedactionTapName, defaultTapConfig.DisableRedaction, "Disables redaction of potentially sensitive request/response headers and body values")
|
||||
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
|
||||
tapCmd.Flags().String(configStructs.DirectionTapName, defaultTapConfig.Direction, "Record traffic that goes in this direction (relative to the tapped pod): in/any")
|
||||
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
|
||||
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file with policy rules")
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ const (
|
||||
|
||||
type ConfigStruct struct {
|
||||
Tap configStructs.TapConfig `yaml:"tap"`
|
||||
Fetch configStructs.FetchConfig `yaml:"fetch"`
|
||||
Version configStructs.VersionConfig `yaml:"version"`
|
||||
View configStructs.ViewConfig `yaml:"view"`
|
||||
Logs configStructs.LogsConfig `yaml:"logs"`
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
package configStructs
|
||||
|
||||
const (
|
||||
DirectoryFetchName = "directory"
|
||||
FromTimestampFetchName = "from"
|
||||
ToTimestampFetchName = "to"
|
||||
GuiPortFetchName = "gui-port"
|
||||
)
|
||||
|
||||
type FetchConfig struct {
|
||||
Directory string `yaml:"directory" default:"."`
|
||||
FromTimestamp int `yaml:"from" default:"0"`
|
||||
ToTimestamp int `yaml:"to" default:"0"`
|
||||
GuiPort uint16 `yaml:"gui-port" default:"8899"`
|
||||
}
|
||||
@@ -3,10 +3,8 @@ package configStructs
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/up9inc/mizu/shared/units"
|
||||
"regexp"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -17,7 +15,6 @@ const (
|
||||
PlainTextFilterRegexesTapName = "regex-masking"
|
||||
DisableRedactionTapName = "no-redact"
|
||||
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
|
||||
DirectionTapName = "direction"
|
||||
DryRunTapName = "dry-run"
|
||||
EnforcePolicyFile = "test-rules"
|
||||
)
|
||||
@@ -34,7 +31,6 @@ type TapConfig struct {
|
||||
HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"`
|
||||
DisableRedaction bool `yaml:"no-redact" default:"false"`
|
||||
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
|
||||
Direction string `yaml:"direction" default:"in"`
|
||||
DryRun bool `yaml:"dry-run" default:"false"`
|
||||
EnforcePolicyFile string `yaml:"test-rules"`
|
||||
ApiServerResources Resources `yaml:"api-server-resources"`
|
||||
@@ -69,10 +65,5 @@ func (config *TapConfig) Validate() error {
|
||||
return errors.New(fmt.Sprintf("Could not parse --%s value %s", HumanMaxEntriesDBSizeTapName, config.HumanMaxEntriesDBSize))
|
||||
}
|
||||
|
||||
directionLowerCase := strings.ToLower(config.Direction)
|
||||
if directionLowerCase != "any" && directionLowerCase != "in" {
|
||||
return errors.New(fmt.Sprintf("%s is not a valid value for flag --%s. Acceptable values are in/any.", config.Direction, DirectionTapName))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -604,6 +604,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
agentContainer.WithEnv(
|
||||
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
|
||||
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
|
||||
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
|
||||
)
|
||||
agentContainer.WithEnv(
|
||||
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-github/v37/github"
|
||||
@@ -76,7 +77,7 @@ func CheckNewerVersion(versionChan chan string) {
|
||||
logger.Log.Debugf("Finished version validation, github version %v, current version %v, took %v", gitHubVersion, currentSemVer, time.Since(start))
|
||||
|
||||
if gitHubVersionSemVer.GreaterThan(currentSemVer) {
|
||||
versionChan <- fmt.Sprintf("Update available! %v -> %v (%v)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL)
|
||||
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL, runtime.GOOS)
|
||||
} else {
|
||||
versionChan <- ""
|
||||
}
|
||||
|
||||
@@ -8,4 +8,5 @@ const (
|
||||
MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES"
|
||||
RulePolicyPath = "/app/enforce-policy/"
|
||||
RulePolicyFileName = "enforce-policy.yaml"
|
||||
GoGCEnvVar = "GOGC"
|
||||
)
|
||||
|
||||
@@ -21,7 +21,7 @@ type Protocol struct {
|
||||
}
|
||||
|
||||
type Extension struct {
|
||||
Protocol Protocol
|
||||
Protocol *Protocol
|
||||
Path string
|
||||
Plug *plugin.Plugin
|
||||
Dissector Dissector
|
||||
@@ -68,13 +68,22 @@ type OutputChannelItem struct {
|
||||
Pair *RequestResponsePair
|
||||
}
|
||||
|
||||
type SuperTimer struct {
|
||||
CaptureTime time.Time
|
||||
}
|
||||
|
||||
type SuperIdentifier struct {
|
||||
Protocol *Protocol
|
||||
IsClosedOthers bool
|
||||
}
|
||||
|
||||
type Dissector interface {
|
||||
Register(*Extension)
|
||||
Ping()
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, emitter Emitter) error
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter) error
|
||||
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
|
||||
Summarize(entry *MizuEntry) *BaseEntryDetails
|
||||
Represent(entry *MizuEntry) (Protocol, []byte, error)
|
||||
Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)
|
||||
}
|
||||
|
||||
type Emitting struct {
|
||||
@@ -103,6 +112,7 @@ type MizuEntry struct {
|
||||
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
|
||||
Service string `json:"service" gorm:"column:service"`
|
||||
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
|
||||
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
|
||||
Path string `json:"path" gorm:"column:path"`
|
||||
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
|
||||
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
|
||||
@@ -117,6 +127,7 @@ type MizuEntry struct {
|
||||
type MizuEntryWrapper struct {
|
||||
Protocol Protocol `json:"protocol"`
|
||||
Representation string `json:"representation"`
|
||||
BodySize int64 `json:"bodySize"`
|
||||
Data MizuEntry `json:"data"`
|
||||
}
|
||||
|
||||
@@ -126,6 +137,7 @@ type BaseEntryDetails struct {
|
||||
Url string `json:"url,omitempty"`
|
||||
RequestSenderIp string `json:"request_sender_ip,omitempty"`
|
||||
Service string `json:"service,omitempty"`
|
||||
Path string `json:"path,omitempty"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
StatusCode int `json:"status_code"`
|
||||
Method string `json:"method,omitempty"`
|
||||
@@ -163,3 +175,8 @@ func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error {
|
||||
bed.IsOutgoing = entry.IsOutgoing
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
TABLE string = "table"
|
||||
BODY string = "body"
|
||||
)
|
||||
|
||||
@@ -93,10 +93,10 @@ type AMQPWrapper struct {
|
||||
Details interface{} `json:"details"`
|
||||
}
|
||||
|
||||
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
|
||||
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter) {
|
||||
request := &api.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: time.Now(),
|
||||
CaptureTime: captureTime,
|
||||
Payload: AMQPPayload{
|
||||
Data: &AMQPWrapper{
|
||||
Method: method,
|
||||
@@ -107,7 +107,7 @@ func emitAMQP(event interface{}, _type string, method string, connectionInfo *ap
|
||||
}
|
||||
item := &api.OutputChannelItem{
|
||||
Protocol: protocol,
|
||||
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
||||
Timestamp: captureTime.UnixNano() / int64(time.Millisecond),
|
||||
ConnectionInfo: connectionInfo,
|
||||
Pair: &api.RequestResponsePair{
|
||||
Request: *request,
|
||||
@@ -219,7 +219,7 @@ func representProperties(properties map[string]interface{}, rep []interface{}) (
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Properties",
|
||||
"data": string(props),
|
||||
})
|
||||
@@ -249,7 +249,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
@@ -267,7 +267,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
|
||||
}
|
||||
headersMarshaled, _ := json.Marshal(headers)
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Headers",
|
||||
"data": string(headersMarshaled),
|
||||
})
|
||||
@@ -275,7 +275,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
|
||||
|
||||
if event["Body"] != nil {
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "body",
|
||||
"type": api.BODY,
|
||||
"title": "Body",
|
||||
"encoding": "base64",
|
||||
"mime_type": contentType,
|
||||
@@ -326,7 +326,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
@@ -344,7 +344,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
|
||||
}
|
||||
headersMarshaled, _ := json.Marshal(headers)
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Headers",
|
||||
"data": string(headersMarshaled),
|
||||
})
|
||||
@@ -352,7 +352,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
|
||||
|
||||
if event["Body"] != nil {
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "body",
|
||||
"type": api.BODY,
|
||||
"title": "Body",
|
||||
"encoding": "base64",
|
||||
"mime_type": contentType,
|
||||
@@ -393,7 +393,7 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
@@ -408,7 +408,7 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
|
||||
}
|
||||
headersMarshaled, _ := json.Marshal(headers)
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Arguments",
|
||||
"data": string(headersMarshaled),
|
||||
})
|
||||
@@ -451,7 +451,7 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
@@ -466,7 +466,7 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} {
|
||||
}
|
||||
headersMarshaled, _ := json.Marshal(headers)
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Arguments",
|
||||
"data": string(headersMarshaled),
|
||||
})
|
||||
@@ -497,7 +497,7 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
@@ -524,7 +524,7 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
|
||||
}
|
||||
headersMarshaled, _ := json.Marshal(headers)
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Server Properties",
|
||||
"data": string(headersMarshaled),
|
||||
})
|
||||
@@ -555,7 +555,7 @@ func representConnectionClose(event map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
@@ -585,7 +585,7 @@ func representQueueBind(event map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
@@ -600,7 +600,7 @@ func representQueueBind(event map[string]interface{}) []interface{} {
|
||||
}
|
||||
headersMarshaled, _ := json.Marshal(headers)
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Arguments",
|
||||
"data": string(headersMarshaled),
|
||||
})
|
||||
@@ -639,7 +639,7 @@ func representBasicConsume(event map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
@@ -654,7 +654,7 @@ func representBasicConsume(event map[string]interface{}) []interface{} {
|
||||
}
|
||||
headersMarshaled, _ := json.Marshal(headers)
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Arguments",
|
||||
"data": string(headersMarshaled),
|
||||
})
|
||||
|
||||
@@ -32,7 +32,7 @@ func init() {
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = protocol
|
||||
extension.Protocol = &protocol
|
||||
}
|
||||
|
||||
func (d dissecting) Ping() {
|
||||
@@ -41,7 +41,7 @@ func (d dissecting) Ping() {
|
||||
|
||||
const amqpRequest string = "amqp_request"
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
|
||||
r := AmqpReader{b}
|
||||
|
||||
var remaining int
|
||||
@@ -78,13 +78,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
var lastMethodFrameMessage Message
|
||||
|
||||
for {
|
||||
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
|
||||
return errors.New("Identified by another protocol")
|
||||
}
|
||||
|
||||
frame, err := r.ReadFrame()
|
||||
if err == io.EOF {
|
||||
// We must read until we see an EOF... very important!
|
||||
return errors.New("AMQP EOF")
|
||||
} else if err != nil {
|
||||
// TODO: Causes ignoring some methods. Return only in case of a certain error. But what?
|
||||
return err
|
||||
}
|
||||
|
||||
switch f := frame.(type) {
|
||||
@@ -101,6 +102,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
case *BasicDeliver:
|
||||
eventBasicDeliver.Properties = header.Properties
|
||||
default:
|
||||
frame = nil
|
||||
}
|
||||
|
||||
case *BodyFrame:
|
||||
@@ -110,11 +112,15 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
switch lastMethodFrameMessage.(type) {
|
||||
case *BasicPublish:
|
||||
eventBasicPublish.Body = f.Body
|
||||
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, emitter)
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
case *BasicDeliver:
|
||||
eventBasicDeliver.Body = f.Body
|
||||
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, emitter)
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
default:
|
||||
body = nil
|
||||
frame = nil
|
||||
}
|
||||
|
||||
case *MethodFrame:
|
||||
@@ -134,7 +140,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
NoWait: m.NoWait,
|
||||
Arguments: m.Arguments,
|
||||
}
|
||||
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, emitter)
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
case *BasicConsume:
|
||||
eventBasicConsume := &BasicConsume{
|
||||
@@ -146,7 +153,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
NoWait: m.NoWait,
|
||||
Arguments: m.Arguments,
|
||||
}
|
||||
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, emitter)
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
case *BasicDeliver:
|
||||
eventBasicDeliver.ConsumerTag = m.ConsumerTag
|
||||
@@ -165,7 +173,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
NoWait: m.NoWait,
|
||||
Arguments: m.Arguments,
|
||||
}
|
||||
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, emitter)
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
case *ExchangeDeclare:
|
||||
eventExchangeDeclare := &ExchangeDeclare{
|
||||
@@ -178,7 +187,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
NoWait: m.NoWait,
|
||||
Arguments: m.Arguments,
|
||||
}
|
||||
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, emitter)
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
case *ConnectionStart:
|
||||
eventConnectionStart := &ConnectionStart{
|
||||
@@ -188,7 +198,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
Mechanisms: m.Mechanisms,
|
||||
Locales: m.Locales,
|
||||
}
|
||||
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, emitter)
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
case *ConnectionClose:
|
||||
eventConnectionClose := &ConnectionClose{
|
||||
@@ -197,9 +208,11 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
ClassId: m.ClassId,
|
||||
MethodId: m.MethodId,
|
||||
}
|
||||
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, emitter)
|
||||
superIdentifier.Protocol = &protocol
|
||||
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
|
||||
|
||||
default:
|
||||
frame = nil
|
||||
|
||||
}
|
||||
|
||||
@@ -264,6 +277,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: 0,
|
||||
Path: summary,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
@@ -300,7 +314,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
|
||||
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
|
||||
p = protocol
|
||||
bodySize = 0
|
||||
var root map[string]interface{}
|
||||
json.Unmarshal([]byte(entry.Entry), &root)
|
||||
representation := make(map[string]interface{}, 0)
|
||||
@@ -334,8 +350,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
|
||||
break
|
||||
}
|
||||
representation["request"] = repRequest
|
||||
object, err := json.Marshal(representation)
|
||||
return protocol, object, err
|
||||
object, err = json.Marshal(representation)
|
||||
return
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
||||
@@ -54,7 +54,7 @@ func (r *AmqpReader) ReadFrame() (frame frame, err error) {
|
||||
channel := binary.BigEndian.Uint16(scratch[1:3])
|
||||
size := binary.BigEndian.Uint32(scratch[3:7])
|
||||
|
||||
if size > 1000000 {
|
||||
if size > 1000000*16 {
|
||||
return nil, ErrMaxSize
|
||||
}
|
||||
|
||||
@@ -352,6 +352,10 @@ func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame,
|
||||
return
|
||||
}
|
||||
|
||||
if hf.Size > 512 {
|
||||
return nil, ErrMaxHeaderFrameSize
|
||||
}
|
||||
|
||||
var flags uint16
|
||||
|
||||
if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil {
|
||||
@@ -439,6 +443,7 @@ func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, e
|
||||
}
|
||||
|
||||
if _, err = io.ReadFull(r.R, bf.Body); err != nil {
|
||||
bf.Body = nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
@@ -19,32 +18,35 @@ import (
|
||||
// ErrCredentials. The text of the error is likely more interesting than
|
||||
// these constants.
|
||||
const (
|
||||
frameMethod = 1
|
||||
frameHeader = 2
|
||||
frameBody = 3
|
||||
frameHeartbeat = 8
|
||||
frameMinSize = 4096
|
||||
frameEnd = 206
|
||||
replySuccess = 200
|
||||
ContentTooLarge = 311
|
||||
NoRoute = 312
|
||||
NoConsumers = 313
|
||||
ConnectionForced = 320
|
||||
InvalidPath = 402
|
||||
AccessRefused = 403
|
||||
NotFound = 404
|
||||
ResourceLocked = 405
|
||||
PreconditionFailed = 406
|
||||
FrameError = 501
|
||||
SyntaxError = 502
|
||||
CommandInvalid = 503
|
||||
ChannelError = 504
|
||||
UnexpectedFrame = 505
|
||||
ResourceError = 506
|
||||
NotAllowed = 530
|
||||
NotImplemented = 540
|
||||
InternalError = 541
|
||||
MaxSizeError = 551
|
||||
frameMethod = 1
|
||||
frameHeader = 2
|
||||
frameBody = 3
|
||||
frameHeartbeat = 8
|
||||
frameMinSize = 4096
|
||||
frameEnd = 206
|
||||
replySuccess = 200
|
||||
ContentTooLarge = 311
|
||||
NoRoute = 312
|
||||
NoConsumers = 313
|
||||
ConnectionForced = 320
|
||||
InvalidPath = 402
|
||||
AccessRefused = 403
|
||||
NotFound = 404
|
||||
ResourceLocked = 405
|
||||
PreconditionFailed = 406
|
||||
FrameError = 501
|
||||
SyntaxError = 502
|
||||
CommandInvalid = 503
|
||||
ChannelError = 504
|
||||
UnexpectedFrame = 505
|
||||
ResourceError = 506
|
||||
NotAllowed = 530
|
||||
NotImplemented = 540
|
||||
InternalError = 541
|
||||
MaxSizeError = 551
|
||||
MaxHeaderFrameSizeError = 552
|
||||
BadMethodFrameUnknownMethod = 601
|
||||
BadMethodFrameUnknownClass = 602
|
||||
)
|
||||
|
||||
func isSoftExceptionCode(code int) bool {
|
||||
@@ -2854,7 +2856,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 20: // channel
|
||||
@@ -2909,7 +2911,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 40: // exchange
|
||||
@@ -2980,7 +2982,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 50: // queue
|
||||
@@ -3067,7 +3069,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 60: // basic
|
||||
@@ -3218,7 +3220,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 90: // tx
|
||||
@@ -3273,7 +3275,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
case 85: // confirm
|
||||
@@ -3296,11 +3298,11 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
|
||||
mf.Method = method
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownMethod
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Bad method frame, unknown class %d", mf.ClassId)
|
||||
return nil, ErrBadMethodFrameUnknownClass
|
||||
}
|
||||
|
||||
return mf, nil
|
||||
|
||||
@@ -60,8 +60,13 @@ var (
|
||||
// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
|
||||
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
|
||||
|
||||
// ErrClosed is returned when the channel or connection is not open
|
||||
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 1MB"}
|
||||
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 16MB"}
|
||||
|
||||
ErrMaxHeaderFrameSize = &Error{Code: MaxHeaderFrameSizeError, Reason: "an AMQP header cannot be bigger than 512 bytes"}
|
||||
|
||||
ErrBadMethodFrameUnknownMethod = &Error{Code: BadMethodFrameUnknownMethod, Reason: "Bad method frame, unknown method"}
|
||||
|
||||
ErrBadMethodFrameUnknownClass = &Error{Code: BadMethodFrameUnknownClass, Reason: "Bad method frame, unknown class"}
|
||||
)
|
||||
|
||||
// Error captures the code and reason a channel or connection has been closed
|
||||
|
||||
@@ -7,14 +7,13 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter api.Emitter) error {
|
||||
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) error {
|
||||
streamID, messageHTTP1, err := grpcAssembler.readMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -32,7 +31,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
|
||||
tcpID.DstPort,
|
||||
streamID,
|
||||
)
|
||||
item = reqResMatcher.registerRequest(ident, &messageHTTP1, time.Now())
|
||||
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
ClientIP: tcpID.SrcIP,
|
||||
@@ -51,7 +50,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
|
||||
tcpID.SrcPort,
|
||||
streamID,
|
||||
)
|
||||
item = reqResMatcher.registerResponse(ident, &messageHTTP1, time.Now())
|
||||
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
ClientIP: tcpID.DstIP,
|
||||
@@ -71,7 +70,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
|
||||
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
|
||||
req, err := http.ReadRequest(b)
|
||||
if err != nil {
|
||||
// log.Println("Error reading stream:", err)
|
||||
@@ -99,7 +98,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
tcpID.DstPort,
|
||||
counterPair.Request,
|
||||
)
|
||||
item := reqResMatcher.registerRequest(ident, req, time.Now())
|
||||
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
ClientIP: tcpID.SrcIP,
|
||||
@@ -113,7 +112,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
|
||||
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
|
||||
res, err := http.ReadResponse(b, nil)
|
||||
if err != nil {
|
||||
// log.Println("Error reading stream:", err)
|
||||
@@ -149,7 +148,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
tcpID.SrcPort,
|
||||
counterPair.Response,
|
||||
)
|
||||
item := reqResMatcher.registerResponse(ident, res, time.Now())
|
||||
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
ClientIP: tcpID.DstIP,
|
||||
|
||||
@@ -3,10 +3,12 @@ package main
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
|
||||
@@ -51,7 +53,7 @@ func init() {
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = protocol
|
||||
extension.Protocol = &protocol
|
||||
extension.MatcherMap = reqResMatcher.openMessagesMap
|
||||
}
|
||||
|
||||
@@ -59,7 +61,7 @@ func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
|
||||
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
|
||||
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
|
||||
if err != nil {
|
||||
@@ -76,41 +78,46 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
grpcAssembler = createGrpcAssembler(b)
|
||||
}
|
||||
|
||||
success := false
|
||||
dissected := false
|
||||
for {
|
||||
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
|
||||
return errors.New("Identified by another protocol")
|
||||
}
|
||||
|
||||
if isHTTP2 {
|
||||
err = handleHTTP2Stream(grpcAssembler, tcpID, emitter)
|
||||
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
rlog.Debugf("[HTTP/2] stream %s error: %s (%v,%+v)", ident, err, err, err)
|
||||
continue
|
||||
}
|
||||
success = true
|
||||
dissected = true
|
||||
} else if isClient {
|
||||
err = handleHTTP1ClientStream(b, tcpID, counterPair, emitter)
|
||||
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
rlog.Debugf("[HTTP-request] stream %s Request error: %s (%v,%+v)", ident, err, err, err)
|
||||
continue
|
||||
}
|
||||
success = true
|
||||
dissected = true
|
||||
} else {
|
||||
err = handleHTTP1ServerStream(b, tcpID, counterPair, emitter)
|
||||
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
rlog.Debugf("[HTTP-response], stream %s Response error: %s (%v,%+v)", ident, err, err, err)
|
||||
continue
|
||||
}
|
||||
success = true
|
||||
dissected = true
|
||||
}
|
||||
}
|
||||
|
||||
if !success {
|
||||
if !dissected {
|
||||
return err
|
||||
}
|
||||
superIdentifier.Protocol = &protocol
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -161,6 +168,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
||||
} else if resolvedSource != "" {
|
||||
service = SetHostname(service, resolvedSource)
|
||||
}
|
||||
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
entryBytes, _ := json.Marshal(item.Pair)
|
||||
return &api.MizuEntry{
|
||||
ProtocolName: protocol.Name,
|
||||
@@ -173,6 +182,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: elapsedTime,
|
||||
Path: path,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
@@ -197,6 +207,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
Url: entry.Url,
|
||||
RequestSenderIp: entry.RequestSenderIp,
|
||||
Service: entry.Service,
|
||||
Path: entry.Path,
|
||||
Summary: entry.Path,
|
||||
StatusCode: entry.Status,
|
||||
Method: entry.Method,
|
||||
@@ -214,9 +225,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
}
|
||||
}
|
||||
|
||||
func representRequest(request map[string]interface{}) []interface{} {
|
||||
repRequest := make([]interface{}, 0)
|
||||
|
||||
func representRequest(request map[string]interface{}) (repRequest []interface{}) {
|
||||
details, _ := json.Marshal([]map[string]string{
|
||||
{
|
||||
"name": "Method",
|
||||
@@ -232,28 +241,28 @@ func representRequest(request map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
repRequest = append(repRequest, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
|
||||
headers, _ := json.Marshal(request["headers"].([]interface{}))
|
||||
repRequest = append(repRequest, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Headers",
|
||||
"data": string(headers),
|
||||
})
|
||||
|
||||
cookies, _ := json.Marshal(request["cookies"].([]interface{}))
|
||||
repRequest = append(repRequest, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Cookies",
|
||||
"data": string(cookies),
|
||||
})
|
||||
|
||||
queryString, _ := json.Marshal(request["queryString"].([]interface{}))
|
||||
repRequest = append(repRequest, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Query String",
|
||||
"data": string(queryString),
|
||||
})
|
||||
@@ -266,7 +275,7 @@ func representRequest(request map[string]interface{}) []interface{} {
|
||||
text, _ := postData["text"]
|
||||
if text != nil {
|
||||
repRequest = append(repRequest, map[string]string{
|
||||
"type": "body",
|
||||
"type": api.BODY,
|
||||
"title": "POST Data (text/plain)",
|
||||
"encoding": "",
|
||||
"mime_type": mimeType.(string),
|
||||
@@ -285,13 +294,13 @@ func representRequest(request map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
repRequest = append(repRequest, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "POST Data (multipart/form-data)",
|
||||
"data": string(multipart),
|
||||
})
|
||||
} else {
|
||||
repRequest = append(repRequest, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "POST Data (application/x-www-form-urlencoded)",
|
||||
"data": string(params),
|
||||
})
|
||||
@@ -299,11 +308,13 @@ func representRequest(request map[string]interface{}) []interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
return repRequest
|
||||
return
|
||||
}
|
||||
|
||||
func representResponse(response map[string]interface{}) []interface{} {
|
||||
repResponse := make([]interface{}, 0)
|
||||
func representResponse(response map[string]interface{}) (repResponse []interface{}, bodySize int64) {
|
||||
repResponse = make([]interface{}, 0)
|
||||
|
||||
bodySize = int64(response["bodySize"].(float64))
|
||||
|
||||
details, _ := json.Marshal([]map[string]string{
|
||||
{
|
||||
@@ -316,25 +327,25 @@ func representResponse(response map[string]interface{}) []interface{} {
|
||||
},
|
||||
{
|
||||
"name": "Body Size",
|
||||
"value": fmt.Sprintf("%g bytes", response["bodySize"].(float64)),
|
||||
"value": fmt.Sprintf("%d bytes", bodySize),
|
||||
},
|
||||
})
|
||||
repResponse = append(repResponse, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Details",
|
||||
"data": string(details),
|
||||
})
|
||||
|
||||
headers, _ := json.Marshal(response["headers"].([]interface{}))
|
||||
repResponse = append(repResponse, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Headers",
|
||||
"data": string(headers),
|
||||
})
|
||||
|
||||
cookies, _ := json.Marshal(response["cookies"].([]interface{}))
|
||||
repResponse = append(repResponse, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Cookies",
|
||||
"data": string(cookies),
|
||||
})
|
||||
@@ -348,7 +359,7 @@ func representResponse(response map[string]interface{}) []interface{} {
|
||||
text, _ := content["text"]
|
||||
if text != nil {
|
||||
repResponse = append(repResponse, map[string]string{
|
||||
"type": "body",
|
||||
"type": api.BODY,
|
||||
"title": "Body",
|
||||
"encoding": encoding.(string),
|
||||
"mime_type": mimeType.(string),
|
||||
@@ -356,11 +367,10 @@ func representResponse(response map[string]interface{}) []interface{} {
|
||||
})
|
||||
}
|
||||
|
||||
return repResponse
|
||||
return
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
|
||||
var p api.Protocol
|
||||
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
|
||||
if entry.ProtocolVersion == "2.0" {
|
||||
p = http2Protocol
|
||||
} else {
|
||||
@@ -374,11 +384,11 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
resDetails := response["details"].(map[string]interface{})
|
||||
repRequest := representRequest(reqDetails)
|
||||
repResponse := representResponse(resDetails)
|
||||
repResponse, bodySize := representResponse(resDetails)
|
||||
representation["request"] = repRequest
|
||||
representation["response"] = repResponse
|
||||
object, err := json.Marshal(representation)
|
||||
return p, object, err
|
||||
object, err = json.Marshal(representation)
|
||||
return
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
||||
@@ -85,7 +85,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
|
||||
func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.GenericMessage, responseHTTPMessage *api.GenericMessage) *api.OutputChannelItem {
|
||||
return &api.OutputChannelItem{
|
||||
Protocol: protocol,
|
||||
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
||||
Timestamp: requestHTTPMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
|
||||
ConnectionInfo: nil,
|
||||
Pair: &api.RequestResponsePair{
|
||||
Request: *requestHTTPMessage,
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
type KafkaPayload struct {
|
||||
@@ -48,7 +50,7 @@ func representRequestHeader(data map[string]interface{}, rep []interface{}) []in
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Request Header",
|
||||
"data": string(requestHeader),
|
||||
})
|
||||
@@ -64,7 +66,7 @@ func representResponseHeader(data map[string]interface{}, rep []interface{}) []i
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Response Header",
|
||||
"data": string(requestHeader),
|
||||
})
|
||||
@@ -114,7 +116,7 @@ func representMetadataRequest(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -173,7 +175,7 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -206,7 +208,7 @@ func representApiVersionsRequest(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -244,7 +246,7 @@ func representApiVersionsResponse(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -287,7 +289,7 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -317,7 +319,7 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -404,7 +406,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -450,7 +452,7 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -476,7 +478,7 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -506,7 +508,7 @@ func representListOffsetsResponse(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -540,7 +542,7 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -570,7 +572,7 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -609,7 +611,7 @@ func representDeleteTopicsRequest(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
@@ -639,7 +641,7 @@ func representDeleteTopicsResponse(data map[string]interface{}) []interface{} {
|
||||
},
|
||||
})
|
||||
rep = append(rep, map[string]string{
|
||||
"type": "table",
|
||||
"type": api.TABLE,
|
||||
"title": "Payload",
|
||||
"data": string(repPayload),
|
||||
})
|
||||
|
||||
@@ -3,8 +3,10 @@ package main
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
@@ -29,7 +31,7 @@ func init() {
|
||||
type dissecting string
|
||||
|
||||
func (d dissecting) Register(extension *api.Extension) {
|
||||
extension.Protocol = _protocol
|
||||
extension.Protocol = &_protocol
|
||||
extension.MatcherMap = reqResMatcher.openMessagesMap
|
||||
}
|
||||
|
||||
@@ -37,18 +39,24 @@ func (d dissecting) Ping() {
|
||||
log.Printf("pong %s\n", _protocol.Name)
|
||||
}
|
||||
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
|
||||
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
|
||||
for {
|
||||
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
|
||||
return errors.New("Identified by another protocol")
|
||||
}
|
||||
|
||||
if isClient {
|
||||
_, _, err := ReadRequest(b, tcpID)
|
||||
_, _, err := ReadRequest(b, tcpID, superTimer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
superIdentifier.Protocol = &_protocol
|
||||
} else {
|
||||
err := ReadResponse(b, tcpID, emitter)
|
||||
err := ReadResponse(b, tcpID, superTimer, emitter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
superIdentifier.Protocol = &_protocol
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -131,6 +139,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
||||
}
|
||||
|
||||
request["url"] = summary
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
entryBytes, _ := json.Marshal(item.Pair)
|
||||
return &api.MizuEntry{
|
||||
ProtocolName: _protocol.Name,
|
||||
@@ -143,6 +152,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
|
||||
RequestSenderIp: item.ConnectionInfo.ClientIP,
|
||||
Service: service,
|
||||
Timestamp: item.Timestamp,
|
||||
ElapsedTime: elapsedTime,
|
||||
Path: summary,
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
@@ -178,7 +188,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
|
||||
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
|
||||
p = _protocol
|
||||
bodySize = 0
|
||||
var root map[string]interface{}
|
||||
json.Unmarshal([]byte(entry.Entry), &root)
|
||||
representation := make(map[string]interface{}, 0)
|
||||
@@ -224,8 +236,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
|
||||
|
||||
representation["request"] = repRequest
|
||||
representation["response"] = repResponse
|
||||
object, err := json.Marshal(representation)
|
||||
return _protocol, object, err
|
||||
object, err = json.Marshal(representation)
|
||||
return
|
||||
}
|
||||
|
||||
var Dissector dissecting
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
@@ -15,9 +16,10 @@ type Request struct {
|
||||
CorrelationID int32
|
||||
ClientID string
|
||||
Payload interface{}
|
||||
CaptureTime time.Time
|
||||
}
|
||||
|
||||
func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16, err error) {
|
||||
func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
|
||||
d := &decoder{reader: r, remain: 4}
|
||||
size := d.readInt32()
|
||||
|
||||
@@ -213,6 +215,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
|
||||
ApiVersion: apiVersion,
|
||||
CorrelationID: correlationID,
|
||||
ClientID: clientID,
|
||||
CaptureTime: superTimer.CaptureTime,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
|
||||
@@ -13,9 +13,10 @@ type Response struct {
|
||||
Size int32
|
||||
CorrelationID int32
|
||||
Payload interface{}
|
||||
CaptureTime time.Time
|
||||
}
|
||||
|
||||
func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error) {
|
||||
func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
|
||||
d := &decoder{reader: r, remain: 4}
|
||||
size := d.readInt32()
|
||||
|
||||
@@ -39,6 +40,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
|
||||
Size: size,
|
||||
CorrelationID: correlationID,
|
||||
Payload: payload,
|
||||
CaptureTime: superTimer.CaptureTime,
|
||||
}
|
||||
|
||||
key := fmt.Sprintf(
|
||||
@@ -258,12 +260,12 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
|
||||
|
||||
item := &api.OutputChannelItem{
|
||||
Protocol: _protocol,
|
||||
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
||||
Timestamp: reqResPair.Request.CaptureTime.UnixNano() / int64(time.Millisecond),
|
||||
ConnectionInfo: connectionInfo,
|
||||
Pair: &api.RequestResponsePair{
|
||||
Request: api.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: time.Now(),
|
||||
CaptureTime: reqResPair.Request.CaptureTime,
|
||||
Payload: KafkaPayload{
|
||||
Data: &KafkaWrapper{
|
||||
Method: apiNames[apiKey],
|
||||
@@ -274,7 +276,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
|
||||
},
|
||||
Response: api.GenericMessage{
|
||||
IsRequest: false,
|
||||
CaptureTime: time.Now(),
|
||||
CaptureTime: reqResPair.Response.CaptureTime,
|
||||
Payload: KafkaPayload{
|
||||
Data: &KafkaWrapper{
|
||||
Method: apiNames[apiKey],
|
||||
|
||||
@@ -13,11 +13,13 @@ import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -94,6 +96,8 @@ var ownIps []string // global
|
||||
var hostMode bool // global
|
||||
var extensions []*api.Extension // global
|
||||
|
||||
const baseStreamChannelTimeoutMs int = 5000 * 100
|
||||
|
||||
/* minOutputLevel: Error will be printed only if outputLevel is above this value
|
||||
* t: key for errorsMap (counting errors)
|
||||
* s, a: arguments log.Printf
|
||||
@@ -168,11 +172,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
|
||||
}
|
||||
|
||||
func startMemoryProfiler() {
|
||||
dirname := "/app/pprof"
|
||||
rlog.Info("Profiling is on, results will be written to %s", dirname)
|
||||
dumpPath := "/app/pprof"
|
||||
envDumpPath := os.Getenv(MemoryProfilingDumpPath)
|
||||
if envDumpPath != "" {
|
||||
dumpPath = envDumpPath
|
||||
}
|
||||
timeInterval := 60
|
||||
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
|
||||
if envTimeInterval != "" {
|
||||
if i, err := strconv.Atoi(envTimeInterval); err == nil {
|
||||
timeInterval = i
|
||||
}
|
||||
}
|
||||
|
||||
rlog.Info("Profiling is on, results will be written to %s", dumpPath)
|
||||
go func() {
|
||||
if _, err := os.Stat(dirname); os.IsNotExist(err) {
|
||||
if err := os.Mkdir(dirname, 0777); err != nil {
|
||||
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
|
||||
if err := os.Mkdir(dumpPath, 0777); err != nil {
|
||||
log.Fatal("could not create directory for profile: ", err)
|
||||
}
|
||||
}
|
||||
@@ -180,9 +196,9 @@ func startMemoryProfiler() {
|
||||
for true {
|
||||
t := time.Now()
|
||||
|
||||
filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05"))
|
||||
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
|
||||
|
||||
rlog.Info("Writing memory profile to %s\n", filename)
|
||||
rlog.Infof("Writing memory profile to %s\n", filename)
|
||||
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
@@ -193,13 +209,50 @@ func startMemoryProfiler() {
|
||||
log.Fatal("could not write memory profile: ", err)
|
||||
}
|
||||
_ = f.Close()
|
||||
time.Sleep(time.Minute)
|
||||
time.Sleep(time.Second * time.Duration(timeInterval))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func closeTimedoutTcpStreamChannels() {
|
||||
maxNumberOfGoroutines = GetMaxNumberOfGoroutines()
|
||||
TcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs()
|
||||
for {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
streams.Range(func(key interface{}, value interface{}) bool {
|
||||
streamWrapper := value.(*tcpStreamWrapper)
|
||||
stream := streamWrapper.stream
|
||||
if stream.superIdentifier.Protocol == nil {
|
||||
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
|
||||
stream.Close()
|
||||
statsTracker.incDroppedTcpStreams()
|
||||
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
|
||||
}
|
||||
} else {
|
||||
if !stream.superIdentifier.IsClosedOthers {
|
||||
for i := range stream.clients {
|
||||
reader := &stream.clients[i]
|
||||
if reader.extension.Protocol != stream.superIdentifier.Protocol {
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
for i := range stream.servers {
|
||||
reader := &stream.servers[i]
|
||||
if reader.extension.Protocol != stream.superIdentifier.Protocol {
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
stream.superIdentifier.IsClosedOthers = true
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
|
||||
go closeTimedoutTcpStreamChannels()
|
||||
|
||||
defer util.Run()()
|
||||
if *debug {
|
||||
@@ -354,7 +407,14 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
startMemoryProfiler()
|
||||
}
|
||||
|
||||
for packet := range source.Packets() {
|
||||
for {
|
||||
packet, err := source.NextPacket()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
rlog.Debugf("Error:", err)
|
||||
continue
|
||||
}
|
||||
packetsCount := statsTracker.incPacketsCount()
|
||||
rlog.Debugf("PACKET #%d", packetsCount)
|
||||
data := packet.Data()
|
||||
|
||||
@@ -3,14 +3,21 @@ package tap
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED"
|
||||
MemoryProfilingDumpPath = "MEMORY_PROFILING_DUMP_PATH"
|
||||
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
|
||||
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
|
||||
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
|
||||
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
|
||||
MaxNumberOfGoroutinesEnvVarName = "MAX_NUMBER_OF_GOROUTINES"
|
||||
MaxBufferedPagesTotalDefaultValue = 5000
|
||||
MaxBufferedPagesPerConnectionDefaultValue = 5000
|
||||
TcpStreamChannelTimeoutMsDefaultValue = 5000
|
||||
MaxNumberOfGoroutinesDefaultValue = 4000
|
||||
)
|
||||
|
||||
type globalSettings struct {
|
||||
@@ -47,6 +54,22 @@ func GetMaxBufferedPagesPerConnection() int {
|
||||
return valueFromEnv
|
||||
}
|
||||
|
||||
func GetTcpChannelTimeoutMs() time.Duration {
|
||||
valueFromEnv, err := strconv.Atoi(os.Getenv(TcpStreamChannelTimeoutMsEnvVarName))
|
||||
if err != nil {
|
||||
return TcpStreamChannelTimeoutMsDefaultValue * time.Millisecond
|
||||
}
|
||||
return time.Duration(valueFromEnv) * time.Millisecond
|
||||
}
|
||||
|
||||
func GetMaxNumberOfGoroutines() int {
|
||||
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxNumberOfGoroutinesEnvVarName))
|
||||
if err != nil {
|
||||
return MaxNumberOfGoroutinesDefaultValue
|
||||
}
|
||||
return valueFromEnv
|
||||
}
|
||||
|
||||
func GetMemoryProfilingEnabled() bool {
|
||||
return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1"
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ type AppStats struct {
|
||||
ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
|
||||
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
|
||||
MatchedPairs int64 `json:"matchedPairs"`
|
||||
DroppedTcpStreams int64 `json:"droppedTcpStreams"`
|
||||
}
|
||||
|
||||
type StatsTracker struct {
|
||||
@@ -23,6 +24,7 @@ type StatsTracker struct {
|
||||
reassembledTcpPayloadsCountMutex sync.Mutex
|
||||
tlsConnectionsCountMutex sync.Mutex
|
||||
matchedPairsMutex sync.Mutex
|
||||
droppedTcpStreamsMutex sync.Mutex
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incMatchedPairs() {
|
||||
@@ -31,6 +33,12 @@ func (st *StatsTracker) incMatchedPairs() {
|
||||
st.matchedPairsMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incDroppedTcpStreams() {
|
||||
st.droppedTcpStreamsMutex.Lock()
|
||||
st.appStats.DroppedTcpStreams++
|
||||
st.droppedTcpStreamsMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incPacketsCount() int64 {
|
||||
st.packetsCountMutex.Lock()
|
||||
st.appStats.PacketsCount++
|
||||
@@ -100,5 +108,10 @@ func (st *StatsTracker) dumpStats() *AppStats {
|
||||
st.appStats.MatchedPairs = 0
|
||||
st.matchedPairsMutex.Unlock()
|
||||
|
||||
st.droppedTcpStreamsMutex.Lock()
|
||||
currentAppStats.DroppedTcpStreams = st.appStats.DroppedTcpStreams
|
||||
st.appStats.DroppedTcpStreams = 0
|
||||
st.droppedTcpStreamsMutex.Unlock()
|
||||
|
||||
return currentAppStats
|
||||
}
|
||||
|
||||
@@ -47,11 +47,12 @@ func (tid *tcpID) String() string {
|
||||
type tcpReader struct {
|
||||
ident string
|
||||
tcpID *api.TcpID
|
||||
isClosed bool
|
||||
isClient bool
|
||||
isOutgoing bool
|
||||
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||
data []byte
|
||||
captureTime time.Time
|
||||
superTimer *api.SuperTimer
|
||||
parent *tcpStream
|
||||
messageCount uint
|
||||
packetsSeen uint
|
||||
@@ -59,6 +60,7 @@ type tcpReader struct {
|
||||
extension *api.Extension
|
||||
emitter api.Emitter
|
||||
counterPair *api.CounterPair
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func (h *tcpReader) Read(p []byte) (int, error) {
|
||||
@@ -69,7 +71,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
|
||||
msg, ok = <-h.msgQueue
|
||||
h.data = msg.bytes
|
||||
|
||||
h.captureTime = msg.timestamp
|
||||
h.superTimer.CaptureTime = msg.timestamp
|
||||
if len(h.data) > 0 {
|
||||
h.packetsSeen += 1
|
||||
}
|
||||
@@ -93,10 +95,19 @@ func (h *tcpReader) Read(p []byte) (int, error) {
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func (h *tcpReader) Close() {
|
||||
h.Lock()
|
||||
if !h.isClosed {
|
||||
h.isClosed = true
|
||||
close(h.msgQueue)
|
||||
}
|
||||
h.Unlock()
|
||||
}
|
||||
|
||||
func (h *tcpReader) run(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
b := bufio.NewReader(h)
|
||||
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.emitter)
|
||||
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter)
|
||||
if err != nil {
|
||||
io.Copy(ioutil.Discard, b)
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
||||
"github.com/google/gopacket/reassembly"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
/* It's a connection (bidirectional)
|
||||
@@ -16,16 +17,19 @@ import (
|
||||
* In our implementation, we pass information from ReassembledSG to the tcpReader through a shared channel.
|
||||
*/
|
||||
type tcpStream struct {
|
||||
tcpstate *reassembly.TCPSimpleFSM
|
||||
fsmerr bool
|
||||
optchecker reassembly.TCPOptionCheck
|
||||
net, transport gopacket.Flow
|
||||
isDNS bool
|
||||
isTapTarget bool
|
||||
clients []tcpReader
|
||||
servers []tcpReader
|
||||
urls []string
|
||||
ident string
|
||||
id int64
|
||||
isClosed bool
|
||||
superIdentifier *api.SuperIdentifier
|
||||
tcpstate *reassembly.TCPSimpleFSM
|
||||
fsmerr bool
|
||||
optchecker reassembly.TCPOptionCheck
|
||||
net, transport gopacket.Flow
|
||||
isDNS bool
|
||||
isTapTarget bool
|
||||
clients []tcpReader
|
||||
servers []tcpReader
|
||||
urls []string
|
||||
ident string
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
@@ -144,13 +148,24 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
||||
// This is where we pass the reassembled information onwards
|
||||
// This channel is read by an tcpReader object
|
||||
statsTracker.incReassembledTcpPayloadsCount()
|
||||
timestamp := ac.GetCaptureInfo().Timestamp
|
||||
if dir == reassembly.TCPDirClientToServer {
|
||||
for _, reader := range t.clients {
|
||||
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
|
||||
for i := range t.clients {
|
||||
reader := &t.clients[i]
|
||||
reader.Lock()
|
||||
if !reader.isClosed {
|
||||
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
|
||||
}
|
||||
reader.Unlock()
|
||||
}
|
||||
} else {
|
||||
for _, reader := range t.servers {
|
||||
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
|
||||
for i := range t.servers {
|
||||
reader := &t.servers[i]
|
||||
reader.Lock()
|
||||
if !reader.isClosed {
|
||||
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
|
||||
}
|
||||
reader.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -159,14 +174,33 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
||||
|
||||
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
|
||||
Trace("%s: Connection closed", t.ident)
|
||||
if t.isTapTarget {
|
||||
for _, reader := range t.clients {
|
||||
close(reader.msgQueue)
|
||||
}
|
||||
for _, reader := range t.servers {
|
||||
close(reader.msgQueue)
|
||||
}
|
||||
if t.isTapTarget && !t.isClosed {
|
||||
t.Close()
|
||||
}
|
||||
// do not remove the connection to allow last ACK
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *tcpStream) Close() {
|
||||
shouldReturn := false
|
||||
t.Lock()
|
||||
if t.isClosed {
|
||||
shouldReturn = true
|
||||
} else {
|
||||
t.isClosed = true
|
||||
}
|
||||
t.Unlock()
|
||||
if shouldReturn {
|
||||
return
|
||||
}
|
||||
streams.Delete(t.id)
|
||||
|
||||
for i := range t.clients {
|
||||
reader := &t.clients[i]
|
||||
reader.Close()
|
||||
}
|
||||
for i := range t.servers {
|
||||
reader := &t.servers[i]
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,9 @@ package tap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
@@ -23,6 +25,16 @@ type tcpStreamFactory struct {
|
||||
Emitter api.Emitter
|
||||
}
|
||||
|
||||
type tcpStreamWrapper struct {
|
||||
stream *tcpStream
|
||||
createdAt time.Time
|
||||
}
|
||||
|
||||
var streams *sync.Map = &sync.Map{} // global
|
||||
var streamId int64 = 0
|
||||
|
||||
var maxNumberOfGoroutines int
|
||||
|
||||
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
|
||||
rlog.Debugf("* NEW: %s %s", net, transport)
|
||||
fsmOptions := reassembly.TCPSimpleFSMOptions{
|
||||
@@ -39,23 +51,32 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
props := factory.getStreamProps(srcIp, srcPort, dstIp, dstPort)
|
||||
isTapTarget := props.isTapTarget
|
||||
stream := &tcpStream{
|
||||
net: net,
|
||||
transport: transport,
|
||||
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
|
||||
isTapTarget: isTapTarget,
|
||||
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
|
||||
ident: fmt.Sprintf("%s:%s", net, transport),
|
||||
optchecker: reassembly.NewTCPOptionCheck(),
|
||||
net: net,
|
||||
transport: transport,
|
||||
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
|
||||
isTapTarget: isTapTarget,
|
||||
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
|
||||
ident: fmt.Sprintf("%s:%s", net, transport),
|
||||
optchecker: reassembly.NewTCPOptionCheck(),
|
||||
superIdentifier: &api.SuperIdentifier{},
|
||||
}
|
||||
if stream.isTapTarget {
|
||||
if runtime.NumGoroutine() > maxNumberOfGoroutines {
|
||||
statsTracker.incDroppedTcpStreams()
|
||||
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine())
|
||||
return stream
|
||||
}
|
||||
streamId++
|
||||
stream.id = streamId
|
||||
for i, extension := range extensions {
|
||||
counterPair := &api.CounterPair{
|
||||
Request: 0,
|
||||
Response: 0,
|
||||
}
|
||||
stream.clients = append(stream.clients, tcpReader{
|
||||
msgQueue: make(chan tcpReaderDataMsg),
|
||||
ident: fmt.Sprintf("%s %s", net, transport),
|
||||
msgQueue: make(chan tcpReaderDataMsg),
|
||||
superTimer: &api.SuperTimer{},
|
||||
ident: fmt.Sprintf("%s %s", net, transport),
|
||||
tcpID: &api.TcpID{
|
||||
SrcIP: srcIp,
|
||||
DstIP: dstIp,
|
||||
@@ -71,8 +92,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
counterPair: counterPair,
|
||||
})
|
||||
stream.servers = append(stream.servers, tcpReader{
|
||||
msgQueue: make(chan tcpReaderDataMsg),
|
||||
ident: fmt.Sprintf("%s %s", net, transport),
|
||||
msgQueue: make(chan tcpReaderDataMsg),
|
||||
superTimer: &api.SuperTimer{},
|
||||
ident: fmt.Sprintf("%s %s", net, transport),
|
||||
tcpID: &api.TcpID{
|
||||
SrcIP: net.Dst().String(),
|
||||
DstIP: net.Src().String(),
|
||||
@@ -87,6 +109,12 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
emitter: factory.Emitter,
|
||||
counterPair: counterPair,
|
||||
})
|
||||
|
||||
streams.Store(stream.id, &tcpStreamWrapper{
|
||||
stream: stream,
|
||||
createdAt: time.Now(),
|
||||
})
|
||||
|
||||
factory.wg.Add(2)
|
||||
// Start reading from channel stream.reader.bytes
|
||||
go stream.clients[i].run(&factory.wg)
|
||||
@@ -117,7 +145,7 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, ds
|
||||
}
|
||||
return &streamProps{isTapTarget: false, isOutgoing: false}
|
||||
} else {
|
||||
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%s", srcIP, dstIP, dstPort))
|
||||
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort))
|
||||
return &streamProps{isTapTarget: true}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,8 +54,8 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
|
||||
const filterEntries = useCallback((entry) => {
|
||||
if(methodsFilter.length > 0 && !methodsFilter.includes(entry.method.toLowerCase())) return;
|
||||
if(pathFilter && entry.path?.toLowerCase()?.indexOf(pathFilter) === -1) return;
|
||||
if(statusFilter.includes(StatusType.SUCCESS) && entry.statusCode >= 400) return;
|
||||
if(statusFilter.includes(StatusType.ERROR) && entry.statusCode < 400) return;
|
||||
if(statusFilter.includes(StatusType.SUCCESS) && entry.status_code >= 400) return;
|
||||
if(statusFilter.includes(StatusType.ERROR) && entry.status_code < 400) return;
|
||||
return entry;
|
||||
},[methodsFilter, pathFilter, statusFilter])
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ interface EntryDetailedProps {
|
||||
|
||||
export const formatSize = (n: number) => n > 1000 ? `${Math.round(n / 1000)}KB` : `${n} B`;
|
||||
|
||||
const EntryTitle: React.FC<any> = ({protocol, data}) => {
|
||||
const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
|
||||
const classes = useStyles();
|
||||
const {response} = JSON.parse(data.entry);
|
||||
|
||||
@@ -40,7 +40,8 @@ const EntryTitle: React.FC<any> = ({protocol, data}) => {
|
||||
return <div className={classes.entryTitle}>
|
||||
<Protocol protocol={protocol} horizontal={true}/>
|
||||
<div style={{right: "30px", position: "absolute", display: "flex"}}>
|
||||
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(response.payload.bodySize)}</div>}
|
||||
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(bodySize)}</div>}
|
||||
<div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>
|
||||
<div style={{opacity: 0.5}}>{'rulesMatched' in data ? data.rulesMatched?.length : '0'} Rules Applied</div>
|
||||
</div>
|
||||
</div>;
|
||||
@@ -63,7 +64,12 @@ const EntrySummary: React.FC<any> = ({data}) => {
|
||||
|
||||
export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData}) => {
|
||||
return <>
|
||||
<EntryTitle protocol={entryData.protocol} data={entryData.data}/>
|
||||
<EntryTitle
|
||||
protocol={entryData.protocol}
|
||||
data={entryData.data}
|
||||
bodySize={entryData.bodySize}
|
||||
elapsedTime={entryData.data.elapsedTime}
|
||||
/>
|
||||
{entryData.data && <EntrySummary data={entryData.data}/>}
|
||||
<>
|
||||
{entryData.data && <EntryViewer representation={entryData.representation} color={entryData.protocol.background_color}/>}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
.Entry
|
||||
font-family: "Source Sans Pro", Lucida Grande, Tahoma, sans-serif
|
||||
height: 100%
|
||||
height: calc(100% - 70px)
|
||||
width: 100%
|
||||
|
||||
h3,
|
||||
@@ -44,6 +44,8 @@
|
||||
border-top-right-radius: 0
|
||||
|
||||
.body
|
||||
height: 100%
|
||||
overflow-y: auto
|
||||
background: $main-background-color
|
||||
color: $blue-gray
|
||||
border-radius: 4px
|
||||
|
||||
Reference in New Issue
Block a user