mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-01 16:57:28 +00:00
Introducing API server provider (#243)
This commit is contained in:
parent
afd5757315
commit
2575ad722a
168
cli/apiserver/provider.go
Normal file
168
cli/apiserver/provider.go
Normal file
@ -0,0 +1,168 @@
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"io/ioutil"
|
||||
core "k8s.io/api/core/v1"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
type apiServerProvider struct {
|
||||
url string
|
||||
isReady bool
|
||||
}
|
||||
|
||||
var Provider = apiServerProvider{}
|
||||
|
||||
func (provider *apiServerProvider) InitAndTestConnection(url string, retries int) error {
|
||||
healthUrl := fmt.Sprintf("%s/", url)
|
||||
retriesLeft := retries
|
||||
for retriesLeft > 0 {
|
||||
if response, err := http.Get(healthUrl); err != nil {
|
||||
logger.Log.Debugf("[ERROR] failed connecting to api server %v", err)
|
||||
} else if response.StatusCode != 200 {
|
||||
logger.Log.Debugf("can't connect to api server yet, response status code %v", response.StatusCode)
|
||||
} else {
|
||||
logger.Log.Debugf("connection test to api server passed successfully")
|
||||
break
|
||||
}
|
||||
retriesLeft -= 1
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
if retriesLeft == 0 {
|
||||
provider.isReady = false
|
||||
return fmt.Errorf("couldn't reach the api server after %v retries", retries)
|
||||
}
|
||||
provider.url = url
|
||||
provider.isReady = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) ReportTappedPods(pods []core.Pod) error {
|
||||
if !provider.isReady {
|
||||
return fmt.Errorf("trying to reach api server when not initialized yet")
|
||||
}
|
||||
tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", provider.url)
|
||||
|
||||
podInfos := make([]shared.PodInfo, 0)
|
||||
for _, pod := range pods {
|
||||
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
|
||||
}
|
||||
tapStatus := shared.TapStatus{Pods: podInfos}
|
||||
|
||||
if jsonValue, err := json.Marshal(tapStatus); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapped pods %w", err)
|
||||
} else {
|
||||
if response, err := http.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
|
||||
} else if response.StatusCode != 200 {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods, response status code %v", response.StatusCode)
|
||||
} else {
|
||||
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) RequestAnalysis(analysisDestination string, sleepIntervalSec int) error {
|
||||
if !provider.isReady {
|
||||
return fmt.Errorf("trying to reach api server when not initialized yet")
|
||||
}
|
||||
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", provider.url, url.QueryEscape(analysisDestination), sleepIntervalSec)
|
||||
u, parseErr := url.ParseRequestURI(urlPath)
|
||||
if parseErr != nil {
|
||||
logger.Log.Fatal("Failed parsing the URL (consider changing the analysis dest URL), err: %v", parseErr)
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Analysis url %v", u.String())
|
||||
if response, requestErr := http.Get(u.String()); requestErr != nil {
|
||||
return fmt.Errorf("failed to notify agent for analysis, err: %w", requestErr)
|
||||
} else if response.StatusCode != 200 {
|
||||
return fmt.Errorf("failed to notify agent for analysis, status code: %v", response.StatusCode)
|
||||
} else {
|
||||
logger.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, error) {
|
||||
if !provider.isReady {
|
||||
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
|
||||
}
|
||||
generalStatsUrl := fmt.Sprintf("%s/api/generalStats", provider.url)
|
||||
|
||||
response, requestErr := http.Get(generalStatsUrl)
|
||||
if requestErr != nil {
|
||||
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
|
||||
} else if response.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("failed to get general stats for telemetry, status code: %v", response.StatusCode)
|
||||
}
|
||||
|
||||
defer func() { _ = response.Body.Close() }()
|
||||
|
||||
data, readErr := ioutil.ReadAll(response.Body)
|
||||
if readErr != nil {
|
||||
return nil, fmt.Errorf("failed to read general stats for telemetry, err: %v", readErr)
|
||||
}
|
||||
|
||||
var generalStats map[string]interface{}
|
||||
if parseErr := json.Unmarshal(data, &generalStats); parseErr != nil {
|
||||
return nil, fmt.Errorf("failed to parse general stats for telemetry, err: %v", parseErr)
|
||||
}
|
||||
return generalStats, nil
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) GetHars(fromTimestamp int, toTimestamp int) (*zip.Reader, error) {
|
||||
if !provider.isReady {
|
||||
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
|
||||
}
|
||||
resp, err := http.Get(fmt.Sprintf("%s/api/har?from=%v&to=%v", provider.url, fromTimestamp, toTimestamp))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed getting har from api server %w", err)
|
||||
}
|
||||
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed reading hars %w", err)
|
||||
}
|
||||
|
||||
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed craeting zip reader %w", err)
|
||||
}
|
||||
return zipReader, nil
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) GetVersion() (string, error) {
|
||||
if !provider.isReady {
|
||||
return "", fmt.Errorf("trying to reach api server when not initialized yet")
|
||||
}
|
||||
versionUrl, _ := url.Parse(fmt.Sprintf("%s/metadata/version", provider.url))
|
||||
req := &http.Request{
|
||||
Method: http.MethodGet,
|
||||
URL: versionUrl,
|
||||
}
|
||||
statusResp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer statusResp.Body.Close()
|
||||
|
||||
versionResponse := &shared.VersionResponse{}
|
||||
if err := json.NewDecoder(statusResp.Body).Decode(&versionResponse); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return versionResponse.SemVer, nil
|
||||
}
|
45
cli/cmd/common.go
Normal file
45
cli/cmd/common.go
Normal file
@ -0,0 +1,45 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/config/configStructs"
|
||||
"github.com/up9inc/mizu/cli/errormessage"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func GetApiServerUrl() string {
|
||||
return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
|
||||
}
|
||||
|
||||
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
|
||||
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
|
||||
logger.Log.Debugf("waiting for finish...")
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
|
||||
// block until ctx cancel is called or termination signal is received
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Log.Debugf("ctx done")
|
||||
break
|
||||
case <-sigChan:
|
||||
logger.Log.Debugf("Got termination signal, canceling execution...")
|
||||
cancel()
|
||||
}
|
||||
}
|
@ -3,10 +3,13 @@ package cmd
|
||||
import (
|
||||
"github.com/creasty/defaults"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/config/configStructs"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu/version"
|
||||
"github.com/up9inc/mizu/cli/telemetry"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
)
|
||||
|
||||
var fetchCmd = &cobra.Command{
|
||||
@ -15,7 +18,12 @@ var fetchCmd = &cobra.Command{
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
go telemetry.ReportRun("fetch", config.Config.Fetch)
|
||||
|
||||
if isCompatible, err := version.CheckVersionCompatibility(config.Config.Fetch.GuiPort); err != nil {
|
||||
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl(), 1); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, make sure one running")
|
||||
return nil
|
||||
}
|
||||
|
||||
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
|
||||
return err
|
||||
} else if !isCompatible {
|
||||
return nil
|
||||
|
@ -1,96 +1,25 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"github.com/up9inc/mizu/cli/mizu/fsUtils"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
)
|
||||
|
||||
func RunMizuFetch() {
|
||||
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Fetch.GuiPort)
|
||||
resp, err := http.Get(fmt.Sprintf("http://%s/api/har?from=%v&to=%v", mizuProxiedUrl, config.Config.Fetch.FromTimestamp, config.Config.Fetch.ToTimestamp))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl(), 5); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
|
||||
}
|
||||
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
zipReader, err := apiserver.Provider.GetHars(config.Config.Fetch.FromTimestamp, config.Config.Fetch.ToTimestamp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
logger.Log.Errorf("Failed fetch data from API server %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
if err := fsUtils.Unzip(zipReader, config.Config.Fetch.Directory); err != nil {
|
||||
logger.Log.Debugf("[ERROR] failed unzip %v", err)
|
||||
}
|
||||
|
||||
_ = Unzip(zipReader, config.Config.Fetch.Directory)
|
||||
}
|
||||
|
||||
func Unzip(reader *zip.Reader, dest string) error {
|
||||
dest, _ = filepath.Abs(dest)
|
||||
_ = os.MkdirAll(dest, os.ModePerm)
|
||||
|
||||
// Closure to address file descriptors issue with all the deferred .Close() methods
|
||||
extractAndWriteFile := func(f *zip.File) error {
|
||||
rc, err := f.Open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := rc.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
path := filepath.Join(dest, f.Name)
|
||||
|
||||
// Check for ZipSlip (Directory traversal)
|
||||
if !strings.HasPrefix(path, filepath.Clean(dest)+string(os.PathSeparator)) {
|
||||
return fmt.Errorf("illegal file path: %s", path)
|
||||
}
|
||||
|
||||
if f.FileInfo().IsDir() {
|
||||
_ = os.MkdirAll(path, f.Mode())
|
||||
} else {
|
||||
_ = os.MkdirAll(filepath.Dir(path), f.Mode())
|
||||
logger.Log.Infof("writing HAR file [ %v ]", path)
|
||||
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := f.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
logger.Log.Info(" done")
|
||||
}()
|
||||
|
||||
_, err = io.Copy(f, rc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, f := range reader.File {
|
||||
err := extractAndWriteFile(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1,35 +1,28 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/config/configStructs"
|
||||
"github.com/up9inc/mizu/cli/errormessage"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"github.com/up9inc/mizu/cli/mizu/fsUtils"
|
||||
"github.com/up9inc/mizu/cli/mizu/goUtils"
|
||||
"github.com/up9inc/mizu/cli/telemetry"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/cli/errormessage"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/debounce"
|
||||
yaml "gopkg.in/yaml.v3"
|
||||
core "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -60,7 +53,6 @@ func RunMizuTap() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath())
|
||||
if err != nil {
|
||||
logger.Log.Error(err)
|
||||
@ -108,13 +100,13 @@ func RunMizuTap() {
|
||||
|
||||
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
||||
|
||||
defer cleanUpMizu(kubernetesProvider)
|
||||
defer finishMizuExecution(kubernetesProvider)
|
||||
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
|
||||
go goUtils.HandleExcWrapper(createProxyToApiServerPod, ctx, kubernetesProvider, cancel)
|
||||
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
|
||||
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel)
|
||||
|
||||
//block until exit signal or error
|
||||
@ -261,23 +253,26 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
return nil
|
||||
}
|
||||
|
||||
func cleanUpMizu(kubernetesProvider *kubernetes.Provider) {
|
||||
telemetry.ReportAPICalls(config.Config.Tap.GuiPort)
|
||||
cleanUpMizuResources(kubernetesProvider)
|
||||
}
|
||||
|
||||
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
|
||||
func finishMizuExecution(kubernetesProvider *kubernetes.Provider) {
|
||||
telemetry.ReportAPICalls()
|
||||
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
|
||||
defer cancel()
|
||||
dumpLogsIfNeeded(kubernetesProvider, removalCtx)
|
||||
cleanUpMizuResources(kubernetesProvider, removalCtx, cancel)
|
||||
}
|
||||
|
||||
if config.Config.DumpLogs {
|
||||
func dumpLogsIfNeeded(kubernetesProvider *kubernetes.Provider, removalCtx context.Context) {
|
||||
if !config.Config.DumpLogs {
|
||||
return
|
||||
}
|
||||
mizuDir := mizu.GetMizuFolderPath()
|
||||
filePath := path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
|
||||
if err := fsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil {
|
||||
logger.Log.Errorf("Failed dump logs %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider, removalCtx context.Context, cancel context.CancelFunc) {
|
||||
logger.Log.Infof("\nRemoving mizu resources\n")
|
||||
|
||||
if !config.Config.IsNsRestrictedMode() {
|
||||
@ -342,7 +337,7 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
|
||||
if err := kubernetesProvider.WaitUtilNamespaceDeleted(ctx, config.Config.MizuResourcesNamespace); err != nil {
|
||||
switch {
|
||||
case ctx.Err() == context.Canceled:
|
||||
// Do nothing. User interrupted the wait.
|
||||
logger.Log.Debugf("Do nothing. User interrupted the wait")
|
||||
case err == wait.ErrWaitTimeout:
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Timeout while removing Namespace %s", config.Config.MizuResourcesNamespace))
|
||||
default:
|
||||
@ -351,29 +346,6 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
|
||||
}
|
||||
}
|
||||
|
||||
func reportTappedPods() {
|
||||
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort)
|
||||
tappedPodsUrl := fmt.Sprintf("http://%s/status/tappedPods", mizuProxiedUrl)
|
||||
|
||||
podInfos := make([]shared.PodInfo, 0)
|
||||
for _, pod := range state.currentlyTappedPods {
|
||||
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
|
||||
}
|
||||
tapStatus := shared.TapStatus{Pods: podInfos}
|
||||
|
||||
if jsonValue, err := json.Marshal(tapStatus); err != nil {
|
||||
logger.Log.Debugf("[ERROR] failed Marshal the tapped pods %v", err)
|
||||
} else {
|
||||
if response, err := http.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
logger.Log.Debugf("[ERROR] failed sending to API server the tapped pods %v", err)
|
||||
} else if response.StatusCode != 200 {
|
||||
logger.Log.Debugf("[ERROR] failed sending to API server the tapped pods, response status code %v", response.StatusCode)
|
||||
} else {
|
||||
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, targetNamespaces []string, cancel context.CancelFunc) {
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, targetNamespaces, config.Config.Tap.PodRegex())
|
||||
|
||||
@ -389,7 +361,9 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
return
|
||||
}
|
||||
|
||||
reportTappedPods()
|
||||
if err := apiserver.Provider.ReportTappedPods(state.currentlyTappedPods); err != nil {
|
||||
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
||||
}
|
||||
|
||||
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
||||
if err != nil {
|
||||
@ -496,7 +470,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
||||
return missingPods
|
||||
}
|
||||
|
||||
func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
|
||||
isPodReady := false
|
||||
@ -522,10 +496,17 @@ func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernet
|
||||
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
||||
isPodReady = true
|
||||
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
||||
logger.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
|
||||
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
|
||||
requestForAnalysis()
|
||||
reportTappedPods()
|
||||
|
||||
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl(), 20); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
logger.Log.Infof("Mizu is available at %s\n", GetApiServerUrl())
|
||||
requestForAnalysisIfNeeded()
|
||||
if err := apiserver.Provider.ReportTappedPods(state.currentlyTappedPods); err != nil {
|
||||
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
||||
}
|
||||
}
|
||||
case <-timeAfter:
|
||||
if !isPodReady {
|
||||
@ -539,34 +520,12 @@ func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernet
|
||||
}
|
||||
}
|
||||
|
||||
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
|
||||
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func requestForAnalysis() {
|
||||
func requestForAnalysisIfNeeded() {
|
||||
if !config.Config.Tap.Analysis {
|
||||
return
|
||||
}
|
||||
|
||||
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort)
|
||||
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", mizuProxiedUrl, url.QueryEscape(config.Config.Tap.AnalysisDestination), config.Config.Tap.SleepIntervalSec)
|
||||
u, parseErr := url.ParseRequestURI(urlPath)
|
||||
if parseErr != nil {
|
||||
logger.Log.Fatal("Failed parsing the URL (consider changing the analysis dest URL), err: %v", parseErr)
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Sending get request to %v", u.String())
|
||||
if response, requestErr := http.Get(u.String()); requestErr != nil {
|
||||
logger.Log.Errorf("Failed to notify agent for analysis, err: %v", requestErr)
|
||||
} else if response.StatusCode != 200 {
|
||||
logger.Log.Errorf("Failed to notify agent for analysis, status code: %v", response.StatusCode)
|
||||
} else {
|
||||
logger.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
|
||||
if err := apiserver.Provider.RequestAnalysis(config.Config.Tap.AnalysisDestination, config.Config.Tap.SleepIntervalSec); err != nil {
|
||||
logger.Log.Debugf("[Error] failed requesting for analysis %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -598,19 +557,6 @@ func getNodeHostToTappedPodIpsMap(tappedPods []core.Pod) map[string][]string {
|
||||
return nodeToTappedPodIPMap
|
||||
}
|
||||
|
||||
func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
|
||||
// block until ctx cancel is called or termination signal is received
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break
|
||||
case <-sigChan:
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
|
||||
if config.Config.Tap.AllNamespaces {
|
||||
return []string{mizu.K8sAllNamespaces}
|
||||
|
@ -3,13 +3,14 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"github.com/up9inc/mizu/cli/mizu/version"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
func runMizuView() {
|
||||
@ -34,19 +35,21 @@ func runMizuView() {
|
||||
return
|
||||
}
|
||||
|
||||
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.View.GuiPort)
|
||||
_, err = http.Get(fmt.Sprintf("http://%s/", mizuProxiedUrl))
|
||||
if err == nil {
|
||||
response, err := http.Get(fmt.Sprintf("%s/", GetApiServerUrl()))
|
||||
if err == nil && response.StatusCode == 200 {
|
||||
logger.Log.Infof("Found a running service %s and open port %d", mizu.ApiServerPodName, config.Config.View.GuiPort)
|
||||
return
|
||||
}
|
||||
logger.Log.Infof("Establishing connection to k8s cluster...")
|
||||
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
||||
|
||||
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
|
||||
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl(), 10); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
|
||||
return
|
||||
}
|
||||
|
||||
logger.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.View.GuiPort))
|
||||
if isCompatible, err := version.CheckVersionCompatibility(config.Config.View.GuiPort); err != nil {
|
||||
logger.Log.Infof("Mizu is available at %s\n", GetApiServerUrl())
|
||||
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
|
||||
logger.Log.Errorf("Failed to check versions compatibility %v", err)
|
||||
cancel()
|
||||
return
|
||||
|
@ -3,9 +3,11 @@ package fsUtils
|
||||
import (
|
||||
"archive/zip"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func AddFileToZip(zipWriter *zip.Writer, filename string) error {
|
||||
@ -53,3 +55,60 @@ func AddStrToZip(writer *zip.Writer, logs string, fileName string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Unzip(reader *zip.Reader, dest string) error {
|
||||
dest, _ = filepath.Abs(dest)
|
||||
_ = os.MkdirAll(dest, os.ModePerm)
|
||||
|
||||
// Closure to address file descriptors issue with all the deferred .Close() methods
|
||||
extractAndWriteFile := func(f *zip.File) error {
|
||||
rc, err := f.Open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := rc.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
|
||||
path := filepath.Join(dest, f.Name)
|
||||
|
||||
// Check for ZipSlip (Directory traversal)
|
||||
if !strings.HasPrefix(path, filepath.Clean(dest)+string(os.PathSeparator)) {
|
||||
return fmt.Errorf("illegal file path: %s", path)
|
||||
}
|
||||
|
||||
if f.FileInfo().IsDir() {
|
||||
_ = os.MkdirAll(path, f.Mode())
|
||||
} else {
|
||||
_ = os.MkdirAll(filepath.Dir(path), f.Mode())
|
||||
logger.Log.Infof("writing HAR file [ %v ]", path)
|
||||
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := f.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
logger.Log.Info(" done")
|
||||
}()
|
||||
|
||||
_, err = io.Copy(f, rc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, f := range reader.File {
|
||||
err := extractAndWriteFile(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -2,43 +2,21 @@ package version
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-github/v37/github"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/semver"
|
||||
)
|
||||
|
||||
func getApiVersion(port uint16) (string, error) {
|
||||
versionUrl, _ := url.Parse(fmt.Sprintf("http://localhost:%d/mizu/metadata/version", port))
|
||||
req := &http.Request{
|
||||
Method: http.MethodGet,
|
||||
URL: versionUrl,
|
||||
}
|
||||
statusResp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer statusResp.Body.Close()
|
||||
|
||||
versionResponse := &shared.VersionResponse{}
|
||||
if err := json.NewDecoder(statusResp.Body).Decode(&versionResponse); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return versionResponse.SemVer, nil
|
||||
}
|
||||
|
||||
func CheckVersionCompatibility(port uint16) (bool, error) {
|
||||
apiSemVer, err := getApiVersion(port)
|
||||
func CheckVersionCompatibility() (bool, error) {
|
||||
apiSemVer, err := apiserver.Provider.GetVersion()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -5,11 +5,10 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/denisbrodbeck/machineid"
|
||||
"github.com/up9inc/mizu/cli/apiserver"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/logger"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
@ -35,35 +34,15 @@ func ReportRun(cmd string, args interface{}) {
|
||||
logger.Log.Debugf("successfully reported telemetry for cmd %v", cmd)
|
||||
}
|
||||
|
||||
func ReportAPICalls(mizuPort uint16) {
|
||||
func ReportAPICalls() {
|
||||
if !shouldRunTelemetry() {
|
||||
logger.Log.Debugf("not reporting telemetry")
|
||||
return
|
||||
}
|
||||
|
||||
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(mizuPort)
|
||||
generalStatsUrl := fmt.Sprintf("http://%s/api/generalStats", mizuProxiedUrl)
|
||||
|
||||
response, requestErr := http.Get(generalStatsUrl)
|
||||
if requestErr != nil {
|
||||
logger.Log.Debugf("ERROR: failed to get general stats for telemetry, err: %v", requestErr)
|
||||
return
|
||||
} else if response.StatusCode != 200 {
|
||||
logger.Log.Debugf("ERROR: failed to get general stats for telemetry, status code: %v", response.StatusCode)
|
||||
return
|
||||
}
|
||||
|
||||
defer func() { _ = response.Body.Close() }()
|
||||
|
||||
data, readErr := ioutil.ReadAll(response.Body)
|
||||
if readErr != nil {
|
||||
logger.Log.Debugf("ERROR: failed to read general stats for telemetry, err: %v", readErr)
|
||||
return
|
||||
}
|
||||
|
||||
var generalStats map[string]interface{}
|
||||
if parseErr := json.Unmarshal(data, &generalStats); parseErr != nil {
|
||||
logger.Log.Debugf("ERROR: failed to parse general stats for telemetry, err: %v", parseErr)
|
||||
generalStats, err := apiserver.Provider.GetGeneralStats()
|
||||
if err != nil {
|
||||
logger.Log.Debugf("[ERROR] failed get general stats from api server %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user