From 3b0502180f0f32ff1ae3ce65fb0d08e61e8588c8 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Fri, 20 Aug 2021 19:19:32 +0300 Subject: [PATCH] Add `Summarize()` method to the `Dissector` interface --- agent/pkg/api/main.go | 45 +++++++------- agent/pkg/controllers/entries_controller.go | 18 +++--- agent/pkg/models/models.go | 65 ++++++++------------- tap/api/api.go | 30 ++++++++-- tap/extensions/amqp/main.go | 5 ++ tap/extensions/http/main.go | 25 +++++++- tap/extensions/http/matcher.go | 6 +- tap/extensions/kafka/main.go | 5 ++ 8 files changed, 118 insertions(+), 81 deletions(-) diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index 28237fd00..b0e36b792 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -7,7 +7,6 @@ import ( "fmt" "mizuserver/pkg/database" "mizuserver/pkg/holder" - "net/http" "net/url" "os" "path" @@ -114,18 +113,22 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension fmt.Printf("item: %+v\n", item) extension := extensionsMap[item.Protocol] fmt.Printf("extension: %+v\n", extension) - var req *http.Request - marshedReq, _ := json.Marshal(item.Data.Request.Orig) - json.Unmarshal(marshedReq, &req) - var res *http.Response - marshedRes, _ := json.Marshal(item.Data.Response.Orig) - json.Unmarshal(marshedRes, &res) - // NOTE: With this call, the incoming data is sent to the last WebSocket (that the web UI communicates). - if harEntry, err := models.NewEntry(req, item.Data.Request.CaptureTime, res, item.Data.Response.CaptureTime); err == nil { - saveHarToDb(harEntry, item.ConnectionInfo) - } else { - rlog.Errorf("Error when creating HTTP entry") - } + // var req *http.Request + // marshedReq, _ := json.Marshal(item.Data.Request.Orig) + // json.Unmarshal(marshedReq, &req) + // var res *http.Response + // marshedRes, _ := json.Marshal(item.Data.Response.Orig) + // json.Unmarshal(marshedRes, &res) + // // NOTE: With this call, the incoming data is sent to the last WebSocket (that the web UI communicates). + // if harEntry, err := models.NewEntry(req, item.Data.Request.CaptureTime, res, item.Data.Response.CaptureTime); err == nil { + // saveHarToDb(harEntry, item.ConnectionInfo) + // } else { + // rlog.Errorf("Error when creating HTTP entry") + // } + baseEntry := extension.Dissector.Summarize(item) + fmt.Printf("baseEntry: %+v\n", baseEntry) + baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(baseEntry) + BroadcastToBrowserClients(baseEntryBytes) } } @@ -180,14 +183,14 @@ func saveHarToDb(entry *har.Entry, connectionInfo *tapApi.ConnectionInfo) { mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry) database.CreateEntry(&mizuEntry) - baseEntry := models.BaseEntryDetails{} - if err := models.GetEntry(&mizuEntry, &baseEntry); err != nil { - return - } - baseEntry.Rules = models.RunValidationRulesState(*entry, serviceName) - baseEntry.Latency = entry.Timings.Receive - baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry) - BroadcastToBrowserClients(baseEntryBytes) + // baseEntry := models.BaseEntryDetails{} + // if err := models.GetEntry(&mizuEntry, &baseEntry); err != nil { + // return + // } + // baseEntry.Rules = models.RunValidationRulesState(*entry, serviceName) + // baseEntry.Latency = entry.Timings.Receive + // baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry) + // BroadcastToBrowserClients(baseEntryBytes) } func getServiceNameFromUrl(inputUrl string) (string, string) { diff --git a/agent/pkg/controllers/entries_controller.go b/agent/pkg/controllers/entries_controller.go index 7eda00063..9c6998446 100644 --- a/agent/pkg/controllers/entries_controller.go +++ b/agent/pkg/controllers/entries_controller.go @@ -44,16 +44,16 @@ func GetEntries(c *gin.Context) { utils.ReverseSlice(entries) } - baseEntries := make([]models.BaseEntryDetails, 0) - for _, data := range entries { - harEntry := models.BaseEntryDetails{} - if err := models.GetEntry(&data, &harEntry); err != nil { - continue - } - baseEntries = append(baseEntries, harEntry) - } + // baseEntries := make([]models.BaseEntryDetails, 0) + // for _, data := range entries { + // harEntry := models.BaseEntryDetails{} + // if err := models.GetEntry(&data, &harEntry); err != nil { + // continue + // } + // baseEntries = append(baseEntries, harEntry) + // } - c.JSON(http.StatusOK, baseEntries) + // c.JSON(http.StatusOK, baseEntries) } func GetHARs(c *gin.Context) { diff --git a/agent/pkg/models/models.go b/agent/pkg/models/models.go index d443e7f5f..c2a13aee8 100644 --- a/agent/pkg/models/models.go +++ b/agent/pkg/models/models.go @@ -48,27 +48,8 @@ type MizuEntry struct { EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"` } -type BaseEntryDetails struct { - Id string `json:"id,omitempty"` - Url string `json:"url,omitempty"` - RequestSenderIp string `json:"requestSenderIp,omitempty"` - Service string `json:"service,omitempty"` - Path string `json:"path,omitempty"` - StatusCode int `json:"statusCode,omitempty"` - Method string `json:"method,omitempty"` - Timestamp int64 `json:"timestamp,omitempty"` - IsOutgoing bool `json:"isOutgoing,omitempty"` - Latency int64 `json:"latency,omitempty"` - Rules ApplicableRules `json:"rules,omitempty"` -} - -type ApplicableRules struct { - Latency int64 `json:"latency,omitempty"` - Status bool `json:"status,omitempty"` -} - -func NewApplicableRules(status bool, latency int64) ApplicableRules { - ar := ApplicableRules{} +func NewApplicableRules(status bool, latency int64) tapApi.ApplicableRules { + ar := tapApi.ApplicableRules{} ar.Status = status ar.Latency = latency return ar @@ -82,24 +63,24 @@ type FullEntryDetailsExtra struct { har.Entry } -func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error { - entryUrl := entry.Url - service := entry.Service - if entry.ResolvedDestination != "" { - entryUrl = utils.SetHostname(entryUrl, entry.ResolvedDestination) - service = utils.SetHostname(service, entry.ResolvedDestination) - } - bed.Id = entry.EntryId - bed.Url = entryUrl - bed.Service = service - bed.Path = entry.Path - bed.StatusCode = entry.Status - bed.Method = entry.Method - bed.Timestamp = entry.Timestamp - bed.RequestSenderIp = entry.RequestSenderIp - bed.IsOutgoing = entry.IsOutgoing - return nil -} +// func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error { +// entryUrl := entry.Url +// service := entry.Service +// if entry.ResolvedDestination != "" { +// entryUrl = utils.SetHostname(entryUrl, entry.ResolvedDestination) +// service = utils.SetHostname(service, entry.ResolvedDestination) +// } +// bed.Id = entry.EntryId +// bed.Url = entryUrl +// bed.Service = service +// bed.Path = entry.Path +// bed.StatusCode = entry.Status +// bed.Method = entry.Method +// bed.Timestamp = entry.Timestamp +// bed.RequestSenderIp = entry.RequestSenderIp +// bed.IsOutgoing = entry.IsOutgoing +// return nil +// } func (fed *FullEntryDetails) UnmarshalData(entry *MizuEntry) error { if err := json.Unmarshal([]byte(entry.Entry), &fed.Entry); err != nil { @@ -145,7 +126,7 @@ type HarFetchRequestBody struct { type WebSocketEntryMessage struct { *shared.WebSocketMessageMetadata - Data *BaseEntryDetails `json:"data,omitempty"` + Data *tapApi.BaseEntryDetails `json:"data,omitempty"` } type WebSocketTappedEntryMessage struct { @@ -158,7 +139,7 @@ type WebsocketOutboundLinkMessage struct { Data *tap.OutboundLink } -func CreateBaseEntryWebSocketMessage(base *BaseEntryDetails) ([]byte, error) { +func CreateBaseEntryWebSocketMessage(base *tapApi.BaseEntryDetails) ([]byte, error) { message := &WebSocketEntryMessage{ WebSocketMessageMetadata: &shared.WebSocketMessageMetadata{ MessageType: shared.WebSocketMessageTypeEntry, @@ -225,7 +206,7 @@ func (fewp *FullEntryWithPolicy) UnmarshalData(entry *MizuEntry) error { return nil } -func RunValidationRulesState(harEntry har.Entry, service string) ApplicableRules { +func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules { numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service) statusPolicyToSend, latency := rules.PassedValidationRules(resultPolicyToSend, numberOfRules) ar := NewApplicableRules(statusPolicyToSend, latency) diff --git a/tap/api/api.go b/tap/api/api.go index 28146041e..4d7a66cc6 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -35,7 +35,7 @@ type TcpID struct { type GenericMessage struct { IsRequest bool CaptureTime time.Time - Orig interface{} + Payload interface{} } type RequestResponsePair struct { @@ -47,13 +47,14 @@ type OutputChannelItem struct { Protocol string Timestamp int64 ConnectionInfo *ConnectionInfo - Data *RequestResponsePair + Pair *RequestResponsePair } type Dissector interface { Register(*Extension) Ping() Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, emitter Emitter) + Summarize(item *OutputChannelItem) *BaseEntryDetails } type Emitting struct { @@ -66,8 +67,27 @@ type Emitter interface { func (e *Emitting) Emit(item *OutputChannelItem) { log.Printf("item: %+v\n", item) - log.Printf("item.Data: %+v\n", item.Data) - log.Printf("item.Data.Request.Orig: %v\n", item.Data.Request.Orig) - log.Printf("item.Data.Response.Orig: %v\n", item.Data.Response.Orig) + log.Printf("item.Pair: %+v\n", item.Pair) + log.Printf("item.Pair.Request.Payload: %v\n", item.Pair.Request.Payload) + log.Printf("item.Pair.Response.Payload: %v\n", item.Pair.Response.Payload) e.OutputChannel <- item } + +type BaseEntryDetails struct { + Id string `json:"id,omitempty"` + Url string `json:"url,omitempty"` + RequestSenderIp string `json:"requestSenderIp,omitempty"` + Service string `json:"service,omitempty"` + Path string `json:"path,omitempty"` + StatusCode int `json:"statusCode,omitempty"` + Method string `json:"method,omitempty"` + Timestamp int64 `json:"timestamp,omitempty"` + IsOutgoing bool `json:"isOutgoing,omitempty"` + Latency int64 `json:"latency,omitempty"` + Rules ApplicableRules `json:"rules,omitempty"` +} + +type ApplicableRules struct { + Latency int64 `json:"latency,omitempty"` + Status bool `json:"status,omitempty"` +} diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 18e2107ab..a576395b8 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -27,4 +27,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em // TODO: Implement } +func (d dissecting) Summarize(item *api.OutputChannelItem) *api.BaseEntryDetails { + // TODO: Implement + return nil +} + var Dissector dissecting diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 72d046e3e..fe6082d76 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -3,10 +3,11 @@ package main import ( "bufio" "fmt" - "github.com/romana/rlog" "io" "log" + "github.com/romana/rlog" + "github.com/up9inc/mizu/tap/api" ) @@ -79,4 +80,26 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em } } +func (d dissecting) Summarize(item *api.OutputChannelItem) *api.BaseEntryDetails { + fmt.Printf("pair.Request.Payload: %+v\n", item.Pair.Request.Payload) + fmt.Printf("item.Pair.Response.Payload: %+v\n", item.Pair.Response.Payload) + var host string + for _, header := range item.Pair.Request.Payload.(map[string]interface{})["headers"].([]interface{}) { + h := header.(map[string]interface{}) + if h["name"] == "Host" { + host = h["value"].(string) + } + } + request := item.Pair.Request.Payload.(map[string]interface{}) + response := item.Pair.Response.Payload.(map[string]interface{}) + return &api.BaseEntryDetails{ + Url: fmt.Sprintf("http://%s%s", host, request["url"].(string)), + RequestSenderIp: item.ConnectionInfo.ClientIP, + Path: request["url"].(string), + StatusCode: int(response["status"].(float64)), + Method: request["method"].(string), + Timestamp: item.Timestamp, + } +} + var Dissector dissecting diff --git a/tap/extensions/http/matcher.go b/tap/extensions/http/matcher.go index 661cc21d8..65a7a7c1c 100644 --- a/tap/extensions/http/matcher.go +++ b/tap/extensions/http/matcher.go @@ -32,7 +32,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht requestHTTPMessage := api.GenericMessage{ IsRequest: true, CaptureTime: captureTime, - Orig: HTTPPayload{ + Payload: HTTPPayload{ Type: "http_request", Data: request, }, @@ -62,7 +62,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response * responseHTTPMessage := api.GenericMessage{ IsRequest: false, CaptureTime: captureTime, - Orig: HTTPPayload{ + Payload: HTTPPayload{ Type: "http_response", Data: response, }, @@ -89,7 +89,7 @@ func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.Gener Protocol: ExtensionName, Timestamp: time.Now().UnixNano() / int64(time.Millisecond), ConnectionInfo: nil, - Data: &api.RequestResponsePair{ + Pair: &api.RequestResponsePair{ Request: *requestHTTPMessage, Response: *responseHTTPMessage, }, diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 60c5b4f85..ecf11734d 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -27,4 +27,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, em // TODO: Implement } +func (d dissecting) Summarize(item *api.OutputChannelItem) *api.BaseEntryDetails { + // TODO: Implement + return nil +} + var Dissector dissecting