Install Mizu in a dedicated namespace (#123)

* Use "mizu" namespace instead of "default". Create and delete as necessary.

* Wait until namespace is deleted.

* Distinguish between timeout and other errors.

* Sorted consts.

* k8s provider gets the names of Mizu serviceaccount, clusterrole and clusterrolebindings from caller.

* Renames.

* Remove non-namespaced mizu resources when finished: clusterrole and clusterrolebindings.

* Don't wait for namespace deletion if it was already deleted.

* When watching pods, check for cancellation before reading from channels.

* Allow user to cancel resource deletion and to cancel the wait.

* Increased cleanup timeout.

* go mod tidy.

* Ignore cli build products.

* Print err.

* Don't delete clusterrole and clusterrolebinding if we do not have permissions.

* Added roles list in README.

* Added clusterrole and clusterrolebindings examples.
This commit is contained in:
nimrod-up9
2021-07-22 14:26:12 +03:00
committed by GitHub
parent e42c4f8648
commit 2996c1a4bc
8 changed files with 402 additions and 40 deletions

View File

@@ -9,6 +9,7 @@ import (
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"log"
"net/http"
"net/url"
@@ -24,6 +25,7 @@ var aggregatorService *core.Service
const (
updateTappersDelay = 5 * time.Second
cleanupTimeout = time.Minute
)
var currentlyTappedPods []core.Pod
@@ -42,6 +44,7 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery, targetNamespace); err != nil {
fmt.Printf("Error listing pods: %v", err)
return
} else {
currentlyTappedPods = matchingPods
@@ -82,6 +85,10 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
return err
}
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil {
return err
}
@@ -93,11 +100,26 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return nil
}
func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Provider) error {
_, err := kubernetesProvider.CreateNamespace(ctx, mizu.ResourcesNamespace)
if err != nil {
fmt.Printf("Error creating Namespace %s: %v\n", mizu.ResourcesNamespace, err)
}
return err
}
func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
var err error
mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider)
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions, tappingOptions.MaxEntriesDBSizeBytes)
var serviceAccountName string
if mizuServiceAccountExists {
serviceAccountName = mizu.ServiceAccountName
} else {
serviceAccountName = ""
}
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, serviceAccountName, mizuApiFilteringOptions, tappingOptions.MaxEntriesDBSizeBytes)
if err != nil {
fmt.Printf("Error creating mizu collector pod: %v\n", err)
return err
@@ -132,6 +154,13 @@ func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.Traffic
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if len(nodeToTappedPodIPMap) > 0 {
var serviceAccountName string
if mizuServiceAccountExists {
serviceAccountName = mizu.ServiceAccountName
} else {
serviceAccountName = ""
}
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
ctx,
mizu.ResourcesNamespace,
@@ -140,7 +169,7 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
mizu.TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
nodeToTappedPodIPMap,
mizuServiceAccountExists,
serviceAccountName,
tappingOptions.TapOutgoing,
); err != nil {
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
@@ -159,15 +188,35 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
fmt.Printf("\nRemoving mizu resources\n")
removalCtx, _ := context.WithTimeout(context.Background(), 5*time.Second)
if err := kubernetesProvider.RemovePod(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil {
fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err)
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
if err := kubernetesProvider.RemoveNamespace(removalCtx, mizu.ResourcesNamespace); err != nil {
fmt.Printf("Error removing Namespace %s: %s (%v,%+v)\n", mizu.ResourcesNamespace, err, err, err)
return
}
if err := kubernetesProvider.RemoveService(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil {
fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err)
if mizuServiceAccountExists {
if err := kubernetesProvider.RemoveNonNamespacedResources(removalCtx, mizu.ClusterRoleName, mizu.ClusterRoleBindingName); err != nil {
fmt.Printf("Error removing non-namespaced resources: %s (%v,%+v)\n", err, err, err)
return
}
}
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", mizu.TapperDaemonSetName, mizu.ResourcesNamespace, err, err, err)
// Call cancel if a terminating signal was received. Allows user to skip the wait.
go func() {
waitForFinish(removalCtx, cancel)
}()
if err := kubernetesProvider.WaitUtilNamespaceDeleted(removalCtx, mizu.ResourcesNamespace); err != nil {
switch {
case removalCtx.Err() == context.Canceled:
// Do nothing. User interrupted the wait.
case err == wait.ErrWaitTimeout:
fmt.Printf("Timeout while removing Namespace %s\n", mizu.ResourcesNamespace)
default:
fmt.Printf("Error while waiting for Namespace %s to be deleted: %s (%v,%+v)\n", mizu.ResourcesNamespace, err, err, err)
}
}
}
@@ -234,6 +283,9 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
timeAfter := time.After(25 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-added:
continue
case <-removed:
@@ -279,21 +331,18 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
case <-errorChan:
cancel()
case <-ctx.Done():
return
}
}
}
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
mizuRBACExists, err := kubernetesProvider.DoesMizuRBACExist(ctx, mizu.ResourcesNamespace)
mizuRBACExists, err := kubernetesProvider.DoesServiceAccountExist(ctx, mizu.ResourcesNamespace, mizu.ServiceAccountName)
if err != nil {
fmt.Printf("warning: could not ensure mizu rbac resources exist %v\n", err)
return false
}
if !mizuRBACExists {
err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.ResourcesNamespace, mizu.RBACVersion)
err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.ResourcesNamespace, mizu.ServiceAccountName, mizu.ClusterRoleName, mizu.ClusterRoleBindingName, mizu.RBACVersion)
if err != nil {
fmt.Printf("warning: could not create mizu rbac resources %v\n", err)
return false