diff --git a/agent/main.go b/agent/main.go index b575ee8bf..dea4b9de6 100644 --- a/agent/main.go +++ b/agent/main.go @@ -4,6 +4,7 @@ import ( "encoding/json" "flag" "fmt" + "log" "mizuserver/pkg/api" "mizuserver/pkg/models" "mizuserver/pkg/routes" @@ -201,6 +202,9 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha continue } + log.Printf("marshaledData: %s\n", marshaledData) + // NOTE: This is where the `*tapApi.OutputChannelItem` leaves the code + // and goes into the intermediate WebSocket. err = connection.WriteMessage(websocket.TextMessage, marshaledData) if err != nil { rlog.Infof("error sending message through socket server %s, (%v,%+v)\n", err, err, err) diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index f4365dac3..2c3b47c9b 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -5,10 +5,8 @@ import ( "context" "encoding/json" "fmt" - "go.mongodb.org/mongo-driver/bson/primitive" "mizuserver/pkg/database" "mizuserver/pkg/holder" - "net/http" "net/url" "os" "path" @@ -16,6 +14,8 @@ import ( "strings" "time" + "go.mongodb.org/mongo-driver/bson/primitive" + "github.com/google/martian/har" "github.com/romana/rlog" "github.com/up9inc/mizu/tap" @@ -110,11 +110,14 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem) { } for item := range outputItems { - if harEntry, err := models.NewEntry(item.Data.Request.Orig.(*http.Request), item.Data.Request.CaptureTime, item.Data.Response.Orig.(*http.Response), item.Data.Response.CaptureTime); err == nil { - saveHarToDb(harEntry, item.ConnectionInfo) - } else { - rlog.Errorf("Error when creating HTTP entry") - } + fmt.Printf("item: %+v\n", item) + // NOTE: With this call, the incoming data is sent to the last WebSocket (that the web UI communicates). + handleItem(item) + // if harEntry, err := models.NewEntry(item.Data.Request.Orig.(*http.Request), item.Data.Request.CaptureTime, item.Data.Response.Orig.(*http.Response), item.Data.Response.CaptureTime); err == nil { + // saveHarToDb(harEntry, item.ConnectionInfo) + // } else { + // rlog.Errorf("Error when creating HTTP entry") + // } } } @@ -125,6 +128,15 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) { } } +func handleItem(item *tapApi.OutputChannelItem) { + baseEntry := models.BaseEntryDetails{ + RequestSenderIp: item.ConnectionInfo.ClientIP, + } + baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry) + // NOTE: This is where it's sent to the last WebSocket + BroadcastToBrowserClients(baseEntryBytes) +} + func saveHarToDb(entry *har.Entry, connectionInfo *tapApi.ConnectionInfo) { entryBytes, _ := json.Marshal(entry) serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL) diff --git a/tap/extensions/http/structs.go b/tap/extensions/http/structs.go index 681b7df68..1a3f022bc 100644 --- a/tap/extensions/http/structs.go +++ b/tap/extensions/http/structs.go @@ -14,6 +14,7 @@ type HTTPPayloader interface { } func (h HTTPPayload) MarshalJSON() ([]byte, error) { + // TODO: Implement JSON marshaling for HTTP request and response switch h.Type { case "http_request": return []byte("{\"val\": \"" + h.Type + "\"}"), nil