Compare commits

..

11 Commits

Author SHA1 Message Date
RoyUP9
6b52458642 removed exit if browser not supported (#270) 2021-09-12 16:09:04 +03:00
M. Mert Yıldıran
858a64687d Stop the hanging Goroutines by dropping the old, unidentified TCP streams (#260)
* Close the hanging TCP message channels after a dynamically aligned timeout (base `10000` milliseconds)

* Bring back `source.Lazy`

* Add a one more `sync.Map.Delete` call

* Improve the formula by taking base Goroutine count into account

* Reduce duplication

* Include the dropped TCP streams count into the stats tracker and print a debug log whenever it happens

* Add `superIdentifier` field to `tcpStream` to check if it has identified

Also stop the other protocol dissectors if a TCP stream identified by a protocol.

* Take one step forward in fixing the channel closing issue (WIP)

Add `sync.Mutex` to `tcpReader` and make the loops reference based.

* Fix the channel closing issue

* Improve the accuracy of the formula, log better and multiply `baseStreamChannelTimeoutMs` by 100

* Remove `fmt.Printf`

* Replace `runtime.Gosched()` with `time.Sleep(1 * time.Millisecond)`

* Close the channels of other protocols in case of an identification

* Simplify the logic

* Replace the formula with hard timeout 5000 milliseconds and 4000 maximum number of Goroutines
2021-09-12 08:26:48 +03:00
M. Mert Yıldıran
819ccf54cd Fix the build error in PR validation (#269)
* Fix the build error in PR validation (temp)

* Set Go version to 1.16

* Revert "Fix the build error in PR validation (temp)"

This reverts commit 4cb613251c.
2021-09-11 21:23:15 +03:00
M. Mert Yıldıran
7cc077c8a0 Fix the memory exhaustion by optimizing max. AMQP message size and GOGC (#257)
* Permanently resolve the memory exhaustion in AMQP

Introduce;
- `MEMORY_PROFILING_DUMP_PATH`
- `MEMORY_PROFILING_TIME_INTERVAL`
environment variables and make `startMemoryProfiler` method more parameterized.

* Fix a leak in HTTP

* Revert "Fix a leak in HTTP"

This reverts commit 9d46820ff3.

* Set maximum AMQP message size to 16MB

* Set `GOGC` to 12800

* Remove some commented out lines and an unnecessary `else if`
2021-09-09 17:45:37 +03:00
Igor Gov
fae5f22d25 Adding a download command if new mizu version is detected (#267) 2021-09-08 22:28:07 +03:00
M. Mert Yıldıran
eba7a3b476 Temporarily fix the filtering into its original state (#266) 2021-09-07 09:50:33 +03:00
RoyUP9
cf231538f4 added panic catch on tests (#265) 2021-09-06 11:42:47 +03:00
gadotroee
073b0b72d3 Remove fetch command (and direction) (#264) 2021-09-06 10:16:04 +03:00
gadotroee
c8705822b3 TRA-3658 - Fix analysis feature (#261) 2021-09-06 09:46:49 +03:00
M. Mert Yıldıran
d4436d9f15 Turn table and body strings to constants and move them to extension API (#262) 2021-09-05 06:44:16 +03:00
M. Mert Yıldıran
4e0ff74944 Fix body size, receive (elapsed time) and timestamps (#258)
* Fix the HTTP body size (it's not applicable to AMQP and Kafka)

* Fix the elapsed time

* Change JSON fields from snake_case to camelCase
2021-09-04 17:15:39 +03:00
44 changed files with 801 additions and 557 deletions

View File

@@ -18,7 +18,7 @@ jobs:
- name: Set up Go 1.16
uses: actions/setup-go@v2
with:
go-version: '^1.16'
go-version: '1.16'
- name: Check out code into the Go module directory
uses: actions/checkout@v2
@@ -33,7 +33,7 @@ jobs:
- name: Set up Go 1.16
uses: actions/setup-go@v2
with:
go-version: '^1.16'
go-version: '1.16'
- name: Check out code into the Go module directory
uses: actions/checkout@v2

3
.gitignore vendored
View File

@@ -26,3 +26,6 @@ build
# Environment variables
.env
# pprof
pprof/*

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"strings"
@@ -12,7 +11,7 @@ import (
"time"
)
func TestTapAndFetch(t *testing.T) {
func TestTap(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")
}
@@ -93,37 +92,6 @@ func TestTapAndFetch(t *testing.T) {
t.Errorf("%v", err)
return
}
fetchCmdArgs := getDefaultFetchCommandArgs()
fetchCmd := exec.Command(cliPath, fetchCmdArgs...)
t.Logf("running command: %v", fetchCmd.String())
if err := fetchCmd.Start(); err != nil {
t.Errorf("failed to start fetch command, err: %v", err)
return
}
harCheckFunc := func() error {
harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har")
if readFileErr != nil {
return fmt.Errorf("failed to read har file, err: %v", readFileErr)
}
harEntries, err := getEntriesFromHarBytes(harBytes)
if err != nil {
return fmt.Errorf("failed to get entries from har, err: %v", err)
}
if len(harEntries) == 0 {
return fmt.Errorf("unexpected har entries result - Expected more than 0 entries")
}
return nil
}
if err := retriesExecute(shortRetriesCount, harCheckFunc); err != nil {
t.Errorf("%v", err)
return
}
})
}
}

View File

@@ -76,13 +76,6 @@ func getDefaultTapNamespace() []string {
return []string{"-n", "mizu-tests"}
}
func getDefaultFetchCommandArgs() []string {
fetchCommand := "fetch"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{fetchCommand}, defaultCmdArgs...)
}
func getDefaultConfigCommandArgs() []string {
configCommand := "config"
defaultCmdArgs := getDefaultCommandArgs()
@@ -91,10 +84,10 @@ func getDefaultConfigCommandArgs() []string {
}
func retriesExecute(retriesCount int, executeFunc func() error) error {
var lastError error
var lastError interface{}
for i := 0; i < retriesCount; i++ {
if err := executeFunc(); err != nil {
if err := tryExecuteFunc(executeFunc); err != nil {
lastError = err
time.Sleep(1 * time.Second)
@@ -107,6 +100,16 @@ func retriesExecute(retriesCount int, executeFunc func() error) error {
return fmt.Errorf("reached max retries count, retries count: %v, last err: %v", retriesCount, lastError)
}
func tryExecuteFunc(executeFunc func() error) (err interface{}) {
defer func() {
if panicErr := recover(); panicErr != nil {
err = panicErr
}
}()
return executeFunc()
}
func waitTapPodsReady(apiServerUrl string) error {
resolvingUrl := fmt.Sprintf("%v/status/tappersCount", apiServerUrl)
tapPodsReadyFunc := func() error {
@@ -179,19 +182,6 @@ func cleanupCommand(cmd *exec.Cmd) error {
return nil
}
func getEntriesFromHarBytes(harBytes []byte) ([]interface{}, error) {
harInterface, convertErr := jsonBytesToInterface(harBytes)
if convertErr != nil {
return nil, convertErr
}
har := harInterface.(map[string]interface{})
harLog := har["log"].(map[string]interface{})
harEntries := harLog["entries"].([]interface{})
return harEntries, nil
}
func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
tapStatus := tapStatusInterface.(map[string]interface{})
podsInterface := tapStatus["pods"].([]interface{})

View File

@@ -1,7 +1,6 @@
# mizu agent
Agent for MIZU (API server and tapper)
Basic APIs:
* /fetch - retrieve traffic data
* /stats - retrieve statistics of collected data
* /viewer - web ui

View File

@@ -3,6 +3,7 @@ package controllers
import (
"encoding/json"
"fmt"
"github.com/google/martian/har"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
@@ -10,11 +11,9 @@ import (
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/google/martian/har"
"github.com/romana/rlog"
tapApi "github.com/up9inc/mizu/tap/api"
@@ -48,7 +47,7 @@ func GetEntries(c *gin.Context) {
Find(&entries)
if len(entries) > 0 && order == database.OrderDesc {
// the entries always order from oldest to newest so we should revers
// the entries always order from oldest to newest - we should reverse
utils.ReverseSlice(entries)
}
@@ -64,93 +63,6 @@ func GetEntries(c *gin.Context) {
c.JSON(http.StatusOK, baseEntries)
}
func GetHARs(c *gin.Context) {
entriesFilter := &models.HarFetchRequestQuery{}
order := database.OrderDesc
if err := c.BindQuery(entriesFilter); err != nil {
c.JSON(http.StatusBadRequest, err)
}
err := validation.Validate(entriesFilter)
if err != nil {
c.JSON(http.StatusBadRequest, err)
}
var timestampFrom, timestampTo int64
if entriesFilter.From < 0 {
timestampFrom = 0
} else {
timestampFrom = entriesFilter.From
}
if entriesFilter.To <= 0 {
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
} else {
timestampTo = entriesFilter.To
}
var entries []tapApi.MizuEntry
database.GetEntriesTable().
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
Find(&entries)
if len(entries) > 0 {
// the entries always order from oldest to newest so we should revers
utils.ReverseSlice(entries)
}
harsObject := map[string]*models.ExtendedHAR{}
for _, entryData := range entries {
var harEntry har.Entry
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
if entryData.ResolvedDestination != "" {
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, entryData.ResolvedDestination)
}
var fileName string
sourceOfEntry := entryData.ResolvedSource
if sourceOfEntry != "" {
// naively assumes the proper service source is http
sourceOfEntry = fmt.Sprintf("http://%s", sourceOfEntry)
//replace / from the file name cause they end up creating a corrupted folder
fileName = fmt.Sprintf("%s.har", strings.ReplaceAll(sourceOfEntry, "/", "_"))
} else {
fileName = "unknown_source.har"
}
if harOfSource, ok := harsObject[fileName]; ok {
harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry)
} else {
var entriesHar []*har.Entry
entriesHar = append(entriesHar, &harEntry)
harsObject[fileName] = &models.ExtendedHAR{
Log: &models.ExtendedLog{
Version: "1.2",
Creator: &models.ExtendedCreator{
Creator: &har.Creator{
Name: "mizu",
Version: "0.0.2",
},
},
Entries: entriesHar,
},
}
// leave undefined when no source is present, otherwise modeler assumes source is empty string ""
if sourceOfEntry != "" {
harsObject[fileName].Log.Creator.Source = &sourceOfEntry
}
}
}
retObj := map[string][]byte{}
for k, v := range harsObject {
bytesData, _ := json.Marshal(v)
retObj[k] = bytesData
}
buffer := utils.ZipData(retObj)
c.Data(http.StatusOK, "application/octet-stream", buffer.Bytes())
}
func UploadEntries(c *gin.Context) {
rlog.Infof("Upload entries - started\n")
@@ -202,15 +114,21 @@ func GetFullEntries(c *gin.Context) {
timestampTo = entriesFilter.To
}
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
result := make([]models.FullEntryDetails, 0)
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, nil)
result := make([]har.Entry, 0)
for _, data := range entriesArray {
harEntry := models.FullEntryDetails{}
if err := models.GetEntry(&data, &harEntry); err != nil {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(data.Entry), &pair); err != nil {
continue
}
result = append(result, harEntry)
harEntry, err := utils.NewEntry(&pair)
if err != nil {
continue
}
result = append(result, *harEntry)
}
c.JSON(http.StatusOK, result)
}
@@ -220,27 +138,12 @@ func GetEntry(c *gin.Context) {
Where(map[string]string{"entryId": c.Param("entryId")}).
First(&entryData)
fullEntry := models.FullEntryDetails{}
if err := models.GetEntry(&entryData, &fullEntry); err != nil {
c.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": true,
"msg": "Can't get entry details",
})
}
// FIXME: Fix the part below
// fullEntryWithPolicy := models.FullEntryWithPolicy{}
// if err := models.GetEntry(&entryData, &fullEntryWithPolicy); err != nil {
// c.JSON(http.StatusInternalServerError, map[string]interface{}{
// "error": true,
// "msg": "Can't get entry details",
// })
// }
extension := extensionsMap[entryData.ProtocolName]
protocol, representation, _ := extension.Dissector.Represent(&entryData)
protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData)
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
Protocol: protocol,
Representation: string(representation),
BodySize: bodySize,
Data: entryData,
})
}

View File

@@ -57,10 +57,16 @@ func initDataBase(databasePath string) *gorm.DB {
return temp
}
func GetEntriesFromDb(timestampFrom int64, timestampTo int64) []tapApi.MizuEntry {
func GetEntriesFromDb(timestampFrom int64, timestampTo int64, protocolName *string) []tapApi.MizuEntry {
order := OrderDesc
protocolNameCondition := "1 = 1"
if protocolName != nil {
protocolNameCondition = fmt.Sprintf("protocolKey = '%s'", *protocolName)
}
var entries []tapApi.MizuEntry
GetEntriesTable().
Where(protocolNameCondition).
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
Find(&entries)

View File

@@ -2,9 +2,7 @@ package models
import (
"encoding/json"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/rules"
"mizuserver/pkg/utils"
@@ -17,47 +15,14 @@ func GetEntry(r *tapApi.MizuEntry, v tapApi.DataUnmarshaler) error {
return v.UnmarshalData(r)
}
func NewApplicableRules(status bool, latency int64, number int) tapApi.ApplicableRules {
ar := tapApi.ApplicableRules{}
ar.Status = status
ar.Latency = latency
ar.NumberOfRules = number
return ar
}
type FullEntryDetails struct {
har.Entry
}
type FullEntryDetailsExtra struct {
har.Entry
}
func (fed *FullEntryDetails) UnmarshalData(entry *tapApi.MizuEntry) error {
if err := json.Unmarshal([]byte(entry.Entry), &fed.Entry); err != nil {
return err
}
if entry.ResolvedDestination != "" {
fed.Entry.Request.URL = utils.SetHostname(fed.Entry.Request.URL, entry.ResolvedDestination)
}
return nil
}
func (fedex *FullEntryDetailsExtra) UnmarshalData(entry *tapApi.MizuEntry) error {
if err := json.Unmarshal([]byte(entry.Entry), &fedex.Entry); err != nil {
return err
}
if entry.ResolvedSource != "" {
fedex.Entry.Request.Headers = append(fedex.Request.Headers, har.Header{Name: "x-mizu-source", Value: entry.ResolvedSource})
}
if entry.ResolvedDestination != "" {
fedex.Entry.Request.Headers = append(fedex.Request.Headers, har.Header{Name: "x-mizu-destination", Value: entry.ResolvedDestination})
fedex.Entry.Request.URL = utils.SetHostname(fedex.Entry.Request.URL, entry.ResolvedDestination)
}
return nil
}
// TODO: until we fixed the Rules feature
//func NewApplicableRules(status bool, latency int64, number int) tapApi.ApplicableRules {
// ar := tapApi.ApplicableRules{}
// ar.Status = status
// ar.Latency = latency
// ar.NumberOfRules = number
// return ar
//}
type EntriesFilter struct {
Limit int `form:"limit" validate:"required,min=1,max=200"`
@@ -147,9 +112,15 @@ type FullEntryWithPolicy struct {
}
func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error {
if err := json.Unmarshal([]byte(entry.Entry), &fewp.Entry); err != nil {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(entry.Entry), &pair); err != nil {
return err
}
harEntry, err := utils.NewEntry(&pair)
if err != nil {
return err
}
fewp.Entry = *harEntry
_, resultPolicyToSend := rules.MatchRequestPolicy(fewp.Entry, entry.Service)
fewp.RulesMatched = resultPolicyToSend
@@ -157,9 +128,10 @@ func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error {
return nil
}
func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules {
numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend, numberOfRules)
ar := NewApplicableRules(statusPolicyToSend, latency, numberOfRules)
return ar
}
// TODO: until we fixed the Rules feature
//func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules {
// numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
// statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend, numberOfRules)
// ar := NewApplicableRules(statusPolicyToSend, latency, numberOfRules)
// return ar
//}

View File

@@ -15,8 +15,6 @@ func EntriesRoutes(ginApp *gin.Engine) {
routeGroup.GET("/uploadEntries", controllers.UploadEntries)
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
routeGroup.GET("/har", controllers.GetHARs)
routeGroup.GET("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
routeGroup.GET("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB

View File

@@ -5,12 +5,14 @@ import (
"compress/zlib"
"encoding/json"
"fmt"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
tapApi "github.com/up9inc/mizu/tap/api"
"io/ioutil"
"log"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/utils"
"net/http"
"net/url"
"strings"
@@ -129,21 +131,33 @@ func UploadEntriesImpl(token string, model string, envPrefix string, sleepInterv
for {
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
rlog.Infof("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
protocolFilter := "http"
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, &protocolFilter)
if len(entriesArray) > 0 {
fullEntriesExtra := make([]models.FullEntryDetailsExtra, 0)
result := make([]har.Entry, 0)
for _, data := range entriesArray {
harEntry := models.FullEntryDetailsExtra{}
if err := models.GetEntry(&data, &harEntry); err != nil {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(data.Entry), &pair); err != nil {
continue
}
fullEntriesExtra = append(fullEntriesExtra, harEntry)
harEntry, err := utils.NewEntry(&pair)
if err != nil {
continue
}
if data.ResolvedSource != "" {
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-source", Value: data.ResolvedSource})
}
if data.ResolvedDestination != "" {
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-destination", Value: data.ResolvedDestination})
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, data.ResolvedDestination)
}
result = append(result, *harEntry)
}
rlog.Infof("About to upload %v entries\n", len(fullEntriesExtra))
body, jMarshalErr := json.Marshal(fullEntriesExtra)
rlog.Infof("About to upload %v entries\n", len(result))
body, jMarshalErr := json.Marshal(result)
if jMarshalErr != nil {
analyzeInformation.Reset()
rlog.Infof("Stopping analyzing")

259
agent/pkg/utils/har.go Normal file
View File

@@ -0,0 +1,259 @@
package utils
import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/google/martian/har"
"github.com/up9inc/mizu/tap"
"github.com/up9inc/mizu/tap/api"
)
// Keep it because we might want cookies in the future
//func BuildCookies(rawCookies []interface{}) []har.Cookie {
// cookies := make([]har.Cookie, 0, len(rawCookies))
//
// for _, cookie := range rawCookies {
// c := cookie.(map[string]interface{})
// expiresStr := ""
// if c["expires"] != nil {
// expiresStr = c["expires"].(string)
// }
// expires, _ := time.Parse(time.RFC3339, expiresStr)
// httpOnly := false
// if c["httponly"] != nil {
// httpOnly, _ = strconv.ParseBool(c["httponly"].(string))
// }
// secure := false
// if c["secure"] != nil {
// secure, _ = strconv.ParseBool(c["secure"].(string))
// }
// path := ""
// if c["path"] != nil {
// path = c["path"].(string)
// }
// domain := ""
// if c["domain"] != nil {
// domain = c["domain"].(string)
// }
//
// cookies = append(cookies, har.Cookie{
// Name: c["name"].(string),
// Value: c["value"].(string),
// Path: path,
// Domain: domain,
// HTTPOnly: httpOnly,
// Secure: secure,
// Expires: expires,
// Expires8601: expiresStr,
// })
// }
//
// return cookies
//}
func BuildHeaders(rawHeaders []interface{}) ([]har.Header, string, string, string, string, string) {
var host, scheme, authority, path, status string
headers := make([]har.Header, 0, len(rawHeaders))
for _, header := range rawHeaders {
h := header.(map[string]interface{})
headers = append(headers, har.Header{
Name: h["name"].(string),
Value: h["value"].(string),
})
if h["name"] == "Host" {
host = h["value"].(string)
}
if h["name"] == ":authority" {
authority = h["value"].(string)
}
if h["name"] == ":scheme" {
scheme = h["value"].(string)
}
if h["name"] == ":path" {
path = h["value"].(string)
}
if h["name"] == ":status" {
path = h["value"].(string)
}
}
return headers, host, scheme, authority, path, status
}
func BuildPostParams(rawParams []interface{}) []har.Param {
params := make([]har.Param, 0, len(rawParams))
for _, param := range rawParams {
p := param.(map[string]interface{})
name := ""
if p["name"] != nil {
name = p["name"].(string)
}
value := ""
if p["value"] != nil {
value = p["value"].(string)
}
fileName := ""
if p["fileName"] != nil {
fileName = p["fileName"].(string)
}
contentType := ""
if p["contentType"] != nil {
contentType = p["contentType"].(string)
}
params = append(params, har.Param{
Name: name,
Value: value,
Filename: fileName,
ContentType: contentType,
})
}
return params
}
func NewRequest(request *api.GenericMessage) (harRequest *har.Request, err error) {
reqDetails := request.Payload.(map[string]interface{})["details"].(map[string]interface{})
headers, host, scheme, authority, path, _ := BuildHeaders(reqDetails["headers"].([]interface{}))
cookies := make([]har.Cookie, 0) // BuildCookies(reqDetails["cookies"].([]interface{}))
postData, _ := reqDetails["postData"].(map[string]interface{})
mimeType, _ := postData["mimeType"]
if mimeType == nil || len(mimeType.(string)) == 0 {
mimeType = "text/html"
}
text, _ := postData["text"]
postDataText := ""
if text != nil {
postDataText = text.(string)
}
queryString := make([]har.QueryString, 0)
for _, _qs := range reqDetails["queryString"].([]interface{}) {
qs := _qs.(map[string]interface{})
queryString = append(queryString, har.QueryString{
Name: qs["name"].(string),
Value: qs["value"].(string),
})
}
url := fmt.Sprintf("http://%s%s", host, reqDetails["url"].(string))
if strings.HasPrefix(mimeType.(string), "application/grpc") {
url = fmt.Sprintf("%s://%s%s", scheme, authority, path)
}
harParams := make([]har.Param, 0)
if postData["params"] != nil {
harParams = BuildPostParams(postData["params"].([]interface{}))
}
harRequest = &har.Request{
Method: reqDetails["method"].(string),
URL: url,
HTTPVersion: reqDetails["httpVersion"].(string),
HeadersSize: -1,
BodySize: int64(bytes.NewBufferString(postDataText).Len()),
QueryString: queryString,
Headers: headers,
Cookies: cookies,
PostData: &har.PostData{
MimeType: mimeType.(string),
Params: harParams,
Text: postDataText,
},
}
return
}
func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err error) {
resDetails := response.Payload.(map[string]interface{})["details"].(map[string]interface{})
headers, _, _, _, _, _status := BuildHeaders(resDetails["headers"].([]interface{}))
cookies := make([]har.Cookie, 0) // BuildCookies(resDetails["cookies"].([]interface{}))
content, _ := resDetails["content"].(map[string]interface{})
mimeType, _ := content["mimeType"]
if mimeType == nil || len(mimeType.(string)) == 0 {
mimeType = "text/html"
}
encoding, _ := content["encoding"]
text, _ := content["text"]
bodyText := ""
if text != nil {
bodyText = text.(string)
}
harContent := &har.Content{
Encoding: encoding.(string),
MimeType: mimeType.(string),
Text: []byte(bodyText),
Size: int64(len(bodyText)),
}
status := int(resDetails["status"].(float64))
if strings.HasPrefix(mimeType.(string), "application/grpc") {
status, err = strconv.Atoi(_status)
if err != nil {
tap.SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response status to int for HAR")
}
}
harResponse = &har.Response{
HTTPVersion: resDetails["httpVersion"].(string),
Status: status,
StatusText: resDetails["statusText"].(string),
HeadersSize: -1,
BodySize: int64(bytes.NewBufferString(bodyText).Len()),
Headers: headers,
Cookies: cookies,
Content: harContent,
}
return
}
func NewEntry(pair *api.RequestResponsePair) (*har.Entry, error) {
harRequest, err := NewRequest(&pair.Request)
if err != nil {
tap.SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting request to HAR")
}
harResponse, err := NewResponse(&pair.Response)
if err != nil {
fmt.Printf("err: %+v\n", err)
tap.SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response to HAR")
}
totalTime := pair.Response.CaptureTime.Sub(pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if totalTime < 1 {
totalTime = 1
}
harEntry := har.Entry{
StartedDateTime: pair.Request.CaptureTime,
Time: totalTime,
Request: harRequest,
Response: harResponse,
Cache: &har.Cache{},
Timings: &har.Timings{
Send: -1,
Wait: -1,
Receive: totalTime,
},
}
return &harEntry, nil
}

View File

@@ -1,7 +1,6 @@
package apiserver
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
@@ -131,28 +130,6 @@ func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, er
return generalStats, nil
}
func (provider *apiServerProvider) GetHars(fromTimestamp int, toTimestamp int) (*zip.Reader, error) {
if !provider.isReady {
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
}
resp, err := http.Get(fmt.Sprintf("%s/api/har?from=%v&to=%v", provider.url, fromTimestamp, toTimestamp))
if err != nil {
return nil, fmt.Errorf("failed getting har from api server %w", err)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed reading hars %w", err)
}
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
if err != nil {
return nil, fmt.Errorf("failed craeting zip reader %w", err)
}
return zipReader, nil
}
func (provider *apiServerProvider) GetVersion() (string, error) {
if !provider.isReady {

View File

@@ -3,7 +3,6 @@ package cmd
import (
"context"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
@@ -63,8 +62,8 @@ func openBrowser(url string) {
default:
err = fmt.Errorf("unsupported platform")
}
if err != nil {
log.Fatal(err)
}
if err != nil {
logger.Log.Errorf("error while opening browser, %v", err)
}
}

View File

@@ -1,46 +0,0 @@
package cmd
import (
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
)
var fetchCmd = &cobra.Command{
Use: "fetch",
Short: "Download recorded traffic to files",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("fetch", config.Config.Fetch)
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, make sure one running")
return nil
}
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
return err
} else if !isCompatible {
return nil
}
RunMizuFetch()
return nil
},
}
func init() {
rootCmd.AddCommand(fetchCmd)
defaultFetchConfig := configStructs.FetchConfig{}
defaults.Set(&defaultFetchConfig)
fetchCmd.Flags().StringP(configStructs.DirectoryFetchName, "d", defaultFetchConfig.Directory, "Provide a custom directory for fetched entries")
fetchCmd.Flags().Int(configStructs.FromTimestampFetchName, defaultFetchConfig.FromTimestamp, "Custom start timestamp for fetched entries")
fetchCmd.Flags().Int(configStructs.ToTimestampFetchName, defaultFetchConfig.ToTimestamp, "Custom end timestamp fetched entries")
fetchCmd.Flags().Uint16P(configStructs.GuiPortFetchName, "p", defaultFetchConfig.GuiPort, "Provide a custom port for the web interface webserver")
}

View File

@@ -1,25 +0,0 @@
package cmd
import (
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils"
)
func RunMizuFetch() {
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
}
zipReader, err := apiserver.Provider.GetHars(config.Config.Fetch.FromTimestamp, config.Config.Fetch.ToTimestamp)
if err != nil {
logger.Log.Errorf("Failed fetch data from API server %v", err)
return
}
if err := fsUtils.Unzip(zipReader, config.Config.Fetch.Directory); err != nil {
logger.Log.Debugf("[ERROR] failed unzip %v", err)
}
}

View File

@@ -64,7 +64,6 @@ func init() {
tapCmd.Flags().StringSliceP(configStructs.PlainTextFilterRegexesTapName, "r", defaultTapConfig.PlainTextFilterRegexes, "List of regex expressions that are used to filter matching values from text/plain http bodies")
tapCmd.Flags().Bool(configStructs.DisableRedactionTapName, defaultTapConfig.DisableRedaction, "Disables redaction of potentially sensitive request/response headers and body values")
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
tapCmd.Flags().String(configStructs.DirectionTapName, defaultTapConfig.Direction, "Record traffic that goes in this direction (relative to the tapped pod): in/any")
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file with policy rules")
}

View File

@@ -18,7 +18,6 @@ const (
type ConfigStruct struct {
Tap configStructs.TapConfig `yaml:"tap"`
Fetch configStructs.FetchConfig `yaml:"fetch"`
Version configStructs.VersionConfig `yaml:"version"`
View configStructs.ViewConfig `yaml:"view"`
Logs configStructs.LogsConfig `yaml:"logs"`

View File

@@ -1,15 +0,0 @@
package configStructs
const (
DirectoryFetchName = "directory"
FromTimestampFetchName = "from"
ToTimestampFetchName = "to"
GuiPortFetchName = "gui-port"
)
type FetchConfig struct {
Directory string `yaml:"directory" default:"."`
FromTimestamp int `yaml:"from" default:"0"`
ToTimestamp int `yaml:"to" default:"0"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
}

View File

@@ -3,10 +3,8 @@ package configStructs
import (
"errors"
"fmt"
"regexp"
"strings"
"github.com/up9inc/mizu/shared/units"
"regexp"
)
const (
@@ -17,7 +15,6 @@ const (
PlainTextFilterRegexesTapName = "regex-masking"
DisableRedactionTapName = "no-redact"
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
DirectionTapName = "direction"
DryRunTapName = "dry-run"
EnforcePolicyFile = "test-rules"
)
@@ -34,7 +31,6 @@ type TapConfig struct {
HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
Direction string `yaml:"direction" default:"in"`
DryRun bool `yaml:"dry-run" default:"false"`
EnforcePolicyFile string `yaml:"test-rules"`
ApiServerResources Resources `yaml:"api-server-resources"`
@@ -69,10 +65,5 @@ func (config *TapConfig) Validate() error {
return errors.New(fmt.Sprintf("Could not parse --%s value %s", HumanMaxEntriesDBSizeTapName, config.HumanMaxEntriesDBSize))
}
directionLowerCase := strings.ToLower(config.Direction)
if directionLowerCase != "any" && directionLowerCase != "in" {
return errors.New(fmt.Sprintf("%s is not a valid value for flag --%s. Acceptable values are in/any.", config.Direction, DirectionTapName))
}
return nil
}

View File

@@ -604,6 +604,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(

View File

@@ -8,6 +8,7 @@ import (
"github.com/up9inc/mizu/cli/mizu"
"io/ioutil"
"net/http"
"runtime"
"time"
"github.com/google/go-github/v37/github"
@@ -76,7 +77,7 @@ func CheckNewerVersion(versionChan chan string) {
logger.Log.Debugf("Finished version validation, github version %v, current version %v, took %v", gitHubVersion, currentSemVer, time.Since(start))
if gitHubVersionSemVer.GreaterThan(currentSemVer) {
versionChan <- fmt.Sprintf("Update available! %v -> %v (%v)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL)
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL, runtime.GOOS)
} else {
versionChan <- ""
}

View File

@@ -8,4 +8,5 @@ const (
MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES"
RulePolicyPath = "/app/enforce-policy/"
RulePolicyFileName = "enforce-policy.yaml"
GoGCEnvVar = "GOGC"
)

View File

@@ -21,7 +21,7 @@ type Protocol struct {
}
type Extension struct {
Protocol Protocol
Protocol *Protocol
Path string
Plug *plugin.Plugin
Dissector Dissector
@@ -68,13 +68,22 @@ type OutputChannelItem struct {
Pair *RequestResponsePair
}
type SuperTimer struct {
CaptureTime time.Time
}
type SuperIdentifier struct {
Protocol *Protocol
IsClosedOthers bool
}
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, emitter Emitter) error
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter) error
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
Summarize(entry *MizuEntry) *BaseEntryDetails
Represent(entry *MizuEntry) (Protocol, []byte, error)
Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)
}
type Emitting struct {
@@ -103,6 +112,7 @@ type MizuEntry struct {
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
@@ -117,6 +127,7 @@ type MizuEntry struct {
type MizuEntryWrapper struct {
Protocol Protocol `json:"protocol"`
Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data MizuEntry `json:"data"`
}
@@ -126,6 +137,7 @@ type BaseEntryDetails struct {
Url string `json:"url,omitempty"`
RequestSenderIp string `json:"request_sender_ip,omitempty"`
Service string `json:"service,omitempty"`
Path string `json:"path,omitempty"`
Summary string `json:"summary,omitempty"`
StatusCode int `json:"status_code"`
Method string `json:"method,omitempty"`
@@ -163,3 +175,8 @@ func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error {
bed.IsOutgoing = entry.IsOutgoing
return nil
}
const (
TABLE string = "table"
BODY string = "body"
)

View File

@@ -93,10 +93,10 @@ type AMQPWrapper struct {
Details interface{} `json:"details"`
}
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
CaptureTime: captureTime,
Payload: AMQPPayload{
Data: &AMQPWrapper{
Method: method,
@@ -107,7 +107,7 @@ func emitAMQP(event interface{}, _type string, method string, connectionInfo *ap
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Timestamp: captureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
@@ -219,7 +219,7 @@ func representProperties(properties map[string]interface{}, rep []interface{}) (
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Properties",
"data": string(props),
})
@@ -249,7 +249,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -267,7 +267,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headersMarshaled),
})
@@ -275,7 +275,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
if event["Body"] != nil {
rep = append(rep, map[string]string{
"type": "body",
"type": api.BODY,
"title": "Body",
"encoding": "base64",
"mime_type": contentType,
@@ -326,7 +326,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -344,7 +344,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headersMarshaled),
})
@@ -352,7 +352,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
if event["Body"] != nil {
rep = append(rep, map[string]string{
"type": "body",
"type": api.BODY,
"title": "Body",
"encoding": "base64",
"mime_type": contentType,
@@ -393,7 +393,7 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -408,7 +408,7 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})
@@ -451,7 +451,7 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -466,7 +466,7 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})
@@ -497,7 +497,7 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -524,7 +524,7 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Server Properties",
"data": string(headersMarshaled),
})
@@ -555,7 +555,7 @@ func representConnectionClose(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -585,7 +585,7 @@ func representQueueBind(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -600,7 +600,7 @@ func representQueueBind(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})
@@ -639,7 +639,7 @@ func representBasicConsume(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -654,7 +654,7 @@ func representBasicConsume(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})

View File

@@ -32,7 +32,7 @@ func init() {
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = protocol
extension.Protocol = &protocol
}
func (d dissecting) Ping() {
@@ -41,7 +41,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
r := AmqpReader{b}
var remaining int
@@ -78,13 +78,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
var lastMethodFrameMessage Message
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
return errors.New("Identified by another protocol")
}
frame, err := r.ReadFrame()
if err == io.EOF {
// We must read until we see an EOF... very important!
return errors.New("AMQP EOF")
} else if err != nil {
// TODO: Causes ignoring some methods. Return only in case of a certain error. But what?
return err
}
switch f := frame.(type) {
@@ -101,6 +102,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
case *BasicDeliver:
eventBasicDeliver.Properties = header.Properties
default:
frame = nil
}
case *BodyFrame:
@@ -110,11 +112,15 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Body = f.Body
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
default:
body = nil
frame = nil
}
case *MethodFrame:
@@ -134,7 +140,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
@@ -146,7 +153,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
@@ -165,7 +173,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
@@ -178,7 +187,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
@@ -188,7 +198,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Mechanisms: m.Mechanisms,
Locales: m.Locales,
}
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
@@ -197,9 +208,11 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
ClassId: m.ClassId,
MethodId: m.MethodId,
}
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
default:
frame = nil
}
@@ -264,6 +277,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: 0,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
@@ -300,7 +314,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
p = protocol
bodySize = 0
var root map[string]interface{}
json.Unmarshal([]byte(entry.Entry), &root)
representation := make(map[string]interface{}, 0)
@@ -334,8 +350,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
break
}
representation["request"] = repRequest
object, err := json.Marshal(representation)
return protocol, object, err
object, err = json.Marshal(representation)
return
}
var Dissector dissecting

View File

@@ -54,7 +54,7 @@ func (r *AmqpReader) ReadFrame() (frame frame, err error) {
channel := binary.BigEndian.Uint16(scratch[1:3])
size := binary.BigEndian.Uint32(scratch[3:7])
if size > 1000000 {
if size > 1000000*16 {
return nil, ErrMaxSize
}
@@ -352,6 +352,10 @@ func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame,
return
}
if hf.Size > 512 {
return nil, ErrMaxHeaderFrameSize
}
var flags uint16
if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil {
@@ -439,6 +443,7 @@ func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, e
}
if _, err = io.ReadFull(r.R, bf.Body); err != nil {
bf.Body = nil
return nil, err
}

View File

@@ -10,7 +10,6 @@ package main
import (
"encoding/binary"
"fmt"
"io"
)
@@ -19,32 +18,35 @@ import (
// ErrCredentials. The text of the error is likely more interesting than
// these constants.
const (
frameMethod = 1
frameHeader = 2
frameBody = 3
frameHeartbeat = 8
frameMinSize = 4096
frameEnd = 206
replySuccess = 200
ContentTooLarge = 311
NoRoute = 312
NoConsumers = 313
ConnectionForced = 320
InvalidPath = 402
AccessRefused = 403
NotFound = 404
ResourceLocked = 405
PreconditionFailed = 406
FrameError = 501
SyntaxError = 502
CommandInvalid = 503
ChannelError = 504
UnexpectedFrame = 505
ResourceError = 506
NotAllowed = 530
NotImplemented = 540
InternalError = 541
MaxSizeError = 551
frameMethod = 1
frameHeader = 2
frameBody = 3
frameHeartbeat = 8
frameMinSize = 4096
frameEnd = 206
replySuccess = 200
ContentTooLarge = 311
NoRoute = 312
NoConsumers = 313
ConnectionForced = 320
InvalidPath = 402
AccessRefused = 403
NotFound = 404
ResourceLocked = 405
PreconditionFailed = 406
FrameError = 501
SyntaxError = 502
CommandInvalid = 503
ChannelError = 504
UnexpectedFrame = 505
ResourceError = 506
NotAllowed = 530
NotImplemented = 540
InternalError = 541
MaxSizeError = 551
MaxHeaderFrameSizeError = 552
BadMethodFrameUnknownMethod = 601
BadMethodFrameUnknownClass = 602
)
func isSoftExceptionCode(code int) bool {
@@ -2854,7 +2856,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 20: // channel
@@ -2909,7 +2911,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 40: // exchange
@@ -2980,7 +2982,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 50: // queue
@@ -3067,7 +3069,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 60: // basic
@@ -3218,7 +3220,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 90: // tx
@@ -3273,7 +3275,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 85: // confirm
@@ -3296,11 +3298,11 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
default:
return nil, fmt.Errorf("Bad method frame, unknown class %d", mf.ClassId)
return nil, ErrBadMethodFrameUnknownClass
}
return mf, nil

View File

@@ -60,8 +60,13 @@ var (
// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
// ErrClosed is returned when the channel or connection is not open
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 1MB"}
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 16MB"}
ErrMaxHeaderFrameSize = &Error{Code: MaxHeaderFrameSizeError, Reason: "an AMQP header cannot be bigger than 512 bytes"}
ErrBadMethodFrameUnknownMethod = &Error{Code: BadMethodFrameUnknownMethod, Reason: "Bad method frame, unknown method"}
ErrBadMethodFrameUnknownClass = &Error{Code: BadMethodFrameUnknownClass, Reason: "Bad method frame, unknown class"}
)
// Error captures the code and reason a channel or connection has been closed

View File

@@ -7,14 +7,13 @@ import (
"io"
"io/ioutil"
"net/http"
"time"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
)
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter api.Emitter) error {
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) error {
streamID, messageHTTP1, err := grpcAssembler.readMessage()
if err != nil {
return err
@@ -32,7 +31,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
tcpID.DstPort,
streamID,
)
item = reqResMatcher.registerRequest(ident, &messageHTTP1, time.Now())
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -51,7 +50,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
tcpID.SrcPort,
streamID,
)
item = reqResMatcher.registerResponse(ident, &messageHTTP1, time.Now())
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
@@ -71,7 +70,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
req, err := http.ReadRequest(b)
if err != nil {
// log.Println("Error reading stream:", err)
@@ -99,7 +98,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
tcpID.DstPort,
counterPair.Request,
)
item := reqResMatcher.registerRequest(ident, req, time.Now())
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -113,7 +112,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
return nil
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
res, err := http.ReadResponse(b, nil)
if err != nil {
// log.Println("Error reading stream:", err)
@@ -149,7 +148,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
tcpID.SrcPort,
counterPair.Response,
)
item := reqResMatcher.registerResponse(ident, res, time.Now())
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,

View File

@@ -3,10 +3,12 @@ package main
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/url"
"time"
"github.com/romana/rlog"
@@ -51,7 +53,7 @@ func init() {
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = protocol
extension.Protocol = &protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
@@ -59,7 +61,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
if err != nil {
@@ -76,41 +78,46 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
grpcAssembler = createGrpcAssembler(b)
}
success := false
dissected := false
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
return errors.New("Identified by another protocol")
}
if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, emitter)
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
rlog.Debugf("[HTTP/2] stream %s error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
dissected = true
} else if isClient {
err = handleHTTP1ClientStream(b, tcpID, counterPair, emitter)
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
rlog.Debugf("[HTTP-request] stream %s Request error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
dissected = true
} else {
err = handleHTTP1ServerStream(b, tcpID, counterPair, emitter)
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
rlog.Debugf("[HTTP-response], stream %s Response error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
dissected = true
}
}
if !success {
if !dissected {
return err
}
superIdentifier.Protocol = &protocol
return nil
}
@@ -161,6 +168,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
} else if resolvedSource != "" {
service = SetHostname(service, resolvedSource)
}
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: protocol.Name,
@@ -173,6 +182,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: path,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
@@ -197,6 +207,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
Url: entry.Url,
RequestSenderIp: entry.RequestSenderIp,
Service: entry.Service,
Path: entry.Path,
Summary: entry.Path,
StatusCode: entry.Status,
Method: entry.Method,
@@ -214,9 +225,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func representRequest(request map[string]interface{}) []interface{} {
repRequest := make([]interface{}, 0)
func representRequest(request map[string]interface{}) (repRequest []interface{}) {
details, _ := json.Marshal([]map[string]string{
{
"name": "Method",
@@ -232,28 +241,28 @@ func representRequest(request map[string]interface{}) []interface{} {
},
})
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
headers, _ := json.Marshal(request["headers"].([]interface{}))
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headers),
})
cookies, _ := json.Marshal(request["cookies"].([]interface{}))
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Cookies",
"data": string(cookies),
})
queryString, _ := json.Marshal(request["queryString"].([]interface{}))
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Query String",
"data": string(queryString),
})
@@ -266,7 +275,7 @@ func representRequest(request map[string]interface{}) []interface{} {
text, _ := postData["text"]
if text != nil {
repRequest = append(repRequest, map[string]string{
"type": "body",
"type": api.BODY,
"title": "POST Data (text/plain)",
"encoding": "",
"mime_type": mimeType.(string),
@@ -285,13 +294,13 @@ func representRequest(request map[string]interface{}) []interface{} {
},
})
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "POST Data (multipart/form-data)",
"data": string(multipart),
})
} else {
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "POST Data (application/x-www-form-urlencoded)",
"data": string(params),
})
@@ -299,11 +308,13 @@ func representRequest(request map[string]interface{}) []interface{} {
}
}
return repRequest
return
}
func representResponse(response map[string]interface{}) []interface{} {
repResponse := make([]interface{}, 0)
func representResponse(response map[string]interface{}) (repResponse []interface{}, bodySize int64) {
repResponse = make([]interface{}, 0)
bodySize = int64(response["bodySize"].(float64))
details, _ := json.Marshal([]map[string]string{
{
@@ -316,25 +327,25 @@ func representResponse(response map[string]interface{}) []interface{} {
},
{
"name": "Body Size",
"value": fmt.Sprintf("%g bytes", response["bodySize"].(float64)),
"value": fmt.Sprintf("%d bytes", bodySize),
},
})
repResponse = append(repResponse, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
headers, _ := json.Marshal(response["headers"].([]interface{}))
repResponse = append(repResponse, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headers),
})
cookies, _ := json.Marshal(response["cookies"].([]interface{}))
repResponse = append(repResponse, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Cookies",
"data": string(cookies),
})
@@ -348,7 +359,7 @@ func representResponse(response map[string]interface{}) []interface{} {
text, _ := content["text"]
if text != nil {
repResponse = append(repResponse, map[string]string{
"type": "body",
"type": api.BODY,
"title": "Body",
"encoding": encoding.(string),
"mime_type": mimeType.(string),
@@ -356,11 +367,10 @@ func representResponse(response map[string]interface{}) []interface{} {
})
}
return repResponse
return
}
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
var p api.Protocol
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
if entry.ProtocolVersion == "2.0" {
p = http2Protocol
} else {
@@ -374,11 +384,11 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
repRequest := representRequest(reqDetails)
repResponse := representResponse(resDetails)
repResponse, bodySize := representResponse(resDetails)
representation["request"] = repRequest
representation["response"] = repResponse
object, err := json.Marshal(representation)
return p, object, err
object, err = json.Marshal(representation)
return
}
var Dissector dissecting

View File

@@ -85,7 +85,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.GenericMessage, responseHTTPMessage *api.GenericMessage) *api.OutputChannelItem {
return &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Timestamp: requestHTTPMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: nil,
Pair: &api.RequestResponsePair{
Request: *requestHTTPMessage,

View File

@@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"strconv"
"github.com/up9inc/mizu/tap/api"
)
type KafkaPayload struct {
@@ -48,7 +50,7 @@ func representRequestHeader(data map[string]interface{}, rep []interface{}) []in
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Request Header",
"data": string(requestHeader),
})
@@ -64,7 +66,7 @@ func representResponseHeader(data map[string]interface{}, rep []interface{}) []i
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Response Header",
"data": string(requestHeader),
})
@@ -114,7 +116,7 @@ func representMetadataRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -173,7 +175,7 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -206,7 +208,7 @@ func representApiVersionsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -244,7 +246,7 @@ func representApiVersionsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -287,7 +289,7 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -317,7 +319,7 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -404,7 +406,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -450,7 +452,7 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -476,7 +478,7 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -506,7 +508,7 @@ func representListOffsetsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -540,7 +542,7 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -570,7 +572,7 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -609,7 +611,7 @@ func representDeleteTopicsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -639,7 +641,7 @@ func representDeleteTopicsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})

View File

@@ -3,8 +3,10 @@ package main
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"log"
"time"
"github.com/up9inc/mizu/tap/api"
)
@@ -29,7 +31,7 @@ func init() {
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = _protocol
extension.Protocol = &_protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
@@ -37,18 +39,24 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol")
}
if isClient {
_, _, err := ReadRequest(b, tcpID)
_, _, err := ReadRequest(b, tcpID, superTimer)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, emitter)
err := ReadResponse(b, tcpID, superTimer, emitter)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
}
}
}
@@ -131,6 +139,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
}
request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: _protocol.Name,
@@ -143,6 +152,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
@@ -178,7 +188,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
p = _protocol
bodySize = 0
var root map[string]interface{}
json.Unmarshal([]byte(entry.Entry), &root)
representation := make(map[string]interface{}, 0)
@@ -224,8 +236,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
representation["request"] = repRequest
representation["response"] = repResponse
object, err := json.Marshal(representation)
return _protocol, object, err
object, err = json.Marshal(representation)
return
}
var Dissector dissecting

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"reflect"
"time"
"github.com/up9inc/mizu/tap/api"
)
@@ -15,9 +16,10 @@ type Request struct {
CorrelationID int32
ClientID string
Payload interface{}
CaptureTime time.Time
}
func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -213,6 +215,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
ApiVersion: apiVersion,
CorrelationID: correlationID,
ClientID: clientID,
CaptureTime: superTimer.CaptureTime,
Payload: payload,
}

View File

@@ -13,9 +13,10 @@ type Response struct {
Size int32
CorrelationID int32
Payload interface{}
CaptureTime time.Time
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -39,6 +40,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
Size: size,
CorrelationID: correlationID,
Payload: payload,
CaptureTime: superTimer.CaptureTime,
}
key := fmt.Sprintf(
@@ -258,12 +260,12 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
item := &api.OutputChannelItem{
Protocol: _protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Timestamp: reqResPair.Request.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
CaptureTime: reqResPair.Request.CaptureTime,
Payload: KafkaPayload{
Data: &KafkaWrapper{
Method: apiNames[apiKey],
@@ -274,7 +276,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
},
Response: api.GenericMessage{
IsRequest: false,
CaptureTime: time.Now(),
CaptureTime: reqResPair.Response.CaptureTime,
Payload: KafkaPayload{
Data: &KafkaWrapper{
Method: apiNames[apiKey],

View File

@@ -13,11 +13,13 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"
@@ -94,6 +96,8 @@ var ownIps []string // global
var hostMode bool // global
var extensions []*api.Extension // global
const baseStreamChannelTimeoutMs int = 5000 * 100
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments log.Printf
@@ -168,11 +172,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
}
func startMemoryProfiler() {
dirname := "/app/pprof"
rlog.Info("Profiling is on, results will be written to %s", dirname)
dumpPath := "/app/pprof"
envDumpPath := os.Getenv(MemoryProfilingDumpPath)
if envDumpPath != "" {
dumpPath = envDumpPath
}
timeInterval := 60
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
if envTimeInterval != "" {
if i, err := strconv.Atoi(envTimeInterval); err == nil {
timeInterval = i
}
}
rlog.Info("Profiling is on, results will be written to %s", dumpPath)
go func() {
if _, err := os.Stat(dirname); os.IsNotExist(err) {
if err := os.Mkdir(dirname, 0777); err != nil {
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
if err := os.Mkdir(dumpPath, 0777); err != nil {
log.Fatal("could not create directory for profile: ", err)
}
}
@@ -180,9 +196,9 @@ func startMemoryProfiler() {
for true {
t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05"))
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
rlog.Info("Writing memory profile to %s\n", filename)
rlog.Infof("Writing memory profile to %s\n", filename)
f, err := os.Create(filename)
if err != nil {
@@ -193,13 +209,50 @@ func startMemoryProfiler() {
log.Fatal("could not write memory profile: ", err)
}
_ = f.Close()
time.Sleep(time.Minute)
time.Sleep(time.Second * time.Duration(timeInterval))
}
}()
}
func closeTimedoutTcpStreamChannels() {
maxNumberOfGoroutines = GetMaxNumberOfGoroutines()
TcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs()
for {
time.Sleep(10 * time.Millisecond)
streams.Range(func(key interface{}, value interface{}) bool {
streamWrapper := value.(*tcpStreamWrapper)
stream := streamWrapper.stream
if stream.superIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
stream.Close()
statsTracker.incDroppedTcpStreams()
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {
for i := range stream.clients {
reader := &stream.clients[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
for i := range stream.servers {
reader := &stream.servers[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
stream.superIdentifier.IsClosedOthers = true
}
}
return true
})
}
}
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
go closeTimedoutTcpStreamChannels()
defer util.Run()()
if *debug {
@@ -354,7 +407,14 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
startMemoryProfiler()
}
for packet := range source.Packets() {
for {
packet, err := source.NextPacket()
if err == io.EOF {
break
} else if err != nil {
rlog.Debugf("Error:", err)
continue
}
packetsCount := statsTracker.incPacketsCount()
rlog.Debugf("PACKET #%d", packetsCount)
data := packet.Data()

View File

@@ -3,14 +3,21 @@ package tap
import (
"os"
"strconv"
"time"
)
const (
MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED"
MemoryProfilingDumpPath = "MEMORY_PROFILING_DUMP_PATH"
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
MaxNumberOfGoroutinesEnvVarName = "MAX_NUMBER_OF_GOROUTINES"
MaxBufferedPagesTotalDefaultValue = 5000
MaxBufferedPagesPerConnectionDefaultValue = 5000
TcpStreamChannelTimeoutMsDefaultValue = 5000
MaxNumberOfGoroutinesDefaultValue = 4000
)
type globalSettings struct {
@@ -47,6 +54,22 @@ func GetMaxBufferedPagesPerConnection() int {
return valueFromEnv
}
func GetTcpChannelTimeoutMs() time.Duration {
valueFromEnv, err := strconv.Atoi(os.Getenv(TcpStreamChannelTimeoutMsEnvVarName))
if err != nil {
return TcpStreamChannelTimeoutMsDefaultValue * time.Millisecond
}
return time.Duration(valueFromEnv) * time.Millisecond
}
func GetMaxNumberOfGoroutines() int {
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxNumberOfGoroutinesEnvVarName))
if err != nil {
return MaxNumberOfGoroutinesDefaultValue
}
return valueFromEnv
}
func GetMemoryProfilingEnabled() bool {
return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1"
}

View File

@@ -13,6 +13,7 @@ type AppStats struct {
ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
MatchedPairs int64 `json:"matchedPairs"`
DroppedTcpStreams int64 `json:"droppedTcpStreams"`
}
type StatsTracker struct {
@@ -23,6 +24,7 @@ type StatsTracker struct {
reassembledTcpPayloadsCountMutex sync.Mutex
tlsConnectionsCountMutex sync.Mutex
matchedPairsMutex sync.Mutex
droppedTcpStreamsMutex sync.Mutex
}
func (st *StatsTracker) incMatchedPairs() {
@@ -31,6 +33,12 @@ func (st *StatsTracker) incMatchedPairs() {
st.matchedPairsMutex.Unlock()
}
func (st *StatsTracker) incDroppedTcpStreams() {
st.droppedTcpStreamsMutex.Lock()
st.appStats.DroppedTcpStreams++
st.droppedTcpStreamsMutex.Unlock()
}
func (st *StatsTracker) incPacketsCount() int64 {
st.packetsCountMutex.Lock()
st.appStats.PacketsCount++
@@ -100,5 +108,10 @@ func (st *StatsTracker) dumpStats() *AppStats {
st.appStats.MatchedPairs = 0
st.matchedPairsMutex.Unlock()
st.droppedTcpStreamsMutex.Lock()
currentAppStats.DroppedTcpStreams = st.appStats.DroppedTcpStreams
st.appStats.DroppedTcpStreams = 0
st.droppedTcpStreamsMutex.Unlock()
return currentAppStats
}

View File

@@ -47,11 +47,12 @@ func (tid *tcpID) String() string {
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
captureTime time.Time
superTimer *api.SuperTimer
parent *tcpStream
messageCount uint
packetsSeen uint
@@ -59,6 +60,7 @@ type tcpReader struct {
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
sync.Mutex
}
func (h *tcpReader) Read(p []byte) (int, error) {
@@ -69,7 +71,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
msg, ok = <-h.msgQueue
h.data = msg.bytes
h.captureTime = msg.timestamp
h.superTimer.CaptureTime = msg.timestamp
if len(h.data) > 0 {
h.packetsSeen += 1
}
@@ -93,10 +95,19 @@ func (h *tcpReader) Read(p []byte) (int, error) {
return l, nil
}
func (h *tcpReader) Close() {
h.Lock()
if !h.isClosed {
h.isClosed = true
close(h.msgQueue)
}
h.Unlock()
}
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.emitter)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter)
if err != nil {
io.Copy(ioutil.Discard, b)
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/tap/api"
)
/* It's a connection (bidirectional)
@@ -16,16 +17,19 @@ import (
* In our implementation, we pass information from ReassembledSG to the tcpReader through a shared channel.
*/
type tcpStream struct {
tcpstate *reassembly.TCPSimpleFSM
fsmerr bool
optchecker reassembly.TCPOptionCheck
net, transport gopacket.Flow
isDNS bool
isTapTarget bool
clients []tcpReader
servers []tcpReader
urls []string
ident string
id int64
isClosed bool
superIdentifier *api.SuperIdentifier
tcpstate *reassembly.TCPSimpleFSM
fsmerr bool
optchecker reassembly.TCPOptionCheck
net, transport gopacket.Flow
isDNS bool
isTapTarget bool
clients []tcpReader
servers []tcpReader
urls []string
ident string
sync.Mutex
}
@@ -144,13 +148,24 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
// This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object
statsTracker.incReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
if dir == reassembly.TCPDirClientToServer {
for _, reader := range t.clients {
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
for i := range t.clients {
reader := &t.clients[i]
reader.Lock()
if !reader.isClosed {
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
}
reader.Unlock()
}
} else {
for _, reader := range t.servers {
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
for i := range t.servers {
reader := &t.servers[i]
reader.Lock()
if !reader.isClosed {
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
}
reader.Unlock()
}
}
}
@@ -159,14 +174,33 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Trace("%s: Connection closed", t.ident)
if t.isTapTarget {
for _, reader := range t.clients {
close(reader.msgQueue)
}
for _, reader := range t.servers {
close(reader.msgQueue)
}
if t.isTapTarget && !t.isClosed {
t.Close()
}
// do not remove the connection to allow last ACK
return false
}
func (t *tcpStream) Close() {
shouldReturn := false
t.Lock()
if t.isClosed {
shouldReturn = true
} else {
t.isClosed = true
}
t.Unlock()
if shouldReturn {
return
}
streams.Delete(t.id)
for i := range t.clients {
reader := &t.clients[i]
reader.Close()
}
for i := range t.servers {
reader := &t.servers[i]
reader.Close()
}
}

View File

@@ -2,7 +2,9 @@ package tap
import (
"fmt"
"runtime"
"sync"
"time"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
@@ -23,6 +25,16 @@ type tcpStreamFactory struct {
Emitter api.Emitter
}
type tcpStreamWrapper struct {
stream *tcpStream
createdAt time.Time
}
var streams *sync.Map = &sync.Map{} // global
var streamId int64 = 0
var maxNumberOfGoroutines int
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
rlog.Debugf("* NEW: %s %s", net, transport)
fsmOptions := reassembly.TCPSimpleFSMOptions{
@@ -39,23 +51,32 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
props := factory.getStreamProps(srcIp, srcPort, dstIp, dstPort)
isTapTarget := props.isTapTarget
stream := &tcpStream{
net: net,
transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isTapTarget: isTapTarget,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(),
net: net,
transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isTapTarget: isTapTarget,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(),
superIdentifier: &api.SuperIdentifier{},
}
if stream.isTapTarget {
if runtime.NumGoroutine() > maxNumberOfGoroutines {
statsTracker.incDroppedTcpStreams()
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", statsTracker.appStats.DroppedTcpStreams, runtime.NumGoroutine())
return stream
}
streamId++
stream.id = streamId
for i, extension := range extensions {
counterPair := &api.CounterPair{
Request: 0,
Response: 0,
}
stream.clients = append(stream.clients, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
msgQueue: make(chan tcpReaderDataMsg),
superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: srcIp,
DstIP: dstIp,
@@ -71,8 +92,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
counterPair: counterPair,
})
stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
msgQueue: make(chan tcpReaderDataMsg),
superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
@@ -87,6 +109,12 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
emitter: factory.Emitter,
counterPair: counterPair,
})
streams.Store(stream.id, &tcpStreamWrapper{
stream: stream,
createdAt: time.Now(),
})
factory.wg.Add(2)
// Start reading from channel stream.reader.bytes
go stream.clients[i].run(&factory.wg)
@@ -117,7 +145,7 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, ds
}
return &streamProps{isTapTarget: false, isOutgoing: false}
} else {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%s", srcIP, dstIP, dstPort))
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort))
return &streamProps{isTapTarget: true}
}
}

View File

@@ -54,8 +54,8 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
const filterEntries = useCallback((entry) => {
if(methodsFilter.length > 0 && !methodsFilter.includes(entry.method.toLowerCase())) return;
if(pathFilter && entry.path?.toLowerCase()?.indexOf(pathFilter) === -1) return;
if(statusFilter.includes(StatusType.SUCCESS) && entry.statusCode >= 400) return;
if(statusFilter.includes(StatusType.ERROR) && entry.statusCode < 400) return;
if(statusFilter.includes(StatusType.SUCCESS) && entry.status_code >= 400) return;
if(statusFilter.includes(StatusType.ERROR) && entry.status_code < 400) return;
return entry;
},[methodsFilter, pathFilter, statusFilter])

View File

@@ -32,7 +32,7 @@ interface EntryDetailedProps {
export const formatSize = (n: number) => n > 1000 ? `${Math.round(n / 1000)}KB` : `${n} B`;
const EntryTitle: React.FC<any> = ({protocol, data}) => {
const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
const classes = useStyles();
const {response} = JSON.parse(data.entry);
@@ -40,7 +40,8 @@ const EntryTitle: React.FC<any> = ({protocol, data}) => {
return <div className={classes.entryTitle}>
<Protocol protocol={protocol} horizontal={true}/>
<div style={{right: "30px", position: "absolute", display: "flex"}}>
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(response.payload.bodySize)}</div>}
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(bodySize)}</div>}
<div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>
<div style={{opacity: 0.5}}>{'rulesMatched' in data ? data.rulesMatched?.length : '0'} Rules Applied</div>
</div>
</div>;
@@ -63,7 +64,12 @@ const EntrySummary: React.FC<any> = ({data}) => {
export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData}) => {
return <>
<EntryTitle protocol={entryData.protocol} data={entryData.data}/>
<EntryTitle
protocol={entryData.protocol}
data={entryData.data}
bodySize={entryData.bodySize}
elapsedTime={entryData.data.elapsedTime}
/>
{entryData.data && <EntrySummary data={entryData.data}/>}
<>
{entryData.data && <EntryViewer representation={entryData.representation} color={entryData.protocol.background_color}/>}

View File

@@ -2,7 +2,7 @@
.Entry
font-family: "Source Sans Pro", Lucida Grande, Tahoma, sans-serif
height: 100%
height: calc(100% - 70px)
width: 100%
h3,
@@ -44,6 +44,8 @@
border-top-right-radius: 0
.body
height: 100%
overflow-y: auto
background: $main-background-color
color: $blue-gray
border-radius: 4px