From c668680f54bb7df3fc44f113d875d265b90872cd Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Fri, 20 Aug 2021 21:34:44 +0300 Subject: [PATCH] Add `Analyze` method to the `Dissector` interface and `MizuEntry` to the extension API --- agent/pkg/api/main.go | 45 ++++------------- agent/pkg/controllers/entries_controller.go | 27 +++++----- agent/pkg/database/main.go | 24 ++++----- agent/pkg/database/size_enforcer.go | 15 +++--- agent/pkg/models/models.go | 50 ++----------------- tap/api/api.go | 47 +++++++++++++++-- tap/extensions/amqp/main.go | 7 ++- tap/extensions/http/main.go | 42 +++++++++++++--- tap/extensions/kafka/main.go | 7 ++- ui/src/components/HarEntryDetailed.tsx | 4 +- .../HarEntryViewer/HAREntryViewer.tsx | 22 ++++---- 11 files changed, 152 insertions(+), 138 deletions(-) diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index b0e36b792..3f09b1f55 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -125,8 +125,13 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension // } else { // rlog.Errorf("Error when creating HTTP entry") // } - baseEntry := extension.Dissector.Summarize(item) + resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo) + mizuEntry := extension.Dissector.Analyze(item, primitive.NewObjectID().Hex(), resolvedSource, resolvedDestionation) + baseEntry := extension.Dissector.Summarize(mizuEntry) fmt.Printf("baseEntry: %+v\n", baseEntry) + fmt.Printf("mizuEntry: %+v\n", mizuEntry) + mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry) + database.CreateEntry(mizuEntry) baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(baseEntry) BroadcastToBrowserClients(baseEntryBytes) } @@ -139,14 +144,7 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) { } } -func saveHarToDb(entry *har.Entry, connectionInfo *tapApi.ConnectionInfo) { - entryBytes, _ := json.Marshal(entry) - serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL) - entryId := primitive.NewObjectID().Hex() - var ( - resolvedSource string - resolvedDestination string - ) +func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) { if k8sResolver != nil { unresolvedSource := connectionInfo.ClientIP resolvedSource = k8sResolver.Resolve(unresolvedSource) @@ -165,32 +163,7 @@ func saveHarToDb(entry *har.Entry, connectionInfo *tapApi.ConnectionInfo) { } } } - - mizuEntry := models.MizuEntry{ - EntryId: entryId, - Entry: string(entryBytes), // simple way to store it and not convert to bytes - Service: serviceName, - Url: entry.Request.URL, - Path: urlPath, - Method: entry.Request.Method, - Status: entry.Response.Status, - RequestSenderIp: connectionInfo.ClientIP, - Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond), - ResolvedSource: resolvedSource, - ResolvedDestination: resolvedDestination, - IsOutgoing: connectionInfo.IsOutgoing, - } - mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry) - database.CreateEntry(&mizuEntry) - - // baseEntry := models.BaseEntryDetails{} - // if err := models.GetEntry(&mizuEntry, &baseEntry); err != nil { - // return - // } - // baseEntry.Rules = models.RunValidationRulesState(*entry, serviceName) - // baseEntry.Latency = entry.Timings.Receive - // baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry) - // BroadcastToBrowserClients(baseEntryBytes) + return resolvedSource, resolvedDestination } func getServiceNameFromUrl(inputUrl string) (string, string) { @@ -204,7 +177,7 @@ func CheckIsServiceIP(address string) bool { } // gives a rough estimate of the size this will take up in the db, good enough for maintaining db size limit accurately -func getEstimatedEntrySizeBytes(mizuEntry models.MizuEntry) int { +func getEstimatedEntrySizeBytes(mizuEntry *tapApi.MizuEntry) int { sizeBytes := len(mizuEntry.Entry) sizeBytes += len(mizuEntry.EntryId) sizeBytes += len(mizuEntry.Service) diff --git a/agent/pkg/controllers/entries_controller.go b/agent/pkg/controllers/entries_controller.go index 9c6998446..1d6568b24 100644 --- a/agent/pkg/controllers/entries_controller.go +++ b/agent/pkg/controllers/entries_controller.go @@ -16,6 +16,8 @@ import ( "github.com/gin-gonic/gin" "github.com/google/martian/har" "github.com/romana/rlog" + + tapApi "github.com/up9inc/mizu/tap/api" ) func GetEntries(c *gin.Context) { @@ -31,7 +33,7 @@ func GetEntries(c *gin.Context) { order := database.OperatorToOrderMapping[entriesFilter.Operator] operatorSymbol := database.OperatorToSymbolMapping[entriesFilter.Operator] - var entries []models.MizuEntry + var entries []tapApi.MizuEntry database.GetEntriesTable(). Order(fmt.Sprintf("timestamp %s", order)). Where(fmt.Sprintf("timestamp %s %v", operatorSymbol, entriesFilter.Timestamp)). @@ -80,7 +82,7 @@ func GetHARs(c *gin.Context) { timestampTo = entriesFilter.To } - var entries []models.MizuEntry + var entries []tapApi.MizuEntry database.GetEntriesTable(). Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)). Order(fmt.Sprintf("timestamp %s", order)). @@ -207,7 +209,7 @@ func GetFullEntries(c *gin.Context) { } func GetEntry(c *gin.Context) { - var entryData models.MizuEntry + var entryData tapApi.MizuEntry database.GetEntriesTable(). Where(map[string]string{"entryId": c.Param("entryId")}). First(&entryData) @@ -219,20 +221,21 @@ func GetEntry(c *gin.Context) { "msg": "Can't get entry details", }) } - fullEntryWithPolicy := models.FullEntryWithPolicy{} - if err := models.GetEntry(&entryData, &fullEntryWithPolicy); err != nil { - c.JSON(http.StatusInternalServerError, map[string]interface{}{ - "error": true, - "msg": "Can't get entry details", - }) - } - c.JSON(http.StatusOK, fullEntryWithPolicy) + fmt.Printf("entryData: %+v\n", entryData) + // fullEntryWithPolicy := models.FullEntryWithPolicy{} + // if err := models.GetEntry(&entryData, &fullEntryWithPolicy); err != nil { + // c.JSON(http.StatusInternalServerError, map[string]interface{}{ + // "error": true, + // "msg": "Can't get entry details", + // }) + // } + c.JSON(http.StatusOK, entryData) } func DeleteAllEntries(c *gin.Context) { database.GetEntriesTable(). Where("1 = 1"). - Delete(&models.MizuEntry{}) + Delete(&tapApi.MizuEntry{}) c.JSON(http.StatusOK, map[string]string{ "msg": "Success", diff --git a/agent/pkg/database/main.go b/agent/pkg/database/main.go index c3b1d7847..f6dfe402e 100644 --- a/agent/pkg/database/main.go +++ b/agent/pkg/database/main.go @@ -2,16 +2,18 @@ package database import ( "fmt" + "mizuserver/pkg/utils" + "time" + "gorm.io/driver/sqlite" "gorm.io/gorm" "gorm.io/gorm/logger" - "mizuserver/pkg/models" - "mizuserver/pkg/utils" - "time" + + tapApi "github.com/up9inc/mizu/tap/api" ) const ( - DBPath = "./entries.db" + DBPath = "./entries.db" OrderDesc = "desc" OrderAsc = "asc" LT = "lt" @@ -19,8 +21,8 @@ const ( ) var ( - DB *gorm.DB - IsDBLocked = false + DB *gorm.DB + IsDBLocked = false OperatorToSymbolMapping = map[string]string{ LT: "<", GT: ">", @@ -40,7 +42,7 @@ func GetEntriesTable() *gorm.DB { return DB.Table("mizu_entries") } -func CreateEntry(entry *models.MizuEntry) { +func CreateEntry(entry *tapApi.MizuEntry) { if IsDBLocked { return } @@ -51,14 +53,13 @@ func initDataBase(databasePath string) *gorm.DB { temp, _ := gorm.Open(sqlite.Open(databasePath), &gorm.Config{ Logger: &utils.TruncatingLogger{LogLevel: logger.Warn, SlowThreshold: 500 * time.Millisecond}, }) - _ = temp.AutoMigrate(&models.MizuEntry{}) // this will ensure table is created + _ = temp.AutoMigrate(&tapApi.MizuEntry{}) // this will ensure table is created return temp } - -func GetEntriesFromDb(timestampFrom int64, timestampTo int64) []models.MizuEntry { +func GetEntriesFromDb(timestampFrom int64, timestampTo int64) []tapApi.MizuEntry { order := OrderDesc - var entries []models.MizuEntry + var entries []tapApi.MizuEntry GetEntriesTable(). Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)). Order(fmt.Sprintf("timestamp %s", order)). @@ -70,4 +71,3 @@ func GetEntriesFromDb(timestampFrom int64, timestampTo int64) []models.MizuEntry } return entries } - diff --git a/agent/pkg/database/size_enforcer.go b/agent/pkg/database/size_enforcer.go index c17c53d97..b28c0ff6b 100644 --- a/agent/pkg/database/size_enforcer.go +++ b/agent/pkg/database/size_enforcer.go @@ -1,16 +1,17 @@ package database import ( + "log" + "os" + "strconv" + "time" + "github.com/fsnotify/fsnotify" "github.com/romana/rlog" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared/debounce" "github.com/up9inc/mizu/shared/units" - "log" - "mizuserver/pkg/models" - "os" - "strconv" - "time" + tapApi "github.com/up9inc/mizu/tap/api" ) const percentageOfMaxSizeBytesToPrune = 15 @@ -99,7 +100,7 @@ func pruneOldEntries(currentFileSize int64) { if bytesToBeRemoved >= amountOfBytesToTrim { break } - var entry models.MizuEntry + var entry tapApi.MizuEntry err = DB.ScanRows(rows, &entry) if err != nil { rlog.Errorf("Error scanning db row: %v", err) @@ -111,7 +112,7 @@ func pruneOldEntries(currentFileSize int64) { } if len(entryIdsToRemove) > 0 { - GetEntriesTable().Where(entryIdsToRemove).Delete(models.MizuEntry{}) + GetEntriesTable().Where(entryIdsToRemove).Delete(tapApi.MizuEntry{}) // VACUUM causes sqlite to shrink the db file after rows have been deleted, the db file will not shrink without this DB.Exec("VACUUM") rlog.Errorf("Removed %d rows and cleared %s", len(entryIdsToRemove), units.BytesToHumanReadable(bytesToBeRemoved)) diff --git a/agent/pkg/models/models.go b/agent/pkg/models/models.go index c2a13aee8..904aabd6b 100644 --- a/agent/pkg/models/models.go +++ b/agent/pkg/models/models.go @@ -21,33 +21,10 @@ import ( "github.com/up9inc/mizu/tap" ) -type DataUnmarshaler interface { - UnmarshalData(*MizuEntry) error -} - -func GetEntry(r *MizuEntry, v DataUnmarshaler) error { +func GetEntry(r *tapApi.MizuEntry, v tapApi.DataUnmarshaler) error { return v.UnmarshalData(r) } -type MizuEntry struct { - ID uint `gorm:"primarykey"` - CreatedAt time.Time - UpdatedAt time.Time - Entry string `json:"entry,omitempty" gorm:"column:entry"` - EntryId string `json:"entryId" gorm:"column:entryId"` - Url string `json:"url" gorm:"column:url"` - Method string `json:"method" gorm:"column:method"` - Status int `json:"status" gorm:"column:status"` - RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"` - Service string `json:"service" gorm:"column:service"` - Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` - Path string `json:"path" gorm:"column:path"` - ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"` - ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` - IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"` - EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"` -} - func NewApplicableRules(status bool, latency int64) tapApi.ApplicableRules { ar := tapApi.ApplicableRules{} ar.Status = status @@ -63,26 +40,7 @@ type FullEntryDetailsExtra struct { har.Entry } -// func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error { -// entryUrl := entry.Url -// service := entry.Service -// if entry.ResolvedDestination != "" { -// entryUrl = utils.SetHostname(entryUrl, entry.ResolvedDestination) -// service = utils.SetHostname(service, entry.ResolvedDestination) -// } -// bed.Id = entry.EntryId -// bed.Url = entryUrl -// bed.Service = service -// bed.Path = entry.Path -// bed.StatusCode = entry.Status -// bed.Method = entry.Method -// bed.Timestamp = entry.Timestamp -// bed.RequestSenderIp = entry.RequestSenderIp -// bed.IsOutgoing = entry.IsOutgoing -// return nil -// } - -func (fed *FullEntryDetails) UnmarshalData(entry *MizuEntry) error { +func (fed *FullEntryDetails) UnmarshalData(entry *tapApi.MizuEntry) error { if err := json.Unmarshal([]byte(entry.Entry), &fed.Entry); err != nil { return err } @@ -93,7 +51,7 @@ func (fed *FullEntryDetails) UnmarshalData(entry *MizuEntry) error { return nil } -func (fedex *FullEntryDetailsExtra) UnmarshalData(entry *MizuEntry) error { +func (fedex *FullEntryDetailsExtra) UnmarshalData(entry *tapApi.MizuEntry) error { if err := json.Unmarshal([]byte(entry.Entry), &fedex.Entry); err != nil { return err } @@ -195,7 +153,7 @@ type FullEntryWithPolicy struct { Service string `json:"service"` } -func (fewp *FullEntryWithPolicy) UnmarshalData(entry *MizuEntry) error { +func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error { if err := json.Unmarshal([]byte(entry.Entry), &fewp.Entry); err != nil { return err } diff --git a/tap/api/api.go b/tap/api/api.go index 4d7a66cc6..9fd581ecc 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -33,9 +33,9 @@ type TcpID struct { } type GenericMessage struct { - IsRequest bool - CaptureTime time.Time - Payload interface{} + IsRequest bool `json:"is_request"` + CaptureTime time.Time `json:"capture_time"` + Payload interface{} `json:"payload"` } type RequestResponsePair struct { @@ -54,7 +54,8 @@ type Dissector interface { Register(*Extension) Ping() Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, emitter Emitter) - Summarize(item *OutputChannelItem) *BaseEntryDetails + Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry + Summarize(entry *MizuEntry) *BaseEntryDetails } type Emitting struct { @@ -73,6 +74,25 @@ func (e *Emitting) Emit(item *OutputChannelItem) { e.OutputChannel <- item } +type MizuEntry struct { + ID uint `gorm:"primarykey"` + CreatedAt time.Time + UpdatedAt time.Time + Entry string `json:"entry,omitempty" gorm:"column:entry"` + EntryId string `json:"entryId" gorm:"column:entryId"` + Url string `json:"url" gorm:"column:url"` + Method string `json:"method" gorm:"column:method"` + Status int `json:"status" gorm:"column:status"` + RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"` + Service string `json:"service" gorm:"column:service"` + Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` + Path string `json:"path" gorm:"column:path"` + ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"` + ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` + IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"` + EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"` +} + type BaseEntryDetails struct { Id string `json:"id,omitempty"` Url string `json:"url,omitempty"` @@ -91,3 +111,22 @@ type ApplicableRules struct { Latency int64 `json:"latency,omitempty"` Status bool `json:"status,omitempty"` } + +type DataUnmarshaler interface { + UnmarshalData(*MizuEntry) error +} + +func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error { + entryUrl := entry.Url + service := entry.Service + bed.Id = entry.EntryId + bed.Url = entryUrl + bed.Service = service + bed.Path = entry.Path + bed.StatusCode = entry.Status + bed.Method = entry.Method + bed.Timestamp = entry.Timestamp + bed.RequestSenderIp = entry.RequestSenderIp + bed.IsOutgoing = entry.IsOutgoing + return nil +} diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index a576395b8..33b4a540f 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -27,7 +27,12 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em // TODO: Implement } -func (d dissecting) Summarize(item *api.OutputChannelItem) *api.BaseEntryDetails { +func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { + // TODO: Implement + return nil +} + +func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { // TODO: Implement return nil } diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index fe6082d76..e3bd4d893 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "encoding/json" "fmt" "io" "log" @@ -80,7 +81,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em } } -func (d dissecting) Summarize(item *api.OutputChannelItem) *api.BaseEntryDetails { +func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { fmt.Printf("pair.Request.Payload: %+v\n", item.Pair.Request.Payload) fmt.Printf("item.Pair.Response.Payload: %+v\n", item.Pair.Response.Payload) var host string @@ -92,13 +93,40 @@ func (d dissecting) Summarize(item *api.OutputChannelItem) *api.BaseEntryDetails } request := item.Pair.Request.Payload.(map[string]interface{}) response := item.Pair.Response.Payload.(map[string]interface{}) + entryBytes, _ := json.Marshal(item.Pair) + service := fmt.Sprintf("http://%s", host) + return &api.MizuEntry{ + EntryId: entryId, + Entry: string(entryBytes), + Url: fmt.Sprintf("%s%s", service, request["url"].(string)), + Method: request["method"].(string), + Status: int(response["status"].(float64)), + RequestSenderIp: item.ConnectionInfo.ClientIP, + Service: service, + Timestamp: item.Timestamp, + Path: request["url"].(string), + ResolvedSource: resolvedSource, + ResolvedDestination: resolvedDestination, + IsOutgoing: item.ConnectionInfo.IsOutgoing, + } +} + +func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { return &api.BaseEntryDetails{ - Url: fmt.Sprintf("http://%s%s", host, request["url"].(string)), - RequestSenderIp: item.ConnectionInfo.ClientIP, - Path: request["url"].(string), - StatusCode: int(response["status"].(float64)), - Method: request["method"].(string), - Timestamp: item.Timestamp, + Id: entry.EntryId, + Url: entry.Url, + RequestSenderIp: entry.RequestSenderIp, + Service: entry.Service, + Path: entry.Path, + StatusCode: entry.Status, + Method: entry.Method, + Timestamp: entry.Timestamp, + IsOutgoing: entry.IsOutgoing, + Latency: 0, + Rules: api.ApplicableRules{ + Latency: 0, + Status: false, + }, } } diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index ecf11734d..a0120ed57 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -27,7 +27,12 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em // TODO: Implement } -func (d dissecting) Summarize(item *api.OutputChannelItem) *api.BaseEntryDetails { +func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *api.MizuEntry { + // TODO: Implement + return nil +} + +func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails { // TODO: Implement return nil } diff --git a/ui/src/components/HarEntryDetailed.tsx b/ui/src/components/HarEntryDetailed.tsx index 8e86d140f..c0c04c0cb 100644 --- a/ui/src/components/HarEntryDetailed.tsx +++ b/ui/src/components/HarEntryDetailed.tsx @@ -29,7 +29,7 @@ const HarEntryTitle: React.FC = ({har}) => { const classes = useStyles(); const {log: {entries}} = har; - const {response, request, timings: {receive}} = entries[0].entry; + const {response, request} = JSON.parse(entries[0].entry); const {status, statusText, bodySize} = response; @@ -42,7 +42,7 @@ const HarEntryTitle: React.FC = ({har}) => {
{formatSize(bodySize)}
{status} {statusText}
-
{Math.round(receive)}ms
+ {/*
{Math.round(receive)}ms
*/}
{'rulesMatched' in entries[0] ? entries[0].rulesMatched?.length : '0'} Rules Applied
; }; diff --git a/ui/src/components/HarEntryViewer/HAREntryViewer.tsx b/ui/src/components/HarEntryViewer/HAREntryViewer.tsx index e0450e1e7..542aea5f0 100644 --- a/ui/src/components/HarEntryViewer/HAREntryViewer.tsx +++ b/ui/src/components/HarEntryViewer/HAREntryViewer.tsx @@ -6,7 +6,9 @@ import {HAREntryTableSection, HAREntryBodySection, HAREntryTablePolicySection} f const MIME_TYPE_KEY = 'mimeType'; const HAREntryDisplay: React.FC = ({har, entry, isCollapsed: initialIsCollapsed, isResponseMocked}) => { - const {request, response, timings: {receive}} = entry; + const {request, response} = JSON.parse(entry); + console.log('request:', request) + console.log('response:', response) const rulesMatched = har.log.entries[0].rulesMatched const TABS = [ {tab: 'request'}, @@ -26,28 +28,28 @@ const HAREntryDisplay: React.FC = ({har, entry, isCollapsed: initialIsColla {!initialIsCollapsed &&
- {request?.url && {request.url}} + {request?.url && {request.payload.url}}
{ currentTab === TABS[0].tab && - + - + - {request?.postData && } + {request.payload?.postData && } - + } {currentTab === TABS[1].tab && - + - + - + } {currentTab === TABS[2].tab && - + }
} ;