This commit is contained in:
up9-github 2021-04-21 15:21:12 +03:00
parent 7167923a49
commit 4afd3ec9ac
5 changed files with 143 additions and 100 deletions

View File

@ -7,7 +7,18 @@ import (
)
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{}
var (
rootCmd = &cobra.Command{}
// TODO: bundle these up into a single config object, consider using viper for this
DisplayVersion bool
Quiet bool
NoDashboard bool
DashboardPort uint16
Namespace string
AllNamespaces bool
KubeConfigPath string
)
func init() {
rootCmd.Use = "cmd pod-query"
rootCmd.Short = "Tail HTTP traffic from multiple pods"
@ -15,10 +26,19 @@ func init() {
if len(args) != 1 {
return rootCmd.Help()
}
regex := regexp.MustCompile(args[0]) // MustCompile panics if expression cant be compiled into regex
mizu.Run(regex)
return nil
}
rootCmd.Flags().BoolVarP(&DisplayVersion, "version", "v", false, "Print the version and exit")
rootCmd.Flags().BoolVarP(&Quiet, "quiet", "q", false, "No stdout output")
rootCmd.Flags().BoolVarP(&NoDashboard, "no-dashboard", "", false, "Dont host a dashboard")
rootCmd.Flags().Uint16VarP(&DashboardPort, "dashboard-port", "p", 3000, "Provide a custom port for the dashboard webserver")
rootCmd.Flags().StringVarP(&Namespace, "namespace", "n", "", "Namespace selector")
rootCmd.Flags().BoolVarP(&AllNamespaces, "all-namespaces", "A", false, "Select all namespaces")
rootCmd.Flags().StringVarP(&KubeConfigPath, "kubeconfig", "k", "", "Path to kubeconfig file")
}
// Execute adds all child commands to the root command and sets flags appropriately.
@ -26,13 +46,3 @@ func init() {
func Execute() {
cobra.CheckErr(rootCmd.Execute())
}
func init() {
rootCmd.Flags().BoolP("version", "v", false, "Print the version and exit")
rootCmd.Flags().BoolP("quiet", "q", false, "No stdout output")
rootCmd.Flags().BoolP("no-dashboard", "", false, "Dont host a dashboard")
rootCmd.Flags().Uint16P("dashboard-port", "p", 3000, "Provide a custom port for the dashboard webserver")
rootCmd.Flags().StringP("namespace", "n", "", "Namespace selector")
rootCmd.Flags().BoolP("all-namespaces", "A", false, "Select all namespaces")
rootCmd.Flags().StringP("kubeconfig", "k", "", "Path to kubeconfig file")
}

View File

@ -209,6 +209,7 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8=
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=

View File

@ -0,0 +1,48 @@
package kubernetes
import (
"bytes"
"fmt"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"net/http"
"net/url"
"strings"
)
type PortForward struct {
stopChan chan struct{}
}
func NewPortForward(kubernetesProvider *Provider, namespace string, podName string, localPort uint16, podPort uint16) (*PortForward, error) {
dialer := getHttpDialer(kubernetesProvider, namespace, podName)
stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1)
out, errOut := new(bytes.Buffer), new(bytes.Buffer)
forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", localPort, podPort)}, stopChan, readyChan, out, errOut)
if err != nil {
return nil, err
}
err = forwarder.ForwardPorts()
if err != nil {
return nil, err
}
return &PortForward{stopChan: stopChan}, nil
}
func (portForward *PortForward) Stop() {
close(portForward.stopChan)
}
func getHttpDialer(kubernetesProvider *Provider, namespace string, podName string) httpstream.Dialer {
roundTripper, upgrader, err := spdy.RoundTripperFor(&kubernetesProvider.clientConfig)
if err != nil {
panic(err)
}
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName)
hostIP := strings.TrimLeft(kubernetesProvider.clientConfig.Host, "htps:/")
serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP}
return spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL)
}

View File

@ -1,58 +1,98 @@
package kubernetes
import (
_ "bytes"
"context"
"fmt"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
_ "k8s.io/client-go/tools/portforward"
"k8s.io/client-go/util/homedir"
"path/filepath"
)
type kubernetesProvider struct {
clientSet *kubernetes.Clientset
type Provider struct {
clientSet *kubernetes.Clientset
kubernetesConfig clientcmd.ClientConfig
clientConfig restclient.Config
Namespace string
}
func New(kubeConfigPath string) *kubernetesProvider {
func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider {
kubernetesConfig := loadKubernetesConfiguration(kubeConfigPath)
restClientConfig, err := kubernetesConfig.ClientConfig()
if err != nil {
panic(err.Error())
}
clientSet := getClientAndConfig(restClientConfig)
return &kubernetesProvider{clientSet: clientSet, kubernetesConfig: kubernetesConfig}
clientSet := getClientSet(restClientConfig)
var namespace string
if len(overrideNamespace) > 0 {
namespace = overrideNamespace
} else {
configuredNamespace, _, err := kubernetesConfig.Namespace()
if err != nil {
panic(err)
}
namespace = configuredNamespace
}
return &Provider{
clientSet: clientSet,
kubernetesConfig: kubernetesConfig,
clientConfig: *restClientConfig,
Namespace: namespace,
}
}
func (provider *kubernetesProvider) GetPodWatcher(ctx context.Context) watch.Interface {
watcher, err := provider.clientSet.CoreV1().Pods(provider.getCurrentNamespace()).Watch(ctx, metav1.ListOptions{Watch: true})
func (provider *Provider) GetPodWatcher(ctx context.Context) watch.Interface {
watcher, err := provider.clientSet.CoreV1().Pods(provider.Namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
panic(err.Error())
}
return watcher
}
func (provider *kubernetesProvider) GetPods() {
namespace := provider.getCurrentNamespace()
pods, err := provider.clientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
func (provider *Provider) GetPods(ctx context.Context) {
pods, err := provider.clientSet.CoreV1().Pods(provider.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
fmt.Printf("There are %d pods in namespace %s\n", len(pods.Items), namespace)
fmt.Printf("There are %d pods in Namespace %s\n", len(pods.Items), provider.Namespace)
}
func (provider *kubernetesProvider) getCurrentNamespace() string {
namespace, _, err := provider.kubernetesConfig.Namespace()
if err != nil {
panic(err.Error())
func (provider *Provider) CreatePod(ctx context.Context, podName string, podImage string) (*core.Pod, error) {
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: provider.Namespace,
},
Spec: core.PodSpec{
Containers: []core.Container{
{
Name: podName,
Image: podImage,
ImagePullPolicy: core.PullAlways,
},
},
TerminationGracePeriodSeconds: new(int64),
},
}
return namespace
return provider.clientSet.CoreV1().Pods(provider.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
func getClientAndConfig(config *restclient.Config) *kubernetes.Clientset {
func (provider *Provider) RemovePod(ctx context.Context, podName string) {
err := provider.clientSet.CoreV1().Pods(provider.Namespace).Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil {
panic(err)
}
}
func getClientSet(config *restclient.Config) *kubernetes.Clientset {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())

View File

@ -1,55 +1,29 @@
// **Copied and modified from https://github.com/wercker/stern/blob/4fa46dd6987fca563d3ab42e61099658f4cade93/stern/watch.go**
// Copyright 2016 Wercker Holding BV
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
"fmt"
"errors"
"regexp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
// Target is a target to watch
type Target struct {
Namespace string
Pod string
Container string
}
// GetID returns the ID of the object
func (t *Target) GetID() string {
return fmt.Sprintf("%s-%s-%s", t.Namespace, t.Pod, t.Container)
}
// FilteredWatch starts listening to Kubernetes events and emits modified
// containers/pods. The first result is targets added, the second is targets
// removed
func FilteredWatch(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp, containerFilter *regexp.Regexp, containerExcludeFilter *regexp.Regexp) (chan *Target, chan *Target) {
added := make(chan *Target)
removed := make(chan *Target)
func FilteredWatch(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) {
addedChan := make(chan *corev1.Pod)
modifiedChan := make(chan *corev1.Pod)
removedChan := make(chan *corev1.Pod)
errorChan := make(chan error)
go func() {
for {
select {
case e := <-watcher.ResultChan():
if e.Object == nil {
// Closed because of error
return
errorChan <- errors.New("kubernetes pod watch failed")
}
pod := e.Object.(*corev1.Pod)
@ -60,52 +34,22 @@ func FilteredWatch(ctx context.Context, watcher watch.Interface, podFilter *rege
switch e.Type {
case watch.Added:
var statuses []corev1.ContainerStatus
statuses = append(statuses, pod.Status.InitContainerStatuses...)
statuses = append(statuses, pod.Status.ContainerStatuses...)
added <- &Target{
Namespace: pod.Namespace,
Pod: pod.Name,
Container: "",
}
//for _, c := range statuses {
// if !containerFilter.MatchString(c.Name) {
// continue
// }
// if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) {
// continue
// }
//}
addedChan <- pod
case watch.Modified:
modifiedChan <- pod
case watch.Deleted:
var containers []corev1.Container
containers = append(containers, pod.Spec.Containers...)
containers = append(containers, pod.Spec.InitContainers...)
for _, c := range containers {
//if !containerFilter.MatchString(c.Name) {
// continue
//}
//if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) {
// continue
//}
removed <- &Target{
Namespace: pod.Namespace,
Pod: pod.Name,
Container: c.Name,
}
}
removedChan <- pod
}
case <-ctx.Done():
watcher.Stop()
close(added)
close(removed)
close(addedChan)
close(modifiedChan)
close(removedChan)
close(errorChan)
return
}
}
}()
return added, removed
return addedChan, modifiedChan, removedChan, errorChan
}