diff --git a/README.md b/README.md index 1d0cd48ef..9e989232a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # 水 mizu -standalone web app traffic viewer for Kubernetes +A simple-yet-powerful API traffic viewer for Kubernetes to help you troubleshoot and debug your microservices. Think TCPDump and Chrome Dev Tools combined. ## Download @@ -29,7 +29,7 @@ Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page. ## How to run 1. Find pod you'd like to tap to in your Kubernetes cluster -2. Run `mizu PODNAME` or `mizu REGEX` +2. Run `mizu tap PODNAME` or `mizu tap REGEX` 3. Open browser on `http://localhost:8899` as instructed .. 4. Watch the WebAPI traffic flowing .. 5. Type ^C to stop diff --git a/api/go.mod b/api/go.mod index f75ea5920..06b66bf96 100644 --- a/api/go.mod +++ b/api/go.mod @@ -11,16 +11,12 @@ require ( github.com/go-playground/universal-translator v0.17.0 github.com/go-playground/validator/v10 v10.5.0 github.com/gofiber/fiber/v2 v2.8.0 - github.com/google/gopacket v1.1.19 github.com/google/martian v2.1.0+incompatible github.com/gorilla/websocket v1.4.2 github.com/leodido/go-urn v1.2.1 // indirect - github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 - github.com/patrickmn/go-cache v2.1.0+incompatible github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/tap v0.0.0 go.mongodb.org/mongo-driver v1.5.1 - golang.org/x/net v0.0.0-20210421230115-4e50805a0758 gorm.io/driver/sqlite v1.1.4 gorm.io/gorm v1.21.8 k8s.io/api v0.21.0 @@ -29,4 +25,5 @@ require ( ) replace github.com/up9inc/mizu/shared v0.0.0 => ../shared + replace github.com/up9inc/mizu/tap v0.0.0 => ../tap diff --git a/api/go.sum b/api/go.sum index efa2d4285..14a3ed94e 100644 --- a/api/go.sum +++ b/api/go.sum @@ -251,7 +251,6 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 h1:fa50YL1pzKW+1SsBnJDOHppJN9stOEwS+CRWyUtyYGU= github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= -github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= diff --git a/api/pkg/api/main.go b/api/pkg/api/main.go index 532344f3e..4df93f500 100644 --- a/api/pkg/api/main.go +++ b/api/pkg/api/main.go @@ -5,10 +5,6 @@ import ( "context" "encoding/json" "fmt" - "mizuserver/pkg/database" - "mizuserver/pkg/models" - "mizuserver/pkg/resolver" - "mizuserver/pkg/utils" "net/url" "os" "path" @@ -19,6 +15,11 @@ import ( "github.com/google/martian/har" "github.com/up9inc/mizu/tap" "go.mongodb.org/mongo-driver/bson/primitive" + + "mizuserver/pkg/database" + "mizuserver/pkg/models" + "mizuserver/pkg/resolver" + "mizuserver/pkg/utils" ) var k8sResolver *resolver.Resolver @@ -84,7 +85,14 @@ func startReadingFiles(workingDir string) { for _, entry := range inputHar.Log.Entries { time.Sleep(time.Millisecond * 250) - saveHarToDb(entry, fileInfo.Name(), false) + connectionInfo := &tap.ConnectionInfo{ + ClientIP: fileInfo.Name(), + ClientPort: "", + ServerIP: "", + ServerPort: "", + IsOutgoing: false, + } + saveHarToDb(entry, connectionInfo) } rmErr := os.Remove(inputFilePath) utils.CheckErr(rmErr) @@ -97,7 +105,7 @@ func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) { } for item := range outputItems { - saveHarToDb(item.HarEntry, item.ConnectionInfo.ClientIP, item.ConnectionInfo.IsOutgoing) + saveHarToDb(item.HarEntry, item.ConnectionInfo) } } @@ -109,17 +117,17 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) { } -func saveHarToDb(entry *har.Entry, sender string, isOutgoing bool) { +func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) { entryBytes, _ := json.Marshal(entry) - serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL) + serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL) entryId := primitive.NewObjectID().Hex() var ( resolvedSource string resolvedDestination string ) if k8sResolver != nil { - resolvedSource = k8sResolver.Resolve(sender) - resolvedDestination = k8sResolver.Resolve(serviceHostName) + resolvedSource = k8sResolver.Resolve(connectionInfo.ClientIP) + resolvedDestination = k8sResolver.Resolve(fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)) } mizuEntry := models.MizuEntry{ EntryId: entryId, @@ -129,11 +137,11 @@ func saveHarToDb(entry *har.Entry, sender string, isOutgoing bool) { Path: urlPath, Method: entry.Request.Method, Status: entry.Response.Status, - RequestSenderIp: sender, + RequestSenderIp: connectionInfo.ClientIP, Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond), ResolvedSource: resolvedSource, ResolvedDestination: resolvedDestination, - IsOutgoing: isOutgoing, + IsOutgoing: connectionInfo.IsOutgoing, } database.GetEntriesTable().Create(&mizuEntry) @@ -142,10 +150,10 @@ func saveHarToDb(entry *har.Entry, sender string, isOutgoing bool) { broadcastToBrowserClients(baseEntryBytes) } -func getServiceNameFromUrl(inputUrl string) (string, string, string) { +func getServiceNameFromUrl(inputUrl string) (string, string) { parsed, err := url.Parse(inputUrl) utils.CheckErr(err) - return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host), parsed.Path, parsed.Host + return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host), parsed.Path } func CheckIsServiceIP(address string) bool { diff --git a/api/pkg/api/socket_server_handlers.go b/api/pkg/api/socket_server_handlers.go index 8164df004..1033dc7a8 100644 --- a/api/pkg/api/socket_server_handlers.go +++ b/api/pkg/api/socket_server_handlers.go @@ -9,6 +9,7 @@ import ( "mizuserver/pkg/controllers" "mizuserver/pkg/models" "mizuserver/pkg/routes" + "mizuserver/pkg/up9" ) var browserClientSocketUUIDs = make([]string, 0) @@ -18,6 +19,9 @@ type RoutesEventHandlers struct { SocketHarOutChannel chan<- *tap.OutputChannelItem } +func init() { + go up9.UpdateAnalyzeStatus(broadcastToBrowserClients) +} func (h *RoutesEventHandlers) WebSocketConnect(ep *ikisocket.EventPayload) { if ep.Kws.GetAttribute("is_tapper") == true { @@ -84,7 +88,6 @@ func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) { } } - func removeSocketUUIDFromBrowserSlice(uuidToRemove string) { newUUIDSlice := make([]string, 0, len(browserClientSocketUUIDs)) for _, uuid := range browserClientSocketUUIDs { diff --git a/api/pkg/controllers/entries_controller.go b/api/pkg/controllers/entries_controller.go index 282271b65..a2139b297 100644 --- a/api/pkg/controllers/entries_controller.go +++ b/api/pkg/controllers/entries_controller.go @@ -7,29 +7,13 @@ import ( "github.com/google/martian/har" "mizuserver/pkg/database" "mizuserver/pkg/models" + "mizuserver/pkg/up9" "mizuserver/pkg/utils" "mizuserver/pkg/validation" + "strings" "time" ) -const ( - OrderDesc = "desc" - OrderAsc = "asc" - LT = "lt" - GT = "gt" -) - -var ( - operatorToSymbolMapping = map[string]string{ - LT: "<", - GT: ">", - } - operatorToOrderMapping = map[string]string{ - LT: OrderDesc, - GT: OrderAsc, - } -) - func GetEntries(c *fiber.Ctx) error { entriesFilter := &models.EntriesFilter{} @@ -41,8 +25,8 @@ func GetEntries(c *fiber.Ctx) error { return c.Status(fiber.StatusBadRequest).JSON(err) } - order := operatorToOrderMapping[entriesFilter.Operator] - operatorSymbol := operatorToSymbolMapping[entriesFilter.Operator] + order := database.OperatorToOrderMapping[entriesFilter.Operator] + operatorSymbol := database.OperatorToSymbolMapping[entriesFilter.Operator] var entries []models.MizuEntry database.GetEntriesTable(). Order(fmt.Sprintf("timestamp %s", order)). @@ -51,7 +35,7 @@ func GetEntries(c *fiber.Ctx) error { Limit(entriesFilter.Limit). Find(&entries) - if len(entries) > 0 && order == OrderDesc { + if len(entries) > 0 && order == database.OrderDesc { // the entries always order from oldest to newest so we should revers utils.ReverseSlice(entries) } @@ -67,7 +51,7 @@ func GetEntries(c *fiber.Ctx) error { func GetHARs(c *fiber.Ctx) error { entriesFilter := &models.HarFetchRequestBody{} - order := OrderDesc + order := database.OrderDesc if err := c.QueryParser(entriesFilter); err != nil { return c.Status(fiber.StatusBadRequest).JSON(err) } @@ -105,9 +89,20 @@ func GetHARs(c *fiber.Ctx) error { for _, entryData := range entries { var harEntry har.Entry _ = json.Unmarshal([]byte(entryData.Entry), &harEntry) + if entryData.ResolvedDestination != "" { + harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, entryData.ResolvedDestination) + } + var fileName string sourceOfEntry := entryData.ResolvedSource - fileName := fmt.Sprintf("%s.har", sourceOfEntry) + if sourceOfEntry != "" { + // naively assumes the proper service source is http + sourceOfEntry = fmt.Sprintf("http://%s", sourceOfEntry) + //replace / from the file name cause they end up creating a corrupted folder + fileName = fmt.Sprintf("%s.har", strings.ReplaceAll(sourceOfEntry, "/", "_")) + } else { + fileName = "unknown_source.har" + } if harOfSource, ok := harsObject[fileName]; ok { harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry) } else { @@ -121,11 +116,14 @@ func GetHARs(c *fiber.Ctx) error { Name: "mizu", Version: "0.0.2", }, - Source: sourceOfEntry, }, Entries: entriesHar, }, } + // leave undefined when no source is present, otherwise modeler assumes source is empty string "" + if sourceOfEntry != "" { + harsObject[fileName].Log.Creator.Source = &sourceOfEntry + } } } @@ -138,9 +136,25 @@ func GetHARs(c *fiber.Ctx) error { return c.Status(fiber.StatusOK).SendStream(buffer) } +func UploadEntries(c *fiber.Ctx) error { + uploadRequestBody := &models.UploadEntriesRequestBody{} + if err := c.QueryParser(uploadRequestBody); err != nil { + return c.Status(fiber.StatusBadRequest).JSON(err) + } + if err := validation.Validate(uploadRequestBody); err != nil { + return c.Status(fiber.StatusBadRequest).JSON(err) + } + if up9.GetAnalyzeInfo().IsAnalyzing { + return c.Status(fiber.StatusBadRequest).SendString("Cannot analyze, mizu is already analyzing") + } + + token, _ := up9.CreateAnonymousToken(uploadRequestBody.Dest) + go up9.UploadEntriesImpl(token.Token, token.Model, uploadRequestBody.Dest) + return c.Status(fiber.StatusOK).SendString("OK") +} + func GetFullEntries(c *fiber.Ctx) error { entriesFilter := &models.HarFetchRequestBody{} - order := OrderDesc if err := c.QueryParser(entriesFilter); err != nil { return c.Status(fiber.StatusBadRequest).JSON(err) } @@ -162,23 +176,7 @@ func GetFullEntries(c *fiber.Ctx) error { timestampTo = entriesFilter.To } - var entries []models.MizuEntry - database.GetEntriesTable(). - Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)). - Order(fmt.Sprintf("timestamp %s", order)). - Find(&entries) - - if len(entries) > 0 { - // the entries always order from oldest to newest so we should revers - utils.ReverseSlice(entries) - } - - entriesArray := make([]har.Entry, 0) - for _, entryData := range entries { - var harEntry har.Entry - _ = json.Unmarshal([]byte(entryData.Entry), &harEntry) - entriesArray = append(entriesArray, harEntry) - } + entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo) return c.Status(fiber.StatusOK).JSON(entriesArray) } diff --git a/api/pkg/controllers/status_controller.go b/api/pkg/controllers/status_controller.go index 15f30a67b..e4fed4c55 100644 --- a/api/pkg/controllers/status_controller.go +++ b/api/pkg/controllers/status_controller.go @@ -3,6 +3,7 @@ package controllers import ( "github.com/gofiber/fiber/v2" "github.com/up9inc/mizu/shared" + "mizuserver/pkg/up9" ) var TapStatus shared.TapStatus @@ -10,3 +11,7 @@ var TapStatus shared.TapStatus func GetTappingStatus(c *fiber.Ctx) error { return c.Status(fiber.StatusOK).JSON(TapStatus) } + +func AnalyzeInformation(c *fiber.Ctx) error { + return c.Status(fiber.StatusOK).JSON(up9.GetAnalyzeInfo()) +} diff --git a/api/pkg/database/main.go b/api/pkg/database/main.go index b015a0868..f14734f85 100644 --- a/api/pkg/database/main.go +++ b/api/pkg/database/main.go @@ -1,9 +1,13 @@ package database import ( + "encoding/json" + "fmt" + "github.com/google/martian/har" "gorm.io/driver/sqlite" "gorm.io/gorm" "mizuserver/pkg/models" + "mizuserver/pkg/utils" ) const ( @@ -14,6 +18,24 @@ var ( DB = initDataBase(DBPath) ) +const ( + OrderDesc = "desc" + OrderAsc = "asc" + LT = "lt" + GT = "gt" +) + +var ( + OperatorToSymbolMapping = map[string]string{ + LT: "<", + GT: ">", + } + OperatorToOrderMapping = map[string]string{ + LT: OrderDesc, + GT: OrderAsc, + } +) + func GetEntriesTable() *gorm.DB { return DB.Table("mizu_entries") } @@ -23,3 +45,34 @@ func initDataBase(databasePath string) *gorm.DB { _ = temp.AutoMigrate(&models.MizuEntry{}) // this will ensure table is created return temp } + +func GetEntriesFromDb(timestampFrom int64, timestampTo int64) []har.Entry { + order := OrderDesc + var entries []models.MizuEntry + GetEntriesTable(). + Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)). + Order(fmt.Sprintf("timestamp %s", order)). + Find(&entries) + + if len(entries) > 0 { + // the entries always order from oldest to newest so we should revers + utils.ReverseSlice(entries) + } + + entriesArray := make([]har.Entry, 0) + for _, entryData := range entries { + var harEntry har.Entry + _ = json.Unmarshal([]byte(entryData.Entry), &harEntry) + + if entryData.ResolvedSource != "" { + harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-source", Value: entryData.ResolvedSource}) + } + if entryData.ResolvedDestination != "" { + harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-destination", Value: entryData.ResolvedDestination}) + } + + entriesArray = append(entriesArray, harEntry) + } + return entriesArray +} + diff --git a/api/pkg/models/models.go b/api/pkg/models/models.go index 108801e1e..232a5c1f8 100644 --- a/api/pkg/models/models.go +++ b/api/pkg/models/models.go @@ -49,6 +49,10 @@ type EntriesFilter struct { Timestamp int64 `query:"timestamp" validate:"required,min=1"` } +type UploadEntriesRequestBody struct { + Dest string `query:"dest"` +} + type HarFetchRequestBody struct { From int64 `query:"from"` To int64 `query:"to"` @@ -101,5 +105,5 @@ type ExtendedLog struct { type ExtendedCreator struct { *har.Creator - Source string `json:"_source"` + Source *string `json:"_source"` } diff --git a/api/pkg/routes/public_routes.go b/api/pkg/routes/public_routes.go index df589a62c..3bdbe150a 100644 --- a/api/pkg/routes/public_routes.go +++ b/api/pkg/routes/public_routes.go @@ -12,7 +12,7 @@ func EntriesRoutes(fiberApp *fiber.App) { routeGroup.Get("/entries", controllers.GetEntries) // get entries (base/thin entries) routeGroup.Get("/entries/:entryId", controllers.GetEntry) // get single (full) entry routeGroup.Get("/exportEntries", controllers.GetFullEntries) - + routeGroup.Get("/uploadEntries", controllers.UploadEntries) routeGroup.Get("/har", controllers.GetHARs) @@ -20,4 +20,5 @@ func EntriesRoutes(fiberApp *fiber.App) { routeGroup.Get("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB routeGroup.Get("/tapStatus", controllers.GetTappingStatus) // get tapping status + routeGroup.Get("/analyzeStatus", controllers.AnalyzeInformation) } diff --git a/api/pkg/up9/main.go b/api/pkg/up9/main.go new file mode 100644 index 000000000..17f1ab131 --- /dev/null +++ b/api/pkg/up9/main.go @@ -0,0 +1,178 @@ +package up9 + +import ( + "bytes" + "compress/zlib" + "encoding/json" + "fmt" + "github.com/up9inc/mizu/shared" + "io/ioutil" + "log" + "mizuserver/pkg/database" + "net/http" + "net/url" + "time" +) + + +const ( + AnalyzeCheckSleepTime = 5 * time.Second +) + +type GuestToken struct { + Token string `json:"token"` + Model string `json:"model"` +} + +type ModelStatus struct { + LastMajorGeneration float64 `json:"lastMajorGeneration"` +} + +func getGuestToken(url string, target *GuestToken) error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + return json.NewDecoder(resp.Body).Decode(target) +} + +func CreateAnonymousToken(envPrefix string) (*GuestToken, error) { + tokenUrl := fmt.Sprintf("https://trcc.%v/anonymous/token", envPrefix) + token := &GuestToken{} + if err := getGuestToken(tokenUrl, token); err != nil { + fmt.Println(err) + return nil, err + } + return token, nil +} + +func GetRemoteUrl(analyzeDestination string, analyzeToken string) string { + return fmt.Sprintf("https://%s/share/%s", analyzeDestination, analyzeToken) +} + +func CheckIfModelReady(analyzeDestination string, analyzeModel string, analyzeToken string) bool { + statusUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/models/%s/status", analyzeDestination, analyzeModel)) + req := &http.Request{ + Method: http.MethodGet, + URL: statusUrl, + Header: map[string][]string{ + "Content-Type": {"application/json"}, + "Guest-Auth": {analyzeToken}, + }, + } + statusResp, err := http.DefaultClient.Do(req) + if err != nil { + return false + } + defer statusResp.Body.Close() + + target := &ModelStatus{} + _ = json.NewDecoder(statusResp.Body).Decode(&target) + + return target.LastMajorGeneration > 0 +} + +func GetTrafficDumpUrl(analyzeDestination string, analyzeModel string) *url.URL { + postUrl, _ := url.Parse(fmt.Sprintf("https://traffic.%s/dumpTrafficBulk/%s", analyzeDestination, analyzeModel)) + return postUrl +} + +type AnalyzeInformation struct { + IsAnalyzing bool + AnalyzedModel string + AnalyzeToken string + AnalyzeDestination string +} + +func (info *AnalyzeInformation) Reset() { + info.IsAnalyzing = false + info.AnalyzedModel = "" + info.AnalyzeToken = "" + info.AnalyzeDestination = "" +} + +var analyzeInformation = &AnalyzeInformation{} + +func GetAnalyzeInfo() *shared.AnalyzeStatus { + return &shared.AnalyzeStatus{ + IsAnalyzing: analyzeInformation.IsAnalyzing, + RemoteUrl: GetRemoteUrl(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzeToken), + IsRemoteReady: CheckIfModelReady(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzedModel, analyzeInformation.AnalyzeToken), + } +} + +func UploadEntriesImpl(token string, model string, envPrefix string) { + analyzeInformation.IsAnalyzing = true + analyzeInformation.AnalyzedModel = model + analyzeInformation.AnalyzeToken = token + analyzeInformation.AnalyzeDestination = envPrefix + + sleepTime := time.Second * 10 + + var timestampFrom int64 = 0 + + for { + timestampTo := time.Now().UnixNano() / int64(time.Millisecond) + fmt.Printf("Getting entries from %v, to %v\n", timestampFrom, timestampTo) + entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo) + + if len(entriesArray) > 0 { + fmt.Printf("About to upload %v entries\n", len(entriesArray)) + + body, jMarshalErr := json.Marshal(entriesArray) + if jMarshalErr != nil { + analyzeInformation.Reset() + fmt.Println("Stopping analyzing") + log.Fatal(jMarshalErr) + } + + var in bytes.Buffer + w := zlib.NewWriter(&in) + _, _ = w.Write(body) + _ = w.Close() + reqBody := ioutil.NopCloser(bytes.NewReader(in.Bytes())) + + req := &http.Request{ + Method: http.MethodPost, + URL: GetTrafficDumpUrl(envPrefix, model), + Header: map[string][]string{ + "Content-Encoding": {"deflate"}, + "Content-Type": {"application/octet-stream"}, + "Guest-Auth": {token}, + }, + Body: reqBody, + } + + if _, postErr := http.DefaultClient.Do(req); postErr != nil { + analyzeInformation.Reset() + log.Println("Stopping analyzing") + log.Fatal(postErr) + } + fmt.Printf("Finish uploading %v entries to %s\n", len(entriesArray), GetTrafficDumpUrl(envPrefix, model)) + + } else { + fmt.Println("Nothing to upload") + } + + fmt.Printf("Sleeping for %v...\n", sleepTime) + time.Sleep(sleepTime) + timestampFrom = timestampTo + } +} + +func UpdateAnalyzeStatus(callback func(data []byte)) { + for { + if !analyzeInformation.IsAnalyzing { + time.Sleep(AnalyzeCheckSleepTime) + continue + } + analyzeStatus := GetAnalyzeInfo() + socketMessage := shared.CreateWebSocketMessageTypeAnalyzeStatus(*analyzeStatus) + + jsonMessage, _ := json.Marshal(socketMessage) + callback(jsonMessage) + time.Sleep(AnalyzeCheckSleepTime) + } +} diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index c3e6fb747..f29a59773 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -15,6 +15,8 @@ type MizuTapOptions struct { GuiPort uint16 Namespace string AllNamespaces bool + Analyze bool + AnalyzeDestination string KubeConfigPath string MizuImage string MizuPodPort uint16 @@ -22,7 +24,6 @@ type MizuTapOptions struct { TapOutgoing bool } - var mizuTapOptions = &MizuTapOptions{} var direction string @@ -30,7 +31,7 @@ var tapCmd = &cobra.Command{ Use: "tap [POD REGEX]", Short: "Record ingoing traffic of a kubernetes pod", Long: `Record the ingoing traffic of a kubernetes pod. - Supported protocols are HTTP and gRPC.`, +Supported protocols are HTTP and gRPC.`, RunE: func(cmd *cobra.Command, args []string) error { if len(args) == 0 { return errors.New("POD REGEX argument is required") @@ -62,6 +63,8 @@ func init() { tapCmd.Flags().Uint16VarP(&mizuTapOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver") tapCmd.Flags().StringVarP(&mizuTapOptions.Namespace, "namespace", "n", "", "Namespace selector") + tapCmd.Flags().BoolVar(&mizuTapOptions.Analyze, "analyze", false, "Uploads traffic to UP9 cloud for further analysis (Beta)") + tapCmd.Flags().StringVar(&mizuTapOptions.AnalyzeDestination, "dest", "up9.app", "Destination environment") tapCmd.Flags().BoolVarP(&mizuTapOptions.AllNamespaces, "all-namespaces", "A", false, "Tap all namespaces") tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file") tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector") diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 76011e1dc..5a9039daf 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "net/http" "os" "os/signal" "regexp" @@ -79,6 +80,7 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { waitForFinish(ctx, cancel) } + func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error { if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil { return err @@ -198,10 +200,10 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro for { select { case newTarget := <-added: - fmt.Printf("+%s\n", newTarget.Name) + fmt.Printf(mizu.Green, fmt.Sprintf("+%s\n", newTarget.Name)) case removedTarget := <-removed: - fmt.Printf("-%s\n", removedTarget.Name) + fmt.Printf(mizu.Red, fmt.Sprintf("-%s\n", removedTarget.Name)) restartTappersDebouncer.SetOn() case modifiedTarget := <-modified: @@ -241,12 +243,21 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi case modifiedPod := <-modified: if modifiedPod.Status.Phase == "Running" && !isPodReady { isPodReady = true - var err error - portForward, err = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel) - fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort) - if err != nil { - fmt.Printf("error forwarding port to pod %s\n", err) + var portForwardCreateError error + if portForward, portForwardCreateError = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel); portForwardCreateError != nil { + fmt.Printf("error forwarding port to pod %s\n", portForwardCreateError) cancel() + } else { + fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort) + time.Sleep(time.Second * 5) // Waiting to be sure port forwarding finished + if tappingOptions.Analyze { + if _, err := http.Get(fmt.Sprintf("http://localhost:%d/api/uploadEntries?dest=%s", tappingOptions.GuiPort, tappingOptions.AnalyzeDestination)); err != nil { + fmt.Println(err) + } else { + fmt.Printf(mizu.Purple, "Traffic is uploading to UP9 cloud for further analsys") + fmt.Println() + } + } } } diff --git a/cli/mizu/consts.go b/cli/mizu/consts.go index 9f72620ad..1f960ead2 100644 --- a/cli/mizu/consts.go +++ b/cli/mizu/consts.go @@ -15,3 +15,14 @@ const ( TapperPodName = "mizu-tapper" K8sAllNamespaces = "" ) + +const ( + Black = "\033[1;30m%s\033[0m" + Red = "\033[1;31m%s\033[0m" + Green = "\033[1;32m%s\033[0m" + Yellow = "\033[1;33m%s\033[0m" + Purple = "\033[1;34m%s\033[0m" + Magenta = "\033[1;35m%s\033[0m" + Teal = "\033[1;36m%s\033[0m" + White = "\033[1;37m%s\033[0m" +) diff --git a/shared/models.go b/shared/models.go index 854dbf1f6..f1d443cdf 100644 --- a/shared/models.go +++ b/shared/models.go @@ -1,28 +1,41 @@ package shared type WebSocketMessageType string + const ( - WebSocketMessageTypeEntry WebSocketMessageType = "entry" - WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry" - WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status" + WebSocketMessageTypeEntry WebSocketMessageType = "entry" + WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry" + WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status" + WebSocketMessageTypeAnalyzeStatus WebSocketMessageType = "analyzeStatus" ) type WebSocketMessageMetadata struct { MessageType WebSocketMessageType `json:"messageType,omitempty"` } +type WebSocketAnalyzeStatusMessage struct { + *WebSocketMessageMetadata + AnalyzeStatus AnalyzeStatus `json:"analyzeStatus"` +} + +type AnalyzeStatus struct { + IsAnalyzing bool `json:"isAnalyzing"` + RemoteUrl string `json:"remoteUrl"` + IsRemoteReady bool `json:"isRemoteReady"` +} + type WebSocketStatusMessage struct { *WebSocketMessageMetadata TappingStatus TapStatus `json:"tappingStatus"` } type TapStatus struct { - Pods []PodInfo `json:"pods"` + Pods []PodInfo `json:"pods"` } type PodInfo struct { Namespace string `json:"namespace"` - Name string `json:"name"` + Name string `json:"name"` } func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessage { @@ -34,6 +47,15 @@ func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessag } } +func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSocketAnalyzeStatusMessage { + return WebSocketAnalyzeStatusMessage{ + WebSocketMessageMetadata: &WebSocketMessageMetadata{ + MessageType: WebSocketMessageTypeAnalyzeStatus, + }, + AnalyzeStatus: analyzeStatus, + } +} + type TrafficFilteringOptions struct { PlainTextMaskingRegexes []*SerializableRegexp } diff --git a/ui/src/App.sass b/ui/src/App.sass index 564ee6c2b..ab781429e 100644 --- a/ui/src/App.sass +++ b/ui/src/App.sass @@ -10,6 +10,8 @@ display: flex align-items: center padding-left: 24px + padding-right: 24px + justify-content: space-between .title font-size: 45px diff --git a/ui/src/App.tsx b/ui/src/App.tsx index 4195e558d..0663cfbb6 100644 --- a/ui/src/App.tsx +++ b/ui/src/App.tsx @@ -1,18 +1,41 @@ -import React from 'react'; -import {HarPage} from "./components/HarPage"; +import React, {useState} from 'react'; import './App.sass'; import logo from './components/assets/Mizu.svg'; +import {Button} from "@material-ui/core"; +import {HarPage} from "./components/HarPage"; + const App = () => { - return ( -
-
-
logo
-
Traffic viewer for Kubernetes
+ + const [analyzeStatus, setAnalyzeStatus] = useState(null); + + return ( +
+
+
+
logo
+
Traffic viewer for Kubernetes
+
+
+ {analyzeStatus?.isAnalyzing && +
+ +
+ } +
+
+
- -
- ); + ); } export default App; diff --git a/ui/src/components/HarEntry.tsx b/ui/src/components/HarEntry.tsx index 95f3c6ffd..368ac92cb 100644 --- a/ui/src/components/HarEntry.tsx +++ b/ui/src/components/HarEntry.tsx @@ -1,9 +1,13 @@ import React from "react"; import styles from './style/HarEntry.module.sass'; -import StatusCode from "./StatusCode"; +import StatusCode, {getClassification, StatusCodeClassification} from "./StatusCode"; import {EndpointPath} from "./EndpointPath"; -import ingoingIcon from "./assets/ingoing-traffic.svg" -import outgoingIcon from "./assets/outgoing-traffic.svg" +import ingoingIconSuccess from "./assets/ingoing-traffic-success.svg" +import ingoingIconFailure from "./assets/ingoing-traffic-failure.svg" +import ingoingIconNeutral from "./assets/ingoing-traffic-neutral.svg" +import outgoingIconSuccess from "./assets/outgoing-traffic-success.svg" +import outgoingIconFailure from "./assets/outgoing-traffic-failure.svg" +import outgoingIconNeutral from "./assets/outgoing-traffic-neutral.svg" interface HAREntry { method?: string, @@ -24,6 +28,26 @@ interface HAREntryProps { } export const HarEntry: React.FC = ({entry, setFocusedEntryId, isSelected}) => { + const classification = getClassification(entry.statusCode) + let ingoingIcon; + let outgoingIcon; + switch(classification) { + case StatusCodeClassification.SUCCESS: { + ingoingIcon = ingoingIconSuccess; + outgoingIcon = outgoingIconSuccess; + break; + } + case StatusCodeClassification.FAILURE: { + ingoingIcon = ingoingIconFailure; + outgoingIcon = outgoingIconFailure; + break; + } + case StatusCodeClassification.NEUTRAL: { + ingoingIcon = ingoingIconNeutral; + outgoingIcon = outgoingIconNeutral; + break; + } + } return <>
setFocusedEntryId(entry.id)}> @@ -38,13 +62,9 @@ export const HarEntry: React.FC = ({entry, setFocusedEntryId, isS
{entry.isOutgoing ? -
- outgoing traffic -
+ outgoing traffic : -
- ingoing traffic -
+ ingoing traffic }
{new Date(+entry.timestamp)?.toLocaleString()}
diff --git a/ui/src/components/HarPage.tsx b/ui/src/components/HarPage.tsx index 11edf80fa..6005cdb1b 100644 --- a/ui/src/components/HarPage.tsx +++ b/ui/src/components/HarPage.tsx @@ -35,7 +35,11 @@ enum ConnectionStatus { Paused } -export const HarPage: React.FC = () => { +interface HarPageProps { + setAnalyzeStatus: (status: any) => void; +} + +export const HarPage: React.FC = ({setAnalyzeStatus}) => { const classes = useLayoutStyles(); @@ -60,21 +64,21 @@ export const HarPage: React.FC = () => { ws.current.onclose = () => setConnection(ConnectionStatus.Closed); } - if(ws.current) { + if (ws.current) { ws.current.onmessage = e => { - if(!e?.data) return; + if (!e?.data) return; const message = JSON.parse(e.data); switch (message.messageType) { case "entry": const entry = message.data - if(connection === ConnectionStatus.Paused) { + if (connection === ConnectionStatus.Paused) { setNoMoreDataBottom(false) return; } - if(!focusedEntryId) setFocusedEntryId(entry.id) + if (!focusedEntryId) setFocusedEntryId(entry.id) let newEntries = [...entries]; - if(entries.length === 1000) { + if (entries.length === 1000) { newEntries = newEntries.splice(1); setNoMoreDataTop(false); } @@ -83,6 +87,9 @@ export const HarPage: React.FC = () => { case "status": setTappingStatus(message.tappingStatus); break + case "analyzeStatus": + setAnalyzeStatus(message.analyzeStatus); + break default: console.error(`unsupported websocket message type, Got: ${message.messageType}`) } @@ -94,19 +101,23 @@ export const HarPage: React.FC = () => { fetch(`http://localhost:8899/api/tapStatus`) .then(response => response.json()) .then(data => setTappingStatus(data)); + + fetch(`http://localhost:8899/api/analyzeStatus`) + .then(response => response.json()) + .then(data => setAnalyzeStatus(data)); }, []); useEffect(() => { - if(!focusedEntryId) return; + if (!focusedEntryId) return; setSelectedHarEntry(null) fetch(`http://localhost:8899/api/entries/${focusedEntryId}`) .then(response => response.json()) .then(data => setSelectedHarEntry(data)); - },[focusedEntryId]) + }, [focusedEntryId]) const toggleConnection = () => { - setConnection(connection === ConnectionStatus.Connected ? ConnectionStatus.Paused : ConnectionStatus.Connected ); + setConnection(connection === ConnectionStatus.Connected ? ConnectionStatus.Paused : ConnectionStatus.Connected); } const getConnectionStatusClass = (isContainer) => { @@ -135,11 +146,12 @@ export const HarPage: React.FC = () => { return (
- pause + pause
{getConnectionTitle()}
-
+
@@ -169,7 +181,8 @@ export const HarPage: React.FC = () => {
- {selectedHarEntry && } + {selectedHarEntry && + }
} {tappingStatus?.pods != null && } diff --git a/ui/src/components/StatusCode.tsx b/ui/src/components/StatusCode.tsx index 376a277ed..2230a9b8a 100644 --- a/ui/src/components/StatusCode.tsx +++ b/ui/src/components/StatusCode.tsx @@ -1,7 +1,7 @@ import React from "react"; import styles from './style/StatusCode.module.sass'; -enum StatusCodeClassification { +export enum StatusCodeClassification { SUCCESS = "success", FAILURE = "failure", NEUTRAL = "neutral" @@ -14,6 +14,12 @@ interface HAREntryProps { const StatusCode: React.FC = ({statusCode}) => { + const classification = getClassification(statusCode) + + return {statusCode} +}; + +export function getClassification(statusCode: number): string { let classification = StatusCodeClassification.NEUTRAL; if (statusCode >= 200 && statusCode <= 399) { @@ -22,7 +28,7 @@ const StatusCode: React.FC = ({statusCode}) => { classification = StatusCodeClassification.FAILURE; } - return {statusCode} -}; + return classification +} -export default StatusCode; \ No newline at end of file +export default StatusCode; diff --git a/ui/src/components/assets/ingoing-traffic-failure.svg b/ui/src/components/assets/ingoing-traffic-failure.svg new file mode 100644 index 000000000..3bb4e26f2 --- /dev/null +++ b/ui/src/components/assets/ingoing-traffic-failure.svg @@ -0,0 +1,5 @@ + + + + + diff --git a/ui/src/components/assets/ingoing-traffic-neutral.svg b/ui/src/components/assets/ingoing-traffic-neutral.svg new file mode 100644 index 000000000..9720d65f2 --- /dev/null +++ b/ui/src/components/assets/ingoing-traffic-neutral.svg @@ -0,0 +1,5 @@ + + + + + diff --git a/ui/src/components/assets/ingoing-traffic-success.svg b/ui/src/components/assets/ingoing-traffic-success.svg new file mode 100644 index 000000000..f1c745e64 --- /dev/null +++ b/ui/src/components/assets/ingoing-traffic-success.svg @@ -0,0 +1,5 @@ + + + + + diff --git a/ui/src/components/assets/ingoing-traffic.svg b/ui/src/components/assets/ingoing-traffic.svg deleted file mode 100644 index 7aaded9a8..000000000 --- a/ui/src/components/assets/ingoing-traffic.svg +++ /dev/null @@ -1 +0,0 @@ - diff --git a/ui/src/components/assets/outgoing-traffic-failure.svg b/ui/src/components/assets/outgoing-traffic-failure.svg new file mode 100644 index 000000000..0a4f7cfb3 --- /dev/null +++ b/ui/src/components/assets/outgoing-traffic-failure.svg @@ -0,0 +1,5 @@ + + + + + diff --git a/ui/src/components/assets/outgoing-traffic-neutral.svg b/ui/src/components/assets/outgoing-traffic-neutral.svg new file mode 100644 index 000000000..8f952e353 --- /dev/null +++ b/ui/src/components/assets/outgoing-traffic-neutral.svg @@ -0,0 +1,5 @@ + + + + + diff --git a/ui/src/components/assets/outgoing-traffic-success.svg b/ui/src/components/assets/outgoing-traffic-success.svg new file mode 100644 index 000000000..477fb7350 --- /dev/null +++ b/ui/src/components/assets/outgoing-traffic-success.svg @@ -0,0 +1,5 @@ + + + + + diff --git a/ui/src/components/assets/outgoing-traffic.svg b/ui/src/components/assets/outgoing-traffic.svg deleted file mode 100644 index 1c8ec87d7..000000000 --- a/ui/src/components/assets/outgoing-traffic.svg +++ /dev/null @@ -1 +0,0 @@ - diff --git a/ui/src/components/style/HarEntry.module.sass b/ui/src/components/style/HarEntry.module.sass index 3e28bfee9..fc5442f48 100644 --- a/ui/src/components/style/HarEntry.module.sass +++ b/ui/src/components/style/HarEntry.module.sass @@ -37,9 +37,10 @@ .timestamp font-size: 12px color: $secondary-font-color - padding-left: 8px - padding-right: 8px + padding-left: 12px flex-shrink: 0 + width: 145px + text-align: left .endpointServiceContainer display: flex @@ -51,13 +52,6 @@ .directionContainer display: flex - width: 28px - flex-direction: column - -.outgoingIcon - display: flex - align-self: flex-end - -.ingoingIcon - display: flex - align-self: flex-start + border-right: 1px solid $data-background-color + padding: 4px + padding-right: 12px