added support of multiple namespaces (#167)

This commit is contained in:
RoyUP9
2021-08-05 11:19:29 +03:00
committed by GitHub
parent fa632b49a7
commit 683d199774
5 changed files with 84 additions and 62 deletions

View File

@@ -56,7 +56,7 @@ func init() {
defaults.Set(&defaultTapConfig)
tapCmd.Flags().Uint16P(configStructs.GuiPortTapName, "p", defaultTapConfig.GuiPort, "Provide a custom port for the web interface webserver")
tapCmd.Flags().StringP(configStructs.NamespaceTapName, "n", defaultTapConfig.Namespace, "Namespace selector")
tapCmd.Flags().StringArrayP(configStructs.NamespacesTapName, "n", defaultTapConfig.Namespaces, "Namespaces selector")
tapCmd.Flags().Bool(configStructs.AnalysisTapName, defaultTapConfig.Analysis, "Uploads traffic to UP9 for further analysis (Beta)")
tapCmd.Flags().BoolP(configStructs.AllNamespacesTapName, "A", defaultTapConfig.AllNamespaces, "Tap all namespaces")
tapCmd.Flags().StringP(configStructs.KubeConfigPathTapName, "k", defaultTapConfig.KubeConfigPath, "Path to kube-config file")

View File

@@ -11,6 +11,7 @@ import (
"os/signal"
"path"
"regexp"
"strings"
"syscall"
"time"
@@ -65,24 +66,25 @@ func RunMizuTap() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
targetNamespace := getNamespace(kubernetesProvider)
targetNamespaces := getNamespaces(kubernetesProvider)
var namespacesStr string
if targetNamespace != mizu.K8sAllNamespaces {
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
if targetNamespaces[0] != mizu.K8sAllNamespaces {
namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(targetNamespaces, "\", \""))
} else {
namespacesStr = "all namespaces"
}
mizu.CheckNewerVersion()
mizu.Log.Infof("Tapping pods in %s", namespacesStr)
if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces); err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error getting pods by regex: %v", errormessage.FormatError(err)))
return
}
if len(state.currentlyTappedPods) == 0 {
var suggestionStr string
if targetNamespace != mizu.K8sAllNamespaces {
if targetNamespaces[0] != mizu.K8sAllNamespaces {
suggestionStr = ". Select a different namespace with -n or tap all namespaces with -A"
}
mizu.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr))
@@ -100,7 +102,7 @@ func RunMizuTap() {
}
go createProxyToApiServerPod(ctx, kubernetesProvider, cancel)
go watchPodsForTapping(ctx, kubernetesProvider, cancel)
go watchPodsForTapping(ctx, kubernetesProvider, targetNamespaces, cancel)
//block until exit signal or error
waitForFinish(ctx, cancel)
@@ -166,13 +168,13 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
opts := &kubernetes.ApiServerOptions{
Namespace: mizu.Config.ResourcesNamespace(),
PodName: mizu.ApiServerPodName,
PodImage: mizu.Config.MizuImage,
ServiceAccountName: serviceAccountName,
IsNamespaceRestricted: !mizu.Config.IsOwnNamespace(),
Namespace: mizu.Config.ResourcesNamespace(),
PodName: mizu.ApiServerPodName,
PodImage: mizu.Config.MizuImage,
ServiceAccountName: serviceAccountName,
IsNamespaceRestricted: !mizu.Config.IsOwnNamespace(),
MizuApiFilteringOptions: mizuApiFilteringOptions,
MaxEntriesDBSizeBytes: mizu.Config.Tap.MaxEntriesDBSizeBytes(),
MaxEntriesDBSizeBytes: mizu.Config.Tap.MaxEntriesDBSizeBytes(),
}
_, err = kubernetesProvider.CreateMizuApiServerPod(ctx, opts)
if err != nil {
@@ -347,12 +349,11 @@ func reportTappedPods() {
}
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
targetNamespace := getNamespace(kubernetesProvider)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), mizu.Config.Tap.PodRegex())
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, targetNamespaces []string, cancel context.CancelFunc) {
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, targetNamespaces, mizu.Config.Tap.PodRegex())
restartTappers := func() {
err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace)
err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces)
if err != nil {
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error getting pods by regex: %v", errormessage.FormatError(err)))
cancel()
@@ -407,9 +408,9 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
}
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) (error, bool) {
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespaces []string) (error, bool) {
changeFound := false
if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespace); err != nil {
if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespaces); err != nil {
return err, false
} else {
addedPods, removedPods := getPodArrayDiff(state.currentlyTappedPods, matchingPods)
@@ -454,7 +455,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.Config.ResourcesNamespace()), podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{mizu.Config.ResourcesNamespace()}, podExactRegex)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
@@ -567,12 +568,12 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
}
}
func getNamespace(kubernetesProvider *kubernetes.Provider) string {
func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
if mizu.Config.Tap.AllNamespaces {
return mizu.K8sAllNamespaces
} else if len(mizu.Config.Tap.Namespace) > 0 {
return mizu.Config.Tap.Namespace
return []string{mizu.K8sAllNamespaces}
} else if len(mizu.Config.Tap.Namespaces) > 0 {
return mizu.Config.Tap.Namespaces
} else {
return kubernetesProvider.CurrentNamespace()
return []string{kubernetesProvider.CurrentNamespace()}
}
}

View File

@@ -691,13 +691,19 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
return err
}
func (provider *Provider) GetAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespace string) ([]core.Pod, error) {
pods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
func (provider *Provider) GetAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) {
var pods []core.Pod
for _, namespace := range namespaces {
namespacePods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pods in ns: %s, %w", namespace, err)
}
pods = append(pods, namespacePods.Items...)
}
matchingPods := make([]core.Pod, 0)
for _, pod := range pods.Items {
for _, pod := range pods {
if regex.MatchString(pod.Name) && isPodRunning(&pod) {
matchingPods = append(matchingPods, pod)
}

View File

@@ -4,49 +4,64 @@ import (
"context"
"errors"
"regexp"
"sync"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
func FilteredWatch(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) {
func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetNamespaces []string, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) {
addedChan := make(chan *corev1.Pod)
modifiedChan := make(chan *corev1.Pod)
removedChan := make(chan *corev1.Pod)
errorChan := make(chan error)
go func() {
for {
select {
case e := <-watcher.ResultChan():
if e.Object == nil {
errorChan <- errors.New("kubernetes pod watch failed")
var wg sync.WaitGroup
for _, targetNamespace := range targetNamespaces {
wg.Add(1)
go func(targetNamespace string) {
defer wg.Done()
watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace)
for {
select {
case e := <-watcher.ResultChan():
if e.Object == nil {
errorChan <- errors.New("kubernetes pod watch failed")
return
}
pod := e.Object.(*corev1.Pod)
if !podFilter.MatchString(pod.Name) {
continue
}
switch e.Type {
case watch.Added:
addedChan <- pod
case watch.Modified:
modifiedChan <- pod
case watch.Deleted:
removedChan <- pod
}
case <-ctx.Done():
watcher.Stop()
return
}
pod := e.Object.(*corev1.Pod)
if !podFilter.MatchString(pod.Name) {
continue
}
switch e.Type {
case watch.Added:
addedChan <- pod
case watch.Modified:
modifiedChan <- pod
case watch.Deleted:
removedChan <- pod
}
case <-ctx.Done():
watcher.Stop()
close(addedChan)
close(modifiedChan)
close(removedChan)
close(errorChan)
return
}
}
}(targetNamespace)
}
go func() {
<-ctx.Done()
wg.Wait()
close(addedChan)
close(modifiedChan)
close(removedChan)
close(errorChan)
}()
return addedChan, modifiedChan, removedChan, errorChan

View File

@@ -11,7 +11,7 @@ import (
const (
GuiPortTapName = "gui-port"
NamespaceTapName = "namespace"
NamespacesTapName = "namespaces"
AnalysisTapName = "analysis"
AllNamespacesTapName = "all-namespaces"
KubeConfigPathTapName = "kube-config"
@@ -29,7 +29,7 @@ type TapConfig struct {
SleepIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
Namespace string `yaml:"namespace"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
KubeConfigPath string `yaml:"kube-config"`