Print Mizu URL only after the watched pods (#156)

* Print Mizu URL after the watched pods
This commit is contained in:
Igor Gov
2021-08-03 08:54:10 +03:00
committed by GitHub
parent ceb8d714e3
commit 793bb97e51
3 changed files with 41 additions and 31 deletions

View File

@@ -86,9 +86,10 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
return return
} }
urlReadyChan := make(chan string)
mizu.CheckNewerVersion() mizu.CheckNewerVersion()
go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions) // TODO convert this to job for built in pod ttl or have the running app handle this go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions, urlReadyChan) // TODO convert this to job for built in pod ttl or have the running app handle this
go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions) go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions, urlReadyChan)
go syncApiStatus(ctx, cancel, tappingOptions) go syncApiStatus(ctx, cancel, tappingOptions)
//block until exit signal or error //block until exit signal or error
@@ -231,7 +232,7 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
} }
} }
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) { func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions, urlReadyChan chan string) {
targetNamespace := getNamespace(tappingOptions, kubernetesProvider) targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), podRegex) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), podRegex)
@@ -256,16 +257,20 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
} }
} }
restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers) restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers)
timer := time.AfterFunc(time.Second*10, func() {
mizu.Log.Debugf("Waiting for URL...")
mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan)
})
for { for {
select { select {
case newTarget := <-added: case newTarget := <-added:
mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", newTarget.Name)) mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", newTarget.Name))
timer.Reset(time.Second * 2)
case removedTarget := <-removed: case removedTarget := <-removed:
mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedTarget.Name)) mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedTarget.Name))
timer.Reset(time.Second * 2)
restartTappersDebouncer.SetOn() restartTappersDebouncer.SetOn()
case modifiedTarget := <-modified: case modifiedTarget := <-modified:
// Act only if the modified pod has already obtained an IP address. // Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events: // After filtering for IPs, on a normal pod restart this includes the following events:
@@ -287,7 +292,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
} }
} }
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, tappingOptions *MizuTapOptions) { func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, tappingOptions *MizuTapOptions, urlReadyChan chan string) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName)) podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex)
isPodReady := false isPodReady := false
@@ -309,15 +314,32 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
go func() { go func() {
err := kubernetes.StartProxy(kubernetesProvider, tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.ApiServerPodName) err := kubernetes.StartProxy(kubernetesProvider, tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.ApiServerPodName)
if err != nil { if err != nil {
mizu.Log.Infof("Error occured while running k8s proxy %v", err) mizu.Log.Infof("Error occurred while running k8s proxy %v", err)
cancel() cancel()
} }
}() }()
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(tappingOptions.GuiPort) }
mizu.Log.Infof("Mizu is available at http://%s", mizuProxiedUrl) urlReadyChan <- kubernetes.GetMizuApiServerProxiedHostAndPath(tappingOptions.GuiPort)
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
if tappingOptions.Analysis { requestForAnalysis(tappingOptions)
case <-timeAfter:
if !isPodReady {
mizu.Log.Errorf("error: %s pod was not ready in time", mizu.ApiServerPodName)
cancel()
}
case <-errorChan:
cancel()
}
}
}
func requestForAnalysis(tappingOptions *MizuTapOptions) {
if !tappingOptions.Analysis {
return
}
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(tappingOptions.GuiPort)
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", mizuProxiedUrl, url.QueryEscape(tappingOptions.AnalysisDestination), tappingOptions.SleepIntervalSec) urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", mizuProxiedUrl, url.QueryEscape(tappingOptions.AnalysisDestination), tappingOptions.SleepIntervalSec)
u, err := url.ParseRequestURI(urlPath) u, err := url.ParseRequestURI(urlPath)
@@ -330,19 +352,6 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
} else { } else {
mizu.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis") mizu.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
} }
}
}
case <-timeAfter:
if !isPodReady {
mizu.Log.Infof("error: %s pod was not ready in time", mizu.ApiServerPodName)
cancel()
}
case <-errorChan:
cancel()
}
}
} }
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool { func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
@@ -407,7 +416,6 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
} }
} }
} }
func getNamespace(tappingOptions *MizuTapOptions, kubernetesProvider *kubernetes.Provider) string { func getNamespace(tappingOptions *MizuTapOptions, kubernetesProvider *kubernetes.Provider) string {

View File

@@ -2,6 +2,7 @@ package kubernetes
import ( import (
"fmt" "fmt"
"github.com/up9inc/mizu/cli/mizu"
"k8s.io/kubectl/pkg/proxy" "k8s.io/kubectl/pkg/proxy"
"net" "net"
"net/http" "net/http"
@@ -13,6 +14,7 @@ const k8sProxyApiPrefix = "/"
const mizuServicePort = 80 const mizuServicePort = 80
func StartProxy(kubernetesProvider *Provider, mizuPort uint16, mizuNamespace string, mizuServiceName string) error { func StartProxy(kubernetesProvider *Provider, mizuPort uint16, mizuNamespace string, mizuServiceName string) error {
mizu.Log.Debugf("Starting proxy. namespace: [%v], service name: [%s], port: [%v]", mizuNamespace, mizuServiceName, mizuPort)
filter := &proxy.FilterServer{ filter := &proxy.FilterServer{
AcceptPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathAcceptRE), AcceptPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathAcceptRE),
RejectPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathRejectRE), RejectPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathRejectRE),

View File

@@ -55,7 +55,7 @@ func CheckNewerVersion() {
client := github.NewClient(nil) client := github.NewClient(nil)
latestRelease, _, err := client.Repositories.GetLatestRelease(context.Background(), "up9inc", "mizu") latestRelease, _, err := client.Repositories.GetLatestRelease(context.Background(), "up9inc", "mizu")
if err != nil { if err != nil {
Log.Debugf("Failed to get latest release") Log.Debugf("[ERROR] Failed to get latest release")
return return
} }
@@ -67,20 +67,20 @@ func CheckNewerVersion() {
} }
} }
if versionFileUrl == "" { if versionFileUrl == "" {
Log.Debugf("Version file not found in the latest release") Log.Debugf("[ERROR] Version file not found in the latest release")
return return
} }
res, err := http.Get(versionFileUrl) res, err := http.Get(versionFileUrl)
if err != nil { if err != nil {
Log.Debugf("http.Get version asset -> %v", err) Log.Debugf("[ERROR] Failed to get the version file %v", err)
return return
} }
data, err := ioutil.ReadAll(res.Body) data, err := ioutil.ReadAll(res.Body)
res.Body.Close() res.Body.Close()
if err != nil { if err != nil {
Log.Debugf("ioutil.ReadAll -> %v", err) Log.Debugf("[ERROR] Failed to read the version file -> %v", err)
return return
} }
gitHubVersion := string(data) gitHubVersion := string(data)