Merge pull request #87 from up9inc/mizu_anonymous

Mizu tap analyze
This commit is contained in:
Igor Gov 2021-06-30 09:56:35 +03:00 committed by GitHub
commit ed4a818a53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 135 additions and 16 deletions

View File

@ -11,16 +11,12 @@ require (
github.com/go-playground/universal-translator v0.17.0 github.com/go-playground/universal-translator v0.17.0
github.com/go-playground/validator/v10 v10.5.0 github.com/go-playground/validator/v10 v10.5.0
github.com/gofiber/fiber/v2 v2.8.0 github.com/gofiber/fiber/v2 v2.8.0
github.com/google/gopacket v1.1.19
github.com/google/martian v2.1.0+incompatible github.com/google/martian v2.1.0+incompatible
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/leodido/go-urn v1.2.1 // indirect github.com/leodido/go-urn v1.2.1 // indirect
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0 github.com/up9inc/mizu/tap v0.0.0
go.mongodb.org/mongo-driver v1.5.1 go.mongodb.org/mongo-driver v1.5.1
golang.org/x/net v0.0.0-20210421230115-4e50805a0758
gorm.io/driver/sqlite v1.1.4 gorm.io/driver/sqlite v1.1.4
gorm.io/gorm v1.21.8 gorm.io/gorm v1.21.8
k8s.io/api v0.21.0 k8s.io/api v0.21.0
@ -29,4 +25,5 @@ require (
) )
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap v0.0.0 => ../tap replace github.com/up9inc/mizu/tap v0.0.0 => ../tap

View File

@ -251,7 +251,6 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 h1:fa50YL1pzKW+1SsBnJDOHppJN9stOEwS+CRWyUtyYGU= github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 h1:fa50YL1pzKW+1SsBnJDOHppJN9stOEwS+CRWyUtyYGU=
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=

View File

@ -1,14 +1,20 @@
package controllers package controllers
import ( import (
"bytes"
"compress/zlib"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/google/martian/har" "github.com/google/martian/har"
"io/ioutil"
"log"
"mizuserver/pkg/database" "mizuserver/pkg/database"
"mizuserver/pkg/models" "mizuserver/pkg/models"
"mizuserver/pkg/utils" "mizuserver/pkg/utils"
"mizuserver/pkg/validation" "mizuserver/pkg/validation"
"net/http"
"net/url"
"time" "time"
) )
@ -138,9 +144,72 @@ func GetHARs(c *fiber.Ctx) error {
return c.Status(fiber.StatusOK).SendStream(buffer) return c.Status(fiber.StatusOK).SendStream(buffer)
} }
func uploadEntriesImpl(token string, model string, envPrefix string) {
sleepTime := time.Second * 10
var timestampFrom int64 = 0
for {
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
fmt.Printf("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
entriesArray := getEntriesFromDb(timestampFrom, timestampTo)
if len(entriesArray) > 0 {
fmt.Printf("About to upload %v entries\n", len(entriesArray))
body, jMarshalErr := json.Marshal(entriesArray)
if jMarshalErr != nil {
log.Fatal(jMarshalErr)
}
var in bytes.Buffer
w := zlib.NewWriter(&in)
_, _ = w.Write(body)
_ = w.Close()
reqBody := ioutil.NopCloser(bytes.NewReader(in.Bytes()))
postUrl, _ := url.Parse(fmt.Sprintf("https://traffic.%s/dumpTrafficBulk/%s", envPrefix, model))
fmt.Println(postUrl)
req := &http.Request{
Method: http.MethodPost,
URL: postUrl,
Header: map[string][]string{
"Content-Encoding": {"deflate"},
"Content-Type": {"application/octet-stream"},
"Guest-Auth": {token},
},
Body: reqBody,
}
_, postErr := http.DefaultClient.Do(req)
if postErr != nil {
log.Fatal(postErr)
}
fmt.Printf("Finish uploading %v entries to %s\n", len(entriesArray), postUrl)
} else {
fmt.Println("Nothing to upload")
}
fmt.Printf("Sleeping for %v...\n", sleepTime)
time.Sleep(sleepTime)
timestampFrom = timestampTo
}
}
func UploadEntries(c *fiber.Ctx) error {
entriesFilter := &models.UploadEntriesRequestBody{}
if err := c.QueryParser(entriesFilter); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
if err := validation.Validate(entriesFilter); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
go uploadEntriesImpl(entriesFilter.Token, entriesFilter.Model, entriesFilter.Dest)
return c.Status(fiber.StatusOK).SendString("OK")
}
func GetFullEntries(c *fiber.Ctx) error { func GetFullEntries(c *fiber.Ctx) error {
entriesFilter := &models.HarFetchRequestBody{} entriesFilter := &models.HarFetchRequestBody{}
order := OrderDesc
if err := c.QueryParser(entriesFilter); err != nil { if err := c.QueryParser(entriesFilter); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err) return c.Status(fiber.StatusBadRequest).JSON(err)
} }
@ -162,6 +231,12 @@ func GetFullEntries(c *fiber.Ctx) error {
timestampTo = entriesFilter.To timestampTo = entriesFilter.To
} }
entriesArray := getEntriesFromDb(timestampFrom, timestampTo)
return c.Status(fiber.StatusOK).JSON(entriesArray)
}
func getEntriesFromDb(timestampFrom int64, timestampTo int64) []har.Entry {
order := OrderDesc
var entries []models.MizuEntry var entries []models.MizuEntry
database.GetEntriesTable(). database.GetEntriesTable().
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)). Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
@ -179,7 +254,7 @@ func GetFullEntries(c *fiber.Ctx) error {
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry) _ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
entriesArray = append(entriesArray, harEntry) entriesArray = append(entriesArray, harEntry)
} }
return c.Status(fiber.StatusOK).JSON(entriesArray) return entriesArray
} }
func GetEntry(c *fiber.Ctx) error { func GetEntry(c *fiber.Ctx) error {

View File

@ -49,6 +49,12 @@ type EntriesFilter struct {
Timestamp int64 `query:"timestamp" validate:"required,min=1"` Timestamp int64 `query:"timestamp" validate:"required,min=1"`
} }
type UploadEntriesRequestBody struct {
Token string `query:"token"`
Model string `query:"model"`
Dest string `query:"dest"`
}
type HarFetchRequestBody struct { type HarFetchRequestBody struct {
From int64 `query:"from"` From int64 `query:"from"`
To int64 `query:"to"` To int64 `query:"to"`

View File

@ -12,7 +12,7 @@ func EntriesRoutes(fiberApp *fiber.App) {
routeGroup.Get("/entries", controllers.GetEntries) // get entries (base/thin entries) routeGroup.Get("/entries", controllers.GetEntries) // get entries (base/thin entries)
routeGroup.Get("/entries/:entryId", controllers.GetEntry) // get single (full) entry routeGroup.Get("/entries/:entryId", controllers.GetEntry) // get single (full) entry
routeGroup.Get("/exportEntries", controllers.GetFullEntries) routeGroup.Get("/exportEntries", controllers.GetFullEntries)
routeGroup.Get("/uploadEntries", controllers.UploadEntries)
routeGroup.Get("/har", controllers.GetHARs) routeGroup.Get("/har", controllers.GetHARs)

View File

@ -15,6 +15,8 @@ type MizuTapOptions struct {
GuiPort uint16 GuiPort uint16
Namespace string Namespace string
AllNamespaces bool AllNamespaces bool
Analyze bool
AnalyzeDestination string
KubeConfigPath string KubeConfigPath string
MizuImage string MizuImage string
MizuPodPort uint16 MizuPodPort uint16
@ -22,7 +24,6 @@ type MizuTapOptions struct {
TapOutgoing bool TapOutgoing bool
} }
var mizuTapOptions = &MizuTapOptions{} var mizuTapOptions = &MizuTapOptions{}
var direction string var direction string
@ -30,7 +31,7 @@ var tapCmd = &cobra.Command{
Use: "tap [POD REGEX]", Use: "tap [POD REGEX]",
Short: "Record ingoing traffic of a kubernetes pod", Short: "Record ingoing traffic of a kubernetes pod",
Long: `Record the ingoing traffic of a kubernetes pod. Long: `Record the ingoing traffic of a kubernetes pod.
Supported protocols are HTTP and gRPC.`, Supported protocols are HTTP and gRPC.`,
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 { if len(args) == 0 {
return errors.New("POD REGEX argument is required") return errors.New("POD REGEX argument is required")
@ -62,6 +63,8 @@ func init() {
tapCmd.Flags().Uint16VarP(&mizuTapOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver") tapCmd.Flags().Uint16VarP(&mizuTapOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver")
tapCmd.Flags().StringVarP(&mizuTapOptions.Namespace, "namespace", "n", "", "Namespace selector") tapCmd.Flags().StringVarP(&mizuTapOptions.Namespace, "namespace", "n", "", "Namespace selector")
tapCmd.Flags().BoolVar(&mizuTapOptions.Analyze, "analyze", false, "Analyze traffic")
tapCmd.Flags().StringVar(&mizuTapOptions.AnalyzeDestination, "dest", "up9.app", "Destination environment")
tapCmd.Flags().BoolVarP(&mizuTapOptions.AllNamespaces, "all-namespaces", "A", false, "Tap all namespaces") tapCmd.Flags().BoolVarP(&mizuTapOptions.AllNamespaces, "all-namespaces", "A", false, "Tap all namespaces")
tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file") tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file")
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector") tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")

View File

@ -2,7 +2,9 @@ package cmd
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"net/http"
"os" "os"
"os/signal" "os/signal"
"regexp" "regexp"
@ -71,7 +73,7 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
return return
} }
go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions) // TODO convert this to job for built in pod ttl or have the running app handle this go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions, ) // TODO convert this to job for built in pod ttl or have the running app handle this
go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions) go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions)
go syncApiStatus(ctx, cancel, tappingOptions) go syncApiStatus(ctx, cancel, tappingOptions)
@ -79,6 +81,34 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
waitForFinish(ctx, cancel) waitForFinish(ctx, cancel)
} }
type GuestToken struct {
Token string `json:"token"`
Model string `json:"model"`
}
func getGuestToken(url string, target *GuestToken) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
return json.NewDecoder(resp.Body).Decode(target)
}
func CreateAnonymousToken(envPrefix string) (*GuestToken, error) {
tokenUrl := fmt.Sprintf("https://trcc.%v/anonymous/token", envPrefix)
token := &GuestToken{}
if err := getGuestToken(tokenUrl, token); err != nil {
fmt.Println(err)
return nil, err
}
fmt.Println("Token:", token.Token, "model:", token.Model)
return token, nil
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error { func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil { if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil {
return err return err
@ -241,12 +271,21 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
case modifiedPod := <-modified: case modifiedPod := <-modified:
if modifiedPod.Status.Phase == "Running" && !isPodReady { if modifiedPod.Status.Phase == "Running" && !isPodReady {
isPodReady = true isPodReady = true
var err error var portForwardCreateError error
portForward, err = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel) if portForward, portForwardCreateError = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel); portForwardCreateError != nil {
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort) fmt.Printf("error forwarding port to pod %s\n", portForwardCreateError)
if err != nil {
fmt.Printf("error forwarding port to pod %s\n", err)
cancel() cancel()
} else {
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
if tappingOptions.Analyze {
token, _ := CreateAnonymousToken(tappingOptions.AnalyzeDestination)
if _, err := http.Get(fmt.Sprintf("http://localhost:%d/api/uploadEntries?token=%s&model=%s&dest=%s", tappingOptions.GuiPort, token.Token, token.Model, tappingOptions.AnalyzeDestination)); err != nil {
fmt.Println(err)
} else {
fmt.Println("https://" + tappingOptions.AnalyzeDestination + "/share/" + token.Token)
}
}
} }
} }