From 4429de93b673d36858e7a46b90ff7acd8dee85be Mon Sep 17 00:00:00 2001 From: RamiBerm Date: Mon, 10 May 2021 15:44:38 +0300 Subject: [PATCH] Update go.mod, main.go, and 4 more files... --- api/go.mod | 29 ++++++++++++ api/pkg/inserter/main.go | 2 +- cli/Makefile | 4 +- cli/cmd/version.go | 5 +- cli/kubernetes/provider.go | 94 +++++++++++++++++++++++++++++++++++--- cli/mizu/mizuRunner.go | 30 ++++++++++-- 6 files changed, 146 insertions(+), 18 deletions(-) diff --git a/api/go.mod b/api/go.mod index 42e3eca49..3069f5302 100644 --- a/api/go.mod +++ b/api/go.mod @@ -3,8 +3,37 @@ module mizuserver go 1.16 require ( + cloud.google.com/go v0.54.0 // indirect + cloud.google.com/go/bigquery v1.4.0 // indirect + cloud.google.com/go/datastore v1.1.0 // indirect + cloud.google.com/go/pubsub v1.2.0 // indirect + cloud.google.com/go/storage v1.6.0 // indirect + github.com/Azure/go-autorest v14.2.0+incompatible // indirect + github.com/Azure/go-autorest/autorest v0.11.12 // indirect + github.com/Azure/go-autorest/autorest/adal v0.9.5 // indirect + github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect + github.com/Azure/go-autorest/autorest/mocks v0.4.1 // indirect + github.com/Azure/go-autorest/logger v0.2.0 // indirect + github.com/Azure/go-autorest/tracing v0.6.0 // indirect + github.com/BurntSushi/toml v0.3.1 // indirect + github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802 // indirect + github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46 // indirect + github.com/PuerkitoBio/purell v1.1.1 // indirect + github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect + github.com/andybalholm/brotli v1.0.1 // indirect github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a + github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect + github.com/aws/aws-sdk-go v1.34.28 // indirect + github.com/census-instrumentation/opencensus-proto v0.2.1 // indirect + github.com/chzyer/logex v1.1.10 // indirect + github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect + github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 // indirect + github.com/client9/misspell v0.3.4 // indirect + github.com/creack/pty v1.1.9 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/djherbis/atime v1.0.0 + github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 // indirect + github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 // indirect github.com/fasthttp/websocket v1.4.3-beta.1 // indirect github.com/go-playground/locales v0.13.0 github.com/go-playground/universal-translator v0.17.0 diff --git a/api/pkg/inserter/main.go b/api/pkg/inserter/main.go index 63985b5c7..cd274a905 100644 --- a/api/pkg/inserter/main.go +++ b/api/pkg/inserter/main.go @@ -23,7 +23,7 @@ var k8sResolver *resolver.Resolver func init() { errOut := make(chan error, 100) - res, err := resolver.NewFromOutOfCluster("/home/rami/.kube/config", errOut) + res, err := resolver.NewFromInCluster(errOut) if err != nil { fmt.Printf("error creating k8s resolver %s", err) return diff --git a/cli/Makefile b/cli/Makefile index 56b2a633e..867f053f2 100644 --- a/cli/Makefile +++ b/cli/Makefile @@ -1,5 +1,5 @@ install: - go build install mizu.go + go install mizu.go build: go build -o bin/mizu mizu.go @@ -16,6 +16,6 @@ build-cr: @#GOOS=linux GOARCH=arm64 go build -o bin/mizu-linux-arm64 mizu.go @#GOOS=windows GOARCH=arm64 go build -o bin/mizu-windows-arm64 mizu.go -clean: +clean: #go clean rm -f ./bin/* diff --git a/cli/cmd/version.go b/cli/cmd/version.go index f292153dc..dac008d27 100644 --- a/cli/cmd/version.go +++ b/cli/cmd/version.go @@ -2,17 +2,16 @@ package cmd import ( "fmt" + "github.com/up9inc/mizu/cli/mizu" "github.com/spf13/cobra" ) -var Version = "0.1.0" - var versionCmd = &cobra.Command{ Use: "version", Short: "Print version info", RunE: func(cmd *cobra.Command, args []string) error { - fmt.Printf("mizu version %s\n", Version) + fmt.Printf("mizu version %s\n", mizu.Version) return nil }, } diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index d20a9c40c..f5db2c0f4 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -3,10 +3,11 @@ package kubernetes import ( _ "bytes" "context" + "errors" "fmt" - "strings" - core "k8s.io/api/core/v1" + rbac "k8s.io/api/rbac/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -19,6 +20,7 @@ import ( _ "k8s.io/client-go/tools/portforward" "k8s.io/client-go/util/homedir" "path/filepath" + "strings" ) type Provider struct { @@ -28,6 +30,11 @@ type Provider struct { Namespace string } +const ( + serviceAccountName = "mizu-service-account" + MizuResourcesNamespace = "default" +) + func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider { kubernetesConfig := loadKubernetesConfiguration(kubeConfigPath) restClientConfig, err := kubernetesConfig.ClientConfig() @@ -55,8 +62,8 @@ func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider { } } -func (provider *Provider) GetPodWatcher(ctx context.Context) watch.Interface { - watcher, err := provider.clientSet.CoreV1().Pods(provider.Namespace).Watch(ctx, metav1.ListOptions{Watch: true}) +func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) watch.Interface { + watcher, err := provider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true}) if err != nil { panic(err.Error()) } @@ -87,7 +94,7 @@ func (provider *Provider) CreateMizuPod(ctx context.Context, podName string, pod pod := &core.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, - Namespace: provider.Namespace, + Namespace: MizuResourcesNamespace, }, Spec: core.PodSpec{ HostNetwork: true, // very important to make passive tapper see traffic @@ -111,15 +118,88 @@ func (provider *Provider) CreateMizuPod(ctx context.Context, podName string, pod }, }, }, + ServiceAccountName: serviceAccountName, TerminationGracePeriodSeconds: new(int64), NodeSelector: map[string]string{"kubernetes.io/hostname": tappedPod.Spec.NodeName}, }, } - return provider.clientSet.CoreV1().Pods(provider.Namespace).Create(ctx, pod, metav1.CreateOptions{}) + return provider.clientSet.CoreV1().Pods(MizuResourcesNamespace).Create(ctx, pod, metav1.CreateOptions{}) +} + +func (provider *Provider) DoesMizuRBACExist(ctx context.Context) (bool, error){ + serviceAccount, err := provider.clientSet.CoreV1().ServiceAccounts(MizuResourcesNamespace).Get(ctx, serviceAccountName, metav1.GetOptions{}) + + var statusError *k8serrors.StatusError + if errors.As(err, &statusError) { + // expected behavior when resource does not exist + if statusError.ErrStatus.Reason == metav1.StatusReasonNotFound { + return false, nil + } + } + if err != nil { + return false, err + } + return serviceAccount != nil, nil +} + +func (provider *Provider) CreateMizuRBAC(ctx context.Context, version string) error { + clusterRoleName := "mizu-cluster-role" + + serviceAccount := &core.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceAccountName, + Namespace: MizuResourcesNamespace, + Labels: map[string]string{"mizu-cli-version": version}, + }, + } + clusterRole := &rbac.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterRoleName, + Labels: map[string]string{"mizu-cli-version": version}, + }, + Rules: []rbac.PolicyRule{ + { + APIGroups: []string {"", "extensions", "apps"}, + Resources: []string {"pods", "services", "endpoints"}, + Verbs: []string {"list", "get", "watch"}, + }, + }, + } + clusterRoleBinding := &rbac.ClusterRoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mizu-cluster-role-binding", + Labels: map[string]string{"mizu-cli-version": version}, + }, + RoleRef: rbac.RoleRef{ + Name: clusterRoleName, + Kind: "ClusterRole", + APIGroup: "rbac.authorization.k8s.io", + }, + Subjects: []rbac.Subject{ + { + Kind: "ServiceAccount", + Name: serviceAccountName, + Namespace: MizuResourcesNamespace, + }, + }, + } + _, err := provider.clientSet.CoreV1().ServiceAccounts(MizuResourcesNamespace).Create(ctx, serviceAccount, metav1.CreateOptions{}) + if err != nil { + return err + } + _, err = provider.clientSet.RbacV1().ClusterRoles().Create(ctx, clusterRole, metav1.CreateOptions{}) + if err != nil { + return err + } + _, err = provider.clientSet.RbacV1().ClusterRoleBindings().Create(ctx, clusterRoleBinding, metav1.CreateOptions{}) + if err != nil { + return err + } + return nil } func (provider *Provider) RemovePod(ctx context.Context, podName string) { - provider.clientSet.CoreV1().Pods(provider.Namespace).Delete(ctx, podName, metav1.DeleteOptions{}) + provider.clientSet.CoreV1().Pods(MizuResourcesNamespace).Delete(ctx, podName, metav1.DeleteOptions{}) } func getClientSet(config *restclient.Config) *kubernetes.Clientset { diff --git a/cli/mizu/mizuRunner.go b/cli/mizu/mizuRunner.go index 254563f13..2407ae743 100644 --- a/cli/mizu/mizuRunner.go +++ b/cli/mizu/mizuRunner.go @@ -12,6 +12,8 @@ import ( "time" ) +const Version = "0.1.0" + func Run(tappedPodName string) { kubernetesProvider := kubernetes.NewProvider(config.Configuration.KubeConfigPath, config.Configuration.Namespace) ctx, cancel := context.WithCancel(context.Background()) @@ -19,7 +21,8 @@ func Run(tappedPodName string) { podName := "mizu-collector" - go createPodAndPortForward(ctx, kubernetesProvider, cancel, podName, tappedPodName) //TODO convert this to job for built in pod ttl or have the running app handle this + createRBACIfNecessary(ctx, kubernetesProvider, cancel) + go createPodAndPortForward(ctx, kubernetesProvider, cancel, podName, kubernetes.MizuResourcesNamespace, tappedPodName) //TODO convert this to job for built in pod ttl or have the running app handle this waitForFinish(ctx, cancel) //block until exit signal or error // TODO handle incoming traffic from tapper using a channel @@ -31,7 +34,7 @@ func Run(tappedPodName string) { } func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp) { - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx), podRegex) + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex) for { select { case newTarget := <- added: @@ -52,7 +55,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro } } -func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podName string, tappedPodName string) { +func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podName string, namespace string, tappedPodName string) { pod, err := kubernetesProvider.CreateMizuPod(ctx, podName, config.Configuration.MizuImage, tappedPodName) if err != nil { fmt.Printf("error creating pod %s", err) @@ -60,7 +63,7 @@ func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes return } podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", pod.Name)) - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx), podExactRegex) + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, namespace), podExactRegex) isPodReady := false var portForward *kubernetes.PortForward for { @@ -75,7 +78,7 @@ func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes if modifiedPod.Status.Phase == "Running" && !isPodReady { isPodReady = true var err error - portForward, err = kubernetes.NewPortForward(kubernetesProvider, kubernetesProvider.Namespace, podName, config.Configuration.GuiPort, config.Configuration.MizuPodPort, cancel) + portForward, err = kubernetes.NewPortForward(kubernetesProvider, namespace, podName, config.Configuration.GuiPort, config.Configuration.MizuPodPort, cancel) fmt.Printf("Web interface is now available at http://localhost:%d\n", config.Configuration.GuiPort) if err != nil { fmt.Printf("error forwarding port to pod %s\n", err) @@ -101,6 +104,23 @@ func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes } } +func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { + mizuRBACExists, err := kubernetesProvider.DoesMizuRBACExist(ctx) + if err != nil { + fmt.Printf("error checking rbac %v", err) + cancel() + return + } + if !mizuRBACExists { + err := kubernetesProvider.CreateMizuRBAC(ctx, Version) + if err != nil { + fmt.Printf("error creating rbac %v", err) + cancel() + return + } + } +} + func waitForFinish(ctx context.Context, cancel context.CancelFunc) { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)