Tap multiple pods statically (#51)

* WIP

* Update tap.go, provider.go, and 2 more files...

* WIP

* WIP

* Solved routine hanging forever: Added missing flag when calling mizuagent.

* Iterate channel with range.

* Panic if har channel is nil or if websocket connection is nil.

* StartPassiveTapper returns read only channel.

* Solved program exiting immediately: Wait for interrupt signal instead of exiting.

* Solve connecting issue - Retry a few times.

* Use lib const instead of magic.

* Nicer error prints.

* Don't coninue piping message if there is an error.

* Comment.

* Dependency injection.

* no message

* Fixed comment.

* Print tapped addresses when they are updated.

* Print errors in cleanup if there are any.

Co-authored-by: RamiBerm <rami.berman@up9.com>
Co-authored-by: Roee Gadot <roee.gadot@up9.com>
This commit is contained in:
nimrod-up9
2021-05-20 12:22:23 +03:00
committed by GitHub
parent 1ddc7f2f6b
commit da24608bec
14 changed files with 461 additions and 162 deletions

View File

@@ -2,6 +2,8 @@ package cmd
import (
"errors"
"fmt"
"regexp"
"github.com/spf13/cobra"
@@ -10,20 +12,25 @@ import (
)
var tapCmd = &cobra.Command{
Use: "tap [PODNAME]",
Use: "tap [POD REGEX]",
Short: "Record ingoing traffic of a kubernetes pod",
Long: `Record the ingoing traffic of a kubernetes pod.
Supported protocols are HTTP and gRPC.`,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return errors.New("PODNAME argument is required")
return errors.New("POD REGEX argument is required")
} else if len(args) > 1 {
return errors.New("Unexpected number of arguments")
}
podName := args[0]
regex, err := regexp.Compile(args[0])
if err != nil {
mizu.Run(podName)
return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err))
return nil
}
mizu.Run(regex)
return nil
},
}

View File

@@ -3,12 +3,15 @@ package kubernetes
import (
_ "bytes"
"context"
"encoding/json"
"errors"
"fmt"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
@@ -20,7 +23,7 @@ import (
_ "k8s.io/client-go/tools/portforward"
"k8s.io/client-go/util/homedir"
"path/filepath"
"strings"
"regexp"
)
type Provider struct {
@@ -77,48 +80,30 @@ func (provider *Provider) GetPods(ctx context.Context, namespace string) {
fmt.Printf("There are %d pods in Namespace %s\n", len(pods.Items), namespace)
}
func (provider *Provider) CreateMizuPod(ctx context.Context, namespace string, podName string, podImage string, tappedPodNamespace string, tappedPodName string, linkServiceAccount bool) (*core.Pod, error) {
tappedPod, err := provider.clientSet.CoreV1().Pods(tappedPodNamespace).Get(ctx, tappedPodName, metav1.GetOptions{})
if err != nil {
panic(err.Error())
}
podIps := make([]string, len(tappedPod.Status.PodIPs))
for ii, podIp := range tappedPod.Status.PodIPs {
podIps[ii] = podIp.IP
}
podIpsString := strings.Join(podIps, ",")
privileged := true
func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool) (*core.Pod, error) {
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: map[string]string{"app": podName},
},
Spec: core.PodSpec{
HostNetwork: true, // very important to make passive tapper see traffic
Containers: []core.Container{
{
Name: podName,
Image: podImage,
ImagePullPolicy: core.PullAlways,
SecurityContext: &core.SecurityContext{
Privileged: &privileged, // must be privileged to get node level traffic
},
Command: []string {"./mizuagent", "--aggregator"},
Env: []core.EnvVar{
{
Name: "HOST_MODE",
Value: "1",
},
{
Name: "TAPPED_ADDRESSES",
Value: podIpsString,
},
},
},
},
DNSPolicy: "ClusterFirstWithHostNet",
TerminationGracePeriodSeconds: new(int64),
NodeSelector: map[string]string{"kubernetes.io/hostname": tappedPod.Spec.NodeName},
},
}
//define the service account only when it exists to prevent pod crash
@@ -128,6 +113,21 @@ func (provider *Provider) CreateMizuPod(ctx context.Context, namespace string, p
return provider.clientSet.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
}
func (provider *Provider) CreateService(ctx context.Context, namespace string, serviceName string, appLabelValue string) (*core.Service, error) {
service := core.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: namespace,
},
Spec: core.ServiceSpec{
Ports: []core.ServicePort {{TargetPort: intstr.FromInt(8899), Port: 80}},
Type: core.ServiceTypeClusterIP,
Selector: map[string]string{"app": appLabelValue},
},
}
return provider.clientSet.CoreV1().Services(namespace).Create(ctx, &service, metav1.CreateOptions{})
}
func (provider *Provider) DoesMizuRBACExist(ctx context.Context, namespace string) (bool, error){
serviceAccount, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Get(ctx, serviceAccountName, metav1.GetOptions{})
@@ -200,8 +200,102 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string ,
return nil
}
func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) {
provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error {
return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
}
func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error {
return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{})
}
func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error {
return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
}
func (provider *Provider) CreateMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool) error {
nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap)
if err != nil {
return err
}
privileged := true
podTemplate := core.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": tapperPodName},
},
Spec: core.PodSpec{
HostNetwork: true, // very important to make passive tapper see traffic
Containers: []core.Container{
{
Name: tapperPodName,
Image: podImage,
ImagePullPolicy: core.PullAlways,
SecurityContext: &core.SecurityContext{
Privileged: &privileged, // must be privileged to get node level traffic
},
Command: []string {"./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp)},
Env: []core.EnvVar{
{
Name: "HOST_MODE",
Value: "1",
},
{
Name: "AGGREGATOR_ADDRESS",
Value: aggregatorPodIp,
},
{
Name: "TAPPED_ADDRESSES_PER_HOST",
Value: string(nodeToTappedPodIPMapJsonStr),
},
{
Name: "NODE_NAME",
ValueFrom: &core.EnvVarSource{
FieldRef: &core.ObjectFieldSelector {
APIVersion: "v1",
FieldPath: "spec.nodeName",
},
},
},
},
},
},
DNSPolicy: "ClusterFirstWithHostNet",
TerminationGracePeriodSeconds: new(int64),
// Affinity: TODO: define node selector for all relevant nodes for this mizu instance
},
}
if linkServiceAccount {
podTemplate.Spec.ServiceAccountName = serviceAccountName
}
labelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{"app": tapperPodName},
}
daemonSet := apps.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: daemonSetName,
Namespace: namespace,
},
Spec: apps.DaemonSetSpec{
Selector: &labelSelector,
Template: podTemplate,
},
}
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Create(ctx, &daemonSet, metav1.CreateOptions{})
return err
}
func (provider *Provider) GetAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp) ([]core.Pod, error) {
pods, err := provider.clientSet.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
matchingPods := make([]core.Pod, 0)
for _, pod := range pods.Items {
if regex.MatchString(pod.Name) {
matchingPods = append(matchingPods, pod)
}
}
return matchingPods, err
}
func getClientSet(config *restclient.Config) *kubernetes.Clientset {

View File

@@ -7,4 +7,7 @@ var (
const (
MizuResourcesNamespace = "default"
TapperDaemonSetName = "mizu-tapper-daemon-set"
aggregatorPodName = "mizu-collector"
tapperPodName = "mizu-tapper"
)

View File

@@ -12,56 +12,90 @@ import (
"time"
)
func Run(tappedPodName string) {
func Run(podRegexQuery *regexp.Regexp) {
kubernetesProvider := kubernetes.NewProvider(config.Configuration.KubeConfigPath, config.Configuration.Namespace)
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
podName := "mizu-collector"
mizuServiceAccountExists := createRBACIfNecessary(ctx, kubernetesProvider)
go createPodAndPortForward(ctx, kubernetesProvider, cancel, podName, MizuResourcesNamespace, tappedPodName, mizuServiceAccountExists) //TODO convert this to job for built in pod ttl or have the running app handle this
waitForFinish(ctx, cancel) //block until exit signal or error
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, podRegexQuery)
if err != nil {
cleanUpMizuResources(kubernetesProvider)
return
}
err = createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap)
if err != nil {
cleanUpMizuResources(kubernetesProvider)
return
}
go portForwardApiPod(ctx, kubernetesProvider, cancel) //TODO convert this to job for built in pod ttl or have the running app handle this
waitForFinish(ctx, cancel) //block until exit signal or error
// TODO handle incoming traffic from tapper using a channel
//cleanup
fmt.Printf("\nremoving pod %s\n", podName)
removalCtx, _ := context.WithTimeout(context.Background(), 2 * time.Second)
kubernetesProvider.RemovePod(removalCtx, MizuResourcesNamespace, podName)
fmt.Printf("\nRemoving mizu resources\n")
cleanUpMizuResources(kubernetesProvider)
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp) {
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex)
for {
select {
case newTarget := <- added:
fmt.Printf("+%s\n", newTarget.Name)
case removedTarget := <- removed:
fmt.Printf("-%s\n", removedTarget.Name)
case <- modified:
continue
case <- errorChan:
cancel()
case <- ctx.Done():
return
}
}
}
func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podName string, namespace string, tappedPodName string, linkServiceAccount bool) {
pod, err := kubernetesProvider.CreateMizuPod(ctx, MizuResourcesNamespace, podName, config.Configuration.MizuImage, kubernetesProvider.Namespace, tappedPodName, linkServiceAccount)
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string) error {
mizuServiceAccountExists := createRBACIfNecessary(ctx, kubernetesProvider)
_, err := kubernetesProvider.CreateMizuAggregatorPod(ctx, MizuResourcesNamespace, aggregatorPodName, config.Configuration.MizuImage, mizuServiceAccountExists)
if err != nil {
fmt.Printf("error creating pod %s", err)
cancel()
return
fmt.Printf("Error creating mizu collector pod: %v\n", err)
return err
}
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", pod.Name))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, namespace), podExactRegex)
aggregatorService, err := kubernetesProvider.CreateService(ctx, MizuResourcesNamespace, aggregatorPodName, aggregatorPodName)
if err != nil {
fmt.Printf("Error creating mizu collector service: %v\n", err)
return err
}
err = kubernetesProvider.CreateMizuTapperDaemonSet(ctx, MizuResourcesNamespace, TapperDaemonSetName, config.Configuration.MizuImage, tapperPodName, fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace), nodeToTappedPodIPMap, mizuServiceAccountExists)
if err != nil {
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
return err
}
return nil
}
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
removalCtx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
if err := kubernetesProvider.RemovePod(removalCtx, MizuResourcesNamespace, aggregatorPodName); err != nil {
fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", aggregatorPodName, MizuResourcesNamespace, err, err, err);
}
if err := kubernetesProvider.RemoveService(removalCtx, MizuResourcesNamespace, aggregatorPodName); err != nil {
fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", aggregatorPodName, MizuResourcesNamespace, err, err, err);
}
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, MizuResourcesNamespace, TapperDaemonSetName); err != nil {
fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", TapperDaemonSetName, MizuResourcesNamespace, err, err, err);
}
}
// will be relevant in the future
//func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp) {
// added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex)
// for {
// select {
// case newTarget := <- added:
// fmt.Printf("+%s\n", newTarget.Name)
//
// case removedTarget := <- removed:
// fmt.Printf("-%s\n", removedTarget.Name)
//
// case <- modified:
// continue
//
// case <- errorChan:
// cancel()
//
// case <- ctx.Done():
// return
// }
// }
//}
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", aggregatorPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, MizuResourcesNamespace), podExactRegex)
isPodReady := false
var portForward *kubernetes.PortForward
for {
@@ -69,14 +103,14 @@ func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes
case <- added:
continue
case <- removed:
fmt.Printf("%s removed\n", podName)
fmt.Printf("%s removed\n", aggregatorPodName)
cancel()
return
case modifiedPod := <- modified:
if modifiedPod.Status.Phase == "Running" && !isPodReady {
isPodReady = true
var err error
portForward, err = kubernetes.NewPortForward(kubernetesProvider, namespace, podName, config.Configuration.GuiPort, config.Configuration.MizuPodPort, cancel)
portForward, err = kubernetes.NewPortForward(kubernetesProvider, MizuResourcesNamespace, aggregatorPodName, config.Configuration.GuiPort, config.Configuration.MizuPodPort, cancel)
fmt.Printf("Web interface is now available at http://localhost:%d\n", config.Configuration.GuiPort)
if err != nil {
fmt.Printf("error forwarding port to pod %s\n", err)
@@ -86,7 +120,7 @@ func createPodAndPortForward(ctx context.Context, kubernetesProvider *kubernetes
case <- time.After(25 * time.Second):
if !isPodReady {
fmt.Printf("error: %s pod was not ready in time", podName)
fmt.Printf("error: %s pod was not ready in time", aggregatorPodName)
cancel()
}
@@ -122,6 +156,23 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P
return true
}
func getNodeHostToTappedPodIpsMap(ctx context.Context, kubernetesProvider *kubernetes.Provider, regex *regexp.Regexp) (map[string][]string, error) {
matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, regex)
if err != nil {
return nil, err
}
nodeToTappedPodIPMap := make(map[string][]string, 0)
for _, pod := range matchingPods {
existingList := nodeToTappedPodIPMap[pod.Spec.NodeName]
if existingList == nil {
nodeToTappedPodIPMap[pod.Spec.NodeName] = []string {pod.Status.PodIP}
} else {
nodeToTappedPodIPMap[pod.Spec.NodeName] = append(nodeToTappedPodIPMap[pod.Spec.NodeName], pod.Status.PodIP)
}
}
return nodeToTappedPodIPMap, nil
}
func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)