Added post install connectivity check (#686)

This commit is contained in:
RoyUP9 2022-01-26 12:11:34 +02:00 committed by GitHub
parent 0c56c0f541
commit be3375f797
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 83 additions and 93 deletions

View File

@ -9,7 +9,7 @@ import (
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver" "github.com/up9inc/mizu/shared/semver"
"net/http" "regexp"
) )
func runMizuCheck() { func runMizuCheck() {
@ -34,7 +34,7 @@ func runMizuCheck() {
} }
if checkPassed { if checkPassed {
checkPassed = checkServerConnection(kubernetesProvider, cancel) checkPassed = checkServerConnection(kubernetesProvider)
} }
if checkPassed { if checkPassed {
@ -91,23 +91,75 @@ func checkKubernetesVersion(kubernetesVersion *semver.SemVersion) bool {
return true return true
} }
func checkServerConnection(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) bool { func checkServerConnection(kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nmizu-connectivity\n--------------------") logger.Log.Infof("\nmizu-connectivity\n--------------------")
serverUrl := GetApiServerUrl() serverUrl := GetApiServerUrl()
if response, err := http.Get(fmt.Sprintf("%s/", serverUrl)); err != nil || response.StatusCode != 200 { apiServerProvider := apiserver.NewProviderWithoutRetries(serverUrl, apiserver.DefaultTimeout)
startProxyReportErrorIfAny(kubernetesProvider, cancel) if err := apiServerProvider.TestConnection(); err == nil {
logger.Log.Infof("%v found Mizu server tunnel available and connected successfully to API server", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
connectedToApiServer := false
if err := checkProxy(serverUrl, kubernetesProvider); err != nil {
logger.Log.Errorf("%v couldn't connect to API server using proxy, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
} else {
connectedToApiServer = true
logger.Log.Infof("%v connected successfully to API server using proxy", fmt.Sprintf(uiUtils.Green, "√"))
}
if err := checkPortForward(serverUrl, kubernetesProvider); err != nil {
logger.Log.Errorf("%v couldn't connect to API server using port-forward, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
} else {
connectedToApiServer = true
logger.Log.Infof("%v connected successfully to API server using port-forward", fmt.Sprintf(uiUtils.Green, "√"))
}
return connectedToApiServer
}
func checkProxy(serverUrl string, kubernetesProvider *kubernetes.Provider) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
if err != nil {
return err
} }
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout) apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err != nil { if err := apiServerProvider.TestConnection(); err != nil {
logger.Log.Errorf("%v couldn't connect to API server, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err) return err
return false
} }
logger.Log.Infof("%v connected successfully to API server", fmt.Sprintf(uiUtils.Green, "√")) if err := httpServer.Shutdown(ctx); err != nil {
return true logger.Log.Debugf("Error occurred while stopping proxy, err: %v", err)
}
return nil
}
func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName)
forwarder, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel)
if err != nil {
return err
}
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err != nil {
return err
}
forwarder.Close()
return nil
} }
func checkAllResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider, isInstallCommand bool) bool { func checkAllResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider, isInstallCommand bool) bool {

View File

@ -14,6 +14,7 @@ import (
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"path" "path"
"regexp"
"time" "time"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
@ -25,7 +26,7 @@ func GetApiServerUrl() string {
return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort)) return fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
} }
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc) {
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel) httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
if err != nil { if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+ logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
@ -41,7 +42,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel
logger.Log.Debugf("Error occurred while stopping proxy %v", errormessage.FormatError(err)) logger.Log.Debugf("Error occurred while stopping proxy %v", errormessage.FormatError(err))
} }
if err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, config.Config.Tap.GuiPort, cancel); err != nil { podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName)
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running port forward %v\n"+ logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running port forward %v\n"+
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName)) "Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
cancel() cancel()

View File

@ -4,11 +4,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/up9inc/mizu/shared/kubernetes"
core "k8s.io/api/core/v1"
"regexp"
"time"
"github.com/creasty/defaults" "github.com/creasty/defaults"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/errormessage" "github.com/up9inc/mizu/cli/errormessage"
@ -63,20 +58,6 @@ func runMizuInstall() {
return return
} }
logger.Log.Infof("Waiting for Mizu server to start...")
readyChan := make(chan string)
readyErrorChan := make(chan error)
go watchApiServerPodReady(ctx, kubernetesProvider, readyChan, readyErrorChan)
select {
case readyMessage := <-readyChan:
logger.Log.Infof(readyMessage)
case err := <-readyErrorChan:
defer resources.CleanUpMizuResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.MizuResourcesNamespace)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("%v", errormessage.FormatError(err)))
return
}
logger.Log.Infof(uiUtils.Magenta, "Installation completed, run `mizu view` to connect to the mizu daemon instance") logger.Log.Infof(uiUtils.Magenta, "Installation completed, run `mizu view` to connect to the mizu daemon instance")
} }
@ -96,59 +77,3 @@ func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Reso
return &mizuAgentConfig return &mizuAgentConfig
} }
func watchApiServerPodReady(ctx context.Context, kubernetesProvider *kubernetes.Provider, readyChan chan string, readyErrorChan chan error) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.ApiServerPodName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
apiServerTimeoutSec := config.GetIntEnvConfig(config.ApiServerTimeoutSec, 120)
timeAfter := time.After(time.Duration(apiServerTimeoutSec) * time.Second)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
switch wEvent.Type {
case kubernetes.EventAdded:
logger.Log.Debugf("Watching API Server pod ready loop, added")
case kubernetes.EventDeleted:
logger.Log.Debugf("Watching API Server pod ready loop, %s removed", kubernetes.ApiServerPodName)
case kubernetes.EventModified:
modifiedPod, err := wEvent.ToPod()
if err != nil {
readyErrorChan <- err
return
}
logger.Log.Debugf("Watching API Server pod ready loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodRunning {
readyChan <- fmt.Sprintf("%v pod is running", modifiedPod.Name)
return
}
case kubernetes.EventBookmark:
break
case kubernetes.EventError:
break
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
readyErrorChan <- fmt.Errorf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err)
return
case <-timeAfter:
readyErrorChan <- fmt.Errorf("mizu API server was not ready in time")
return
case <-ctx.Done():
logger.Log.Debugf("Watching API Server pod ready loop, ctx done")
return
}
}
}

View File

@ -422,7 +422,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr
} }
func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, err error) { func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, err error) {
startProxyReportErrorIfAny(kubernetesProvider, cancel) startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel)
options, _ := getMizuApiFilteringOptions() options, _ := getMizuApiFilteringOptions()
if err = startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, *options, state.startTime); err != nil { if err = startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, *options, state.startTime); err != nil {

View File

@ -47,7 +47,7 @@ func runMizuView() {
return return
} }
logger.Log.Infof("Establishing connection to k8s cluster...") logger.Log.Infof("Establishing connection to k8s cluster...")
startProxyReportErrorIfAny(kubernetesProvider, cancel) startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel)
} }
apiServerProvider := apiserver.NewProvider(url, apiserver.DefaultRetries, apiserver.DefaultTimeout) apiServerProvider := apiserver.NewProvider(url, apiserver.DefaultRetries, apiserver.DefaultTimeout)

View File

@ -11,6 +11,7 @@ import (
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"regexp"
"strings" "strings"
"time" "time"
@ -84,12 +85,21 @@ func getRerouteHttpHandlerMizuStatic(proxyHandler http.Handler, mizuNamespace st
}) })
} }
func NewPortForward(kubernetesProvider *Provider, namespace string, podName string, localPort uint16, cancel context.CancelFunc) error { func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *regexp.Regexp, localPort uint16, ctx context.Context, cancel context.CancelFunc) (*portforward.PortForwarder, error) {
logger.Log.Debugf("Starting proxy using port-forward method. namespace: [%v], service name: [%s], port: [%v]", namespace, podName, localPort) pods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, podRegex, []string{namespace})
if err != nil {
return nil, err
} else if len(pods) == 0 {
return nil, fmt.Errorf("didn't find pod to port-forward")
}
podName := pods[0].Name
logger.Log.Debugf("Starting proxy using port-forward method. namespace: [%v], pod name: [%s], port: [%v]", namespace, podName, localPort)
dialer, err := getHttpDialer(kubernetesProvider, namespace, podName) dialer, err := getHttpDialer(kubernetesProvider, namespace, podName)
if err != nil { if err != nil {
return err return nil, err
} }
stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1) stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1)
@ -97,7 +107,7 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podName stri
forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", localPort, shared.DefaultApiServerPort)}, stopChan, readyChan, out, errOut) forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", localPort, shared.DefaultApiServerPort)}, stopChan, readyChan, out, errOut)
if err != nil { if err != nil {
return err return nil, err
} }
go func() { go func() {
@ -107,7 +117,7 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podName stri
} }
}() }()
return nil return forwarder, nil
} }
func getHttpDialer(kubernetesProvider *Provider, namespace string, podName string) (httpstream.Dialer, error) { func getHttpDialer(kubernetesProvider *Provider, namespace string, podName string) (httpstream.Dialer, error) {
@ -116,6 +126,7 @@ func getHttpDialer(kubernetesProvider *Provider, namespace string, podName strin
logger.Log.Errorf("Error creating http dialer") logger.Log.Errorf("Error creating http dialer")
return nil, err return nil, err
} }
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName) path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName)
hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") // no need specify "t" twice hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") // no need specify "t" twice
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP} serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}