Update go.mod, main.go, and 4 more files...

This commit is contained in:
RamiBerm 2021-05-10 15:44:38 +03:00
parent c4d44ba9d3
commit 4429de93b6
6 changed files with 146 additions and 18 deletions

View File

@ -3,8 +3,37 @@ module mizuserver
go 1.16
require (
cloud.google.com/go v0.54.0 // indirect
cloud.google.com/go/bigquery v1.4.0 // indirect
cloud.google.com/go/datastore v1.1.0 // indirect
cloud.google.com/go/pubsub v1.2.0 // indirect
cloud.google.com/go/storage v1.6.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.12 // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.5 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/mocks v0.4.1 // indirect
github.com/Azure/go-autorest/logger v0.2.0 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802 // indirect
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/andybalholm/brotli v1.0.1 // indirect
github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
github.com/aws/aws-sdk-go v1.34.28 // indirect
github.com/census-instrumentation/opencensus-proto v0.2.1 // indirect
github.com/chzyer/logex v1.1.10 // indirect
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 // indirect
github.com/client9/misspell v0.3.4 // indirect
github.com/creack/pty v1.1.9 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/djherbis/atime v1.0.0
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 // indirect
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 // indirect
github.com/fasthttp/websocket v1.4.3-beta.1 // indirect
github.com/go-playground/locales v0.13.0
github.com/go-playground/universal-translator v0.17.0

View File

@ -23,7 +23,7 @@ var k8sResolver *resolver.Resolver
func init() {
errOut := make(chan error, 100)
res, err := resolver.NewFromOutOfCluster("/home/rami/.kube/config", errOut)
res, err := resolver.NewFromInCluster(errOut)
if err != nil {
fmt.Printf("error creating k8s resolver %s", err)
return

View File

@ -1,5 +1,5 @@
install:
go build install mizu.go
go install mizu.go
build:
go build -o bin/mizu mizu.go
@ -16,6 +16,6 @@ build-cr:
@#GOOS=linux GOARCH=arm64 go build -o bin/mizu-linux-arm64 mizu.go
@#GOOS=windows GOARCH=arm64 go build -o bin/mizu-windows-arm64 mizu.go
clean:
clean:
#go clean
rm -f ./bin/*

View File

@ -2,17 +2,16 @@ package cmd
import (
"fmt"
"github.com/up9inc/mizu/cli/mizu"
"github.com/spf13/cobra"
)
var Version = "0.1.0"
var versionCmd = &cobra.Command{
Use: "version",
Short: "Print version info",
RunE: func(cmd *cobra.Command, args []string) error {
fmt.Printf("mizu version %s\n", Version)
fmt.Printf("mizu version %s\n", mizu.Version)
return nil
},
}

View File

@ -3,10 +3,11 @@ package kubernetes
import (
_ "bytes"
"context"
"errors"
"fmt"
"strings"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
@ -19,6 +20,7 @@ import (
_ "k8s.io/client-go/tools/portforward"
"k8s.io/client-go/util/homedir"
"path/filepath"
"strings"
)
type Provider struct {
@ -28,6 +30,11 @@ type Provider struct {
Namespace string
}
const (
serviceAccountName = "mizu-service-account"
MizuResourcesNamespace = "default"
)
func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider {
kubernetesConfig := loadKubernetesConfiguration(kubeConfigPath)
restClientConfig, err := kubernetesConfig.ClientConfig()
@ -55,8 +62,8 @@ func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider {
}
}
func (provider *Provider) GetPodWatcher(ctx context.Context) watch.Interface {
watcher, err := provider.clientSet.CoreV1().Pods(provider.Namespace).Watch(ctx, metav1.ListOptions{Watch: true})
func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) watch.Interface {
watcher, err := provider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
panic(err.Error())
}
@ -87,7 +94,7 @@ func (provider *Provider) CreateMizuPod(ctx context.Context, podName string, pod
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: provider.Namespace,
Namespace: MizuResourcesNamespace,
},
Spec: core.PodSpec{
HostNetwork: true, // very important to make passive tapper see traffic
@ -111,15 +118,88 @@ func (provider *Provider) CreateMizuPod(ctx context.Context, podName string, pod
},
},
},
ServiceAccountName: serviceAccountName,
TerminationGracePeriodSeconds: new(int64),
NodeSelector: map[string]string{"kubernetes.io/hostname": tappedPod.Spec.NodeName},
},
}
return provider.clientSet.CoreV1().Pods(provider.Namespace).Create(ctx, pod, metav1.CreateOptions{})
return provider.clientSet.CoreV1().Pods(MizuResourcesNamespace).Create(ctx, pod, metav1.CreateOptions{})
}
func (provider *Provider) DoesMizuRBACExist(ctx context.Context) (bool, error){
serviceAccount, err := provider.clientSet.CoreV1().ServiceAccounts(MizuResourcesNamespace).Get(ctx, serviceAccountName, metav1.GetOptions{})
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) {
// expected behavior when resource does not exist
if statusError.ErrStatus.Reason == metav1.StatusReasonNotFound {
return false, nil
}
}
if err != nil {
return false, err
}
return serviceAccount != nil, nil
}
func (provider *Provider) CreateMizuRBAC(ctx context.Context, version string) error {
clusterRoleName := "mizu-cluster-role"
serviceAccount := &core.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: serviceAccountName,
Namespace: MizuResourcesNamespace,
Labels: map[string]string{"mizu-cli-version": version},
},
}
clusterRole := &rbac.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: clusterRoleName,
Labels: map[string]string{"mizu-cli-version": version},
},
Rules: []rbac.PolicyRule{
{
APIGroups: []string {"", "extensions", "apps"},
Resources: []string {"pods", "services", "endpoints"},
Verbs: []string {"list", "get", "watch"},
},
},
}
clusterRoleBinding := &rbac.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "mizu-cluster-role-binding",
Labels: map[string]string{"mizu-cli-version": version},
},
RoleRef: rbac.RoleRef{
Name: clusterRoleName,
Kind: "ClusterRole",
APIGroup: "rbac.authorization.k8s.io",
},
Subjects: []rbac.Subject{
{
Kind: "ServiceAccount",
Name: serviceAccountName,
Namespace: MizuResourcesNamespace,
},
},
}
_, err := provider.clientSet.CoreV1().ServiceAccounts(MizuResourcesNamespace).Create(ctx, serviceAccount, metav1.CreateOptions{})
if err != nil {
return err
}
_, err = provider.clientSet.RbacV1().ClusterRoles().Create(ctx, clusterRole, metav1.CreateOptions{})
if err != nil {
return err
}
_, err = provider.clientSet.RbacV1().ClusterRoleBindings().Create(ctx, clusterRoleBinding, metav1.CreateOptions{})
if err != nil {
return err
}
return nil
}
func (provider *Provider) RemovePod(ctx context.Context, podName string) {
provider.clientSet.CoreV1().Pods(provider.Namespace).Delete(ctx, podName, metav1.DeleteOptions{})
provider.clientSet.CoreV1().Pods(MizuResourcesNamespace).Delete(ctx, podName, metav1.DeleteOptions{})
}
func getClientSet(config *restclient.Config) *kubernetes.Clientset {

View File

@ -12,6 +12,8 @@ import (
"time"
)
const Version = "0.1.0"
func Run(tappedPodName string) {
kubernetesProvider := kubernetes.NewProvider(config.Configuration.KubeConfigPath, config.Configuration.Namespace)
ctx, cancel := context.WithCancel(context.Background())
@ -19,7 +21,8 @@ func Run(tappedPodName string) {
podName := "mizu-collector"
go createPodAndPortForward(ctx, kubernetesProvider, cancel, podName, tappedPodName) //TODO convert this to job for built in pod ttl or have the running app handle this
createRBACIfNecessary(ctx, kubernetesProvider, cancel)
go createPodAndPortForward(ctx, kubernetesProvider, cancel, podName, kubernetes.MizuResourcesNamespace, tappedPodName) //TODO convert this to job for built in pod ttl or have the running app handle this
waitForFinish(ctx, cancel) //block until exit signal or error
// TODO handle incoming traffic from tapper using a channel
@ -31,7 +34,7 @@ func Run(tappedPodName string) {
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp) {
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx), podRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex)
for {
select {
case newTarget := <- added:
@ -52,7 +55,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
}
func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podName string, tappedPodName string) {
func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podName string, namespace string, tappedPodName string) {
pod, err := kubernetesProvider.CreateMizuPod(ctx, podName, config.Configuration.MizuImage, tappedPodName)
if err != nil {
fmt.Printf("error creating pod %s", err)
@ -60,7 +63,7 @@ func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes
return
}
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", pod.Name))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx), podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, namespace), podExactRegex)
isPodReady := false
var portForward *kubernetes.PortForward
for {
@ -75,7 +78,7 @@ func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes
if modifiedPod.Status.Phase == "Running" && !isPodReady {
isPodReady = true
var err error
portForward, err = kubernetes.NewPortForward(kubernetesProvider, kubernetesProvider.Namespace, podName, config.Configuration.GuiPort, config.Configuration.MizuPodPort, cancel)
portForward, err = kubernetes.NewPortForward(kubernetesProvider, namespace, podName, config.Configuration.GuiPort, config.Configuration.MizuPodPort, cancel)
fmt.Printf("Web interface is now available at http://localhost:%d\n", config.Configuration.GuiPort)
if err != nil {
fmt.Printf("error forwarding port to pod %s\n", err)
@ -101,6 +104,23 @@ func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes
}
}
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
mizuRBACExists, err := kubernetesProvider.DoesMizuRBACExist(ctx)
if err != nil {
fmt.Printf("error checking rbac %v", err)
cancel()
return
}
if !mizuRBACExists {
err := kubernetesProvider.CreateMizuRBAC(ctx, Version)
if err != nil {
fmt.Printf("error creating rbac %v", err)
cancel()
return
}
}
}
func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)