From 4afd3ec9ac60e34048912e7797c9b38aefb560af Mon Sep 17 00:00:00 2001 From: up9-github Date: Wed, 21 Apr 2021 15:21:12 +0300 Subject: [PATCH] WIP --- cli/cmd/root.go | 32 ++++++++----- cli/go.sum | 1 + cli/kubernetes/portForward.go | 48 +++++++++++++++++++ cli/kubernetes/provider.go | 74 ++++++++++++++++++++++------- cli/kubernetes/watch.go | 88 +++++++---------------------------- 5 files changed, 143 insertions(+), 100 deletions(-) create mode 100644 cli/kubernetes/portForward.go diff --git a/cli/cmd/root.go b/cli/cmd/root.go index 78642c146..c6789f89f 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -7,7 +7,18 @@ import ( ) // rootCmd represents the base command when called without any subcommands -var rootCmd = &cobra.Command{} +var ( + rootCmd = &cobra.Command{} + + // TODO: bundle these up into a single config object, consider using viper for this + DisplayVersion bool + Quiet bool + NoDashboard bool + DashboardPort uint16 + Namespace string + AllNamespaces bool + KubeConfigPath string +) func init() { rootCmd.Use = "cmd pod-query" rootCmd.Short = "Tail HTTP traffic from multiple pods" @@ -15,10 +26,19 @@ func init() { if len(args) != 1 { return rootCmd.Help() } + regex := regexp.MustCompile(args[0]) // MustCompile panics if expression cant be compiled into regex mizu.Run(regex) return nil } + + rootCmd.Flags().BoolVarP(&DisplayVersion, "version", "v", false, "Print the version and exit") + rootCmd.Flags().BoolVarP(&Quiet, "quiet", "q", false, "No stdout output") + rootCmd.Flags().BoolVarP(&NoDashboard, "no-dashboard", "", false, "Dont host a dashboard") + rootCmd.Flags().Uint16VarP(&DashboardPort, "dashboard-port", "p", 3000, "Provide a custom port for the dashboard webserver") + rootCmd.Flags().StringVarP(&Namespace, "namespace", "n", "", "Namespace selector") + rootCmd.Flags().BoolVarP(&AllNamespaces, "all-namespaces", "A", false, "Select all namespaces") + rootCmd.Flags().StringVarP(&KubeConfigPath, "kubeconfig", "k", "", "Path to kubeconfig file") } // Execute adds all child commands to the root command and sets flags appropriately. @@ -26,13 +46,3 @@ func init() { func Execute() { cobra.CheckErr(rootCmd.Execute()) } - -func init() { - rootCmd.Flags().BoolP("version", "v", false, "Print the version and exit") - rootCmd.Flags().BoolP("quiet", "q", false, "No stdout output") - rootCmd.Flags().BoolP("no-dashboard", "", false, "Dont host a dashboard") - rootCmd.Flags().Uint16P("dashboard-port", "p", 3000, "Provide a custom port for the dashboard webserver") - rootCmd.Flags().StringP("namespace", "n", "", "Namespace selector") - rootCmd.Flags().BoolP("all-namespaces", "A", false, "Select all namespaces") - rootCmd.Flags().StringP("kubeconfig", "k", "", "Path to kubeconfig file") -} diff --git a/cli/go.sum b/cli/go.sum index e303faaee..3136194bc 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -209,6 +209,7 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= diff --git a/cli/kubernetes/portForward.go b/cli/kubernetes/portForward.go new file mode 100644 index 000000000..8ed796d4b --- /dev/null +++ b/cli/kubernetes/portForward.go @@ -0,0 +1,48 @@ +package kubernetes + +import ( + "bytes" + "fmt" + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" + "net/http" + "net/url" + "strings" +) + +type PortForward struct { + stopChan chan struct{} +} + +func NewPortForward(kubernetesProvider *Provider, namespace string, podName string, localPort uint16, podPort uint16) (*PortForward, error) { + dialer := getHttpDialer(kubernetesProvider, namespace, podName) + stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1) + out, errOut := new(bytes.Buffer), new(bytes.Buffer) + + forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", localPort, podPort)}, stopChan, readyChan, out, errOut) + if err != nil { + return nil, err + } + err = forwarder.ForwardPorts() + if err != nil { + return nil, err + } + return &PortForward{stopChan: stopChan}, nil +} + +func (portForward *PortForward) Stop() { + close(portForward.stopChan) +} + +func getHttpDialer(kubernetesProvider *Provider, namespace string, podName string) httpstream.Dialer { + roundTripper, upgrader, err := spdy.RoundTripperFor(&kubernetesProvider.clientConfig) + if err != nil { + panic(err) + } + path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName) + hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/") + serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP} + + return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL) +} diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index c1865c0a4..5de4c020b 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -1,58 +1,98 @@ package kubernetes import ( + _ "bytes" "context" "fmt" + core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + _ "k8s.io/client-go/tools/portforward" "k8s.io/client-go/util/homedir" "path/filepath" ) -type kubernetesProvider struct { - clientSet *kubernetes.Clientset +type Provider struct { + clientSet *kubernetes.Clientset kubernetesConfig clientcmd.ClientConfig + clientConfig restclient.Config + Namespace string } -func New(kubeConfigPath string) *kubernetesProvider { +func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider { kubernetesConfig := loadKubernetesConfiguration(kubeConfigPath) restClientConfig, err := kubernetesConfig.ClientConfig() if err != nil { panic(err.Error()) } - clientSet := getClientAndConfig(restClientConfig) - return &kubernetesProvider{clientSet: clientSet, kubernetesConfig: kubernetesConfig} + clientSet := getClientSet(restClientConfig) + + var namespace string + if len(overrideNamespace) > 0 { + namespace = overrideNamespace + } else { + configuredNamespace, _, err := kubernetesConfig.Namespace() + if err != nil { + panic(err) + } + namespace = configuredNamespace + } + + return &Provider{ + clientSet: clientSet, + kubernetesConfig: kubernetesConfig, + clientConfig: *restClientConfig, + Namespace: namespace, + } } -func (provider *kubernetesProvider) GetPodWatcher(ctx context.Context) watch.Interface { - watcher, err := provider.clientSet.CoreV1().Pods(provider.getCurrentNamespace()).Watch(ctx, metav1.ListOptions{Watch: true}) +func (provider *Provider) GetPodWatcher(ctx context.Context) watch.Interface { + watcher, err := provider.clientSet.CoreV1().Pods(provider.Namespace).Watch(ctx, metav1.ListOptions{Watch: true}) if err != nil { panic(err.Error()) } return watcher } -func (provider *kubernetesProvider) GetPods() { - namespace := provider.getCurrentNamespace() - pods, err := provider.clientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{}) +func (provider *Provider) GetPods(ctx context.Context) { + pods, err := provider.clientSet.CoreV1().Pods(provider.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { panic(err.Error()) } - fmt.Printf("There are %d pods in namespace %s\n", len(pods.Items), namespace) + fmt.Printf("There are %d pods in Namespace %s\n", len(pods.Items), provider.Namespace) } -func (provider *kubernetesProvider) getCurrentNamespace() string { - namespace, _, err := provider.kubernetesConfig.Namespace() - if err != nil { - panic(err.Error()) +func (provider *Provider) CreatePod(ctx context.Context, podName string, podImage string) (*core.Pod, error) { + pod := &core.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: provider.Namespace, + }, + Spec: core.PodSpec{ + Containers: []core.Container{ + { + Name: podName, + Image: podImage, + ImagePullPolicy: core.PullAlways, + }, + }, + TerminationGracePeriodSeconds: new(int64), + }, } - return namespace + return provider.clientSet.CoreV1().Pods(provider.Namespace).Create(ctx, pod, metav1.CreateOptions{}) } -func getClientAndConfig(config *restclient.Config) *kubernetes.Clientset { +func (provider *Provider) RemovePod(ctx context.Context, podName string) { + err := provider.clientSet.CoreV1().Pods(provider.Namespace).Delete(ctx, podName, metav1.DeleteOptions{}) + if err != nil { + panic(err) + } +} + +func getClientSet(config *restclient.Config) *kubernetes.Clientset { clientSet, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) diff --git a/cli/kubernetes/watch.go b/cli/kubernetes/watch.go index 73476afb8..b70107804 100644 --- a/cli/kubernetes/watch.go +++ b/cli/kubernetes/watch.go @@ -1,55 +1,29 @@ -// **Copied and modified from https://github.com/wercker/stern/blob/4fa46dd6987fca563d3ab42e61099658f4cade93/stern/watch.go** -// Copyright 2016 Wercker Holding BV -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package kubernetes import ( "context" - "fmt" + "errors" "regexp" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/watch" ) -// Target is a target to watch -type Target struct { - Namespace string - Pod string - Container string -} - -// GetID returns the ID of the object -func (t *Target) GetID() string { - return fmt.Sprintf("%s-%s-%s", t.Namespace, t.Pod, t.Container) -} - // FilteredWatch starts listening to Kubernetes events and emits modified // containers/pods. The first result is targets added, the second is targets // removed -func FilteredWatch(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp, containerFilter *regexp.Regexp, containerExcludeFilter *regexp.Regexp) (chan *Target, chan *Target) { - added := make(chan *Target) - removed := make(chan *Target) +func FilteredWatch(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) { + addedChan := make(chan *corev1.Pod) + modifiedChan := make(chan *corev1.Pod) + removedChan := make(chan *corev1.Pod) + errorChan := make(chan error) go func() { for { select { case e := <-watcher.ResultChan(): if e.Object == nil { - // Closed because of error - return + errorChan <- errors.New("kubernetes pod watch failed") } pod := e.Object.(*corev1.Pod) @@ -60,52 +34,22 @@ func FilteredWatch(ctx context.Context, watcher watch.Interface, podFilter *rege switch e.Type { case watch.Added: - var statuses []corev1.ContainerStatus - statuses = append(statuses, pod.Status.InitContainerStatuses...) - statuses = append(statuses, pod.Status.ContainerStatuses...) - - added <- &Target{ - Namespace: pod.Namespace, - Pod: pod.Name, - Container: "", - } - - //for _, c := range statuses { - // if !containerFilter.MatchString(c.Name) { - // continue - // } - // if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) { - // continue - // } - //} + addedChan <- pod + case watch.Modified: + modifiedChan <- pod case watch.Deleted: - var containers []corev1.Container - containers = append(containers, pod.Spec.Containers...) - containers = append(containers, pod.Spec.InitContainers...) - - for _, c := range containers { - //if !containerFilter.MatchString(c.Name) { - // continue - //} - //if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) { - // continue - //} - - removed <- &Target{ - Namespace: pod.Namespace, - Pod: pod.Name, - Container: c.Name, - } - } + removedChan <- pod } case <-ctx.Done(): watcher.Stop() - close(added) - close(removed) + close(addedChan) + close(modifiedChan) + close(removedChan) + close(errorChan) return } } }() - return added, removed + return addedChan, modifiedChan, removedChan, errorChan }