From b1df4b69ae422f30e87f5ebfb7f90c1b1a8e0a8e Mon Sep 17 00:00:00 2001 From: RamiBerm Date: Sun, 11 Jul 2021 13:07:25 +0300 Subject: [PATCH] Update public_routes.go, fetchRunner.go, and 3 more files... --- api/pkg/routes/public_routes.go | 1 + cli/cmd/fetchRunner.go | 5 ++- cli/cmd/tapRunner.go | 47 +++------------------------- cli/kubernetes/portForward.go | 52 ------------------------------- cli/kubernetes/proxy.go | 54 +++++++++++++++++++++------------ 5 files changed, 44 insertions(+), 115 deletions(-) delete mode 100644 cli/kubernetes/portForward.go diff --git a/api/pkg/routes/public_routes.go b/api/pkg/routes/public_routes.go index 3bdbe150a..f95521f4a 100644 --- a/api/pkg/routes/public_routes.go +++ b/api/pkg/routes/public_routes.go @@ -21,4 +21,5 @@ func EntriesRoutes(fiberApp *fiber.App) { routeGroup.Get("/tapStatus", controllers.GetTappingStatus) // get tapping status routeGroup.Get("/analyzeStatus", controllers.AnalyzeInformation) + } diff --git a/cli/cmd/fetchRunner.go b/cli/cmd/fetchRunner.go index 58db0352b..959a59acc 100644 --- a/cli/cmd/fetchRunner.go +++ b/cli/cmd/fetchRunner.go @@ -4,6 +4,8 @@ import ( "archive/zip" "bytes" "fmt" + "github.com/up9inc/mizu/cli/kubernetes" + "github.com/up9inc/mizu/cli/mizu" "io" "io/ioutil" "log" @@ -14,7 +16,8 @@ import ( ) func RunMizuFetch(fetch *MizuFetchOptions) { - resp, err := http.Get(fmt.Sprintf("http://localhost:%v/api/har?from=%v&to=%v", fetch.MizuPort, fetch.FromTimestamp, fetch.ToTimestamp)) + mizuProxiedUrl := kubernetes.GetMizuCollectorProxiesHostAndPath(uint16(fetch.MizuPort), mizu.ResourcesNamespace, mizu.AggregatorPodName) + resp, err := http.Get(fmt.Sprintf("http://%s/api/har?from=%v&to=%v", mizuProxiedUrl, fetch.FromTimestamp, fetch.ToTimestamp)) if err != nil { log.Fatal(err) } diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 94aade36c..089a34c97 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -3,7 +3,6 @@ package cmd import ( "context" "fmt" - "net/http" "os" "os/signal" "regexp" @@ -232,7 +231,6 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.AggregatorPodName)) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex) isPodReady := false - var portForward *kubernetes.PortForward for { select { case <-added: @@ -244,29 +242,16 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi case modifiedPod := <-modified: if modifiedPod.Status.Phase == "Running" && !isPodReady { go func() { - err := kubernetes.StartProxy(ctx, kubernetesProvider, tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName) + err := kubernetes.StartProxy(kubernetesProvider, tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName) if err != nil { fmt.Printf("Error starting k8s proxy %v\n", err) + cancel() + } else { + fmt.Printf("Mizu is available at http://%s\n", kubernetes.GetMizuCollectorProxiesHostAndPath(tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName)) } }() isPodReady = true - //var portForwardCreateError error - //if portForward, portForwardCreateError = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, fmt.Sprintf(mizu.AggregatorPodName), tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel); portForwardCreateError != nil { - // fmt.Printf("error forwarding port to pod %s\n", portForwardCreateError) - // cancel() - //} else { - // fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort) - // time.Sleep(time.Second * 5) // Waiting to be sure port forwarding finished - // if tappingOptions.Analyze { - // if _, err := http.Get(fmt.Sprintf("http://localhost:%d/api/uploadEntries?dest=%s", tappingOptions.GuiPort, tappingOptions.AnalyzeDestination)); err != nil { - // fmt.Println(err) - // } else { - // fmt.Printf(mizu.Purple, "Traffic is uploading to UP9 cloud for further analsys") - // fmt.Println() - // } - // } - //} } case <-time.After(25 * time.Second): @@ -279,18 +264,11 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi cancel() case <-ctx.Done(): - if portForward != nil { - portForward.Stop() - } return } } } -func printMizuReadyMessages(tappingOptions *MizuTapOptions) { - -} - func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool { mizuRBACExists, err := kubernetesProvider.DoesMizuRBACExist(ctx, mizu.ResourcesNamespace) if err != nil { @@ -334,7 +312,7 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) { } func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOptions *MizuTapOptions) { - controlSocket, err := mizu.CreateControlSocket(fmt.Sprintf("ws://localhost:%d/ws", tappingOptions.GuiPort)) + controlSocket, err := mizu.CreateControlSocket(fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuCollectorProxiesHostAndPath(tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName))) if err != nil { fmt.Printf("error establishing control socket connection %s\n", err) cancel() @@ -355,21 +333,6 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption } -func spamHealthcheck(ctx context.Context, tappingOptions *MizuTapOptions) { - for { - select { - case <- ctx.Done(): - return - default: - _, err := http.Get(fmt.Sprintf("http://localhost:%d/health", tappingOptions.GuiPort)) - if err != nil { - fmt.Printf("Error sending healthcheck %v\n", err) - } - } - time.Sleep(1 * time.Second) - } -} - func getNamespace(tappingOptions *MizuTapOptions, kubernetesProvider *kubernetes.Provider) string { if tappingOptions.AllNamespaces { return mizu.K8sAllNamespaces diff --git a/cli/kubernetes/portForward.go b/cli/kubernetes/portForward.go deleted file mode 100644 index bc00ef8a1..000000000 --- a/cli/kubernetes/portForward.go +++ /dev/null @@ -1,52 +0,0 @@ -package kubernetes - -import ( - "bytes" - "context" - "fmt" - "k8s.io/apimachinery/pkg/util/httpstream" - "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/transport/spdy" - "net/http" - "net/url" - "strings" -) - -type PortForward struct { - stopChan chan struct{} -} - -func NewPortForward(kubernetesProvider *Provider, namespace string, podName string, localPort uint16, podPort uint16, cancel context.CancelFunc) (*PortForward, error) { - dialer := getHttpDialer(kubernetesProvider, namespace, podName) - stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1) - out, errOut := new(bytes.Buffer), new(bytes.Buffer) - - forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", localPort, podPort)}, stopChan, readyChan, out, errOut) - if err != nil { - return nil, err - } - go func() { - err = forwarder.ForwardPorts() // this is blocking - if err != nil { - fmt.Printf("kubernetes port-forwarding error: %s", err) - cancel() - } - }() - return &PortForward{stopChan: stopChan}, nil -} - -func (portForward *PortForward) Stop() { - close(portForward.stopChan) -} - -func getHttpDialer(kubernetesProvider *Provider, namespace string, podName string) httpstream.Dialer { - roundTripper, upgrader, err := spdy.RoundTripperFor(&kubernetesProvider.clientConfig) - if err != nil { - panic(err) - } - path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName) - hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") - serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP} - - return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL) -} diff --git a/cli/kubernetes/proxy.go b/cli/kubernetes/proxy.go index 9c8a22192..9cbd59d3d 100644 --- a/cli/kubernetes/proxy.go +++ b/cli/kubernetes/proxy.go @@ -1,14 +1,17 @@ package kubernetes import ( - "context" "fmt" "k8s.io/kubectl/pkg/proxy" + "net" + "net/http" + "net/url" "time" ) -func StartProxy(ctx context.Context, kubernetesProvider *Provider, mizuPort uint16, mizuNamespace string, mizuServiceName string) error { - //o := cmdProxy.NewProxyOptions(genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr}) +const k8sProxyApiPrefix = "/" + +func StartProxy(kubernetesProvider *Provider, mizuPort uint16, mizuNamespace string, mizuServiceName string) error { filter := &proxy.FilterServer{ AcceptPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathAcceptRE), RejectPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathRejectRE), @@ -16,26 +19,37 @@ func StartProxy(ctx context.Context, kubernetesProvider *Provider, mizuPort uint RejectMethods: proxy.MakeRegexpArrayOrDie(proxy.DefaultMethodRejectRE), } - server, err := proxy.NewServer("", "/", "/static/", filter, &kubernetesProvider.clientConfig, time.Second * 1) + mizuProxiedUrl := GetMizuCollectorProxiesHostAndPath(mizuPort, mizuNamespace, mizuServiceName) + proxyHandler, err := proxy.NewProxyHandler(k8sProxyApiPrefix, filter, &kubernetesProvider.clientConfig, time.Second * 2) + if err != nil { + return err + } + mux := http.NewServeMux() + mux.Handle(k8sProxyApiPrefix, proxyHandler) + //work around to make static resources available to the dashboard (all .svgs will not load without this) + mux.Handle("/static/", getRerouteHttpHandler(proxyHandler, mizuProxiedUrl)) - l, err := server.Listen("127.0.0.1", int(mizuPort)) + //l, err := server.Listen("127.0.0.1", int(mizuPort)) + l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", int(mizuPort))) if err != nil { return err } - go func() { - for { - select { - case <-ctx.Done(): - fmt.Printf("Closing connection due to context done") - err := l.Close() - if err != nil { - fmt.Printf("Error stopping proxy network handler %v", err) - } - return - } - } - }() - fmt.Printf("Mizu is available at http://localhost:%d/api/v1/namespaces/%s/services/%s:80/proxy\n", mizuPort, mizuNamespace, mizuServiceName) - return server.ServeOnListener(l) + server := http.Server{ + Handler: mux, + } + return server.Serve(l) +} + +func GetMizuCollectorProxiesHostAndPath(mizuPort uint16, mizuNamespace string, mizuServiceName string) string { + return fmt.Sprintf("localhost:%d/api/v1/namespaces/%s/services/%s:80/proxy", mizuPort, mizuNamespace, mizuServiceName) +} + +// rewrites requests so they end up reaching the mizu-collector k8s service via the k8s proxy handler +func getRerouteHttpHandler(proxyHandler http.Handler, mizuProxyUrl string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + newUrl, _ := url.Parse(fmt.Sprintf("http://%s%s", mizuProxyUrl, r.URL.Path)) + r.URL = newUrl + proxyHandler.ServeHTTP(w, r) + }) }