From 9e34662511bb99f75b36d05f017c83f3d521d3db Mon Sep 17 00:00:00 2001 From: Igor Gov Date: Wed, 4 Aug 2021 08:18:07 +0300 Subject: [PATCH] Adding logs and fixing several issues (#162) * Config grooming and several general fixes --- agent/main.go | 2 +- cli/cmd/config.go | 22 ++++--- cli/cmd/tapRunner.go | 131 +++++++++++++++++++------------------ cli/kubernetes/provider.go | 5 +- cli/mizu/config.go | 32 ++++----- cli/mizu/consts.go | 13 ++++ cli/mizu/logger.go | 6 +- cli/mizu/telemetry.go | 5 +- 8 files changed, 121 insertions(+), 95 deletions(-) diff --git a/agent/main.go b/agent/main.go index b44978917..445d856c2 100644 --- a/agent/main.go +++ b/agent/main.go @@ -85,7 +85,7 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) { app := gin.Default() app.GET("/echo", func(c *gin.Context) { - c.String(http.StatusOK, "Hello, World 👋!") + c.String(http.StatusOK, "Here is Mizu agent") }) eventHandlers := api.RoutesEventHandlers{ diff --git a/cli/cmd/config.go b/cli/cmd/config.go index 3949924e6..66d4485d8 100644 --- a/cli/cmd/config.go +++ b/cli/cmd/config.go @@ -8,17 +8,24 @@ import ( "io/ioutil" ) -var outputFileName string +var regenerateFile bool var configCmd = &cobra.Command{ Use: "config", - Short: "Generate example config file to stdout", + Short: "Generate config with default values", RunE: func(cmd *cobra.Command, args []string) error { - template := mizu.GetTemplateConfig() - if outputFileName != "" { + template, err := mizu.GetConfigWithDefaults() + if err != nil { + mizu.Log.Errorf("Failed generating config with defaults %v", err) + return nil + } + if regenerateFile { data := []byte(template) - _ = ioutil.WriteFile(outputFileName, data, 0644) - mizu.Log.Infof(fmt.Sprintf("Template File written to %s", fmt.Sprintf(uiUtils.Purple, outputFileName))) + if err := ioutil.WriteFile(mizu.GetConfigFilePath(), data, 0644); err != nil { + mizu.Log.Errorf("Failed writing config %v", err) + return nil + } + mizu.Log.Infof(fmt.Sprintf("Template File written to %s", fmt.Sprintf(uiUtils.Purple, mizu.GetConfigFilePath()))) } else { mizu.Log.Debugf("Writing template config.\n%v", template) fmt.Printf("%v", template) @@ -29,6 +36,5 @@ var configCmd = &cobra.Command{ func init() { rootCmd.AddCommand(configCmd) - - configCmd.Flags().StringVarP(&outputFileName, "file", "f", "", "Save content to local file") + configCmd.Flags().BoolVarP(®enerateFile, "regenerate", "r", false, fmt.Sprintf("Regenerate the config file with default values %s", mizu.GetConfigFilePath())) } diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index d2b11b304..36ddedd1c 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -53,28 +53,20 @@ func RunMizuTap() { defer cancel() // cancel will be called when this function exits targetNamespace := getNamespace(kubernetesProvider) - if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil { - mizu.Log.Infof("Error listing pods: %v", err) - return - } - - if mizu.Config.Tap.DryRun { - return - } - - urlReadyChan := make(chan string) - go func() { - mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan) - }() - var namespacesStr string if targetNamespace != mizu.K8sAllNamespaces { namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace) } else { namespacesStr = "all namespaces" } + mizu.CheckNewerVersion() mizu.Log.Infof("Tapping pods in %s", namespacesStr) + if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil { + mizu.Log.Infof("Error listing pods: %v", err) + return + } + if len(currentlyTappedPods) == 0 { var suggestionStr string if targetNamespace != mizu.K8sAllNamespaces { @@ -83,6 +75,10 @@ func RunMizuTap() { mizu.Log.Infof("Did not find any pods matching the regex argument%s", suggestionStr) } + if mizu.Config.Tap.DryRun { + return + } + nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods) if err != nil { return @@ -92,10 +88,8 @@ func RunMizuTap() { return } - mizu.CheckNewerVersion() - go portForwardApiPod(ctx, kubernetesProvider, cancel, urlReadyChan) // TODO convert this to job for built in pod ttl or have the running app handle this + go createProxyToApiServerPod(ctx, kubernetesProvider, cancel) go watchPodsForTapping(ctx, kubernetesProvider, cancel) - go syncApiStatus(ctx, cancel) //block until exit signal or error waitForFinish(ctx, cancel) @@ -121,9 +115,10 @@ func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Pro _, err := kubernetesProvider.CreateNamespace(ctx, mizu.ResourcesNamespace) if err != nil { mizu.Log.Infof("Error creating Namespace %s: %v", mizu.ResourcesNamespace, err) + return err } - - return err + mizu.Log.Debugf("Successfully creating Namespace %s", mizu.ResourcesNamespace) + return nil } func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error { @@ -141,17 +136,20 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro mizu.Log.Infof("Error creating mizu %s pod: %v", mizu.ApiServerPodName, err) return err } + mizu.Log.Debugf("Successfully created API server pod: %s", mizu.ApiServerPodName) apiServerService, err = kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.ApiServerPodName, mizu.ApiServerPodName) if err != nil { mizu.Log.Infof("Error creating mizu %s service: %v", mizu.ApiServerPodName, err) return err } + mizu.Log.Debugf("Successfully created service: %s", mizu.ApiServerPodName) return nil } func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) { + var compiledRegexSlice []*shared.SerializableRegexp if mizu.Config.Tap.PlainTextFilterRegexes != nil && len(mizu.Config.Tap.PlainTextFilterRegexes) > 0 { @@ -192,9 +190,10 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi mizu.Log.Infof("Error creating mizu tapper daemonset: %v", err) return err } + mizu.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap)) } else { if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil { - mizu.Log.Infof("Error deleting mizu tapper daemonset: %v", err) + mizu.Log.Errorf("Error deleting mizu tapper daemonset: %v", err) return err } } @@ -241,20 +240,42 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro targetNamespace := getNamespace(kubernetesProvider) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), mizu.Config.Tap.PodRegex()) + controlSocketStr := fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)) + controlSocket, err := mizu.CreateControlSocket(controlSocketStr) + if err != nil { + mizu.Log.Infof("error establishing control socket connection %s", err) + cancel() + } + mizu.Log.Debugf("Control socket created %s", controlSocketStr) + err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods) + if err != nil { + mizu.Log.Debugf("error Sending message via control socket %v, error: %s", controlSocketStr, err) + } restartTappers := func() { - if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil { - mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err) + err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace) + if err != nil { + mizu.Log.Errorf("Error getting pods by regex: %s (%v,%+v)", err, err, err) cancel() } + if !changeFound { + mizu.Log.Debugf("Nothing changed update tappers not needed") + return + } + + err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods) + if err != nil { + mizu.Log.Debugf("error Sending message via control socket %v, error: %s", controlSocketStr, err) + } + nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods) if err != nil { - mizu.Log.Infof("Error building node to ips map: %s (%v,%+v)", err, err, err) + mizu.Log.Errorf("Error building node to ips map: %s (%v,%+v)", err, err, err) cancel() } if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil { - mizu.Log.Infof("Error updating daemonset: %s (%v,%+v)", err, err, err) + mizu.Log.Errorf("Error updating daemonset: %s (%v,%+v)", err, err, err) cancel() } } @@ -262,17 +283,21 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro for { select { - case <-added: - case <-removed: + case pod := <-added: + mizu.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace) restartTappersDebouncer.SetOn() - case modifiedTarget := <-modified: + case pod := <-removed: + mizu.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace) + restartTappersDebouncer.SetOn() + case pod := <-modified: + mizu.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP) // Act only if the modified pod has already obtained an IP address. // After filtering for IPs, on a normal pod restart this includes the following events: // - Pod deletion // - Pod reaches start state // - Pod reaches ready state // Ready/unready transitions might also trigger this event. - if modifiedTarget.Status.PodIP != "" { + if pod.Status.PodIP != "" { restartTappersDebouncer.SetOn() } @@ -286,22 +311,25 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro } } -func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) error { +func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) (error, bool) { + changeFound := false if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespace); err != nil { mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err) - return err + return err, false } else { addedPods, removedPods := getPodArrayDiff(currentlyTappedPods, matchingPods) for _, addedPod := range addedPods { + changeFound = true mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name)) } for _, removedPod := range removedPods { + changeFound = true mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name)) } currentlyTappedPods = matchingPods } - return nil + return nil, changeFound } func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) { @@ -329,43 +357,44 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod { return missingPods } -func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, urlReadyChan chan string) { +func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName)) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex) isPodReady := false timeAfter := time.After(25 * time.Second) - for { select { case <-ctx.Done(): return case <-added: + mizu.Log.Debugf("Got agent pod added event") continue case <-removed: mizu.Log.Infof("%s removed", mizu.ApiServerPodName) cancel() return case modifiedPod := <-modified: - if modifiedPod.Status.Phase == "Running" && !isPodReady { + mizu.Log.Debugf("Got agent pod modified event, status phase: %v", modifiedPod.Status.Phase) + if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { isPodReady = true go func() { err := kubernetes.StartProxy(kubernetesProvider, mizu.Config.Tap.GuiPort, mizu.ResourcesNamespace, mizu.ApiServerPodName) if err != nil { - mizu.Log.Infof("Error occurred while running k8s proxy %v", err) + mizu.Log.Errorf("Error occurred while running k8s proxy %v", err) cancel() } }() + mizu.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)) + time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready + requestForAnalysis() } - - urlReadyChan <- kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort) - time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready - requestForAnalysis() case <-timeAfter: if !isPodReady { - mizu.Log.Errorf("error: %s pod was not ready in time", mizu.ApiServerPodName) + mizu.Log.Errorf("Error: %s pod was not ready in time", mizu.ApiServerPodName) cancel() } case <-errorChan: + mizu.Log.Debugf("[ERROR] Agent creation, watching %v namespace", mizu.ResourcesNamespace) cancel() } } @@ -435,28 +464,6 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) { } } -func syncApiStatus(ctx context.Context, cancel context.CancelFunc) { - controlSocketStr := fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)) - controlSocket, err := mizu.CreateControlSocket(controlSocketStr) - if err != nil { - mizu.Log.Infof("error establishing control socket connection %s", err) - cancel() - } - - for { - select { - case <-ctx.Done(): - return - default: - err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods) - if err != nil { - mizu.Log.Debugf("error Sending message via control socket %v, error: %s", controlSocketStr, err) - } - time.Sleep(10 * time.Second) - } - } -} - func getNamespace(kubernetesProvider *kubernetes.Provider) string { if mizu.Config.Tap.AllNamespaces { return mizu.K8sAllNamespaces diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 3ef165fb7..bf4cdc0c9 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -473,6 +473,8 @@ func (provider *Provider) CheckDaemonSetExists(ctx context.Context, namespace st } func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, tapOutgoing bool) error { + mizu.Log.Debugf("Applying %d tapper deamonsets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName) + if len(nodeToTappedPodIPMap) == 0 { return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName) } @@ -493,12 +495,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac mizuCmd = append(mizuCmd, "--anydirection") } - privileged := true agentContainer := applyconfcore.Container() agentContainer.WithName(tapperPodName) agentContainer.WithImage(podImage) agentContainer.WithImagePullPolicy(core.PullAlways) - agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged)) + agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(true)) agentContainer.WithCommand(mizuCmd...) agentContainer.WithEnv( applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"), diff --git a/cli/mizu/config.go b/cli/mizu/config.go index 1809abb0c..33293812d 100644 --- a/cli/mizu/config.go +++ b/cli/mizu/config.go @@ -17,7 +17,7 @@ import ( ) const ( - Separator = "=" + Separator = "=" SetCommandName = "set" ) @@ -29,33 +29,34 @@ func InitConfig(cmd *cobra.Command) error { } if err := mergeConfigFile(); err != nil { - Log.Infof(uiUtils.Red, "Invalid config file") - return err + Log.Errorf("Could not load config file, error %v", err) + Log.Fatalf("You can regenerate the file using `mizu config -r` or just remove it %v", GetConfigFilePath()) } cmd.Flags().Visit(initFlag) finalConfigPrettified, _ := uiUtils.PrettyJson(Config) - Log.Debugf("Merged all config successfully\n Final config: %v", finalConfigPrettified) + Log.Debugf("Init config finished\n Final config: %v", finalConfigPrettified) return nil } -func GetTemplateConfig() string { - prettifiedConfig, _ := uiUtils.PrettyYaml(Config) - return prettifiedConfig +func GetConfigWithDefaults() (string, error) { + defaultConf := ConfigStruct{} + if err := defaults.Set(&defaultConf); err != nil { + return "", err + } + return uiUtils.PrettyYaml(defaultConf) +} + +func GetConfigFilePath() string { + return path.Join(getMizuFolderPath(), "config.yaml") } func mergeConfigFile() error { - Log.Debugf("Merging config file values") - home, homeDirErr := os.UserHomeDir() - if homeDirErr != nil { - return homeDirErr - } - - reader, openErr := os.Open(path.Join(home, ".mizu", "config.yaml")) + reader, openErr := os.Open(GetConfigFilePath()) if openErr != nil { - return openErr + return nil } buf, readErr := ioutil.ReadAll(reader) @@ -66,6 +67,7 @@ func mergeConfigFile() error { if err := yaml.Unmarshal(buf, &Config); err != nil { return err } + Log.Debugf("Found config file, merged to default options") return nil } diff --git a/cli/mizu/consts.go b/cli/mizu/consts.go index 0f966ea9b..1924f8c5c 100644 --- a/cli/mizu/consts.go +++ b/cli/mizu/consts.go @@ -1,5 +1,10 @@ package mizu +import ( + "os" + "path" +) + var ( SemVer = "0.0.1" Branch = "develop" @@ -18,3 +23,11 @@ const ( TapperDaemonSetName = "mizu-tapper-daemon-set" TapperPodName = "mizu-tapper" ) + +func getMizuFolderPath() string { + home, homeDirErr := os.UserHomeDir() + if homeDirErr != nil { + return "" + } + return path.Join(home, ".mizu") +} diff --git a/cli/mizu/logger.go b/cli/mizu/logger.go index a708b4879..20ca87856 100644 --- a/cli/mizu/logger.go +++ b/cli/mizu/logger.go @@ -14,10 +14,9 @@ var format = logging.MustStringFormatter( ) func InitLogger() { - homeDirPath, _ := os.UserHomeDir() - mizuDirPath := path.Join(homeDirPath, ".mizu") + mizuDirPath := getMizuFolderPath() if err := os.MkdirAll(mizuDirPath, os.ModePerm); err != nil { - panic(fmt.Sprintf("Failed creating .mizu dir: %v, err %v", mizuDirPath, err)) + panic(fmt.Sprintf("Failed creating mizu dir: %v, err %v", mizuDirPath, err)) } logPath := path.Join(mizuDirPath, "log.log") f, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) @@ -35,5 +34,6 @@ func InitLogger() { logging.SetBackend(backend1Leveled, backend2Formatter) + Log.Debugf("\n\n\n") Log.Debugf("Running mizu version %v", SemVer) } diff --git a/cli/mizu/telemetry.go b/cli/mizu/telemetry.go index a8924f972..e36956cd9 100644 --- a/cli/mizu/telemetry.go +++ b/cli/mizu/telemetry.go @@ -15,10 +15,6 @@ func ReportRun(cmd string, args interface{}) { return } - if Branch != "main" { - Log.Debugf("reporting only on main branch") - return - } argsBytes, _ := json.Marshal(args) argsMap := map[string]string{ "telemetry_type": "execution", @@ -26,6 +22,7 @@ func ReportRun(cmd string, args interface{}) { "args": string(argsBytes), "component": "mizu_cli", "BuildTimestamp": BuildTimestamp, + "Branch": Branch, "version": SemVer} argsMap["message"] = fmt.Sprintf("mizu %v - %v", argsMap["cmd"], string(argsBytes))