mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-03-09 22:22:16 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eef58496b5 | ||
|
|
1137f9386b | ||
|
|
93714ab902 | ||
|
|
fc03ba2eda | ||
|
|
3662fbcdf6 |
@@ -9,6 +9,7 @@ import (
|
||||
"mizuserver/pkg/controllers"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/routes"
|
||||
"mizuserver/pkg/up9"
|
||||
)
|
||||
|
||||
var browserClientSocketUUIDs = make([]string, 0)
|
||||
@@ -18,6 +19,9 @@ type RoutesEventHandlers struct {
|
||||
SocketHarOutChannel chan<- *tap.OutputChannelItem
|
||||
}
|
||||
|
||||
func init() {
|
||||
go up9.UpdateAnalyzeStatus(broadcastToBrowserClients)
|
||||
}
|
||||
|
||||
func (h *RoutesEventHandlers) WebSocketConnect(ep *ikisocket.EventPayload) {
|
||||
if ep.Kws.GetAttribute("is_tapper") == true {
|
||||
@@ -84,7 +88,6 @@ func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func removeSocketUUIDFromBrowserSlice(uuidToRemove string) {
|
||||
newUUIDSlice := make([]string, 0, len(browserClientSocketUUIDs))
|
||||
for _, uuid := range browserClientSocketUUIDs {
|
||||
|
||||
@@ -1,41 +1,19 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/zlib"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/google/martian/har"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"mizuserver/pkg/database"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/up9"
|
||||
"mizuserver/pkg/utils"
|
||||
"mizuserver/pkg/validation"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
OrderDesc = "desc"
|
||||
OrderAsc = "asc"
|
||||
LT = "lt"
|
||||
GT = "gt"
|
||||
)
|
||||
|
||||
var (
|
||||
operatorToSymbolMapping = map[string]string{
|
||||
LT: "<",
|
||||
GT: ">",
|
||||
}
|
||||
operatorToOrderMapping = map[string]string{
|
||||
LT: OrderDesc,
|
||||
GT: OrderAsc,
|
||||
}
|
||||
)
|
||||
|
||||
func GetEntries(c *fiber.Ctx) error {
|
||||
entriesFilter := &models.EntriesFilter{}
|
||||
|
||||
@@ -47,8 +25,8 @@ func GetEntries(c *fiber.Ctx) error {
|
||||
return c.Status(fiber.StatusBadRequest).JSON(err)
|
||||
}
|
||||
|
||||
order := operatorToOrderMapping[entriesFilter.Operator]
|
||||
operatorSymbol := operatorToSymbolMapping[entriesFilter.Operator]
|
||||
order := database.OperatorToOrderMapping[entriesFilter.Operator]
|
||||
operatorSymbol := database.OperatorToSymbolMapping[entriesFilter.Operator]
|
||||
var entries []models.MizuEntry
|
||||
database.GetEntriesTable().
|
||||
Order(fmt.Sprintf("timestamp %s", order)).
|
||||
@@ -57,7 +35,7 @@ func GetEntries(c *fiber.Ctx) error {
|
||||
Limit(entriesFilter.Limit).
|
||||
Find(&entries)
|
||||
|
||||
if len(entries) > 0 && order == OrderDesc {
|
||||
if len(entries) > 0 && order == database.OrderDesc {
|
||||
// the entries always order from oldest to newest so we should revers
|
||||
utils.ReverseSlice(entries)
|
||||
}
|
||||
@@ -73,7 +51,7 @@ func GetEntries(c *fiber.Ctx) error {
|
||||
|
||||
func GetHARs(c *fiber.Ctx) error {
|
||||
entriesFilter := &models.HarFetchRequestBody{}
|
||||
order := OrderDesc
|
||||
order := database.OrderDesc
|
||||
if err := c.QueryParser(entriesFilter); err != nil {
|
||||
return c.Status(fiber.StatusBadRequest).JSON(err)
|
||||
}
|
||||
@@ -111,9 +89,17 @@ func GetHARs(c *fiber.Ctx) error {
|
||||
for _, entryData := range entries {
|
||||
var harEntry har.Entry
|
||||
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
|
||||
if entryData.ResolvedDestination != "" {
|
||||
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, entryData.ResolvedDestination)
|
||||
}
|
||||
|
||||
sourceOfEntry := entryData.ResolvedSource
|
||||
fileName := fmt.Sprintf("%s.har", sourceOfEntry)
|
||||
if sourceOfEntry != "" {
|
||||
// naively assumes the proper service source is http
|
||||
sourceOfEntry = fmt.Sprintf("http://%s", sourceOfEntry)
|
||||
}
|
||||
//replace / from the file name cause they end up creating a corrupted folder
|
||||
fileName := fmt.Sprintf("%s.har", strings.ReplaceAll(sourceOfEntry, "/", "_"))
|
||||
if harOfSource, ok := harsObject[fileName]; ok {
|
||||
harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry)
|
||||
} else {
|
||||
@@ -127,11 +113,14 @@ func GetHARs(c *fiber.Ctx) error {
|
||||
Name: "mizu",
|
||||
Version: "0.0.2",
|
||||
},
|
||||
Source: sourceOfEntry,
|
||||
},
|
||||
Entries: entriesHar,
|
||||
},
|
||||
}
|
||||
// leave undefined when no source is present, otherwise modeler assumes source is empty string ""
|
||||
if sourceOfEntry != "" {
|
||||
harsObject[fileName].Log.Creator.Source = sourceOfEntry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,67 +133,20 @@ func GetHARs(c *fiber.Ctx) error {
|
||||
return c.Status(fiber.StatusOK).SendStream(buffer)
|
||||
}
|
||||
|
||||
func uploadEntriesImpl(token string, model string, envPrefix string) {
|
||||
sleepTime := time.Second * 10
|
||||
|
||||
var timestampFrom int64 = 0
|
||||
|
||||
for {
|
||||
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
fmt.Printf("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
|
||||
entriesArray := getEntriesFromDb(timestampFrom, timestampTo)
|
||||
|
||||
if len(entriesArray) > 0 {
|
||||
fmt.Printf("About to upload %v entries\n", len(entriesArray))
|
||||
|
||||
body, jMarshalErr := json.Marshal(entriesArray)
|
||||
if jMarshalErr != nil {
|
||||
log.Fatal(jMarshalErr)
|
||||
}
|
||||
|
||||
var in bytes.Buffer
|
||||
w := zlib.NewWriter(&in)
|
||||
_, _ = w.Write(body)
|
||||
_ = w.Close()
|
||||
reqBody := ioutil.NopCloser(bytes.NewReader(in.Bytes()))
|
||||
|
||||
postUrl, _ := url.Parse(fmt.Sprintf("https://traffic.%s/dumpTrafficBulk/%s", envPrefix, model))
|
||||
fmt.Println(postUrl)
|
||||
req := &http.Request{
|
||||
Method: http.MethodPost,
|
||||
URL: postUrl,
|
||||
Header: map[string][]string{
|
||||
"Content-Encoding": {"deflate"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
"Guest-Auth": {token},
|
||||
},
|
||||
Body: reqBody,
|
||||
}
|
||||
_, postErr := http.DefaultClient.Do(req)
|
||||
if postErr != nil {
|
||||
log.Fatal(postErr)
|
||||
}
|
||||
fmt.Printf("Finish uploading %v entries to %s\n", len(entriesArray), postUrl)
|
||||
|
||||
} else {
|
||||
fmt.Println("Nothing to upload")
|
||||
}
|
||||
|
||||
fmt.Printf("Sleeping for %v...\n", sleepTime)
|
||||
time.Sleep(sleepTime)
|
||||
timestampFrom = timestampTo
|
||||
}
|
||||
}
|
||||
|
||||
func UploadEntries(c *fiber.Ctx) error {
|
||||
entriesFilter := &models.UploadEntriesRequestBody{}
|
||||
if err := c.QueryParser(entriesFilter); err != nil {
|
||||
uploadRequestBody := &models.UploadEntriesRequestBody{}
|
||||
if err := c.QueryParser(uploadRequestBody); err != nil {
|
||||
return c.Status(fiber.StatusBadRequest).JSON(err)
|
||||
}
|
||||
if err := validation.Validate(entriesFilter); err != nil {
|
||||
if err := validation.Validate(uploadRequestBody); err != nil {
|
||||
return c.Status(fiber.StatusBadRequest).JSON(err)
|
||||
}
|
||||
go uploadEntriesImpl(entriesFilter.Token, entriesFilter.Model, entriesFilter.Dest)
|
||||
if up9.GetAnalyzeInfo().IsAnalyzing {
|
||||
return c.Status(fiber.StatusBadRequest).SendString("Cannot analyze, mizu is already analyzing")
|
||||
}
|
||||
|
||||
token, _ := up9.CreateAnonymousToken(uploadRequestBody.Dest)
|
||||
go up9.UploadEntriesImpl(token.Token, token.Model, uploadRequestBody.Dest)
|
||||
return c.Status(fiber.StatusOK).SendString("OK")
|
||||
}
|
||||
|
||||
@@ -231,32 +173,10 @@ func GetFullEntries(c *fiber.Ctx) error {
|
||||
timestampTo = entriesFilter.To
|
||||
}
|
||||
|
||||
entriesArray := getEntriesFromDb(timestampFrom, timestampTo)
|
||||
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
|
||||
return c.Status(fiber.StatusOK).JSON(entriesArray)
|
||||
}
|
||||
|
||||
func getEntriesFromDb(timestampFrom int64, timestampTo int64) []har.Entry {
|
||||
order := OrderDesc
|
||||
var entries []models.MizuEntry
|
||||
database.GetEntriesTable().
|
||||
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
|
||||
Order(fmt.Sprintf("timestamp %s", order)).
|
||||
Find(&entries)
|
||||
|
||||
if len(entries) > 0 {
|
||||
// the entries always order from oldest to newest so we should revers
|
||||
utils.ReverseSlice(entries)
|
||||
}
|
||||
|
||||
entriesArray := make([]har.Entry, 0)
|
||||
for _, entryData := range entries {
|
||||
var harEntry har.Entry
|
||||
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
|
||||
entriesArray = append(entriesArray, harEntry)
|
||||
}
|
||||
return entriesArray
|
||||
}
|
||||
|
||||
func GetEntry(c *fiber.Ctx) error {
|
||||
var entryData models.EntryData
|
||||
database.GetEntriesTable().
|
||||
|
||||
@@ -3,6 +3,7 @@ package controllers
|
||||
import (
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"mizuserver/pkg/up9"
|
||||
)
|
||||
|
||||
var TapStatus shared.TapStatus
|
||||
@@ -10,3 +11,7 @@ var TapStatus shared.TapStatus
|
||||
func GetTappingStatus(c *fiber.Ctx) error {
|
||||
return c.Status(fiber.StatusOK).JSON(TapStatus)
|
||||
}
|
||||
|
||||
func AnalyzeInformation(c *fiber.Ctx) error {
|
||||
return c.Status(fiber.StatusOK).JSON(up9.GetAnalyzeInfo())
|
||||
}
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/google/martian/har"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -14,6 +18,24 @@ var (
|
||||
DB = initDataBase(DBPath)
|
||||
)
|
||||
|
||||
const (
|
||||
OrderDesc = "desc"
|
||||
OrderAsc = "asc"
|
||||
LT = "lt"
|
||||
GT = "gt"
|
||||
)
|
||||
|
||||
var (
|
||||
OperatorToSymbolMapping = map[string]string{
|
||||
LT: "<",
|
||||
GT: ">",
|
||||
}
|
||||
OperatorToOrderMapping = map[string]string{
|
||||
LT: OrderDesc,
|
||||
GT: OrderAsc,
|
||||
}
|
||||
)
|
||||
|
||||
func GetEntriesTable() *gorm.DB {
|
||||
return DB.Table("mizu_entries")
|
||||
}
|
||||
@@ -23,3 +45,34 @@ func initDataBase(databasePath string) *gorm.DB {
|
||||
_ = temp.AutoMigrate(&models.MizuEntry{}) // this will ensure table is created
|
||||
return temp
|
||||
}
|
||||
|
||||
func GetEntriesFromDb(timestampFrom int64, timestampTo int64) []har.Entry {
|
||||
order := OrderDesc
|
||||
var entries []models.MizuEntry
|
||||
GetEntriesTable().
|
||||
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
|
||||
Order(fmt.Sprintf("timestamp %s", order)).
|
||||
Find(&entries)
|
||||
|
||||
if len(entries) > 0 {
|
||||
// the entries always order from oldest to newest so we should revers
|
||||
utils.ReverseSlice(entries)
|
||||
}
|
||||
|
||||
entriesArray := make([]har.Entry, 0)
|
||||
for _, entryData := range entries {
|
||||
var harEntry har.Entry
|
||||
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
|
||||
|
||||
if entryData.ResolvedSource != "" {
|
||||
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-source", Value: entryData.ResolvedSource})
|
||||
}
|
||||
if entryData.ResolvedDestination != "" {
|
||||
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-destination", Value: entryData.ResolvedDestination})
|
||||
}
|
||||
|
||||
entriesArray = append(entriesArray, harEntry)
|
||||
}
|
||||
return entriesArray
|
||||
}
|
||||
|
||||
|
||||
@@ -50,8 +50,6 @@ type EntriesFilter struct {
|
||||
}
|
||||
|
||||
type UploadEntriesRequestBody struct {
|
||||
Token string `query:"token"`
|
||||
Model string `query:"model"`
|
||||
Dest string `query:"dest"`
|
||||
}
|
||||
|
||||
|
||||
@@ -20,4 +20,5 @@ func EntriesRoutes(fiberApp *fiber.App) {
|
||||
routeGroup.Get("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB
|
||||
|
||||
routeGroup.Get("/tapStatus", controllers.GetTappingStatus) // get tapping status
|
||||
routeGroup.Get("/analyzeStatus", controllers.AnalyzeInformation)
|
||||
}
|
||||
|
||||
178
api/pkg/up9/main.go
Normal file
178
api/pkg/up9/main.go
Normal file
@@ -0,0 +1,178 @@
|
||||
package up9
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/zlib"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"mizuserver/pkg/database"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
const (
|
||||
AnalyzeCheckSleepTime = 5 * time.Second
|
||||
)
|
||||
|
||||
type GuestToken struct {
|
||||
Token string `json:"token"`
|
||||
Model string `json:"model"`
|
||||
}
|
||||
|
||||
type ModelStatus struct {
|
||||
LastMajorGeneration float64 `json:"lastMajorGeneration"`
|
||||
}
|
||||
|
||||
func getGuestToken(url string, target *GuestToken) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return json.NewDecoder(resp.Body).Decode(target)
|
||||
}
|
||||
|
||||
func CreateAnonymousToken(envPrefix string) (*GuestToken, error) {
|
||||
tokenUrl := fmt.Sprintf("https://trcc.%v/anonymous/token", envPrefix)
|
||||
token := &GuestToken{}
|
||||
if err := getGuestToken(tokenUrl, token); err != nil {
|
||||
fmt.Println(err)
|
||||
return nil, err
|
||||
}
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func GetRemoteUrl(analyzeDestination string, analyzeToken string) string {
|
||||
return fmt.Sprintf("https://%s/share/%s", analyzeDestination, analyzeToken)
|
||||
}
|
||||
|
||||
func CheckIfModelReady(analyzeDestination string, analyzeModel string, analyzeToken string) bool {
|
||||
statusUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/models/%s/status", analyzeDestination, analyzeModel))
|
||||
req := &http.Request{
|
||||
Method: http.MethodGet,
|
||||
URL: statusUrl,
|
||||
Header: map[string][]string{
|
||||
"Content-Type": {"application/json"},
|
||||
"Guest-Auth": {analyzeToken},
|
||||
},
|
||||
}
|
||||
statusResp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
defer statusResp.Body.Close()
|
||||
|
||||
target := &ModelStatus{}
|
||||
_ = json.NewDecoder(statusResp.Body).Decode(&target)
|
||||
|
||||
return target.LastMajorGeneration > 0
|
||||
}
|
||||
|
||||
func GetTrafficDumpUrl(analyzeDestination string, analyzeModel string) *url.URL {
|
||||
postUrl, _ := url.Parse(fmt.Sprintf("https://traffic.%s/dumpTrafficBulk/%s", analyzeDestination, analyzeModel))
|
||||
return postUrl
|
||||
}
|
||||
|
||||
type AnalyzeInformation struct {
|
||||
IsAnalyzing bool
|
||||
AnalyzedModel string
|
||||
AnalyzeToken string
|
||||
AnalyzeDestination string
|
||||
}
|
||||
|
||||
func (info *AnalyzeInformation) Reset() {
|
||||
info.IsAnalyzing = false
|
||||
info.AnalyzedModel = ""
|
||||
info.AnalyzeToken = ""
|
||||
info.AnalyzeDestination = ""
|
||||
}
|
||||
|
||||
var analyzeInformation = &AnalyzeInformation{}
|
||||
|
||||
func GetAnalyzeInfo() *shared.AnalyzeStatus {
|
||||
return &shared.AnalyzeStatus{
|
||||
IsAnalyzing: analyzeInformation.IsAnalyzing,
|
||||
RemoteUrl: GetRemoteUrl(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzeToken),
|
||||
IsRemoteReady: CheckIfModelReady(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzedModel, analyzeInformation.AnalyzeToken),
|
||||
}
|
||||
}
|
||||
|
||||
func UploadEntriesImpl(token string, model string, envPrefix string) {
|
||||
analyzeInformation.IsAnalyzing = true
|
||||
analyzeInformation.AnalyzedModel = model
|
||||
analyzeInformation.AnalyzeToken = token
|
||||
analyzeInformation.AnalyzeDestination = envPrefix
|
||||
|
||||
sleepTime := time.Second * 10
|
||||
|
||||
var timestampFrom int64 = 0
|
||||
|
||||
for {
|
||||
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
fmt.Printf("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
|
||||
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
|
||||
|
||||
if len(entriesArray) > 0 {
|
||||
fmt.Printf("About to upload %v entries\n", len(entriesArray))
|
||||
|
||||
body, jMarshalErr := json.Marshal(entriesArray)
|
||||
if jMarshalErr != nil {
|
||||
analyzeInformation.Reset()
|
||||
fmt.Println("Stopping analyzing")
|
||||
log.Fatal(jMarshalErr)
|
||||
}
|
||||
|
||||
var in bytes.Buffer
|
||||
w := zlib.NewWriter(&in)
|
||||
_, _ = w.Write(body)
|
||||
_ = w.Close()
|
||||
reqBody := ioutil.NopCloser(bytes.NewReader(in.Bytes()))
|
||||
|
||||
req := &http.Request{
|
||||
Method: http.MethodPost,
|
||||
URL: GetTrafficDumpUrl(envPrefix, model),
|
||||
Header: map[string][]string{
|
||||
"Content-Encoding": {"deflate"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
"Guest-Auth": {token},
|
||||
},
|
||||
Body: reqBody,
|
||||
}
|
||||
|
||||
if _, postErr := http.DefaultClient.Do(req); postErr != nil {
|
||||
analyzeInformation.Reset()
|
||||
log.Println("Stopping analyzing")
|
||||
log.Fatal(postErr)
|
||||
}
|
||||
fmt.Printf("Finish uploading %v entries to %s\n", len(entriesArray), GetTrafficDumpUrl(envPrefix, model))
|
||||
|
||||
} else {
|
||||
fmt.Println("Nothing to upload")
|
||||
}
|
||||
|
||||
fmt.Printf("Sleeping for %v...\n", sleepTime)
|
||||
time.Sleep(sleepTime)
|
||||
timestampFrom = timestampTo
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateAnalyzeStatus(callback func(data []byte)) {
|
||||
for {
|
||||
if !analyzeInformation.IsAnalyzing {
|
||||
time.Sleep(AnalyzeCheckSleepTime)
|
||||
continue
|
||||
}
|
||||
analyzeStatus := GetAnalyzeInfo()
|
||||
socketMessage := shared.CreateWebSocketMessageTypeAnalyzeStatus(*analyzeStatus)
|
||||
|
||||
jsonMessage, _ := json.Marshal(socketMessage)
|
||||
callback(jsonMessage)
|
||||
time.Sleep(AnalyzeCheckSleepTime)
|
||||
}
|
||||
}
|
||||
@@ -63,7 +63,7 @@ func init() {
|
||||
|
||||
tapCmd.Flags().Uint16VarP(&mizuTapOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver")
|
||||
tapCmd.Flags().StringVarP(&mizuTapOptions.Namespace, "namespace", "n", "", "Namespace selector")
|
||||
tapCmd.Flags().BoolVar(&mizuTapOptions.Analyze, "analyze", false, "Uploads traffic to UP9 cloud for further analysis")
|
||||
tapCmd.Flags().BoolVar(&mizuTapOptions.Analyze, "analyze", false, "Uploads traffic to UP9 cloud for further analysis (Beta)")
|
||||
tapCmd.Flags().StringVar(&mizuTapOptions.AnalyzeDestination, "dest", "up9.app", "Destination environment")
|
||||
tapCmd.Flags().BoolVarP(&mizuTapOptions.AllNamespaces, "all-namespaces", "A", false, "Tap all namespaces")
|
||||
tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file")
|
||||
|
||||
@@ -2,7 +2,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -81,30 +80,6 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||
waitForFinish(ctx, cancel)
|
||||
}
|
||||
|
||||
type GuestToken struct {
|
||||
Token string `json:"token"`
|
||||
Model string `json:"model"`
|
||||
}
|
||||
|
||||
func getGuestToken(url string, target *GuestToken) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return json.NewDecoder(resp.Body).Decode(target)
|
||||
}
|
||||
|
||||
func CreateAnonymousToken(envPrefix string) (*GuestToken, error) {
|
||||
tokenUrl := fmt.Sprintf("https://trcc.%v/anonymous/token", envPrefix)
|
||||
token := &GuestToken{}
|
||||
if err := getGuestToken(tokenUrl, token); err != nil {
|
||||
fmt.Println(err)
|
||||
return nil, err
|
||||
}
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
|
||||
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil {
|
||||
@@ -225,10 +200,10 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
for {
|
||||
select {
|
||||
case newTarget := <-added:
|
||||
fmt.Printf("+%s\n", newTarget.Name)
|
||||
fmt.Printf(mizu.Green, fmt.Sprintf("+%s\n", newTarget.Name))
|
||||
|
||||
case removedTarget := <-removed:
|
||||
fmt.Printf("-%s\n", removedTarget.Name)
|
||||
fmt.Printf(mizu.Red, fmt.Sprintf("-%s\n", removedTarget.Name))
|
||||
restartTappersDebouncer.SetOn()
|
||||
|
||||
case modifiedTarget := <-modified:
|
||||
@@ -274,14 +249,13 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
cancel()
|
||||
} else {
|
||||
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
|
||||
|
||||
time.Sleep(time.Second * 5) // Waiting to be sure port forwarding finished
|
||||
if tappingOptions.Analyze {
|
||||
token, _ := CreateAnonymousToken(tappingOptions.AnalyzeDestination)
|
||||
if _, err := http.Get(fmt.Sprintf("http://localhost:%d/api/uploadEntries?token=%s&model=%s&dest=%s", tappingOptions.GuiPort, token.Token, token.Model, tappingOptions.AnalyzeDestination)); err != nil {
|
||||
if _, err := http.Get(fmt.Sprintf("http://localhost:%d/api/uploadEntries?dest=%s", tappingOptions.GuiPort, tappingOptions.AnalyzeDestination)); err != nil {
|
||||
fmt.Println(err)
|
||||
} else {
|
||||
fmt.Println("Staring to upload and analyze the data, it may take a few minutes")
|
||||
fmt.Println("https://" + tappingOptions.AnalyzeDestination + "/share/" + token.Token)
|
||||
fmt.Printf(mizu.Purple, "Traffic is uploading to UP9 cloud for further analsys")
|
||||
fmt.Println()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,3 +15,14 @@ const (
|
||||
TapperPodName = "mizu-tapper"
|
||||
K8sAllNamespaces = ""
|
||||
)
|
||||
|
||||
const (
|
||||
Black = "\033[1;30m%s\033[0m"
|
||||
Red = "\033[1;31m%s\033[0m"
|
||||
Green = "\033[1;32m%s\033[0m"
|
||||
Yellow = "\033[1;33m%s\033[0m"
|
||||
Purple = "\033[1;34m%s\033[0m"
|
||||
Magenta = "\033[1;35m%s\033[0m"
|
||||
Teal = "\033[1;36m%s\033[0m"
|
||||
White = "\033[1;37m%s\033[0m"
|
||||
)
|
||||
|
||||
@@ -1,28 +1,41 @@
|
||||
package shared
|
||||
|
||||
type WebSocketMessageType string
|
||||
|
||||
const (
|
||||
WebSocketMessageTypeEntry WebSocketMessageType = "entry"
|
||||
WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry"
|
||||
WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status"
|
||||
WebSocketMessageTypeEntry WebSocketMessageType = "entry"
|
||||
WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry"
|
||||
WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status"
|
||||
WebSocketMessageTypeAnalyzeStatus WebSocketMessageType = "analyzeStatus"
|
||||
)
|
||||
|
||||
type WebSocketMessageMetadata struct {
|
||||
MessageType WebSocketMessageType `json:"messageType,omitempty"`
|
||||
}
|
||||
|
||||
type WebSocketAnalyzeStatusMessage struct {
|
||||
*WebSocketMessageMetadata
|
||||
AnalyzeStatus AnalyzeStatus `json:"analyzeStatus"`
|
||||
}
|
||||
|
||||
type AnalyzeStatus struct {
|
||||
IsAnalyzing bool `json:"isAnalyzing"`
|
||||
RemoteUrl string `json:"remoteUrl"`
|
||||
IsRemoteReady bool `json:"isRemoteReady"`
|
||||
}
|
||||
|
||||
type WebSocketStatusMessage struct {
|
||||
*WebSocketMessageMetadata
|
||||
TappingStatus TapStatus `json:"tappingStatus"`
|
||||
}
|
||||
|
||||
type TapStatus struct {
|
||||
Pods []PodInfo `json:"pods"`
|
||||
Pods []PodInfo `json:"pods"`
|
||||
}
|
||||
|
||||
type PodInfo struct {
|
||||
Namespace string `json:"namespace"`
|
||||
Name string `json:"name"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessage {
|
||||
@@ -34,6 +47,15 @@ func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessag
|
||||
}
|
||||
}
|
||||
|
||||
func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSocketAnalyzeStatusMessage {
|
||||
return WebSocketAnalyzeStatusMessage{
|
||||
WebSocketMessageMetadata: &WebSocketMessageMetadata{
|
||||
MessageType: WebSocketMessageTypeAnalyzeStatus,
|
||||
},
|
||||
AnalyzeStatus: analyzeStatus,
|
||||
}
|
||||
}
|
||||
|
||||
type TrafficFilteringOptions struct {
|
||||
PlainTextMaskingRegexes []*SerializableRegexp
|
||||
}
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
display: flex
|
||||
align-items: center
|
||||
padding-left: 24px
|
||||
padding-right: 24px
|
||||
justify-content: space-between
|
||||
|
||||
.title
|
||||
font-size: 45px
|
||||
|
||||
@@ -1,18 +1,41 @@
|
||||
import React from 'react';
|
||||
import {HarPage} from "./components/HarPage";
|
||||
import React, {useState} from 'react';
|
||||
import './App.sass';
|
||||
import logo from './components/assets/Mizu.svg';
|
||||
import {Button} from "@material-ui/core";
|
||||
import {HarPage} from "./components/HarPage";
|
||||
|
||||
|
||||
const App = () => {
|
||||
return (
|
||||
<div className="mizuApp">
|
||||
<div className="header">
|
||||
<div className="title"><img src={logo} alt="logo"/></div>
|
||||
<div className="description">Traffic viewer for Kubernetes</div>
|
||||
|
||||
const [analyzeStatus, setAnalyzeStatus] = useState(null);
|
||||
|
||||
return (
|
||||
<div className="mizuApp">
|
||||
<div className="header">
|
||||
<div style={{display: "flex", alignItems: "center"}}>
|
||||
<div className="title"><img src={logo} alt="logo"/></div>
|
||||
<div className="description">Traffic viewer for Kubernetes</div>
|
||||
</div>
|
||||
<div>
|
||||
{analyzeStatus?.isAnalyzing &&
|
||||
<div
|
||||
title={!analyzeStatus?.isRemoteReady ? "Analysis is not ready yet" : "Go To see further analysis"}>
|
||||
<Button
|
||||
variant="contained"
|
||||
color="primary"
|
||||
disabled={!analyzeStatus?.isRemoteReady}
|
||||
onClick={() => {
|
||||
window.open(analyzeStatus?.remoteUrl)
|
||||
}}>
|
||||
Analysis
|
||||
</Button>
|
||||
</div>
|
||||
}
|
||||
</div>
|
||||
</div>
|
||||
<HarPage setAnalyzeStatus={setAnalyzeStatus}/>
|
||||
</div>
|
||||
<HarPage/>
|
||||
</div>
|
||||
);
|
||||
);
|
||||
}
|
||||
|
||||
export default App;
|
||||
|
||||
@@ -35,7 +35,11 @@ enum ConnectionStatus {
|
||||
Paused
|
||||
}
|
||||
|
||||
export const HarPage: React.FC = () => {
|
||||
interface HarPageProps {
|
||||
setAnalyzeStatus: (status: any) => void;
|
||||
}
|
||||
|
||||
export const HarPage: React.FC<HarPageProps> = ({setAnalyzeStatus}) => {
|
||||
|
||||
const classes = useLayoutStyles();
|
||||
|
||||
@@ -60,21 +64,21 @@ export const HarPage: React.FC = () => {
|
||||
ws.current.onclose = () => setConnection(ConnectionStatus.Closed);
|
||||
}
|
||||
|
||||
if(ws.current) {
|
||||
if (ws.current) {
|
||||
ws.current.onmessage = e => {
|
||||
if(!e?.data) return;
|
||||
if (!e?.data) return;
|
||||
const message = JSON.parse(e.data);
|
||||
|
||||
switch (message.messageType) {
|
||||
case "entry":
|
||||
const entry = message.data
|
||||
if(connection === ConnectionStatus.Paused) {
|
||||
if (connection === ConnectionStatus.Paused) {
|
||||
setNoMoreDataBottom(false)
|
||||
return;
|
||||
}
|
||||
if(!focusedEntryId) setFocusedEntryId(entry.id)
|
||||
if (!focusedEntryId) setFocusedEntryId(entry.id)
|
||||
let newEntries = [...entries];
|
||||
if(entries.length === 1000) {
|
||||
if (entries.length === 1000) {
|
||||
newEntries = newEntries.splice(1);
|
||||
setNoMoreDataTop(false);
|
||||
}
|
||||
@@ -83,6 +87,9 @@ export const HarPage: React.FC = () => {
|
||||
case "status":
|
||||
setTappingStatus(message.tappingStatus);
|
||||
break
|
||||
case "analyzeStatus":
|
||||
setAnalyzeStatus(message.analyzeStatus);
|
||||
break
|
||||
default:
|
||||
console.error(`unsupported websocket message type, Got: ${message.messageType}`)
|
||||
}
|
||||
@@ -94,19 +101,23 @@ export const HarPage: React.FC = () => {
|
||||
fetch(`http://localhost:8899/api/tapStatus`)
|
||||
.then(response => response.json())
|
||||
.then(data => setTappingStatus(data));
|
||||
|
||||
fetch(`http://localhost:8899/api/analyzeStatus`)
|
||||
.then(response => response.json())
|
||||
.then(data => setAnalyzeStatus(data));
|
||||
}, []);
|
||||
|
||||
|
||||
useEffect(() => {
|
||||
if(!focusedEntryId) return;
|
||||
if (!focusedEntryId) return;
|
||||
setSelectedHarEntry(null)
|
||||
fetch(`http://localhost:8899/api/entries/${focusedEntryId}`)
|
||||
.then(response => response.json())
|
||||
.then(data => setSelectedHarEntry(data));
|
||||
},[focusedEntryId])
|
||||
}, [focusedEntryId])
|
||||
|
||||
const toggleConnection = () => {
|
||||
setConnection(connection === ConnectionStatus.Connected ? ConnectionStatus.Paused : ConnectionStatus.Connected );
|
||||
setConnection(connection === ConnectionStatus.Connected ? ConnectionStatus.Paused : ConnectionStatus.Connected);
|
||||
}
|
||||
|
||||
const getConnectionStatusClass = (isContainer) => {
|
||||
@@ -135,11 +146,12 @@ export const HarPage: React.FC = () => {
|
||||
return (
|
||||
<div className="HarPage">
|
||||
<div className="harPageHeader">
|
||||
<img style={{cursor: "pointer", marginRight: 15, height: 30}} alt="pause" src={connection === ConnectionStatus.Connected ? pauseIcon : playIcon} onClick={toggleConnection}/>
|
||||
<img style={{cursor: "pointer", marginRight: 15, height: 30}} alt="pause"
|
||||
src={connection === ConnectionStatus.Connected ? pauseIcon : playIcon} onClick={toggleConnection}/>
|
||||
<div className="connectionText">
|
||||
{getConnectionTitle()}
|
||||
<div className={"indicatorContainer " + getConnectionStatusClass(true)}>
|
||||
<div className={"indicator " + getConnectionStatusClass(false)} />
|
||||
<div className={"indicator " + getConnectionStatusClass(false)}/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@@ -169,7 +181,8 @@ export const HarPage: React.FC = () => {
|
||||
</div>
|
||||
</div>
|
||||
<div className={classes.details}>
|
||||
{selectedHarEntry && <HAREntryDetailed harEntry={selectedHarEntry} classes={{root: classes.harViewer}}/>}
|
||||
{selectedHarEntry &&
|
||||
<HAREntryDetailed harEntry={selectedHarEntry} classes={{root: classes.harViewer}}/>}
|
||||
</div>
|
||||
</div>}
|
||||
{tappingStatus?.pods != null && <StatusBar tappingStatus={tappingStatus}/>}
|
||||
|
||||
Reference in New Issue
Block a user