diff --git a/agent/main.go b/agent/main.go index 31e421a19..d315a5ffd 100644 --- a/agent/main.go +++ b/agent/main.go @@ -77,7 +77,7 @@ func main() { filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions) - socketConnection, err := utils.ConnectToSocketServer(*apiServerAddress) + socketConnection, _, err := websocket.DefaultDialer.Dial(*apiServerAddress, nil) if err != nil { panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err)) } diff --git a/agent/pkg/utils/socket_client.go b/agent/pkg/utils/socket_client.go deleted file mode 100644 index aa532852a..000000000 --- a/agent/pkg/utils/socket_client.go +++ /dev/null @@ -1,38 +0,0 @@ -package utils - -import ( - "github.com/gorilla/websocket" - "github.com/romana/rlog" - "time" -) - -const ( - DEFAULT_SOCKET_RETRIES = 3 - DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10 -) - -func ConnectToSocketServer(address string) (*websocket.Conn, error) { - var err error - var connection *websocket.Conn - try := 0 - - // Connection to server fails if client pod is up before server. - // Retries solve this issue. - for try < DEFAULT_SOCKET_RETRIES { - rlog.Infof("Trying to connect to websocket: %s, attempt: %v/%v", address, try, DEFAULT_SOCKET_RETRIES) - connection, _, err = websocket.DefaultDialer.Dial(address, nil) - if err != nil { - rlog.Warnf("Failed connecting to websocket: %s, attempt: %v/%v, err: %s, (%v,%+v)", address, try, DEFAULT_SOCKET_RETRIES, err, err, err) - try++ - } else { - break - } - time.Sleep(DEFAULT_SOCKET_RETRY_SLEEP_TIME) - } - - if err != nil { - return nil, err - } - - return connection, nil -} diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 7f71a2d99..3545b4632 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -109,19 +109,17 @@ func RunMizuTap() { return } - nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods) - defer finishMizuExecution(kubernetesProvider) - if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil { + if err := createMizuResources(ctx, kubernetesProvider, mizuApiFilteringOptions, mizuValidationRules); err != nil { logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err))) return } - go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel) - go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel, nodeToTappedPodIPMap) + go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel, mizuApiFilteringOptions) + go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel) go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel, mizuApiFilteringOptions) - //block until exit signal or error + // block until exit signal or error waitForFinish(ctx, cancel) } @@ -134,7 +132,7 @@ func readValidationRules(file string) (string, error) { return string(newContent), nil } -func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error { +func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error { if !config.Config.IsNsRestrictedMode() { if err := createMizuNamespace(ctx, kubernetesProvider); err != nil { return err @@ -145,10 +143,6 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro return err } - if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil { - return err - } - if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules); err != nil { logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err))) } @@ -228,7 +222,9 @@ func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) { }, nil } -func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions) error { +func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error { + nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods) + if len(nodeToTappedPodIPMap) > 0 { var serviceAccountName string if state.mizuServiceAccountExists { @@ -407,12 +403,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro logger.Log.Debugf("[Error] failed update tapped pods %v", err) } - nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods) - if err != nil { - logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err))) - cancel() - } - if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil { + if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil { logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err))) cancel() } @@ -531,7 +522,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod { return missingPods } -func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { +func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, mizuApiFilteringOptions *api.TrafficFilteringOptions) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName)) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex) isPodReady := false @@ -571,6 +562,10 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi cancel() break } + if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil { + logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err))) + cancel() + } logger.Log.Infof("Mizu is available at %s\n", url) openBrowser(url) @@ -579,13 +574,13 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi logger.Log.Debugf("[Error] failed update tapped pods %v", err) } } - case _, ok := <-errorChan: + case err, ok := <-errorChan: if !ok { errorChan = nil continue } - logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace) + logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err) cancel() case <-timeAfter: @@ -600,14 +595,10 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi } } -func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, nodeToTappedPodIPMap map[string][]string) { +func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", mizu.TapperDaemonSetName)) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex) var prevPodPhase core.PodPhase - var appendMetaname bool - if len(nodeToTappedPodIPMap) > 1 { - appendMetaname = true - } for { select { case addedPod, ok := <-added: @@ -616,22 +607,14 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider continue } - if appendMetaname { - logger.Log.Debugf("Tapper is created [%s]", addedPod.ObjectMeta.Name) - } else { - logger.Log.Debugf("Tapper is created") - } + logger.Log.Debugf("Tapper is created [%s]", addedPod.Name) case removedPod, ok := <-removed: if !ok { removed = nil continue } - if appendMetaname { - logger.Log.Debugf("Tapper is removed [%s]", removedPod.ObjectMeta.Name) - } else { - logger.Log.Debugf("Tapper is removed") - } + logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name) case modifiedPod, ok := <-modified: if !ok { modified = nil @@ -639,13 +622,14 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider } if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue { - logger.Log.Infof(uiUtils.Red, "Cannot deploy the tapper. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message) + logger.Log.Infof(uiUtils.Red, "Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message) cancel() break } podStatus := modifiedPod.Status if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase { + logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase))) continue } prevPodPhase = podStatus.Phase @@ -655,22 +639,19 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider if state.Terminated != nil { switch state.Terminated.Reason { case "OOMKilled": - logger.Log.Infof(uiUtils.Red, "Tapper is terminated! OOMKilled. Increase pod resources.") + logger.Log.Infof(uiUtils.Red, "Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name) } - } else { - logger.Log.Debugf("Tapper is %s", strings.ToLower(string(podStatus.Phase))) } - } else { - logger.Log.Debugf("Tapper is %s", strings.ToLower(string(podStatus.Phase))) } - case _, ok := <-errorChan: + logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase))) + case err, ok := <-errorChan: if !ok { errorChan = nil continue } - logger.Log.Errorf("[ERROR] Tapper creation, watching %v namespace", config.Config.MizuResourcesNamespace) + logger.Log.Debugf("[Error] Error in mizu tapper watch, err: %v", err) cancel() case <-ctx.Done():