Mizu tap analyze

This commit is contained in:
Igor Gov 2021-06-29 17:05:44 +03:00
parent c59aadb221
commit b84c698c1a
7 changed files with 125 additions and 10 deletions

View File

@ -11,16 +11,12 @@ require (
github.com/go-playground/universal-translator v0.17.0
github.com/go-playground/validator/v10 v10.5.0
github.com/gofiber/fiber/v2 v2.8.0
github.com/google/gopacket v1.1.19
github.com/google/martian v2.1.0+incompatible
github.com/gorilla/websocket v1.4.2
github.com/leodido/go-urn v1.2.1 // indirect
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
go.mongodb.org/mongo-driver v1.5.1
golang.org/x/net v0.0.0-20210421230115-4e50805a0758
gorm.io/driver/sqlite v1.1.4
gorm.io/gorm v1.21.8
k8s.io/api v0.21.0
@ -29,4 +25,5 @@ require (
)
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap v0.0.0 => ../tap

View File

@ -251,7 +251,6 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 h1:fa50YL1pzKW+1SsBnJDOHppJN9stOEwS+CRWyUtyYGU=
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=

View File

@ -1,14 +1,20 @@
package controllers
import (
"bytes"
"compress/zlib"
"encoding/json"
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/google/martian/har"
"io/ioutil"
"log"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
"net/http"
"net/url"
"time"
)
@ -138,9 +144,70 @@ func GetHARs(c *fiber.Ctx) error {
return c.Status(fiber.StatusOK).SendStream(buffer)
}
func uploadEntriesImpl(token string, model string) {
baseUrl := "igorgov-dev.dev.testr.io"
sleepTime := int64(time.Second * 10)
var timestampFrom int64 = 0
for {
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
fmt.Printf("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
entriesArray := getEntriesFromDb(timestampFrom, timestampTo)
if len(entriesArray) > 0 {
fmt.Printf("About to upload %v entries\n", len(entriesArray))
body, jMarshalErr := json.Marshal(entriesArray)
if jMarshalErr != nil {
log.Fatal(jMarshalErr)
}
var in bytes.Buffer
w := zlib.NewWriter(&in)
w.Write(body)
w.Close()
reqBody := ioutil.NopCloser(bytes.NewReader(in.Bytes()))
postUrl, _ := url.Parse("https://traffic." + baseUrl + "/dumpTrafficBulk/" + model)
req := &http.Request{
Method: "POST",
URL: postUrl,
Header: map[string][]string{
"Content-Encoding": {"deflate"},
"Content-Type": {"application/octet-stream"},
"Guest-Auth": {token},
},
Body: reqBody,
}
_, postErr := http.DefaultClient.Do(req)
if postErr != nil {
log.Fatal(postErr)
}
} else {
fmt.Println("Nothing to upload")
}
fmt.Println("Sleeping for " + string(sleepTime) + "seconds...")
time.Sleep(time.Duration(sleepTime))
timestampFrom = timestampTo
}
}
func UploadEntries(c *fiber.Ctx) error {
entriesFilter := &models.UploadEntriesRequestBody{}
if err := c.QueryParser(entriesFilter); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
if err := validation.Validate(entriesFilter); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
go uploadEntriesImpl(entriesFilter.Token, entriesFilter.Model)
return c.Status(fiber.StatusOK).SendString("OK")
}
func GetFullEntries(c *fiber.Ctx) error {
entriesFilter := &models.HarFetchRequestBody{}
order := OrderDesc
if err := c.QueryParser(entriesFilter); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
@ -162,6 +229,12 @@ func GetFullEntries(c *fiber.Ctx) error {
timestampTo = entriesFilter.To
}
entriesArray := getEntriesFromDb(timestampFrom, timestampTo)
return c.Status(fiber.StatusOK).JSON(entriesArray)
}
func getEntriesFromDb(timestampFrom int64, timestampTo int64) []har.Entry {
order := OrderDesc
var entries []models.MizuEntry
database.GetEntriesTable().
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
@ -179,7 +252,7 @@ func GetFullEntries(c *fiber.Ctx) error {
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
entriesArray = append(entriesArray, harEntry)
}
return c.Status(fiber.StatusOK).JSON(entriesArray)
return entriesArray
}
func GetEntry(c *fiber.Ctx) error {

View File

@ -49,6 +49,11 @@ type EntriesFilter struct {
Timestamp int64 `query:"timestamp" validate:"required,min=1"`
}
type UploadEntriesRequestBody struct {
Token string `query:"token"`
Model string `query:"model"`
}
type HarFetchRequestBody struct {
From int64 `query:"from"`
To int64 `query:"to"`

View File

@ -12,7 +12,7 @@ func EntriesRoutes(fiberApp *fiber.App) {
routeGroup.Get("/entries", controllers.GetEntries) // get entries (base/thin entries)
routeGroup.Get("/entries/:entryId", controllers.GetEntry) // get single (full) entry
routeGroup.Get("/exportEntries", controllers.GetFullEntries)
routeGroup.Get("/uploadEntries", controllers.UploadEntries)
routeGroup.Get("/har", controllers.GetHARs)

View File

@ -15,6 +15,7 @@ type MizuTapOptions struct {
GuiPort uint16
Namespace string
AllNamespaces bool
Analyze bool
KubeConfigPath string
MizuImage string
MizuPodPort uint16
@ -22,7 +23,6 @@ type MizuTapOptions struct {
TapOutgoing bool
}
var mizuTapOptions = &MizuTapOptions{}
var direction string
@ -30,7 +30,7 @@ var tapCmd = &cobra.Command{
Use: "tap [POD REGEX]",
Short: "Record ingoing traffic of a kubernetes pod",
Long: `Record the ingoing traffic of a kubernetes pod.
Supported protocols are HTTP and gRPC.`,
Supported protocols are HTTP and gRPC.`,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return errors.New("POD REGEX argument is required")
@ -62,6 +62,7 @@ func init() {
tapCmd.Flags().Uint16VarP(&mizuTapOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver")
tapCmd.Flags().StringVarP(&mizuTapOptions.Namespace, "namespace", "n", "", "Namespace selector")
tapCmd.Flags().BoolVarP(&mizuTapOptions.Analyze, "analyze", "", false, "Analyze traffic")
tapCmd.Flags().BoolVarP(&mizuTapOptions.AllNamespaces, "all-namespaces", "A", false, "Tap all namespaces")
tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file")
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")

View File

@ -2,7 +2,10 @@ package cmd
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"regexp"
@ -79,6 +82,32 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
waitForFinish(ctx, cancel)
}
type guestToken struct {
Token string `json:"token"`
Model string `json:"model"`
}
func CreateAnonymousToken(baseUrl string) guestToken {
resp, httpErr := http.Get("https://trcc." + baseUrl + "/anonymous/token")
if httpErr != nil {
fmt.Println(httpErr)
}
body, ioErr := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if ioErr != nil {
fmt.Println(ioErr)
}
token := guestToken{}
jsonErr := json.Unmarshal(body, &token)
if jsonErr != nil {
fmt.Println(jsonErr)
}
fmt.Println("Token:", token.Token, "model:", token.Model)
return token
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil {
return err
@ -244,6 +273,17 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
var err error
portForward, err = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel)
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
if tappingOptions.Analyze {
baseUrl := "igorgov-dev.dev.testr.io"
token := CreateAnonymousToken(baseUrl)
_, err := http.Get("http://localhost:8899/api/uploadEntries")
if err != nil {
fmt.Println(err)
}
fmt.Println("https://" + baseUrl + "/share/" + token.Token)
}
if err != nil {
fmt.Printf("error forwarding port to pod %s\n", err)
cancel()