Feature/tra 3474 config refactor (#155)

This commit is contained in:
RoyUP9
2021-08-03 17:23:52 +03:00
committed by GitHub
parent f9396e01ca
commit 69a9deab4b
17 changed files with 393 additions and 329 deletions

View File

@@ -11,7 +11,6 @@ import (
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/clientcmd"
"log"
"net/http"
"net/url"
"os"
@@ -31,13 +30,13 @@ const (
var currentlyTappedPods []core.Pod
func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
mizuApiFilteringOptions, err := getMizuApiFilteringOptions(tappingOptions)
func RunMizuTap() {
mizuApiFilteringOptions, err := getMizuApiFilteringOptions()
if err != nil {
return
}
kubernetesProvider, err := kubernetes.NewProvider(tappingOptions.KubeConfigPath)
kubernetesProvider, err := kubernetes.NewProvider(mizu.Config.Tap.KubeConfigPath)
if err != nil {
if clientcmd.IsEmptyConfig(err) {
mizu.Log.Infof(uiUtils.Red, "Couldn't find the kube config file, or file is empty. Try adding '--kube-config=<path to kube config file>'\n")
@@ -53,17 +52,21 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, podRegexQuery, targetNamespace); err != nil {
targetNamespace := getNamespace(kubernetesProvider)
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
mizu.Log.Infof("Error listing pods: %v", err)
return
}
if mizu.Config.Tap.DryRun {
return
}
urlReadyChan := make(chan string)
go func() {
mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan)
}()
var namespacesStr string
if targetNamespace != mizu.K8sAllNamespaces {
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
@@ -85,29 +88,29 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
return
}
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions, mizuApiFilteringOptions); err != nil {
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
return
}
mizu.CheckNewerVersion()
go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions, urlReadyChan) // TODO convert this to job for built in pod ttl or have the running app handle this
go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions)
go syncApiStatus(ctx, cancel, tappingOptions)
go portForwardApiPod(ctx, kubernetesProvider, cancel, urlReadyChan) // TODO convert this to job for built in pod ttl or have the running app handle this
go watchPodsForTapping(ctx, kubernetesProvider, cancel)
go syncApiStatus(ctx, cancel)
//block until exit signal or error
waitForFinish(ctx, cancel)
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
return err
}
if err := createMizuApiServer(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil {
if err := createMizuApiServer(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
return err
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
return err
}
@@ -123,7 +126,7 @@ func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
var err error
mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider)
@@ -133,7 +136,7 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
} else {
serviceAccountName = ""
}
_, err = kubernetesProvider.CreateMizuApiServerPod(ctx, mizu.ResourcesNamespace, mizu.ApiServerPodName, tappingOptions.MizuImage, serviceAccountName, mizuApiFilteringOptions, tappingOptions.MaxEntriesDBSizeBytes)
_, err = kubernetesProvider.CreateMizuApiServerPod(ctx, mizu.ResourcesNamespace, mizu.ApiServerPodName, mizu.Config.MizuImage, serviceAccountName, mizuApiFilteringOptions, mizu.Config.Tap.MaxEntriesDBSizeBytes())
if err != nil {
mizu.Log.Infof("Error creating mizu %s pod: %v", mizu.ApiServerPodName, err)
return err
@@ -148,12 +151,12 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
return nil
}
func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.TrafficFilteringOptions, error) {
func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
var compiledRegexSlice []*shared.SerializableRegexp
if tappingOptions.PlainTextFilterRegexes != nil && len(tappingOptions.PlainTextFilterRegexes) > 0 {
if mizu.Config.Tap.PlainTextFilterRegexes != nil && len(mizu.Config.Tap.PlainTextFilterRegexes) > 0 {
compiledRegexSlice = make([]*shared.SerializableRegexp, 0)
for _, regexStr := range tappingOptions.PlainTextFilterRegexes {
for _, regexStr := range mizu.Config.Tap.PlainTextFilterRegexes {
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
if err != nil {
mizu.Log.Infof("Regex %s is invalid: %v", regexStr, err)
@@ -163,10 +166,10 @@ func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.Traffic
}
}
return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice, HideHealthChecks: tappingOptions.HideHealthChecks, DisableRedaction: tappingOptions.DisableRedaction}, nil
return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice, HideHealthChecks: mizu.Config.Tap.HideHealthChecks, DisableRedaction: mizu.Config.Tap.DisableRedaction}, nil
}
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string) error {
if len(nodeToTappedPodIPMap) > 0 {
var serviceAccountName string
if mizuServiceAccountExists {
@@ -179,12 +182,12 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
ctx,
mizu.ResourcesNamespace,
mizu.TapperDaemonSetName,
tappingOptions.MizuImage,
mizu.Config.MizuImage,
mizu.TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", apiServerService.Name, apiServerService.Namespace),
nodeToTappedPodIPMap,
serviceAccountName,
tappingOptions.TapOutgoing,
mizu.Config.Tap.TapOutgoing(),
); err != nil {
mizu.Log.Infof("Error creating mizu tapper daemonset: %v", err)
return err
@@ -234,13 +237,12 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
}
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) {
targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), podRegex)
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
targetNamespace := getNamespace(kubernetesProvider)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), mizu.Config.Tap.PodRegex())
restartTappers := func() {
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, podRegex, targetNamespace); err != nil {
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
cancel()
}
@@ -251,7 +253,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
cancel()
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
mizu.Log.Infof("Error updating daemonset: %s (%v,%+v)", err, err, err)
cancel()
}
@@ -284,8 +286,8 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
}
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, podRegex *regexp.Regexp, targetNamespace string) error {
if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, podRegex, targetNamespace); err != nil {
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) error {
if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespace); err != nil {
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
return err
} else {
@@ -298,6 +300,7 @@ func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx cont
}
currentlyTappedPods = matchingPods
}
return nil
}
@@ -326,16 +329,16 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
return missingPods
}
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, tappingOptions *MizuTapOptions, urlReadyChan chan string) {
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, urlReadyChan chan string) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-added:
continue
case <-removed:
@@ -346,43 +349,45 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
if modifiedPod.Status.Phase == "Running" && !isPodReady {
isPodReady = true
go func() {
err := kubernetes.StartProxy(kubernetesProvider, tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.ApiServerPodName)
err := kubernetes.StartProxy(kubernetesProvider, mizu.Config.Tap.GuiPort, mizu.ResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
mizu.Log.Infof("Error occurred while running k8s proxy %v", err)
cancel()
}
}()
}
urlReadyChan <- kubernetes.GetMizuApiServerProxiedHostAndPath(tappingOptions.GuiPort)
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
requestForAnalysis(tappingOptions)
urlReadyChan <- kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
requestForAnalysis()
case <-timeAfter:
if !isPodReady {
mizu.Log.Errorf("error: %s pod was not ready in time", mizu.ApiServerPodName)
cancel()
}
case <-errorChan:
cancel()
}
}
}
func requestForAnalysis(tappingOptions *MizuTapOptions) {
if !tappingOptions.Analysis {
func requestForAnalysis() {
if !mizu.Config.Tap.Analysis {
return
}
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(tappingOptions.GuiPort)
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", mizuProxiedUrl, url.QueryEscape(tappingOptions.AnalysisDestination), tappingOptions.SleepIntervalSec)
u, err := url.ParseRequestURI(urlPath)
if err != nil {
log.Fatal(fmt.Sprintf("Failed parsing the URL %v\n", err))
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", mizuProxiedUrl, url.QueryEscape(mizu.Config.Tap.AnalysisDestination), mizu.Config.Tap.SleepIntervalSec)
u, parseErr := url.ParseRequestURI(urlPath)
if parseErr != nil {
mizu.Log.Fatal("Failed parsing the URL (consider changing the analysis dest URL), err: %v", parseErr)
}
mizu.Log.Debugf("Sending get request to %v", u.String())
if response, err := http.Get(u.String()); err != nil || response.StatusCode != 200 {
mizu.Log.Infof("error sending upload entries req, status code: %v, err: %v", response.StatusCode, err)
if response, requestErr := http.Get(u.String()); requestErr != nil {
mizu.Log.Errorf("Failed to notify agent for analysis, err: %v", requestErr)
} else if response.StatusCode != 200 {
mizu.Log.Errorf("Failed to notify agent for analysis, status code: %v", response.StatusCode)
} else {
mizu.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
}
@@ -430,8 +435,8 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
}
}
func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOptions *MizuTapOptions) {
controlSocketStr := fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuApiServerProxiedHostAndPath(tappingOptions.GuiPort))
func syncApiStatus(ctx context.Context, cancel context.CancelFunc) {
controlSocketStr := fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort))
controlSocket, err := mizu.CreateControlSocket(controlSocketStr)
if err != nil {
mizu.Log.Infof("error establishing control socket connection %s", err)
@@ -452,11 +457,11 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption
}
}
func getNamespace(tappingOptions *MizuTapOptions, kubernetesProvider *kubernetes.Provider) string {
if tappingOptions.AllNamespaces {
func getNamespace(kubernetesProvider *kubernetes.Provider) string {
if mizu.Config.Tap.AllNamespaces {
return mizu.K8sAllNamespaces
} else if len(tappingOptions.Namespace) > 0 {
return tappingOptions.Namespace
} else if len(mizu.Config.Tap.Namespace) > 0 {
return mizu.Config.Tap.Namespace
} else {
return kubernetesProvider.CurrentNamespace()
}