From be3375f797cd912ba9951a9bcd3ba46aa94733be Mon Sep 17 00:00:00 2001 From: RoyUP9 <87927115+RoyUP9@users.noreply.github.com> Date: Wed, 26 Jan 2022 12:11:34 +0200 Subject: [PATCH] Added post install connectivity check (#686) --- cli/cmd/checkRunner.go | 70 ++++++++++++++++++++++++++++++----- cli/cmd/common.go | 6 ++- cli/cmd/installRunner.go | 75 -------------------------------------- cli/cmd/tapRunner.go | 2 +- cli/cmd/viewRunner.go | 2 +- shared/kubernetes/proxy.go | 21 ++++++++--- 6 files changed, 83 insertions(+), 93 deletions(-) diff --git a/cli/cmd/checkRunner.go b/cli/cmd/checkRunner.go index 71417e13c..77bdb6518 100644 --- a/cli/cmd/checkRunner.go +++ b/cli/cmd/checkRunner.go @@ -9,7 +9,7 @@ import ( "github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/semver" - "net/http" + "regexp" ) func runMizuCheck() { @@ -34,7 +34,7 @@ func runMizuCheck() { } if checkPassed { - checkPassed = checkServerConnection(kubernetesProvider, cancel) + checkPassed = checkServerConnection(kubernetesProvider) } if checkPassed { @@ -91,23 +91,75 @@ func checkKubernetesVersion(kubernetesVersion *semver.SemVersion) bool { return true } -func checkServerConnection(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) bool { +func checkServerConnection(kubernetesProvider *kubernetes.Provider) bool { logger.Log.Infof("\nmizu-connectivity\n--------------------") serverUrl := GetApiServerUrl() - if response, err := http.Get(fmt.Sprintf("%s/", serverUrl)); err != nil || response.StatusCode != 200 { - startProxyReportErrorIfAny(kubernetesProvider, cancel) + apiServerProvider := apiserver.NewProviderWithoutRetries(serverUrl, apiserver.DefaultTimeout) + if err := apiServerProvider.TestConnection(); err == nil { + logger.Log.Infof("%v found Mizu server tunnel available and connected successfully to API server", fmt.Sprintf(uiUtils.Green, "√")) + return true + } + + connectedToApiServer := false + + if err := checkProxy(serverUrl, kubernetesProvider); err != nil { + logger.Log.Errorf("%v couldn't connect to API server using proxy, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err) + } else { + connectedToApiServer = true + logger.Log.Infof("%v connected successfully to API server using proxy", fmt.Sprintf(uiUtils.Green, "√")) + } + + if err := checkPortForward(serverUrl, kubernetesProvider); err != nil { + logger.Log.Errorf("%v couldn't connect to API server using port-forward, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err) + } else { + connectedToApiServer = true + logger.Log.Infof("%v connected successfully to API server using port-forward", fmt.Sprintf(uiUtils.Green, "√")) + } + + return connectedToApiServer +} + +func checkProxy(serverUrl string, kubernetesProvider *kubernetes.Provider) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel) + if err != nil { + return err } apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout) if err := apiServerProvider.TestConnection(); err != nil { - logger.Log.Errorf("%v couldn't connect to API server, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err) - return false + return err } - logger.Log.Infof("%v connected successfully to API server", fmt.Sprintf(uiUtils.Green, "√")) - return true + if err := httpServer.Shutdown(ctx); err != nil { + logger.Log.Debugf("Error occurred while stopping proxy, err: %v", err) + } + + return nil +} + +func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName) + forwarder, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel) + if err != nil { + return err + } + + apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout) + if err := apiServerProvider.TestConnection(); err != nil { + return err + } + + forwarder.Close() + + return nil } func checkAllResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider, isInstallCommand bool) bool { diff --git a/cli/cmd/common.go b/cli/cmd/common.go index 4b31257bb..243409642 100644 --- a/cli/cmd/common.go +++ b/cli/cmd/common.go @@ -14,6 +14,7 @@ import ( "github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/shared" "path" + "regexp" "time" "github.com/up9inc/mizu/cli/config" @@ -25,7 +26,7 @@ func GetApiServerUrl() string { return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort)) } -func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { +func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc) { httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel) if err != nil { logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+ @@ -41,7 +42,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel logger.Log.Debugf("Error occurred while stopping proxy %v", errormessage.FormatError(err)) } - if err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, config.Config.Tap.GuiPort, cancel); err != nil { + podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName) + if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel); err != nil { logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running port forward %v\n"+ "Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName)) cancel() diff --git a/cli/cmd/installRunner.go b/cli/cmd/installRunner.go index fcb74395a..dd5d78321 100644 --- a/cli/cmd/installRunner.go +++ b/cli/cmd/installRunner.go @@ -4,11 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/up9inc/mizu/shared/kubernetes" - core "k8s.io/api/core/v1" - "regexp" - "time" - "github.com/creasty/defaults" "github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/errormessage" @@ -63,20 +58,6 @@ func runMizuInstall() { return } - logger.Log.Infof("Waiting for Mizu server to start...") - readyChan := make(chan string) - readyErrorChan := make(chan error) - go watchApiServerPodReady(ctx, kubernetesProvider, readyChan, readyErrorChan) - - select { - case readyMessage := <-readyChan: - logger.Log.Infof(readyMessage) - case err := <-readyErrorChan: - defer resources.CleanUpMizuResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.MizuResourcesNamespace) - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("%v", errormessage.FormatError(err))) - return - } - logger.Log.Infof(uiUtils.Magenta, "Installation completed, run `mizu view` to connect to the mizu daemon instance") } @@ -96,59 +77,3 @@ func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Reso return &mizuAgentConfig } - -func watchApiServerPodReady(ctx context.Context, kubernetesProvider *kubernetes.Provider, readyChan chan string, readyErrorChan chan error) { - podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.ApiServerPodName)) - podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) - eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) - - apiServerTimeoutSec := config.GetIntEnvConfig(config.ApiServerTimeoutSec, 120) - timeAfter := time.After(time.Duration(apiServerTimeoutSec) * time.Second) - for { - select { - case wEvent, ok := <-eventChan: - if !ok { - eventChan = nil - continue - } - - switch wEvent.Type { - case kubernetes.EventAdded: - logger.Log.Debugf("Watching API Server pod ready loop, added") - case kubernetes.EventDeleted: - logger.Log.Debugf("Watching API Server pod ready loop, %s removed", kubernetes.ApiServerPodName) - case kubernetes.EventModified: - modifiedPod, err := wEvent.ToPod() - if err != nil { - readyErrorChan <- err - return - } - - logger.Log.Debugf("Watching API Server pod ready loop, modified: %v", modifiedPod.Status.Phase) - - if modifiedPod.Status.Phase == core.PodRunning { - readyChan <- fmt.Sprintf("%v pod is running", modifiedPod.Name) - return - } - case kubernetes.EventBookmark: - break - case kubernetes.EventError: - break - } - case err, ok := <-errorChan: - if !ok { - errorChan = nil - continue - } - - readyErrorChan <- fmt.Errorf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err) - return - case <-timeAfter: - readyErrorChan <- fmt.Errorf("mizu API server was not ready in time") - return - case <-ctx.Done(): - logger.Log.Debugf("Watching API Server pod ready loop, ctx done") - return - } - } -} diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 3fe5de91a..6c104a6ff 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -422,7 +422,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr } func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, err error) { - startProxyReportErrorIfAny(kubernetesProvider, cancel) + startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel) options, _ := getMizuApiFilteringOptions() if err = startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, *options, state.startTime); err != nil { diff --git a/cli/cmd/viewRunner.go b/cli/cmd/viewRunner.go index c1a4e1005..8161ddbe2 100644 --- a/cli/cmd/viewRunner.go +++ b/cli/cmd/viewRunner.go @@ -47,7 +47,7 @@ func runMizuView() { return } logger.Log.Infof("Establishing connection to k8s cluster...") - startProxyReportErrorIfAny(kubernetesProvider, cancel) + startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel) } apiServerProvider := apiserver.NewProvider(url, apiserver.DefaultRetries, apiserver.DefaultTimeout) diff --git a/shared/kubernetes/proxy.go b/shared/kubernetes/proxy.go index 44d627451..57b919c2d 100644 --- a/shared/kubernetes/proxy.go +++ b/shared/kubernetes/proxy.go @@ -11,6 +11,7 @@ import ( "net" "net/http" "net/url" + "regexp" "strings" "time" @@ -84,12 +85,21 @@ func getRerouteHttpHandlerMizuStatic(proxyHandler http.Handler, mizuNamespace st }) } -func NewPortForward(kubernetesProvider *Provider, namespace string, podName string, localPort uint16, cancel context.CancelFunc) error { - logger.Log.Debugf("Starting proxy using port-forward method. namespace: [%v], service name: [%s], port: [%v]", namespace, podName, localPort) +func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *regexp.Regexp, localPort uint16, ctx context.Context, cancel context.CancelFunc) (*portforward.PortForwarder, error) { + pods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, podRegex, []string{namespace}) + if err != nil { + return nil, err + } else if len(pods) == 0 { + return nil, fmt.Errorf("didn't find pod to port-forward") + } + + podName := pods[0].Name + + logger.Log.Debugf("Starting proxy using port-forward method. namespace: [%v], pod name: [%s], port: [%v]", namespace, podName, localPort) dialer, err := getHttpDialer(kubernetesProvider, namespace, podName) if err != nil { - return err + return nil, err } stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1) @@ -97,7 +107,7 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podName stri forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", localPort, shared.DefaultApiServerPort)}, stopChan, readyChan, out, errOut) if err != nil { - return err + return nil, err } go func() { @@ -107,7 +117,7 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podName stri } }() - return nil + return forwarder, nil } func getHttpDialer(kubernetesProvider *Provider, namespace string, podName string) (httpstream.Dialer, error) { @@ -116,6 +126,7 @@ func getHttpDialer(kubernetesProvider *Provider, namespace string, podName strin logger.Log.Errorf("Error creating http dialer") return nil, err } + path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName) hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") // no need specify "t" twice serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}