Cli pkg refactor (2) (#200)

This commit is contained in:
Igor Gov
2021-08-11 09:56:03 +03:00
committed by GitHub
parent 56dc6843e0
commit 7b73004e85
29 changed files with 311 additions and 283 deletions

View File

@@ -5,9 +5,12 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/up9inc/mizu/cli/fsUtils"
"github.com/up9inc/mizu/cli/goUtils"
"github.com/up9inc/mizu/cli/mizu/configStructs"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/goUtils"
"github.com/up9inc/mizu/cli/mizu/version"
"net/http"
"net/url"
"os"
@@ -46,21 +49,21 @@ var state tapState
func RunMizuTap() {
mizuApiFilteringOptions, err := getMizuApiFilteringOptions()
if err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error parsing regex-masking: %v", errormessage.FormatError(err)))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error parsing regex-masking: %v", errormessage.FormatError(err)))
return
}
var mizuValidationRules string
if mizu.Config.Tap.EnforcePolicyFile != "" {
mizuValidationRules, err = readValidationRules(mizu.Config.Tap.EnforcePolicyFile)
if config.Config.Tap.EnforcePolicyFile != "" {
mizuValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
if err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
return
}
}
kubernetesProvider, err := kubernetes.NewProvider(mizu.Config.KubeConfigPath)
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath)
if err != nil {
mizu.Log.Error(err)
logger.Log.Error(err)
return
}
@@ -75,11 +78,11 @@ func RunMizuTap() {
} else {
namespacesStr = "all namespaces"
}
mizu.CheckNewerVersion()
mizu.Log.Infof("Tapping pods in %s", namespacesStr)
version.CheckNewerVersion()
logger.Log.Infof("Tapping pods in %s", namespacesStr)
if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error getting pods by regex: %v", errormessage.FormatError(err)))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error getting pods by regex: %v", errormessage.FormatError(err)))
return
}
@@ -88,10 +91,10 @@ func RunMizuTap() {
if !mizu.Contains(targetNamespaces, mizu.K8sAllNamespaces) {
suggestionStr = ". Select a different namespace with -n or tap all namespaces with -A"
}
mizu.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr))
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr))
}
if mizu.Config.Tap.DryRun {
if config.Config.Tap.DryRun {
return
}
@@ -99,7 +102,7 @@ func RunMizuTap() {
defer cleanUpMizuResources(kubernetesProvider)
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
return
}
@@ -120,7 +123,7 @@ func readValidationRules(file string) (string, error) {
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *shared.TrafficFilteringOptions, mizuValidationRules string) error {
if !mizu.Config.IsNsRestrictedMode() {
if !config.Config.IsNsRestrictedMode() {
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
return err
}
@@ -135,7 +138,7 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules); err != nil {
mizu.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
state.doNotRemoveConfigMap = true
} else if mizuValidationRules == "" {
state.doNotRemoveConfigMap = true
@@ -145,12 +148,12 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, data string) error {
err := kubernetesProvider.CreateConfigMap(ctx, mizu.Config.MizuResourcesNamespace, mizu.ConfigMapName, data)
err := kubernetesProvider.CreateConfigMap(ctx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName, data)
return err
}
func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Provider) error {
_, err := kubernetesProvider.CreateNamespace(ctx, mizu.Config.MizuResourcesNamespace)
_, err := kubernetesProvider.CreateNamespace(ctx, config.Config.MizuResourcesNamespace)
return err
}
@@ -159,7 +162,7 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider)
if err != nil {
mizu.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to ensure the resources required for IP resolving. Mizu will not resolve target IPs to names. error: %v", errormessage.FormatError(err)))
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to ensure the resources required for IP resolving. Mizu will not resolve target IPs to names. error: %v", errormessage.FormatError(err)))
}
var serviceAccountName string
@@ -170,25 +173,25 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
opts := &kubernetes.ApiServerOptions{
Namespace: mizu.Config.MizuResourcesNamespace,
Namespace: config.Config.MizuResourcesNamespace,
PodName: mizu.ApiServerPodName,
PodImage: mizu.Config.AgentImage,
PodImage: config.Config.AgentImage,
ServiceAccountName: serviceAccountName,
IsNamespaceRestricted: mizu.Config.IsNsRestrictedMode(),
IsNamespaceRestricted: config.Config.IsNsRestrictedMode(),
MizuApiFilteringOptions: mizuApiFilteringOptions,
MaxEntriesDBSizeBytes: mizu.Config.Tap.MaxEntriesDBSizeBytes(),
MaxEntriesDBSizeBytes: config.Config.Tap.MaxEntriesDBSizeBytes(),
}
_, err = kubernetesProvider.CreateMizuApiServerPod(ctx, opts)
if err != nil {
return err
}
mizu.Log.Debugf("Successfully created API server pod: %s", mizu.ApiServerPodName)
logger.Log.Debugf("Successfully created API server pod: %s", mizu.ApiServerPodName)
state.apiServerService, err = kubernetesProvider.CreateService(ctx, mizu.Config.MizuResourcesNamespace, mizu.ApiServerPodName, mizu.ApiServerPodName)
state.apiServerService, err = kubernetesProvider.CreateService(ctx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName, mizu.ApiServerPodName)
if err != nil {
return err
}
mizu.Log.Debugf("Successfully created service: %s", mizu.ApiServerPodName)
logger.Log.Debugf("Successfully created service: %s", mizu.ApiServerPodName)
return nil
}
@@ -196,9 +199,9 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
var compiledRegexSlice []*shared.SerializableRegexp
if mizu.Config.Tap.PlainTextFilterRegexes != nil && len(mizu.Config.Tap.PlainTextFilterRegexes) > 0 {
if config.Config.Tap.PlainTextFilterRegexes != nil && len(config.Config.Tap.PlainTextFilterRegexes) > 0 {
compiledRegexSlice = make([]*shared.SerializableRegexp, 0)
for _, regexStr := range mizu.Config.Tap.PlainTextFilterRegexes {
for _, regexStr := range config.Config.Tap.PlainTextFilterRegexes {
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
if err != nil {
return nil, err
@@ -209,8 +212,8 @@ func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
return &shared.TrafficFilteringOptions{
PlainTextMaskingRegexes: compiledRegexSlice,
HealthChecksUserAgentHeaders: mizu.Config.Tap.HealthChecksUserAgentHeaders,
DisableRedaction: mizu.Config.Tap.DisableRedaction,
HealthChecksUserAgentHeaders: config.Config.Tap.HealthChecksUserAgentHeaders,
DisableRedaction: config.Config.Tap.DisableRedaction,
}, nil
}
@@ -225,20 +228,20 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
ctx,
mizu.Config.MizuResourcesNamespace,
config.Config.MizuResourcesNamespace,
mizu.TapperDaemonSetName,
mizu.Config.AgentImage,
config.Config.AgentImage,
mizu.TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", state.apiServerService.Name, state.apiServerService.Namespace),
nodeToTappedPodIPMap,
serviceAccountName,
mizu.Config.Tap.TapOutgoing(),
config.Config.Tap.TapOutgoing(),
); err != nil {
return err
}
mizu.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
logger.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
} else {
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
if err := kubernetesProvider.RemoveDaemonSet(ctx, config.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
return err
}
}
@@ -251,65 +254,65 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
if mizu.Config.DumpLogs {
if config.Config.DumpLogs {
mizuDir := mizu.GetMizuFolderPath()
filePath = path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
if err := fsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil {
mizu.Log.Errorf("Failed dump logs %v", err)
logger.Log.Errorf("Failed dump logs %v", err)
}
}
mizu.Log.Infof("\nRemoving mizu resources\n")
logger.Log.Infof("\nRemoving mizu resources\n")
if !mizu.Config.IsNsRestrictedMode() {
if err := kubernetesProvider.RemoveNamespace(removalCtx, mizu.Config.MizuResourcesNamespace); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Namespace %s: %v", mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
if !config.Config.IsNsRestrictedMode() {
if err := kubernetesProvider.RemoveNamespace(removalCtx, config.Config.MizuResourcesNamespace); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Namespace %s: %v", config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
return
}
} else {
if err := kubernetesProvider.RemovePod(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Pod %s in namespace %s: %v", mizu.ApiServerPodName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
if err := kubernetesProvider.RemovePod(removalCtx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Pod %s in namespace %s: %v", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
if err := kubernetesProvider.RemoveService(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service %s in namespace %s: %v", mizu.ApiServerPodName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
if err := kubernetesProvider.RemoveService(removalCtx, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service %s in namespace %s: %v", mizu.ApiServerPodName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing DaemonSet %s in namespace %s: %v", mizu.TapperDaemonSetName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, config.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing DaemonSet %s in namespace %s: %v", mizu.TapperDaemonSetName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
if !state.doNotRemoveConfigMap {
if err := kubernetesProvider.RemoveConfigMap(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.ConfigMapName); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing ConfigMap %s in namespace %s: %v", mizu.ConfigMapName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
if err := kubernetesProvider.RemoveConfigMap(removalCtx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing ConfigMap %s in namespace %s: %v", mizu.ConfigMapName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
}
}
if state.mizuServiceAccountExists {
if !mizu.Config.IsNsRestrictedMode() {
if !config.Config.IsNsRestrictedMode() {
if err := kubernetesProvider.RemoveNonNamespacedResources(removalCtx, mizu.ClusterRoleName, mizu.ClusterRoleBindingName); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing non-namespaced resources: %v", errormessage.FormatError(err)))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing non-namespaced resources: %v", errormessage.FormatError(err)))
return
}
} else {
if err := kubernetesProvider.RemoveServicAccount(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.ServiceAccountName); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service Account %s in namespace %s: %v", mizu.ServiceAccountName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
if err := kubernetesProvider.RemoveServicAccount(removalCtx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service Account %s in namespace %s: %v", mizu.ServiceAccountName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
return
}
if err := kubernetesProvider.RemoveRole(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.RoleName); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Role %s in namespace %s: %v", mizu.RoleName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
if err := kubernetesProvider.RemoveRole(removalCtx, config.Config.MizuResourcesNamespace, mizu.RoleName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Role %s in namespace %s: %v", mizu.RoleName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
if err := kubernetesProvider.RemoveRoleBinding(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.RoleBindingName); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing RoleBinding %s in namespace %s: %v", mizu.RoleBindingName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
if err := kubernetesProvider.RemoveRoleBinding(removalCtx, config.Config.MizuResourcesNamespace, mizu.RoleBindingName); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing RoleBinding %s in namespace %s: %v", mizu.RoleBindingName, config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
}
}
if !mizu.Config.IsNsRestrictedMode() {
if !config.Config.IsNsRestrictedMode() {
waitUntilNamespaceDeleted(removalCtx, cancel, kubernetesProvider)
}
}
@@ -320,20 +323,20 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
waitForFinish(ctx, cancel)
}()
if err := kubernetesProvider.WaitUtilNamespaceDeleted(ctx, mizu.Config.MizuResourcesNamespace); err != nil {
if err := kubernetesProvider.WaitUtilNamespaceDeleted(ctx, config.Config.MizuResourcesNamespace); err != nil {
switch {
case ctx.Err() == context.Canceled:
// Do nothing. User interrupted the wait.
case err == wait.ErrWaitTimeout:
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Timeout while removing Namespace %s", mizu.Config.MizuResourcesNamespace))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Timeout while removing Namespace %s", config.Config.MizuResourcesNamespace))
default:
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error while waiting for Namespace %s to be deleted: %v", mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error while waiting for Namespace %s to be deleted: %v", config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
}
}
}
func reportTappedPods() {
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort)
tappedPodsUrl := fmt.Sprintf("http://%s/status/tappedPods", mizuProxiedUrl)
podInfos := make([]shared.PodInfo, 0)
@@ -343,30 +346,30 @@ func reportTappedPods() {
tapStatus := shared.TapStatus{Pods: podInfos}
if jsonValue, err := json.Marshal(tapStatus); err != nil {
mizu.Log.Debugf("[ERROR] failed Marshal the tapped pods %v", err)
logger.Log.Debugf("[ERROR] failed Marshal the tapped pods %v", err)
} else {
if response, err := http.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
mizu.Log.Debugf("[ERROR] failed sending to API server the tapped pods %v", err)
logger.Log.Debugf("[ERROR] failed sending to API server the tapped pods %v", err)
} else if response.StatusCode != 200 {
mizu.Log.Debugf("[ERROR] failed sending to API server the tapped pods, response status code %v", response.StatusCode)
logger.Log.Debugf("[ERROR] failed sending to API server the tapped pods, response status code %v", response.StatusCode)
} else {
mizu.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
}
}
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, targetNamespaces []string, cancel context.CancelFunc) {
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, targetNamespaces, mizu.Config.Tap.PodRegex())
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, targetNamespaces, config.Config.Tap.PodRegex())
restartTappers := func() {
err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces)
if err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Failed to update currently tapped pods: %v", err))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Failed to update currently tapped pods: %v", err))
cancel()
}
if !changeFound {
mizu.Log.Debugf("Nothing changed update tappers not needed")
logger.Log.Debugf("Nothing changed update tappers not needed")
return
}
@@ -374,11 +377,11 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
if err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err)))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err)))
cancel()
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err)))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err)))
cancel()
}
}
@@ -387,13 +390,13 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
for {
select {
case pod := <-added:
mizu.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod := <-removed:
mizu.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod := <-modified:
mizu.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
@@ -405,13 +408,13 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
case err := <-errorChan:
mizu.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
// TODO: Does this also perform cleanup?
cancel()
case <-ctx.Done():
mizu.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`")
logger.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`")
restartTappersDebouncer.Cancel()
return
}
@@ -420,18 +423,18 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespaces []string) (error, bool) {
changeFound := false
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespaces); err != nil {
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), targetNamespaces); err != nil {
return err, false
} else {
podsToTap := excludeMizuPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(state.currentlyTappedPods, podsToTap)
for _, addedPod := range addedPods {
changeFound = true
mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name))
logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name))
}
for _, removedPod := range removedPods {
changeFound = true
mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name))
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name))
}
state.currentlyTappedPods = podsToTap
}
@@ -479,86 +482,86 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{mizu.Config.MizuResourcesNamespace}, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
select {
case <-ctx.Done():
mizu.Log.Debugf("Watching API Server pod loop, ctx done")
logger.Log.Debugf("Watching API Server pod loop, ctx done")
return
case <-added:
mizu.Log.Debugf("Watching API Server pod loop, added")
logger.Log.Debugf("Watching API Server pod loop, added")
continue
case <-removed:
mizu.Log.Infof("%s removed", mizu.ApiServerPodName)
logger.Log.Infof("%s removed", mizu.ApiServerPodName)
cancel()
return
case modifiedPod := <-modified:
if modifiedPod == nil {
mizu.Log.Debugf("Watching API Server pod loop, modifiedPod with nil")
logger.Log.Debugf("Watching API Server pod loop, modifiedPod with nil")
continue
}
mizu.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
mizu.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort))
logger.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
requestForAnalysis()
reportTappedPods()
}
case <-timeAfter:
if !isPodReady {
mizu.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
cancel()
}
case <-errorChan:
mizu.Log.Debugf("[ERROR] Agent creation, watching %v namespace", mizu.Config.MizuResourcesNamespace)
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
cancel()
}
}
}
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
err := kubernetes.StartProxy(kubernetesProvider, mizu.Config.Tap.GuiPort, mizu.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
cancel()
}
}
func requestForAnalysis() {
if !mizu.Config.Tap.Analysis {
if !config.Config.Tap.Analysis {
return
}
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", mizuProxiedUrl, url.QueryEscape(mizu.Config.Tap.AnalysisDestination), mizu.Config.Tap.SleepIntervalSec)
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort)
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", mizuProxiedUrl, url.QueryEscape(config.Config.Tap.AnalysisDestination), config.Config.Tap.SleepIntervalSec)
u, parseErr := url.ParseRequestURI(urlPath)
if parseErr != nil {
mizu.Log.Fatal("Failed parsing the URL (consider changing the analysis dest URL), err: %v", parseErr)
logger.Log.Fatal("Failed parsing the URL (consider changing the analysis dest URL), err: %v", parseErr)
}
mizu.Log.Debugf("Sending get request to %v", u.String())
logger.Log.Debugf("Sending get request to %v", u.String())
if response, requestErr := http.Get(u.String()); requestErr != nil {
mizu.Log.Errorf("Failed to notify agent for analysis, err: %v", requestErr)
logger.Log.Errorf("Failed to notify agent for analysis, err: %v", requestErr)
} else if response.StatusCode != 200 {
mizu.Log.Errorf("Failed to notify agent for analysis, status code: %v", response.StatusCode)
logger.Log.Errorf("Failed to notify agent for analysis, status code: %v", response.StatusCode)
} else {
mizu.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
logger.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
}
}
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) (bool, error) {
if !mizu.Config.IsNsRestrictedMode() {
err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.Config.MizuResourcesNamespace, mizu.ServiceAccountName, mizu.ClusterRoleName, mizu.ClusterRoleBindingName, mizu.RBACVersion)
if !config.Config.IsNsRestrictedMode() {
err := kubernetesProvider.CreateMizuRBAC(ctx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName, mizu.ClusterRoleName, mizu.ClusterRoleBindingName, mizu.RBACVersion)
if err != nil {
return false, err
}
} else {
err := kubernetesProvider.CreateMizuRBACNamespaceRestricted(ctx, mizu.Config.MizuResourcesNamespace, mizu.ServiceAccountName, mizu.RoleName, mizu.RoleBindingName, mizu.RBACVersion)
err := kubernetesProvider.CreateMizuRBACNamespaceRestricted(ctx, config.Config.MizuResourcesNamespace, mizu.ServiceAccountName, mizu.RoleName, mizu.RoleBindingName, mizu.RBACVersion)
if err != nil {
return false, err
}
@@ -593,10 +596,10 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
}
func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
if mizu.Config.Tap.AllNamespaces {
if config.Config.Tap.AllNamespaces {
return []string{mizu.K8sAllNamespaces}
} else if len(mizu.Config.Tap.Namespaces) > 0 {
return mizu.Config.Tap.Namespaces
} else if len(config.Config.Tap.Namespaces) > 0 {
return config.Config.Tap.Namespaces
} else {
return []string{kubernetesProvider.CurrentNamespace()}
}