Compare commits

..

4 Commits

Author SHA1 Message Date
Igor Gov
2575ad722a Introducing API server provider (#243) 2021-08-22 11:41:38 +03:00
RoyUP9
afd5757315 added tapper count route and wait time for tappers in test (#226) 2021-08-22 11:38:19 +03:00
Alon Girmonsky
dba8b1f215 some changes in the read me (#241)
change prerequisite to permissions and kubeconfig. These are more FYIs as Mizu requires very little prerequisites. 
Change the description to match getmizu.io
2021-08-20 12:39:52 +03:00
Igor Gov
6dd0ef1268 Adding user friendly message in view command before sleeping (#239) 2021-08-19 12:22:18 +03:00
16 changed files with 511 additions and 322 deletions

View File

@@ -2,7 +2,9 @@
# The API Traffic Viewer for Kubernetes
A simple-yet-powerful API traffic viewer for Kubernetes to help you troubleshoot and debug your microservices. Think TCPDump and Chrome Dev Tools combined
A simple-yet-powerful API traffic viewer for Kubernetes enabling you to view all API communication between microservices to help your debug and troubleshoot regressions.
Think TCPDump and Chrome Dev Tools combined.
![Simple UI](assets/mizu-ui.png)
@@ -38,8 +40,10 @@ SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/
### Development (unstable) Build
Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page
## Prerequisites
1. Set `KUBECONFIG` environment variable to your Kubernetes configuration. If this is not set, Mizu assumes that configuration is at `${HOME}/.kube/config`
## Kubeconfig & Permissions
While `mizu`most often works out of the box, you can influence its behavior:
1. [OPTIONAL] Set `KUBECONFIG` environment variable to your Kubernetes configuration. If this is not set, Mizu assumes that configuration is at `${HOME}/.kube/config`
2. `mizu` assumes user running the command has permissions to create resources (such as pods, services, namespaces) on your Kubernetes cluster (no worries - `mizu` resources are cleaned up upon termination)
For detailed list of k8s permissions see [PERMISSIONS](PERMISSIONS.md) document

View File

@@ -13,22 +13,22 @@ func TestTapAndFetch(t *testing.T) {
t.Skip("ignored acceptance test")
}
tests := []int{1, 100}
tests := []int{50}
for _, entriesCount := range tests {
t.Run(fmt.Sprintf("%d", entriesCount), func(t *testing.T) {
cliPath, cliPathErr := GetCliPath()
cliPath, cliPathErr := getCliPath()
if cliPathErr != nil {
t.Errorf("failed to get cli path, err: %v", cliPathErr)
return
}
tapCmdArgs := GetDefaultTapCommandArgs()
tapCmdArgs := getDefaultTapCommandArgs()
tapCmd := exec.Command(cliPath, tapCmdArgs...)
t.Logf("running command: %v", tapCmd.String())
t.Cleanup(func() {
if err := CleanupCommand(tapCmd); err != nil {
if err := cleanupCommand(tapCmd); err != nil {
t.Logf("failed to cleanup tap command, err: %v", err)
}
})
@@ -38,86 +38,87 @@ func TestTapAndFetch(t *testing.T) {
return
}
time.Sleep(30 * time.Second)
if err := waitTapPodsReady(); err != nil {
t.Errorf("failed to start tap pods on time, err: %v", err)
return
}
proxyUrl := "http://localhost:8080/api/v1/namespaces/mizu-tests/services/httpbin/proxy/get"
for i := 0; i < entriesCount; i++ {
if _, requestErr := ExecuteHttpRequest(proxyUrl); requestErr != nil {
if _, requestErr := executeHttpRequest(proxyUrl); requestErr != nil {
t.Errorf("failed to send proxy request, err: %v", requestErr)
return
}
}
time.Sleep(5 * time.Second)
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
entriesCheckFunc := func() error {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
entriesUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries?limit=%v&operator=lt&timestamp=%v", entriesCount, timestamp)
requestResult, requestErr := ExecuteHttpRequest(entriesUrl)
if requestErr != nil {
t.Errorf("failed to get entries, err: %v", requestErr)
entriesUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries?limit=%v&operator=lt&timestamp=%v", entriesCount, timestamp)
requestResult, requestErr := executeHttpRequest(entriesUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entries, err: %v", requestErr)
}
entries, ok := requestResult.([]interface{})
if !ok {
return fmt.Errorf("invalid entries type")
}
if len(entries) == 0 {
return fmt.Errorf("unexpected entries result - Expected more than 0 entries")
}
entry, ok := entries[0].(map[string]interface{})
if !ok {
return fmt.Errorf("invalid entry type")
}
entryUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries/%v", entry["id"])
requestResult, requestErr = executeHttpRequest(entryUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
}
if requestResult == nil {
return fmt.Errorf("unexpected nil entry result")
}
return nil
}
if err := retriesExecute(ShortRetriesCount, entriesCheckFunc); err != nil {
t.Errorf("%v", err)
return
}
entries, ok := requestResult.([]interface{})
if !ok {
t.Errorf("invalid entries type")
return
}
if len(entries) != entriesCount {
t.Errorf("unexpected entries result - Expected: %v, actual: %v", entriesCount, len(entries))
return
}
entry, ok := entries[0].(map[string]interface{})
if !ok {
t.Errorf("invalid entry type")
return
}
entryUrl := fmt.Sprintf("http://localhost:8899/mizu/api/entries/%v", entry["id"])
requestResult, requestErr = ExecuteHttpRequest(entryUrl)
if requestErr != nil {
t.Errorf("failed to get entry, err: %v", requestErr)
return
}
if requestResult == nil {
t.Errorf("unexpected nil entry result")
return
}
fetchCmdArgs := GetDefaultFetchCommandArgs()
fetchCmdArgs := getDefaultFetchCommandArgs()
fetchCmd := exec.Command(cliPath, fetchCmdArgs...)
t.Logf("running command: %v", fetchCmd.String())
t.Cleanup(func() {
if err := CleanupCommand(fetchCmd); err != nil {
t.Logf("failed to cleanup fetch command, err: %v", err)
}
})
if err := fetchCmd.Start(); err != nil {
t.Errorf("failed to start fetch command, err: %v", err)
return
}
time.Sleep(5 * time.Second)
harCheckFunc := func() error {
harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har")
if readFileErr != nil {
return fmt.Errorf("failed to read har file, err: %v", readFileErr)
}
harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har")
if readFileErr != nil {
t.Errorf("failed to read har file, err: %v", readFileErr)
return
harEntries, err := getEntriesFromHarBytes(harBytes)
if err != nil {
return fmt.Errorf("failed to get entries from har, err: %v", err)
}
if len(harEntries) == 0 {
return fmt.Errorf("unexpected har entries result - Expected more than 0 entries")
}
return nil
}
harEntries, err := GetEntriesFromHarBytes(harBytes)
if err != nil {
t.Errorf("failed to get entries from har, err: %v", err)
return
}
if len(harEntries) != entriesCount {
t.Errorf("unexpected har entries result - Expected: %v, actual: %v", entriesCount, len(harEntries))
if err := retriesExecute(ShortRetriesCount, harCheckFunc); err != nil {
t.Errorf("%v", err)
return
}
})

View File

@@ -10,9 +10,15 @@ import (
"os/exec"
"path"
"syscall"
"time"
)
func GetCliPath() (string, error) {
const (
LongRetriesCount = 100
ShortRetriesCount = 10
)
func getCliPath() (string, error) {
dir, filePathErr := os.Getwd()
if filePathErr != nil {
return "", filePathErr
@@ -22,34 +28,74 @@ func GetCliPath() (string, error) {
return cliPath, nil
}
func GetDefaultCommandArgs() []string {
func getDefaultCommandArgs() []string {
setFlag := "--set"
telemetry := "telemetry=false"
return []string{setFlag, telemetry}
}
func GetDefaultTapCommandArgs() []string {
func getDefaultTapCommandArgs() []string {
tapCommand := "tap"
setFlag := "--set"
namespaces := "tap.namespaces=mizu-tests"
agentImage := "agent-image=gcr.io/up9-docker-hub/mizu/ci:0.0.0"
imagePullPolicy := "image-pull-policy=Never"
defaultCmdArgs := GetDefaultCommandArgs()
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{tapCommand, setFlag, namespaces, setFlag, agentImage, setFlag, imagePullPolicy}, defaultCmdArgs...)
}
func GetDefaultFetchCommandArgs() []string {
func getDefaultFetchCommandArgs() []string {
tapCommand := "fetch"
defaultCmdArgs := GetDefaultCommandArgs()
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{tapCommand}, defaultCmdArgs...)
}
func JsonBytesToInterface(jsonBytes []byte) (interface{}, error) {
func retriesExecute(retriesCount int, executeFunc func() error) error {
var lastError error
for i := 0; i < retriesCount; i++ {
if err := executeFunc(); err != nil {
lastError = err
time.Sleep(1 * time.Second)
continue
}
return nil
}
return fmt.Errorf("reached max retries count, retries count: %v, last err: %v", retriesCount, lastError)
}
func waitTapPodsReady() error {
resolvingUrl := fmt.Sprintf("http://localhost:8899/mizu/status/tappersCount")
tapPodsReadyFunc := func() error {
requestResult, requestErr := executeHttpRequest(resolvingUrl)
if requestErr != nil {
return requestErr
}
tappersCount, ok := requestResult.(float64)
if !ok {
return fmt.Errorf("invalid tappers count type")
}
if tappersCount == 0 {
return fmt.Errorf("no tappers running")
}
return nil
}
return retriesExecute(LongRetriesCount, tapPodsReadyFunc)
}
func jsonBytesToInterface(jsonBytes []byte) (interface{}, error) {
var result interface{}
if parseErr := json.Unmarshal(jsonBytes, &result); parseErr != nil {
return nil, parseErr
@@ -58,7 +104,7 @@ func JsonBytesToInterface(jsonBytes []byte) (interface{}, error) {
return result, nil
}
func ExecuteHttpRequest(url string) (interface{}, error) {
func executeHttpRequest(url string) (interface{}, error) {
response, requestErr := http.Get(url)
if requestErr != nil {
return nil, requestErr
@@ -66,15 +112,17 @@ func ExecuteHttpRequest(url string) (interface{}, error) {
return nil, fmt.Errorf("invalid status code %v", response.StatusCode)
}
defer func() { response.Body.Close() }()
data, readErr := ioutil.ReadAll(response.Body)
if readErr != nil {
return nil, readErr
}
return JsonBytesToInterface(data)
return jsonBytesToInterface(data)
}
func CleanupCommand(cmd *exec.Cmd) error {
func cleanupCommand(cmd *exec.Cmd) error {
if err := cmd.Process.Signal(syscall.SIGQUIT); err != nil {
return err
}
@@ -86,8 +134,8 @@ func CleanupCommand(cmd *exec.Cmd) error {
return nil
}
func GetEntriesFromHarBytes(harBytes []byte) ([]interface{}, error){
harInterface, convertErr := JsonBytesToInterface(harBytes)
func getEntriesFromHarBytes(harBytes []byte) ([]interface{}, error){
harInterface, convertErr := jsonBytesToInterface(harBytes)
if convertErr != nil {
return nil, convertErr
}
@@ -97,14 +145,12 @@ func GetEntriesFromHarBytes(harBytes []byte) ([]interface{}, error){
return nil, errors.New("invalid har type")
}
harLogInterface := har["log"]
harLog, ok := harLogInterface.(map[string]interface{})
harLog, ok := har["log"].(map[string]interface{})
if !ok {
return nil, errors.New("invalid har log type")
}
harEntriesInterface := harLog["entries"]
harEntries, ok := harEntriesInterface.([]interface{})
harEntries, ok := harLog["entries"].([]interface{})
if !ok {
return nil, errors.New("invalid har entries type")
}

View File

@@ -28,6 +28,7 @@ func init() {
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
if isTapper {
rlog.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
providers.TapperAdded()
} else {
rlog.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
socketListLock.Lock()
@@ -39,6 +40,7 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
if isTapper {
rlog.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
providers.TapperRemoved()
} else {
rlog.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
socketListLock.Lock()

View File

@@ -30,3 +30,7 @@ func PostTappedPods(c *gin.Context) {
api.BroadcastToBrowserClients(jsonBytes)
}
}
func GetTappersCount(c *gin.Context) {
c.JSON(http.StatusOK, providers.TappersCount)
}

View File

@@ -4,14 +4,18 @@ import (
"github.com/patrickmn/go-cache"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"sync"
"time"
)
const tlsLinkRetainmentTime = time.Minute * 15
var (
TapStatus shared.TapStatus
TappersCount int
TapStatus shared.TapStatus
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
tappersCountLock = sync.Mutex{}
)
func GetAllRecentTLSAddresses() []string {
@@ -26,3 +30,15 @@ func GetAllRecentTLSAddresses() []string {
return recentTLSLinks
}
func TapperAdded() {
tappersCountLock.Lock()
TappersCount++
tappersCountLock.Unlock()
}
func TapperRemoved() {
tappersCountLock.Lock()
TappersCount--
tappersCountLock.Unlock()
}

View File

@@ -9,4 +9,6 @@ func StatusRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/status")
routeGroup.POST("/tappedPods", controllers.PostTappedPods)
routeGroup.GET("/tappersCount", controllers.GetTappersCount)
}

168
cli/apiserver/provider.go Normal file
View File

@@ -0,0 +1,168 @@
package apiserver
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"io/ioutil"
core "k8s.io/api/core/v1"
"net/http"
"net/url"
"time"
)
type apiServerProvider struct {
url string
isReady bool
}
var Provider = apiServerProvider{}
func (provider *apiServerProvider) InitAndTestConnection(url string, retries int) error {
healthUrl := fmt.Sprintf("%s/", url)
retriesLeft := retries
for retriesLeft > 0 {
if response, err := http.Get(healthUrl); err != nil {
logger.Log.Debugf("[ERROR] failed connecting to api server %v", err)
} else if response.StatusCode != 200 {
logger.Log.Debugf("can't connect to api server yet, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("connection test to api server passed successfully")
break
}
retriesLeft -= 1
time.Sleep(time.Second)
}
if retriesLeft == 0 {
provider.isReady = false
return fmt.Errorf("couldn't reach the api server after %v retries", retries)
}
provider.url = url
provider.isReady = true
return nil
}
func (provider *apiServerProvider) ReportTappedPods(pods []core.Pod) error {
if !provider.isReady {
return fmt.Errorf("trying to reach api server when not initialized yet")
}
tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", provider.url)
podInfos := make([]shared.PodInfo, 0)
for _, pod := range pods {
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
}
tapStatus := shared.TapStatus{Pods: podInfos}
if jsonValue, err := json.Marshal(tapStatus); err != nil {
return fmt.Errorf("failed Marshal the tapped pods %w", err)
} else {
if response, err := http.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed sending to API server the tapped pods, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
return nil
}
}
}
func (provider *apiServerProvider) RequestAnalysis(analysisDestination string, sleepIntervalSec int) error {
if !provider.isReady {
return fmt.Errorf("trying to reach api server when not initialized yet")
}
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", provider.url, url.QueryEscape(analysisDestination), sleepIntervalSec)
u, parseErr := url.ParseRequestURI(urlPath)
if parseErr != nil {
logger.Log.Fatal("Failed parsing the URL (consider changing the analysis dest URL), err: %v", parseErr)
}
logger.Log.Debugf("Analysis url %v", u.String())
if response, requestErr := http.Get(u.String()); requestErr != nil {
return fmt.Errorf("failed to notify agent for analysis, err: %w", requestErr)
} else if response.StatusCode != 200 {
return fmt.Errorf("failed to notify agent for analysis, status code: %v", response.StatusCode)
} else {
logger.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
return nil
}
}
func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, error) {
if !provider.isReady {
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
}
generalStatsUrl := fmt.Sprintf("%s/api/generalStats", provider.url)
response, requestErr := http.Get(generalStatsUrl)
if requestErr != nil {
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
} else if response.StatusCode != 200 {
return nil, fmt.Errorf("failed to get general stats for telemetry, status code: %v", response.StatusCode)
}
defer func() { _ = response.Body.Close() }()
data, readErr := ioutil.ReadAll(response.Body)
if readErr != nil {
return nil, fmt.Errorf("failed to read general stats for telemetry, err: %v", readErr)
}
var generalStats map[string]interface{}
if parseErr := json.Unmarshal(data, &generalStats); parseErr != nil {
return nil, fmt.Errorf("failed to parse general stats for telemetry, err: %v", parseErr)
}
return generalStats, nil
}
func (provider *apiServerProvider) GetHars(fromTimestamp int, toTimestamp int) (*zip.Reader, error) {
if !provider.isReady {
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
}
resp, err := http.Get(fmt.Sprintf("%s/api/har?from=%v&to=%v", provider.url, fromTimestamp, toTimestamp))
if err != nil {
return nil, fmt.Errorf("failed getting har from api server %w", err)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed reading hars %w", err)
}
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
if err != nil {
return nil, fmt.Errorf("failed craeting zip reader %w", err)
}
return zipReader, nil
}
func (provider *apiServerProvider) GetVersion() (string, error) {
if !provider.isReady {
return "", fmt.Errorf("trying to reach api server when not initialized yet")
}
versionUrl, _ := url.Parse(fmt.Sprintf("%s/metadata/version", provider.url))
req := &http.Request{
Method: http.MethodGet,
URL: versionUrl,
}
statusResp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer statusResp.Body.Close()
versionResponse := &shared.VersionResponse{}
if err := json.NewDecoder(statusResp.Body).Decode(&versionResponse); err != nil {
return "", err
}
return versionResponse.SemVer, nil
}

45
cli/cmd/common.go Normal file
View File

@@ -0,0 +1,45 @@
package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/uiUtils"
"os"
"os/signal"
"syscall"
)
func GetApiServerUrl() string {
return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
}
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
cancel()
}
}
func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
logger.Log.Debugf("waiting for finish...")
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
// block until ctx cancel is called or termination signal is received
select {
case <-ctx.Done():
logger.Log.Debugf("ctx done")
break
case <-sigChan:
logger.Log.Debugf("Got termination signal, canceling execution...")
cancel()
}
}

View File

@@ -3,10 +3,13 @@ package cmd
import (
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
)
var fetchCmd = &cobra.Command{
@@ -15,7 +18,12 @@ var fetchCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("fetch", config.Config.Fetch)
if isCompatible, err := version.CheckVersionCompatibility(config.Config.Fetch.GuiPort); err != nil {
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl(), 1); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, make sure one running")
return nil
}
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
return err
} else if !isCompatible {
return nil

View File

@@ -1,96 +1,25 @@
package cmd
import (
"archive/zip"
"bytes"
"fmt"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/logger"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils"
)
func RunMizuFetch() {
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Fetch.GuiPort)
resp, err := http.Get(fmt.Sprintf("http://%s/api/har?from=%v&to=%v", mizuProxiedUrl, config.Config.Fetch.FromTimestamp, config.Config.Fetch.ToTimestamp))
if err != nil {
log.Fatal(err)
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl(), 5); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
zipReader, err := apiserver.Provider.GetHars(config.Config.Fetch.FromTimestamp, config.Config.Fetch.ToTimestamp)
if err != nil {
log.Fatal(err)
logger.Log.Errorf("Failed fetch data from API server %v", err)
return
}
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
if err != nil {
log.Fatal(err)
if err := fsUtils.Unzip(zipReader, config.Config.Fetch.Directory); err != nil {
logger.Log.Debugf("[ERROR] failed unzip %v", err)
}
_ = Unzip(zipReader, config.Config.Fetch.Directory)
}
func Unzip(reader *zip.Reader, dest string) error {
dest, _ = filepath.Abs(dest)
_ = os.MkdirAll(dest, os.ModePerm)
// Closure to address file descriptors issue with all the deferred .Close() methods
extractAndWriteFile := func(f *zip.File) error {
rc, err := f.Open()
if err != nil {
return err
}
defer func() {
if err := rc.Close(); err != nil {
panic(err)
}
}()
path := filepath.Join(dest, f.Name)
// Check for ZipSlip (Directory traversal)
if !strings.HasPrefix(path, filepath.Clean(dest)+string(os.PathSeparator)) {
return fmt.Errorf("illegal file path: %s", path)
}
if f.FileInfo().IsDir() {
_ = os.MkdirAll(path, f.Mode())
} else {
_ = os.MkdirAll(filepath.Dir(path), f.Mode())
logger.Log.Infof("writing HAR file [ %v ]", path)
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
}
defer func() {
if err := f.Close(); err != nil {
panic(err)
}
logger.Log.Info(" done")
}()
_, err = io.Copy(f, rc)
if err != nil {
return err
}
}
return nil
}
for _, f := range reader.File {
err := extractAndWriteFile(f)
if err != nil {
return err
}
}
return nil
}

View File

@@ -1,35 +1,28 @@
package cmd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/goUtils"
"github.com/up9inc/mizu/cli/telemetry"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"regexp"
"strings"
"syscall"
"time"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
yaml "gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"path"
"regexp"
"strings"
"time"
)
const (
@@ -60,7 +53,6 @@ func RunMizuTap() {
return
}
}
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath())
if err != nil {
logger.Log.Error(err)
@@ -108,13 +100,13 @@ func RunMizuTap() {
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
defer cleanUpMizu(kubernetesProvider)
defer finishMizuExecution(kubernetesProvider)
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
return
}
go goUtils.HandleExcWrapper(createProxyToApiServerPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel)
//block until exit signal or error
@@ -261,23 +253,26 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
return nil
}
func cleanUpMizu(kubernetesProvider *kubernetes.Provider) {
telemetry.ReportAPICalls(config.Config.Tap.GuiPort)
cleanUpMizuResources(kubernetesProvider)
}
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
func finishMizuExecution(kubernetesProvider *kubernetes.Provider) {
telemetry.ReportAPICalls()
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(kubernetesProvider, removalCtx)
cleanUpMizuResources(kubernetesProvider, removalCtx, cancel)
}
if config.Config.DumpLogs {
mizuDir := mizu.GetMizuFolderPath()
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
if err := fsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil {
logger.Log.Errorf("Failed dump logs %v", err)
}
func dumpLogsIfNeeded(kubernetesProvider *kubernetes.Provider, removalCtx context.Context) {
if !config.Config.DumpLogs {
return
}
mizuDir := mizu.GetMizuFolderPath()
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
if err := fsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil {
logger.Log.Errorf("Failed dump logs %v", err)
}
}
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider, removalCtx context.Context, cancel context.CancelFunc) {
logger.Log.Infof("\nRemoving mizu resources\n")
if !config.Config.IsNsRestrictedMode() {
@@ -342,7 +337,7 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
if err := kubernetesProvider.WaitUtilNamespaceDeleted(ctx, config.Config.MizuResourcesNamespace); err != nil {
switch {
case ctx.Err() == context.Canceled:
// Do nothing. User interrupted the wait.
logger.Log.Debugf("Do nothing. User interrupted the wait")
case err == wait.ErrWaitTimeout:
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Timeout while removing Namespace %s", config.Config.MizuResourcesNamespace))
default:
@@ -351,29 +346,6 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
}
}
func reportTappedPods() {
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort)
tappedPodsUrl := fmt.Sprintf("http://%s/status/tappedPods", mizuProxiedUrl)
podInfos := make([]shared.PodInfo, 0)
for _, pod := range state.currentlyTappedPods {
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
}
tapStatus := shared.TapStatus{Pods: podInfos}
if jsonValue, err := json.Marshal(tapStatus); err != nil {
logger.Log.Debugf("[ERROR] failed Marshal the tapped pods %v", err)
} else {
if response, err := http.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
logger.Log.Debugf("[ERROR] failed sending to API server the tapped pods %v", err)
} else if response.StatusCode != 200 {
logger.Log.Debugf("[ERROR] failed sending to API server the tapped pods, response status code %v", response.StatusCode)
} else {
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
}
}
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, targetNamespaces []string, cancel context.CancelFunc) {
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, targetNamespaces, config.Config.Tap.PodRegex())
@@ -389,7 +361,9 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
return
}
reportTappedPods()
if err := apiserver.Provider.ReportTappedPods(state.currentlyTappedPods); err != nil {
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
if err != nil {
@@ -496,7 +470,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
return missingPods
}
func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
isPodReady := false
@@ -522,10 +496,17 @@ func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernet
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
logger.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
requestForAnalysis()
reportTappedPods()
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl(), 20); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
cancel()
break
}
logger.Log.Infof("Mizu is available at %s\n", GetApiServerUrl())
requestForAnalysisIfNeeded()
if err := apiserver.Provider.ReportTappedPods(state.currentlyTappedPods); err != nil {
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
}
}
case <-timeAfter:
if !isPodReady {
@@ -539,34 +520,12 @@ func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernet
}
}
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
cancel()
}
}
func requestForAnalysis() {
func requestForAnalysisIfNeeded() {
if !config.Config.Tap.Analysis {
return
}
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort)
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", mizuProxiedUrl, url.QueryEscape(config.Config.Tap.AnalysisDestination), config.Config.Tap.SleepIntervalSec)
u, parseErr := url.ParseRequestURI(urlPath)
if parseErr != nil {
logger.Log.Fatal("Failed parsing the URL (consider changing the analysis dest URL), err: %v", parseErr)
}
logger.Log.Debugf("Sending get request to %v", u.String())
if response, requestErr := http.Get(u.String()); requestErr != nil {
logger.Log.Errorf("Failed to notify agent for analysis, err: %v", requestErr)
} else if response.StatusCode != 200 {
logger.Log.Errorf("Failed to notify agent for analysis, status code: %v", response.StatusCode)
} else {
logger.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
if err := apiserver.Provider.RequestAnalysis(config.Config.Tap.AnalysisDestination, config.Config.Tap.SleepIntervalSec); err != nil {
logger.Log.Debugf("[Error] failed requesting for analysis %v", err)
}
}
@@ -598,19 +557,6 @@ func getNodeHostToTappedPodIpsMap(tappedPods []core.Pod) map[string][]string {
return nodeToTappedPodIPMap
}
func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
// block until ctx cancel is called or termination signal is received
select {
case <-ctx.Done():
break
case <-sigChan:
cancel()
}
}
func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
if config.Config.Tap.AllNamespaces {
return []string{mizu.K8sAllNamespaces}

View File

@@ -3,13 +3,14 @@ package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/uiUtils"
"net/http"
"time"
)
func runMizuView() {
@@ -34,20 +35,21 @@ func runMizuView() {
return
}
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.View.GuiPort)
_, err = http.Get(fmt.Sprintf("http://%s/", mizuProxiedUrl))
if err == nil {
response, err := http.Get(fmt.Sprintf("%s/", GetApiServerUrl()))
if err == nil && response.StatusCode == 200 {
logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort)
return
}
logger.Log.Debugf("Found service %s, creating k8s proxy", mizu.ApiServerPodName)
logger.Log.Infof("Establishing connection to k8s cluster...")
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl(), 10); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
return
}
logger.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.View.GuiPort))
if isCompatible, err := version.CheckVersionCompatibility(config.Config.View.GuiPort); err != nil {
logger.Log.Infof("Mizu is available at %s\n", GetApiServerUrl())
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
logger.Log.Errorf("Failed to check versions compatibility %v", err)
cancel()
return

View File

@@ -3,9 +3,11 @@ package fsUtils
import (
"archive/zip"
"fmt"
"github.com/up9inc/mizu/cli/logger"
"io"
"os"
"path/filepath"
"strings"
)
func AddFileToZip(zipWriter *zip.Writer, filename string) error {
@@ -53,3 +55,60 @@ func AddStrToZip(writer *zip.Writer, logs string, fileName string) error {
}
return nil
}
func Unzip(reader *zip.Reader, dest string) error {
dest, _ = filepath.Abs(dest)
_ = os.MkdirAll(dest, os.ModePerm)
// Closure to address file descriptors issue with all the deferred .Close() methods
extractAndWriteFile := func(f *zip.File) error {
rc, err := f.Open()
if err != nil {
return err
}
defer func() {
if err := rc.Close(); err != nil {
panic(err)
}
}()
path := filepath.Join(dest, f.Name)
// Check for ZipSlip (Directory traversal)
if !strings.HasPrefix(path, filepath.Clean(dest)+string(os.PathSeparator)) {
return fmt.Errorf("illegal file path: %s", path)
}
if f.FileInfo().IsDir() {
_ = os.MkdirAll(path, f.Mode())
} else {
_ = os.MkdirAll(filepath.Dir(path), f.Mode())
logger.Log.Infof("writing HAR file [ %v ]", path)
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
}
defer func() {
if err := f.Close(); err != nil {
panic(err)
}
logger.Log.Info(" done")
}()
_, err = io.Copy(f, rc)
if err != nil {
return err
}
}
return nil
}
for _, f := range reader.File {
err := extractAndWriteFile(f)
if err != nil {
return err
}
}
return nil
}

View File

@@ -2,43 +2,21 @@ package version
import (
"context"
"encoding/json"
"fmt"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/google/go-github/v37/github"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/semver"
)
func getApiVersion(port uint16) (string, error) {
versionUrl, _ := url.Parse(fmt.Sprintf("http://localhost:%d/mizu/metadata/version", port))
req := &http.Request{
Method: http.MethodGet,
URL: versionUrl,
}
statusResp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer statusResp.Body.Close()
versionResponse := &shared.VersionResponse{}
if err := json.NewDecoder(statusResp.Body).Decode(&versionResponse); err != nil {
return "", err
}
return versionResponse.SemVer, nil
}
func CheckVersionCompatibility(port uint16) (bool, error) {
apiSemVer, err := getApiVersion(port)
func CheckVersionCompatibility() (bool, error) {
apiSemVer, err := apiserver.Provider.GetVersion()
if err != nil {
return false, err
}

View File

@@ -5,11 +5,10 @@ import (
"encoding/json"
"fmt"
"github.com/denisbrodbeck/machineid"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"io/ioutil"
"net/http"
)
@@ -35,35 +34,15 @@ func ReportRun(cmd string, args interface{}) {
logger.Log.Debugf("successfully reported telemetry for cmd %v", cmd)
}
func ReportAPICalls(mizuPort uint16) {
func ReportAPICalls() {
if !shouldRunTelemetry() {
logger.Log.Debugf("not reporting telemetry")
return
}
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(mizuPort)
generalStatsUrl := fmt.Sprintf("http://%s/api/generalStats", mizuProxiedUrl)
response, requestErr := http.Get(generalStatsUrl)
if requestErr != nil {
logger.Log.Debugf("ERROR: failed to get general stats for telemetry, err: %v", requestErr)
return
} else if response.StatusCode != 200 {
logger.Log.Debugf("ERROR: failed to get general stats for telemetry, status code: %v", response.StatusCode)
return
}
defer func() { _ = response.Body.Close() }()
data, readErr := ioutil.ReadAll(response.Body)
if readErr != nil {
logger.Log.Debugf("ERROR: failed to read general stats for telemetry, err: %v", readErr)
return
}
var generalStats map[string]interface{}
if parseErr := json.Unmarshal(data, &generalStats); parseErr != nil {
logger.Log.Debugf("ERROR: failed to parse general stats for telemetry, err: %v", parseErr)
generalStats, err := apiserver.Provider.GetGeneralStats()
if err != nil {
logger.Log.Debugf("[ERROR] failed get general stats from api server %v", err)
return
}