diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 2eddcbb51..76011e1dc 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -3,13 +3,14 @@ package cmd import ( "context" "fmt" - "github.com/up9inc/mizu/shared" "os" "os/signal" "regexp" "syscall" "time" + "github.com/up9inc/mizu/shared" + core "k8s.io/api/core/v1" "github.com/up9inc/mizu/cli/debounce" @@ -45,6 +46,22 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { currentlyTappedPods = matchingPods } + var namespacesStr string + if targetNamespace != mizu.K8sAllNamespaces { + namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace) + } else { + namespacesStr = "all namespaces" + } + fmt.Printf("Tapping pods in %s\n", namespacesStr) + + if len(currentlyTappedPods) == 0 { + var suggestionStr string + if targetNamespace != mizu.K8sAllNamespaces { + suggestionStr = "\nSelect a different namespace with -n or tap all namespaces with -A" + } + fmt.Printf("Did not find any pods matching the regex argument%s\n", suggestionStr) + } + nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods) if err != nil { return @@ -67,7 +84,7 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro return err } - if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { + if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { return err } @@ -111,20 +128,27 @@ func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.Traffic return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil } -func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { - if err := kubernetesProvider.ApplyMizuTapperDaemonSet( - ctx, - mizu.ResourcesNamespace, - mizu.TapperDaemonSetName, - tappingOptions.MizuImage, - mizu.TapperPodName, - fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace), - nodeToTappedPodIPMap, - mizuServiceAccountExists, - tappingOptions.TapOutgoing, - ); err != nil { - fmt.Printf("Error creating mizu tapper daemonset: %v\n", err) - return err +func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { + if len(nodeToTappedPodIPMap) > 0 { + if err := kubernetesProvider.ApplyMizuTapperDaemonSet( + ctx, + mizu.ResourcesNamespace, + mizu.TapperDaemonSetName, + tappingOptions.MizuImage, + mizu.TapperPodName, + fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace), + nodeToTappedPodIPMap, + mizuServiceAccountExists, + tappingOptions.TapOutgoing, + ); err != nil { + fmt.Printf("Error creating mizu tapper daemonset: %v\n", err) + return err + } + } else { + if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil { + fmt.Printf("Error deleting mizu tapper daemonset: %v\n", err) + return err + } } return nil @@ -164,7 +188,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro cancel() } - if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { + if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { fmt.Printf("Error updating daemonset: %s (%v,%+v)\n", err, err, err) cancel() } diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index e416e940c..0bf40c1b8 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -102,7 +102,6 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace }, DNSPolicy: core.DNSClusterFirstWithHostNet, TerminationGracePeriodSeconds: new(int64), - // Affinity: TODO: define node selector for all relevant nodes for this mizu instance }, } //define the service account only when it exists to prevent pod crash @@ -215,18 +214,94 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string, } func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error { + if isFound, err := provider.CheckPodExists(ctx, namespace, podName); + err != nil { + return err + } else if !isFound { + return nil + } + return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{}) } func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error { + if isFound, err := provider.CheckServiceExists(ctx, namespace, serviceName); + err != nil { + return err + } else if !isFound { + return nil + } + return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{}) } func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error { + if isFound, err := provider.CheckDaemonSetExists(ctx, namespace, daemonSetName); + err != nil { + return err + } else if !isFound { + return nil + } + return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{}) } +func (provider *Provider) CheckPodExists(ctx context.Context, namespace string, name string) (bool, error) { + listOptions := metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", name), + Limit: 1, + } + resourceList, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, listOptions) + if err != nil { + return false, err + } + + if len(resourceList.Items) > 0 { + return true, nil + } + + return false, nil +} + +func (provider *Provider) CheckServiceExists(ctx context.Context, namespace string, name string) (bool, error) { + listOptions := metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", name), + Limit: 1, + } + resourceList, err := provider.clientSet.CoreV1().Services(namespace).List(ctx, listOptions) + if err != nil { + return false, err + } + + if len(resourceList.Items) > 0 { + return true, nil + } + + return false, nil +} + +func (provider *Provider) CheckDaemonSetExists(ctx context.Context, namespace string, name string) (bool, error) { + listOptions := metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", name), + Limit: 1, + } + resourceList, err := provider.clientSet.AppsV1().DaemonSets(namespace).List(ctx, listOptions) + if err != nil { + return false, err + } + + if len(resourceList.Items) > 0 { + return true, nil + } + + return false, nil +} + func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool, tapOutgoing bool) error { + if len(nodeToTappedPodIPMap) == 0 { + return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName) + } + nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap) if err != nil { return err