Fix crash when there are no pods matching the regex (#85)

* Removed done todo.

* Error when trying to apply tapper-daemonset with 0 pods in affinity.

* Reorder imports.

* Create/update mizu tappers if there are tapped pods. Delete if there are no tapped pods.

* Skip deletion if tapper daemonset is not installed.

* Renamed createMizuTappers -> updateMizuTappers.

* Renamed IsDaemonSetApplied -> CheckDaemonSetExists.

* Skip deletion if pod / service is not installed.

* Fixed: Inverted logic.

* Rename.

* Fixed compilation bugs.

* Warn if no pods are found. Suggest changing the namespace.

* Use consts.

* Removed empty line.
This commit is contained in:
nimrod-up9 2021-06-27 18:24:14 +03:00 committed by GitHub
parent 6aaee4b519
commit c59aadb221
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 117 additions and 18 deletions

View File

@ -3,13 +3,14 @@ package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/shared"
"os"
"os/signal"
"regexp"
"syscall"
"time"
"github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1"
"github.com/up9inc/mizu/cli/debounce"
@ -45,6 +46,22 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
currentlyTappedPods = matchingPods
}
var namespacesStr string
if targetNamespace != mizu.K8sAllNamespaces {
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
} else {
namespacesStr = "all namespaces"
}
fmt.Printf("Tapping pods in %s\n", namespacesStr)
if len(currentlyTappedPods) == 0 {
var suggestionStr string
if targetNamespace != mizu.K8sAllNamespaces {
suggestionStr = "\nSelect a different namespace with -n or tap all namespaces with -A"
}
fmt.Printf("Did not find any pods matching the regex argument%s\n", suggestionStr)
}
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
if err != nil {
return
@ -67,7 +84,7 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
return err
}
@ -111,20 +128,27 @@ func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.Traffic
return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil
}
func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
ctx,
mizu.ResourcesNamespace,
mizu.TapperDaemonSetName,
tappingOptions.MizuImage,
mizu.TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
nodeToTappedPodIPMap,
mizuServiceAccountExists,
tappingOptions.TapOutgoing,
); err != nil {
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
return err
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if len(nodeToTappedPodIPMap) > 0 {
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
ctx,
mizu.ResourcesNamespace,
mizu.TapperDaemonSetName,
tappingOptions.MizuImage,
mizu.TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
nodeToTappedPodIPMap,
mizuServiceAccountExists,
tappingOptions.TapOutgoing,
); err != nil {
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
return err
}
} else {
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
fmt.Printf("Error deleting mizu tapper daemonset: %v\n", err)
return err
}
}
return nil
@ -164,7 +188,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
cancel()
}
if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
fmt.Printf("Error updating daemonset: %s (%v,%+v)\n", err, err, err)
cancel()
}

View File

@ -102,7 +102,6 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
},
DNSPolicy: core.DNSClusterFirstWithHostNet,
TerminationGracePeriodSeconds: new(int64),
// Affinity: TODO: define node selector for all relevant nodes for this mizu instance
},
}
//define the service account only when it exists to prevent pod crash
@ -215,18 +214,94 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string,
}
func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error {
if isFound, err := provider.CheckPodExists(ctx, namespace, podName);
err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
}
func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error {
if isFound, err := provider.CheckServiceExists(ctx, namespace, serviceName);
err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{})
}
func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error {
if isFound, err := provider.CheckDaemonSetExists(ctx, namespace, daemonSetName);
err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
}
func (provider *Provider) CheckPodExists(ctx context.Context, namespace string, name string) (bool, error) {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
Limit: 1,
}
resourceList, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, listOptions)
if err != nil {
return false, err
}
if len(resourceList.Items) > 0 {
return true, nil
}
return false, nil
}
func (provider *Provider) CheckServiceExists(ctx context.Context, namespace string, name string) (bool, error) {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
Limit: 1,
}
resourceList, err := provider.clientSet.CoreV1().Services(namespace).List(ctx, listOptions)
if err != nil {
return false, err
}
if len(resourceList.Items) > 0 {
return true, nil
}
return false, nil
}
func (provider *Provider) CheckDaemonSetExists(ctx context.Context, namespace string, name string) (bool, error) {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
Limit: 1,
}
resourceList, err := provider.clientSet.AppsV1().DaemonSets(namespace).List(ctx, listOptions)
if err != nil {
return false, err
}
if len(resourceList.Items) > 0 {
return true, nil
}
return false, nil
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool, tapOutgoing bool) error {
if len(nodeToTappedPodIPMap) == 0 {
return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName)
}
nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap)
if err != nil {
return err