Compare commits

..

4 Commits

Author SHA1 Message Date
gadotroee
073b0b72d3 Remove fetch command (and direction) (#264) 2021-09-06 10:16:04 +03:00
gadotroee
c8705822b3 TRA-3658 - Fix analysis feature (#261) 2021-09-06 09:46:49 +03:00
M. Mert Yıldıran
d4436d9f15 Turn table and body strings to constants and move them to extension API (#262) 2021-09-05 06:44:16 +03:00
M. Mert Yıldıran
4e0ff74944 Fix body size, receive (elapsed time) and timestamps (#258)
* Fix the HTTP body size (it's not applicable to AMQP and Kafka)

* Fix the elapsed time

* Change JSON fields from snake_case to camelCase
2021-09-04 17:15:39 +03:00
31 changed files with 479 additions and 461 deletions

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"strings"
@@ -12,7 +11,7 @@ import (
"time"
)
func TestTapAndFetch(t *testing.T) {
func TestTap(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")
}
@@ -93,37 +92,6 @@ func TestTapAndFetch(t *testing.T) {
t.Errorf("%v", err)
return
}
fetchCmdArgs := getDefaultFetchCommandArgs()
fetchCmd := exec.Command(cliPath, fetchCmdArgs...)
t.Logf("running command: %v", fetchCmd.String())
if err := fetchCmd.Start(); err != nil {
t.Errorf("failed to start fetch command, err: %v", err)
return
}
harCheckFunc := func() error {
harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har")
if readFileErr != nil {
return fmt.Errorf("failed to read har file, err: %v", readFileErr)
}
harEntries, err := getEntriesFromHarBytes(harBytes)
if err != nil {
return fmt.Errorf("failed to get entries from har, err: %v", err)
}
if len(harEntries) == 0 {
return fmt.Errorf("unexpected har entries result - Expected more than 0 entries")
}
return nil
}
if err := retriesExecute(shortRetriesCount, harCheckFunc); err != nil {
t.Errorf("%v", err)
return
}
})
}
}

View File

@@ -76,13 +76,6 @@ func getDefaultTapNamespace() []string {
return []string{"-n", "mizu-tests"}
}
func getDefaultFetchCommandArgs() []string {
fetchCommand := "fetch"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{fetchCommand}, defaultCmdArgs...)
}
func getDefaultConfigCommandArgs() []string {
configCommand := "config"
defaultCmdArgs := getDefaultCommandArgs()
@@ -179,19 +172,6 @@ func cleanupCommand(cmd *exec.Cmd) error {
return nil
}
func getEntriesFromHarBytes(harBytes []byte) ([]interface{}, error) {
harInterface, convertErr := jsonBytesToInterface(harBytes)
if convertErr != nil {
return nil, convertErr
}
har := harInterface.(map[string]interface{})
harLog := har["log"].(map[string]interface{})
harEntries := harLog["entries"].([]interface{})
return harEntries, nil
}
func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
tapStatus := tapStatusInterface.(map[string]interface{})
podsInterface := tapStatus["pods"].([]interface{})

View File

@@ -1,7 +1,6 @@
# mizu agent
Agent for MIZU (API server and tapper)
Basic APIs:
* /fetch - retrieve traffic data
* /stats - retrieve statistics of collected data
* /viewer - web ui

View File

@@ -3,6 +3,7 @@ package controllers
import (
"encoding/json"
"fmt"
"github.com/google/martian/har"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
@@ -10,11 +11,9 @@ import (
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/google/martian/har"
"github.com/romana/rlog"
tapApi "github.com/up9inc/mizu/tap/api"
@@ -48,7 +47,7 @@ func GetEntries(c *gin.Context) {
Find(&entries)
if len(entries) > 0 && order == database.OrderDesc {
// the entries always order from oldest to newest so we should revers
// the entries always order from oldest to newest - we should reverse
utils.ReverseSlice(entries)
}
@@ -64,93 +63,6 @@ func GetEntries(c *gin.Context) {
c.JSON(http.StatusOK, baseEntries)
}
func GetHARs(c *gin.Context) {
entriesFilter := &models.HarFetchRequestQuery{}
order := database.OrderDesc
if err := c.BindQuery(entriesFilter); err != nil {
c.JSON(http.StatusBadRequest, err)
}
err := validation.Validate(entriesFilter)
if err != nil {
c.JSON(http.StatusBadRequest, err)
}
var timestampFrom, timestampTo int64
if entriesFilter.From < 0 {
timestampFrom = 0
} else {
timestampFrom = entriesFilter.From
}
if entriesFilter.To <= 0 {
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
} else {
timestampTo = entriesFilter.To
}
var entries []tapApi.MizuEntry
database.GetEntriesTable().
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
Find(&entries)
if len(entries) > 0 {
// the entries always order from oldest to newest so we should revers
utils.ReverseSlice(entries)
}
harsObject := map[string]*models.ExtendedHAR{}
for _, entryData := range entries {
var harEntry har.Entry
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
if entryData.ResolvedDestination != "" {
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, entryData.ResolvedDestination)
}
var fileName string
sourceOfEntry := entryData.ResolvedSource
if sourceOfEntry != "" {
// naively assumes the proper service source is http
sourceOfEntry = fmt.Sprintf("http://%s", sourceOfEntry)
//replace / from the file name cause they end up creating a corrupted folder
fileName = fmt.Sprintf("%s.har", strings.ReplaceAll(sourceOfEntry, "/", "_"))
} else {
fileName = "unknown_source.har"
}
if harOfSource, ok := harsObject[fileName]; ok {
harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry)
} else {
var entriesHar []*har.Entry
entriesHar = append(entriesHar, &harEntry)
harsObject[fileName] = &models.ExtendedHAR{
Log: &models.ExtendedLog{
Version: "1.2",
Creator: &models.ExtendedCreator{
Creator: &har.Creator{
Name: "mizu",
Version: "0.0.2",
},
},
Entries: entriesHar,
},
}
// leave undefined when no source is present, otherwise modeler assumes source is empty string ""
if sourceOfEntry != "" {
harsObject[fileName].Log.Creator.Source = &sourceOfEntry
}
}
}
retObj := map[string][]byte{}
for k, v := range harsObject {
bytesData, _ := json.Marshal(v)
retObj[k] = bytesData
}
buffer := utils.ZipData(retObj)
c.Data(http.StatusOK, "application/octet-stream", buffer.Bytes())
}
func UploadEntries(c *gin.Context) {
rlog.Infof("Upload entries - started\n")
@@ -202,15 +114,21 @@ func GetFullEntries(c *gin.Context) {
timestampTo = entriesFilter.To
}
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
result := make([]models.FullEntryDetails, 0)
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, nil)
result := make([]har.Entry, 0)
for _, data := range entriesArray {
harEntry := models.FullEntryDetails{}
if err := models.GetEntry(&data, &harEntry); err != nil {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(data.Entry), &pair); err != nil {
continue
}
result = append(result, harEntry)
harEntry, err := utils.NewEntry(&pair)
if err != nil {
continue
}
result = append(result, *harEntry)
}
c.JSON(http.StatusOK, result)
}
@@ -220,27 +138,12 @@ func GetEntry(c *gin.Context) {
Where(map[string]string{"entryId": c.Param("entryId")}).
First(&entryData)
fullEntry := models.FullEntryDetails{}
if err := models.GetEntry(&entryData, &fullEntry); err != nil {
c.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": true,
"msg": "Can't get entry details",
})
}
// FIXME: Fix the part below
// fullEntryWithPolicy := models.FullEntryWithPolicy{}
// if err := models.GetEntry(&entryData, &fullEntryWithPolicy); err != nil {
// c.JSON(http.StatusInternalServerError, map[string]interface{}{
// "error": true,
// "msg": "Can't get entry details",
// })
// }
extension := extensionsMap[entryData.ProtocolName]
protocol, representation, _ := extension.Dissector.Represent(&entryData)
protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData)
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
Protocol: protocol,
Representation: string(representation),
BodySize: bodySize,
Data: entryData,
})
}

View File

@@ -57,10 +57,16 @@ func initDataBase(databasePath string) *gorm.DB {
return temp
}
func GetEntriesFromDb(timestampFrom int64, timestampTo int64) []tapApi.MizuEntry {
func GetEntriesFromDb(timestampFrom int64, timestampTo int64, protocolName *string) []tapApi.MizuEntry {
order := OrderDesc
protocolNameCondition := "1 = 1"
if protocolName != nil {
protocolNameCondition = fmt.Sprintf("protocolKey = '%s'", *protocolName)
}
var entries []tapApi.MizuEntry
GetEntriesTable().
Where(protocolNameCondition).
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
Find(&entries)

View File

@@ -2,9 +2,7 @@ package models
import (
"encoding/json"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/rules"
"mizuserver/pkg/utils"
@@ -17,47 +15,14 @@ func GetEntry(r *tapApi.MizuEntry, v tapApi.DataUnmarshaler) error {
return v.UnmarshalData(r)
}
func NewApplicableRules(status bool, latency int64, number int) tapApi.ApplicableRules {
ar := tapApi.ApplicableRules{}
ar.Status = status
ar.Latency = latency
ar.NumberOfRules = number
return ar
}
type FullEntryDetails struct {
har.Entry
}
type FullEntryDetailsExtra struct {
har.Entry
}
func (fed *FullEntryDetails) UnmarshalData(entry *tapApi.MizuEntry) error {
if err := json.Unmarshal([]byte(entry.Entry), &fed.Entry); err != nil {
return err
}
if entry.ResolvedDestination != "" {
fed.Entry.Request.URL = utils.SetHostname(fed.Entry.Request.URL, entry.ResolvedDestination)
}
return nil
}
func (fedex *FullEntryDetailsExtra) UnmarshalData(entry *tapApi.MizuEntry) error {
if err := json.Unmarshal([]byte(entry.Entry), &fedex.Entry); err != nil {
return err
}
if entry.ResolvedSource != "" {
fedex.Entry.Request.Headers = append(fedex.Request.Headers, har.Header{Name: "x-mizu-source", Value: entry.ResolvedSource})
}
if entry.ResolvedDestination != "" {
fedex.Entry.Request.Headers = append(fedex.Request.Headers, har.Header{Name: "x-mizu-destination", Value: entry.ResolvedDestination})
fedex.Entry.Request.URL = utils.SetHostname(fedex.Entry.Request.URL, entry.ResolvedDestination)
}
return nil
}
// TODO: until we fixed the Rules feature
//func NewApplicableRules(status bool, latency int64, number int) tapApi.ApplicableRules {
// ar := tapApi.ApplicableRules{}
// ar.Status = status
// ar.Latency = latency
// ar.NumberOfRules = number
// return ar
//}
type EntriesFilter struct {
Limit int `form:"limit" validate:"required,min=1,max=200"`
@@ -147,9 +112,15 @@ type FullEntryWithPolicy struct {
}
func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error {
if err := json.Unmarshal([]byte(entry.Entry), &fewp.Entry); err != nil {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(entry.Entry), &pair); err != nil {
return err
}
harEntry, err := utils.NewEntry(&pair)
if err != nil {
return err
}
fewp.Entry = *harEntry
_, resultPolicyToSend := rules.MatchRequestPolicy(fewp.Entry, entry.Service)
fewp.RulesMatched = resultPolicyToSend
@@ -157,9 +128,10 @@ func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error {
return nil
}
func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules {
numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend, numberOfRules)
ar := NewApplicableRules(statusPolicyToSend, latency, numberOfRules)
return ar
}
// TODO: until we fixed the Rules feature
//func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules {
// numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
// statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend, numberOfRules)
// ar := NewApplicableRules(statusPolicyToSend, latency, numberOfRules)
// return ar
//}

View File

@@ -15,8 +15,6 @@ func EntriesRoutes(ginApp *gin.Engine) {
routeGroup.GET("/uploadEntries", controllers.UploadEntries)
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
routeGroup.GET("/har", controllers.GetHARs)
routeGroup.GET("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
routeGroup.GET("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB

View File

@@ -5,12 +5,14 @@ import (
"compress/zlib"
"encoding/json"
"fmt"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
tapApi "github.com/up9inc/mizu/tap/api"
"io/ioutil"
"log"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/utils"
"net/http"
"net/url"
"strings"
@@ -129,21 +131,33 @@ func UploadEntriesImpl(token string, model string, envPrefix string, sleepInterv
for {
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
rlog.Infof("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
protocolFilter := "http"
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, &protocolFilter)
if len(entriesArray) > 0 {
fullEntriesExtra := make([]models.FullEntryDetailsExtra, 0)
result := make([]har.Entry, 0)
for _, data := range entriesArray {
harEntry := models.FullEntryDetailsExtra{}
if err := models.GetEntry(&data, &harEntry); err != nil {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(data.Entry), &pair); err != nil {
continue
}
fullEntriesExtra = append(fullEntriesExtra, harEntry)
harEntry, err := utils.NewEntry(&pair)
if err != nil {
continue
}
if data.ResolvedSource != "" {
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-source", Value: data.ResolvedSource})
}
if data.ResolvedDestination != "" {
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-destination", Value: data.ResolvedDestination})
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, data.ResolvedDestination)
}
result = append(result, *harEntry)
}
rlog.Infof("About to upload %v entries\n", len(fullEntriesExtra))
body, jMarshalErr := json.Marshal(fullEntriesExtra)
rlog.Infof("About to upload %v entries\n", len(result))
body, jMarshalErr := json.Marshal(result)
if jMarshalErr != nil {
analyzeInformation.Reset()
rlog.Infof("Stopping analyzing")

259
agent/pkg/utils/har.go Normal file
View File

@@ -0,0 +1,259 @@
package utils
import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/google/martian/har"
"github.com/up9inc/mizu/tap"
"github.com/up9inc/mizu/tap/api"
)
// Keep it because we might want cookies in the future
//func BuildCookies(rawCookies []interface{}) []har.Cookie {
// cookies := make([]har.Cookie, 0, len(rawCookies))
//
// for _, cookie := range rawCookies {
// c := cookie.(map[string]interface{})
// expiresStr := ""
// if c["expires"] != nil {
// expiresStr = c["expires"].(string)
// }
// expires, _ := time.Parse(time.RFC3339, expiresStr)
// httpOnly := false
// if c["httponly"] != nil {
// httpOnly, _ = strconv.ParseBool(c["httponly"].(string))
// }
// secure := false
// if c["secure"] != nil {
// secure, _ = strconv.ParseBool(c["secure"].(string))
// }
// path := ""
// if c["path"] != nil {
// path = c["path"].(string)
// }
// domain := ""
// if c["domain"] != nil {
// domain = c["domain"].(string)
// }
//
// cookies = append(cookies, har.Cookie{
// Name: c["name"].(string),
// Value: c["value"].(string),
// Path: path,
// Domain: domain,
// HTTPOnly: httpOnly,
// Secure: secure,
// Expires: expires,
// Expires8601: expiresStr,
// })
// }
//
// return cookies
//}
func BuildHeaders(rawHeaders []interface{}) ([]har.Header, string, string, string, string, string) {
var host, scheme, authority, path, status string
headers := make([]har.Header, 0, len(rawHeaders))
for _, header := range rawHeaders {
h := header.(map[string]interface{})
headers = append(headers, har.Header{
Name: h["name"].(string),
Value: h["value"].(string),
})
if h["name"] == "Host" {
host = h["value"].(string)
}
if h["name"] == ":authority" {
authority = h["value"].(string)
}
if h["name"] == ":scheme" {
scheme = h["value"].(string)
}
if h["name"] == ":path" {
path = h["value"].(string)
}
if h["name"] == ":status" {
path = h["value"].(string)
}
}
return headers, host, scheme, authority, path, status
}
func BuildPostParams(rawParams []interface{}) []har.Param {
params := make([]har.Param, 0, len(rawParams))
for _, param := range rawParams {
p := param.(map[string]interface{})
name := ""
if p["name"] != nil {
name = p["name"].(string)
}
value := ""
if p["value"] != nil {
value = p["value"].(string)
}
fileName := ""
if p["fileName"] != nil {
fileName = p["fileName"].(string)
}
contentType := ""
if p["contentType"] != nil {
contentType = p["contentType"].(string)
}
params = append(params, har.Param{
Name: name,
Value: value,
Filename: fileName,
ContentType: contentType,
})
}
return params
}
func NewRequest(request *api.GenericMessage) (harRequest *har.Request, err error) {
reqDetails := request.Payload.(map[string]interface{})["details"].(map[string]interface{})
headers, host, scheme, authority, path, _ := BuildHeaders(reqDetails["headers"].([]interface{}))
cookies := make([]har.Cookie, 0) // BuildCookies(reqDetails["cookies"].([]interface{}))
postData, _ := reqDetails["postData"].(map[string]interface{})
mimeType, _ := postData["mimeType"]
if mimeType == nil || len(mimeType.(string)) == 0 {
mimeType = "text/html"
}
text, _ := postData["text"]
postDataText := ""
if text != nil {
postDataText = text.(string)
}
queryString := make([]har.QueryString, 0)
for _, _qs := range reqDetails["queryString"].([]interface{}) {
qs := _qs.(map[string]interface{})
queryString = append(queryString, har.QueryString{
Name: qs["name"].(string),
Value: qs["value"].(string),
})
}
url := fmt.Sprintf("http://%s%s", host, reqDetails["url"].(string))
if strings.HasPrefix(mimeType.(string), "application/grpc") {
url = fmt.Sprintf("%s://%s%s", scheme, authority, path)
}
harParams := make([]har.Param, 0)
if postData["params"] != nil {
harParams = BuildPostParams(postData["params"].([]interface{}))
}
harRequest = &har.Request{
Method: reqDetails["method"].(string),
URL: url,
HTTPVersion: reqDetails["httpVersion"].(string),
HeadersSize: -1,
BodySize: int64(bytes.NewBufferString(postDataText).Len()),
QueryString: queryString,
Headers: headers,
Cookies: cookies,
PostData: &har.PostData{
MimeType: mimeType.(string),
Params: harParams,
Text: postDataText,
},
}
return
}
func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err error) {
resDetails := response.Payload.(map[string]interface{})["details"].(map[string]interface{})
headers, _, _, _, _, _status := BuildHeaders(resDetails["headers"].([]interface{}))
cookies := make([]har.Cookie, 0) // BuildCookies(resDetails["cookies"].([]interface{}))
content, _ := resDetails["content"].(map[string]interface{})
mimeType, _ := content["mimeType"]
if mimeType == nil || len(mimeType.(string)) == 0 {
mimeType = "text/html"
}
encoding, _ := content["encoding"]
text, _ := content["text"]
bodyText := ""
if text != nil {
bodyText = text.(string)
}
harContent := &har.Content{
Encoding: encoding.(string),
MimeType: mimeType.(string),
Text: []byte(bodyText),
Size: int64(len(bodyText)),
}
status := int(resDetails["status"].(float64))
if strings.HasPrefix(mimeType.(string), "application/grpc") {
status, err = strconv.Atoi(_status)
if err != nil {
tap.SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response status to int for HAR")
}
}
harResponse = &har.Response{
HTTPVersion: resDetails["httpVersion"].(string),
Status: status,
StatusText: resDetails["statusText"].(string),
HeadersSize: -1,
BodySize: int64(bytes.NewBufferString(bodyText).Len()),
Headers: headers,
Cookies: cookies,
Content: harContent,
}
return
}
func NewEntry(pair *api.RequestResponsePair) (*har.Entry, error) {
harRequest, err := NewRequest(&pair.Request)
if err != nil {
tap.SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting request to HAR")
}
harResponse, err := NewResponse(&pair.Response)
if err != nil {
fmt.Printf("err: %+v\n", err)
tap.SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response to HAR")
}
totalTime := pair.Response.CaptureTime.Sub(pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if totalTime < 1 {
totalTime = 1
}
harEntry := har.Entry{
StartedDateTime: pair.Request.CaptureTime,
Time: totalTime,
Request: harRequest,
Response: harResponse,
Cache: &har.Cache{},
Timings: &har.Timings{
Send: -1,
Wait: -1,
Receive: totalTime,
},
}
return &harEntry, nil
}

View File

@@ -1,7 +1,6 @@
package apiserver
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
@@ -131,28 +130,6 @@ func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, er
return generalStats, nil
}
func (provider *apiServerProvider) GetHars(fromTimestamp int, toTimestamp int) (*zip.Reader, error) {
if !provider.isReady {
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
}
resp, err := http.Get(fmt.Sprintf("%s/api/har?from=%v&to=%v", provider.url, fromTimestamp, toTimestamp))
if err != nil {
return nil, fmt.Errorf("failed getting har from api server %w", err)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed reading hars %w", err)
}
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
if err != nil {
return nil, fmt.Errorf("failed craeting zip reader %w", err)
}
return zipReader, nil
}
func (provider *apiServerProvider) GetVersion() (string, error) {
if !provider.isReady {

View File

@@ -1,46 +0,0 @@
package cmd
import (
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
)
var fetchCmd = &cobra.Command{
Use: "fetch",
Short: "Download recorded traffic to files",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("fetch", config.Config.Fetch)
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, make sure one running")
return nil
}
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
return err
} else if !isCompatible {
return nil
}
RunMizuFetch()
return nil
},
}
func init() {
rootCmd.AddCommand(fetchCmd)
defaultFetchConfig := configStructs.FetchConfig{}
defaults.Set(&defaultFetchConfig)
fetchCmd.Flags().StringP(configStructs.DirectoryFetchName, "d", defaultFetchConfig.Directory, "Provide a custom directory for fetched entries")
fetchCmd.Flags().Int(configStructs.FromTimestampFetchName, defaultFetchConfig.FromTimestamp, "Custom start timestamp for fetched entries")
fetchCmd.Flags().Int(configStructs.ToTimestampFetchName, defaultFetchConfig.ToTimestamp, "Custom end timestamp fetched entries")
fetchCmd.Flags().Uint16P(configStructs.GuiPortFetchName, "p", defaultFetchConfig.GuiPort, "Provide a custom port for the web interface webserver")
}

View File

@@ -1,25 +0,0 @@
package cmd
import (
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils"
)
func RunMizuFetch() {
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
}
zipReader, err := apiserver.Provider.GetHars(config.Config.Fetch.FromTimestamp, config.Config.Fetch.ToTimestamp)
if err != nil {
logger.Log.Errorf("Failed fetch data from API server %v", err)
return
}
if err := fsUtils.Unzip(zipReader, config.Config.Fetch.Directory); err != nil {
logger.Log.Debugf("[ERROR] failed unzip %v", err)
}
}

View File

@@ -64,7 +64,6 @@ func init() {
tapCmd.Flags().StringSliceP(configStructs.PlainTextFilterRegexesTapName, "r", defaultTapConfig.PlainTextFilterRegexes, "List of regex expressions that are used to filter matching values from text/plain http bodies")
tapCmd.Flags().Bool(configStructs.DisableRedactionTapName, defaultTapConfig.DisableRedaction, "Disables redaction of potentially sensitive request/response headers and body values")
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
tapCmd.Flags().String(configStructs.DirectionTapName, defaultTapConfig.Direction, "Record traffic that goes in this direction (relative to the tapped pod): in/any")
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file with policy rules")
}

View File

@@ -18,7 +18,6 @@ const (
type ConfigStruct struct {
Tap configStructs.TapConfig `yaml:"tap"`
Fetch configStructs.FetchConfig `yaml:"fetch"`
Version configStructs.VersionConfig `yaml:"version"`
View configStructs.ViewConfig `yaml:"view"`
Logs configStructs.LogsConfig `yaml:"logs"`

View File

@@ -1,15 +0,0 @@
package configStructs
const (
DirectoryFetchName = "directory"
FromTimestampFetchName = "from"
ToTimestampFetchName = "to"
GuiPortFetchName = "gui-port"
)
type FetchConfig struct {
Directory string `yaml:"directory" default:"."`
FromTimestamp int `yaml:"from" default:"0"`
ToTimestamp int `yaml:"to" default:"0"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
}

View File

@@ -3,10 +3,8 @@ package configStructs
import (
"errors"
"fmt"
"regexp"
"strings"
"github.com/up9inc/mizu/shared/units"
"regexp"
)
const (
@@ -17,7 +15,6 @@ const (
PlainTextFilterRegexesTapName = "regex-masking"
DisableRedactionTapName = "no-redact"
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
DirectionTapName = "direction"
DryRunTapName = "dry-run"
EnforcePolicyFile = "test-rules"
)
@@ -34,7 +31,6 @@ type TapConfig struct {
HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
Direction string `yaml:"direction" default:"in"`
DryRun bool `yaml:"dry-run" default:"false"`
EnforcePolicyFile string `yaml:"test-rules"`
ApiServerResources Resources `yaml:"api-server-resources"`
@@ -69,10 +65,5 @@ func (config *TapConfig) Validate() error {
return errors.New(fmt.Sprintf("Could not parse --%s value %s", HumanMaxEntriesDBSizeTapName, config.HumanMaxEntriesDBSize))
}
directionLowerCase := strings.ToLower(config.Direction)
if directionLowerCase != "any" && directionLowerCase != "in" {
return errors.New(fmt.Sprintf("%s is not a valid value for flag --%s. Acceptable values are in/any.", config.Direction, DirectionTapName))
}
return nil
}

View File

@@ -68,13 +68,17 @@ type OutputChannelItem struct {
Pair *RequestResponsePair
}
type SuperTimer struct {
CaptureTime time.Time
}
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, emitter Emitter) error
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, emitter Emitter) error
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
Summarize(entry *MizuEntry) *BaseEntryDetails
Represent(entry *MizuEntry) (Protocol, []byte, error)
Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)
}
type Emitting struct {
@@ -103,6 +107,7 @@ type MizuEntry struct {
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
@@ -117,6 +122,7 @@ type MizuEntry struct {
type MizuEntryWrapper struct {
Protocol Protocol `json:"protocol"`
Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data MizuEntry `json:"data"`
}
@@ -163,3 +169,8 @@ func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error {
bed.IsOutgoing = entry.IsOutgoing
return nil
}
const (
TABLE string = "table"
BODY string = "body"
)

View File

@@ -93,10 +93,10 @@ type AMQPWrapper struct {
Details interface{} `json:"details"`
}
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
CaptureTime: captureTime,
Payload: AMQPPayload{
Data: &AMQPWrapper{
Method: method,
@@ -107,7 +107,7 @@ func emitAMQP(event interface{}, _type string, method string, connectionInfo *ap
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Timestamp: captureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
@@ -219,7 +219,7 @@ func representProperties(properties map[string]interface{}, rep []interface{}) (
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Properties",
"data": string(props),
})
@@ -249,7 +249,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -267,7 +267,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headersMarshaled),
})
@@ -275,7 +275,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
if event["Body"] != nil {
rep = append(rep, map[string]string{
"type": "body",
"type": api.BODY,
"title": "Body",
"encoding": "base64",
"mime_type": contentType,
@@ -326,7 +326,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -344,7 +344,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headersMarshaled),
})
@@ -352,7 +352,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
if event["Body"] != nil {
rep = append(rep, map[string]string{
"type": "body",
"type": api.BODY,
"title": "Body",
"encoding": "base64",
"mime_type": contentType,
@@ -393,7 +393,7 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -408,7 +408,7 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})
@@ -451,7 +451,7 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -466,7 +466,7 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})
@@ -497,7 +497,7 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -524,7 +524,7 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Server Properties",
"data": string(headersMarshaled),
})
@@ -555,7 +555,7 @@ func representConnectionClose(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -585,7 +585,7 @@ func representQueueBind(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -600,7 +600,7 @@ func representQueueBind(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})
@@ -639,7 +639,7 @@ func representBasicConsume(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -654,7 +654,7 @@ func representBasicConsume(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})

View File

@@ -41,7 +41,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
r := AmqpReader{b}
var remaining int
@@ -110,10 +110,10 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Body = f.Body
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, emitter)
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, emitter)
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
default:
}
@@ -134,7 +134,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, emitter)
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
@@ -146,7 +146,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, emitter)
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
@@ -165,7 +165,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, emitter)
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
@@ -178,7 +178,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, emitter)
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
@@ -188,7 +188,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Mechanisms: m.Mechanisms,
Locales: m.Locales,
}
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, emitter)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
@@ -197,7 +197,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
ClassId: m.ClassId,
MethodId: m.MethodId,
}
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, emitter)
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
default:
@@ -264,6 +264,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: 0,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
@@ -300,7 +301,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
p = protocol
bodySize = 0
var root map[string]interface{}
json.Unmarshal([]byte(entry.Entry), &root)
representation := make(map[string]interface{}, 0)
@@ -334,8 +337,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
break
}
representation["request"] = repRequest
object, err := json.Marshal(representation)
return protocol, object, err
object, err = json.Marshal(representation)
return
}
var Dissector dissecting

View File

@@ -7,14 +7,13 @@ import (
"io"
"io/ioutil"
"net/http"
"time"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
)
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter api.Emitter) error {
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) error {
streamID, messageHTTP1, err := grpcAssembler.readMessage()
if err != nil {
return err
@@ -32,7 +31,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
tcpID.DstPort,
streamID,
)
item = reqResMatcher.registerRequest(ident, &messageHTTP1, time.Now())
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -51,7 +50,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
tcpID.SrcPort,
streamID,
)
item = reqResMatcher.registerResponse(ident, &messageHTTP1, time.Now())
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
@@ -71,7 +70,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
req, err := http.ReadRequest(b)
if err != nil {
// log.Println("Error reading stream:", err)
@@ -99,7 +98,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
tcpID.DstPort,
counterPair.Request,
)
item := reqResMatcher.registerRequest(ident, req, time.Now())
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -113,7 +112,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
return nil
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
res, err := http.ReadResponse(b, nil)
if err != nil {
// log.Println("Error reading stream:", err)
@@ -149,7 +148,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
tcpID.SrcPort,
counterPair.Response,
)
item := reqResMatcher.registerResponse(ident, res, time.Now())
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,

View File

@@ -7,6 +7,7 @@ import (
"io"
"log"
"net/url"
"time"
"github.com/romana/rlog"
@@ -59,7 +60,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
if err != nil {
@@ -79,7 +80,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
success := false
for {
if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, emitter)
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -88,7 +89,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
success = true
} else if isClient {
err = handleHTTP1ClientStream(b, tcpID, counterPair, emitter)
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -97,7 +98,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
success = true
} else {
err = handleHTTP1ServerStream(b, tcpID, counterPair, emitter)
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -161,6 +162,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
} else if resolvedSource != "" {
service = SetHostname(service, resolvedSource)
}
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: protocol.Name,
@@ -173,6 +176,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: path,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
@@ -214,9 +218,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func representRequest(request map[string]interface{}) []interface{} {
repRequest := make([]interface{}, 0)
func representRequest(request map[string]interface{}) (repRequest []interface{}) {
details, _ := json.Marshal([]map[string]string{
{
"name": "Method",
@@ -232,28 +234,28 @@ func representRequest(request map[string]interface{}) []interface{} {
},
})
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
headers, _ := json.Marshal(request["headers"].([]interface{}))
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headers),
})
cookies, _ := json.Marshal(request["cookies"].([]interface{}))
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Cookies",
"data": string(cookies),
})
queryString, _ := json.Marshal(request["queryString"].([]interface{}))
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Query String",
"data": string(queryString),
})
@@ -266,7 +268,7 @@ func representRequest(request map[string]interface{}) []interface{} {
text, _ := postData["text"]
if text != nil {
repRequest = append(repRequest, map[string]string{
"type": "body",
"type": api.BODY,
"title": "POST Data (text/plain)",
"encoding": "",
"mime_type": mimeType.(string),
@@ -285,13 +287,13 @@ func representRequest(request map[string]interface{}) []interface{} {
},
})
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "POST Data (multipart/form-data)",
"data": string(multipart),
})
} else {
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "POST Data (application/x-www-form-urlencoded)",
"data": string(params),
})
@@ -299,11 +301,13 @@ func representRequest(request map[string]interface{}) []interface{} {
}
}
return repRequest
return
}
func representResponse(response map[string]interface{}) []interface{} {
repResponse := make([]interface{}, 0)
func representResponse(response map[string]interface{}) (repResponse []interface{}, bodySize int64) {
repResponse = make([]interface{}, 0)
bodySize = int64(response["bodySize"].(float64))
details, _ := json.Marshal([]map[string]string{
{
@@ -316,25 +320,25 @@ func representResponse(response map[string]interface{}) []interface{} {
},
{
"name": "Body Size",
"value": fmt.Sprintf("%g bytes", response["bodySize"].(float64)),
"value": fmt.Sprintf("%d bytes", bodySize),
},
})
repResponse = append(repResponse, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
headers, _ := json.Marshal(response["headers"].([]interface{}))
repResponse = append(repResponse, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headers),
})
cookies, _ := json.Marshal(response["cookies"].([]interface{}))
repResponse = append(repResponse, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Cookies",
"data": string(cookies),
})
@@ -348,7 +352,7 @@ func representResponse(response map[string]interface{}) []interface{} {
text, _ := content["text"]
if text != nil {
repResponse = append(repResponse, map[string]string{
"type": "body",
"type": api.BODY,
"title": "Body",
"encoding": encoding.(string),
"mime_type": mimeType.(string),
@@ -356,11 +360,10 @@ func representResponse(response map[string]interface{}) []interface{} {
})
}
return repResponse
return
}
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
var p api.Protocol
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
if entry.ProtocolVersion == "2.0" {
p = http2Protocol
} else {
@@ -374,11 +377,11 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
repRequest := representRequest(reqDetails)
repResponse := representResponse(resDetails)
repResponse, bodySize := representResponse(resDetails)
representation["request"] = repRequest
representation["response"] = repResponse
object, err := json.Marshal(representation)
return p, object, err
object, err = json.Marshal(representation)
return
}
var Dissector dissecting

View File

@@ -85,7 +85,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.GenericMessage, responseHTTPMessage *api.GenericMessage) *api.OutputChannelItem {
return &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Timestamp: requestHTTPMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: nil,
Pair: &api.RequestResponsePair{
Request: *requestHTTPMessage,

View File

@@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"strconv"
"github.com/up9inc/mizu/tap/api"
)
type KafkaPayload struct {
@@ -48,7 +50,7 @@ func representRequestHeader(data map[string]interface{}, rep []interface{}) []in
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Request Header",
"data": string(requestHeader),
})
@@ -64,7 +66,7 @@ func representResponseHeader(data map[string]interface{}, rep []interface{}) []i
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Response Header",
"data": string(requestHeader),
})
@@ -114,7 +116,7 @@ func representMetadataRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -173,7 +175,7 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -206,7 +208,7 @@ func representApiVersionsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -244,7 +246,7 @@ func representApiVersionsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -287,7 +289,7 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -317,7 +319,7 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -404,7 +406,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -450,7 +452,7 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -476,7 +478,7 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -506,7 +508,7 @@ func representListOffsetsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -540,7 +542,7 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -570,7 +572,7 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -609,7 +611,7 @@ func representDeleteTopicsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -639,7 +641,7 @@ func representDeleteTopicsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/up9inc/mizu/tap/api"
)
@@ -37,15 +38,15 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
for {
if isClient {
_, _, err := ReadRequest(b, tcpID)
_, _, err := ReadRequest(b, tcpID, superTimer)
if err != nil {
return err
}
} else {
err := ReadResponse(b, tcpID, emitter)
err := ReadResponse(b, tcpID, superTimer, emitter)
if err != nil {
return err
}
@@ -131,6 +132,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
}
request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: _protocol.Name,
@@ -143,6 +145,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
@@ -178,7 +181,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
p = _protocol
bodySize = 0
var root map[string]interface{}
json.Unmarshal([]byte(entry.Entry), &root)
representation := make(map[string]interface{}, 0)
@@ -224,8 +229,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
representation["request"] = repRequest
representation["response"] = repResponse
object, err := json.Marshal(representation)
return _protocol, object, err
object, err = json.Marshal(representation)
return
}
var Dissector dissecting

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"reflect"
"time"
"github.com/up9inc/mizu/tap/api"
)
@@ -15,9 +16,10 @@ type Request struct {
CorrelationID int32
ClientID string
Payload interface{}
CaptureTime time.Time
}
func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -213,6 +215,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
ApiVersion: apiVersion,
CorrelationID: correlationID,
ClientID: clientID,
CaptureTime: superTimer.CaptureTime,
Payload: payload,
}

View File

@@ -13,9 +13,10 @@ type Response struct {
Size int32
CorrelationID int32
Payload interface{}
CaptureTime time.Time
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -39,6 +40,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
Size: size,
CorrelationID: correlationID,
Payload: payload,
CaptureTime: superTimer.CaptureTime,
}
key := fmt.Sprintf(
@@ -258,12 +260,12 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
item := &api.OutputChannelItem{
Protocol: _protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Timestamp: reqResPair.Request.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
CaptureTime: reqResPair.Request.CaptureTime,
Payload: KafkaPayload{
Data: &KafkaWrapper{
Method: apiNames[apiKey],
@@ -274,7 +276,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
},
Response: api.GenericMessage{
IsRequest: false,
CaptureTime: time.Now(),
CaptureTime: reqResPair.Response.CaptureTime,
Payload: KafkaPayload{
Data: &KafkaWrapper{
Method: apiNames[apiKey],

View File

@@ -51,7 +51,7 @@ type tcpReader struct {
isOutgoing bool
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
captureTime time.Time
superTimer *api.SuperTimer
parent *tcpStream
messageCount uint
packetsSeen uint
@@ -69,7 +69,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
msg, ok = <-h.msgQueue
h.data = msg.bytes
h.captureTime = msg.timestamp
h.superTimer.CaptureTime = msg.timestamp
if len(h.data) > 0 {
h.packetsSeen += 1
}
@@ -96,7 +96,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.emitter)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.emitter)
if err != nil {
io.Copy(ioutil.Discard, b)
}

View File

@@ -144,13 +144,14 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
// This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object
statsTracker.incReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
if dir == reassembly.TCPDirClientToServer {
for _, reader := range t.clients {
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
}
} else {
for _, reader := range t.servers {
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
}
}
}

View File

@@ -54,8 +54,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
Response: 0,
}
stream.clients = append(stream.clients, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
msgQueue: make(chan tcpReaderDataMsg),
superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: srcIp,
DstIP: dstIp,
@@ -71,8 +72,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
counterPair: counterPair,
})
stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
msgQueue: make(chan tcpReaderDataMsg),
superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),

View File

@@ -32,7 +32,7 @@ interface EntryDetailedProps {
export const formatSize = (n: number) => n > 1000 ? `${Math.round(n / 1000)}KB` : `${n} B`;
const EntryTitle: React.FC<any> = ({protocol, data}) => {
const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
const classes = useStyles();
const {response} = JSON.parse(data.entry);
@@ -40,7 +40,8 @@ const EntryTitle: React.FC<any> = ({protocol, data}) => {
return <div className={classes.entryTitle}>
<Protocol protocol={protocol} horizontal={true}/>
<div style={{right: "30px", position: "absolute", display: "flex"}}>
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(response.payload.bodySize)}</div>}
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(bodySize)}</div>}
<div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>
<div style={{opacity: 0.5}}>{'rulesMatched' in data ? data.rulesMatched?.length : '0'} Rules Applied</div>
</div>
</div>;
@@ -63,7 +64,12 @@ const EntrySummary: React.FC<any> = ({data}) => {
export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData}) => {
return <>
<EntryTitle protocol={entryData.protocol} data={entryData.data}/>
<EntryTitle
protocol={entryData.protocol}
data={entryData.data}
bodySize={entryData.bodySize}
elapsedTime={entryData.data.elapsedTime}
/>
{entryData.data && <EntrySummary data={entryData.data}/>}
<>
{entryData.data && <EntryViewer representation={entryData.representation} color={entryData.protocol.background_color}/>}

View File

@@ -2,7 +2,7 @@
.Entry
font-family: "Source Sans Pro", Lucida Grande, Tahoma, sans-serif
height: 100%
height: calc(100% - 70px)
width: 100%
h3,
@@ -44,6 +44,8 @@
border-top-right-radius: 0
.body
height: 100%
overflow-y: auto
background: $main-background-color
color: $blue-gray
border-radius: 4px