TRA-3868 move tapped pod watch and tapper updating to shared (#416)

* WIP

* WIP

* WIP

* WIP

* WIP

* Update tapRunner.go and k8sTapManager.go

* Update cleanRunner.go, common.go, and 8 more files...

* Update common.go, tapConfig.go, and 2 more files...

* Update config.go, config.go, and 5 more files...

* Update tapRunner.go, config.go, and 7 more files...

* Update cleanRunner.go, logs.go, and 2 more files...

* Update k8sTapManager.go, provider.go, and watch.go

* Update go.sum, go.mod, and go.sum

* Update go.mod and go.sum

* Update go.mod, go.sum, and 2 more files...

* Revert "Update go.mod, go.sum, and 2 more files..."

This reverts commit 8140311349.

* Update funcWrappers.go, tapRunner.go, and 4 more files...

* Update main.go, tapRunner.go, and mizuTapperSyncer.go
This commit is contained in:
RamiBerm
2021-11-01 14:12:32 +02:00
committed by GitHub
parent 35dbd5fde2
commit 655626bc42
29 changed files with 1653 additions and 425 deletions

View File

@@ -0,0 +1,16 @@
package kubernetes
const (
MizuResourcesPrefix = "mizu-"
ApiServerPodName = MizuResourcesPrefix + "api-server"
ClusterRoleBindingName = MizuResourcesPrefix + "cluster-role-binding"
ClusterRoleName = MizuResourcesPrefix + "cluster-role"
K8sAllNamespaces = ""
RoleBindingName = MizuResourcesPrefix + "role-binding"
RoleName = MizuResourcesPrefix + "role"
ServiceAccountName = MizuResourcesPrefix + "service-account"
TapperDaemonSetName = MizuResourcesPrefix + "tapper-daemon-set"
TapperPodName = MizuResourcesPrefix + "tapper"
ConfigMapName = MizuResourcesPrefix + "config"
MinKubernetesServerVersion = "1.16.0"
)

View File

@@ -0,0 +1,26 @@
package kubernetes
type K8sTapManagerErrorReason string
const (
TapManagerTapperUpdateError K8sTapManagerErrorReason = "TAPPER_UPDATE_ERROR"
TapManagerPodWatchError K8sTapManagerErrorReason = "POD_WATCH_ERROR"
TapManagerPodListError K8sTapManagerErrorReason = "POD_LIST_ERROR"
)
type K8sTapManagerError struct {
OriginalError error
TapManagerReason K8sTapManagerErrorReason
}
// K8sTapManagerError implements the Error interface.
func (e *K8sTapManagerError) Error() string {
return e.OriginalError.Error()
}
type ClusterBehindProxyError struct{}
// ClusterBehindProxyError implements the Error interface.
func (e *ClusterBehindProxyError) Error() string {
return "Cluster is behind proxy"
}

View File

@@ -0,0 +1,207 @@
package kubernetes
import (
"context"
"fmt"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
core "k8s.io/api/core/v1"
"regexp"
"time"
)
const updateTappersDelay = 5 * time.Second
type TappedPodChangeEvent struct {
Added []core.Pod
Removed []core.Pod
}
// MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created
type MizuTapperSyncer struct {
context context.Context
CurrentlyTappedPods []core.Pod
config TapperSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TappedPodChangeEvent
ErrorOut chan K8sTapManagerError
}
type TapperSyncerConfig struct {
TargetNamespaces []string
PodFilterRegex regexp.Regexp
MizuResourcesNamespace string
AgentImage string
TapperResources shared.Resources
ImagePullPolicy core.PullPolicy
DumpLogs bool
IgnoredUserAgents []string
MizuApiFilteringOptions api.TrafficFilteringOptions
MizuServiceAccountExists bool
}
func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig) (*MizuTapperSyncer, error) {
syncer := &MizuTapperSyncer{
context: ctx,
CurrentlyTappedPods: make([]core.Pod, 0),
config: config,
kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TappedPodChangeEvent, 100),
ErrorOut: make(chan K8sTapManagerError, 100),
}
if err, _ := syncer.updateCurrentlyTappedPods(); err != nil {
return nil, err
}
if err := syncer.updateMizuTappers(); err != nil {
return nil, err
}
go syncer.watchPodsForTapping()
return syncer, nil
}
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, tapperSyncer.kubernetesProvider, tapperSyncer.config.TargetNamespaces, &tapperSyncer.config.PodFilterRegex)
restartTappers := func() {
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
if err != nil {
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodListError,
}
}
if !changeFound {
logger.Log.Debugf("Nothing changed update tappers not needed")
return
}
if err := tapperSyncer.updateMizuTappers(); err != nil {
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerTapperUpdateError,
}
}
}
restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers)
for {
select {
case pod, ok := <-added:
if !ok {
added = nil
continue
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod, ok := <-removed:
if !ok {
removed = nil
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod, ok := <-modified:
if !ok {
modified = nil
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
restartTappersDebouncer.SetOn()
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
case <-tapperSyncer.context.Done():
logger.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`")
restartTappersDebouncer.Cancel()
// TODO: Does this also perform cleanup?
return
}
}
}
func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, changesFound bool) {
if matchingPods, err := tapperSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(tapperSyncer.context, &tapperSyncer.config.PodFilterRegex, tapperSyncer.config.TargetNamespaces); err != nil {
return err, false
} else {
podsToTap := excludeMizuPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(tapperSyncer.CurrentlyTappedPods, podsToTap)
for _, addedPod := range addedPods {
logger.Log.Debugf("tapping new pod %s", addedPod.Name)
}
for _, removedPod := range removedPods {
logger.Log.Debugf("pod %s is no longer running, tapping for it stopped", removedPod.Name)
}
if len(addedPods) > 0 || len(removedPods) > 0 {
tapperSyncer.CurrentlyTappedPods = podsToTap
tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{
Added: addedPods,
Removed: removedPods,
}
return nil, true
}
return nil, false
}
}
func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
nodeToTappedPodIPMap := GetNodeHostToTappedPodIpsMap(tapperSyncer.CurrentlyTappedPods)
if len(nodeToTappedPodIPMap) > 0 {
var serviceAccountName string
if tapperSyncer.config.MizuServiceAccountExists {
serviceAccountName = ServiceAccountName
} else {
serviceAccountName = ""
}
if err := tapperSyncer.kubernetesProvider.ApplyMizuTapperDaemonSet(
tapperSyncer.context,
tapperSyncer.config.MizuResourcesNamespace,
TapperDaemonSetName,
tapperSyncer.config.AgentImage,
TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", ApiServerPodName, tapperSyncer.config.MizuResourcesNamespace),
nodeToTappedPodIPMap,
serviceAccountName,
tapperSyncer.config.TapperResources,
tapperSyncer.config.ImagePullPolicy,
tapperSyncer.config.MizuApiFilteringOptions,
tapperSyncer.config.DumpLogs,
); err != nil {
return err
}
logger.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
} else {
if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,788 @@
package kubernetes
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
"github.com/up9inc/mizu/tap/api"
"io"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apimachinery/pkg/watch"
applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1"
applyconfcore "k8s.io/client-go/applyconfigurations/core/v1"
applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
watchtools "k8s.io/client-go/tools/watch"
"net/url"
"path/filepath"
"regexp"
)
type Provider struct {
clientSet *kubernetes.Clientset
kubernetesConfig clientcmd.ClientConfig
clientConfig restclient.Config
Namespace string
}
const (
fieldManagerName = "mizu-manager"
)
func NewProvider(kubeConfigPath string) (*Provider, error) {
kubernetesConfig := loadKubernetesConfiguration(kubeConfigPath)
restClientConfig, err := kubernetesConfig.ClientConfig()
if err != nil {
if clientcmd.IsEmptyConfig(err) {
return nil, fmt.Errorf("couldn't find the kube config file, or file is empty (%s)\n"+
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
}
if clientcmd.IsConfigurationInvalid(err) {
return nil, fmt.Errorf("invalid kube config file (%s)\n"+
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
}
return nil, fmt.Errorf("error while using kube config (%s)\n"+
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
}
clientSet, err := getClientSet(restClientConfig)
if err != nil {
return nil, fmt.Errorf("error while using kube config (%s)\n"+
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
}
if err := validateNotProxy(kubernetesConfig, restClientConfig); err != nil {
return nil, err
}
if err := validateKubernetesVersion(clientSet); err != nil {
return nil, err
}
return &Provider{
clientSet: clientSet,
kubernetesConfig: kubernetesConfig,
clientConfig: *restClientConfig,
}, nil
}
func (provider *Provider) CurrentNamespace() string {
ns, _, _ := provider.kubernetesConfig.Namespace()
return ns
}
func (provider *Provider) WaitUtilNamespaceDeleted(ctx context.Context, name string) error {
fieldSelector := fmt.Sprintf("metadata.name=%s", name)
var limit int64 = 1
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
options.Limit = limit
return provider.clientSet.CoreV1().Namespaces().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
options.Limit = limit
return provider.clientSet.CoreV1().Namespaces().Watch(ctx, options)
},
}
var preconditionFunc watchtools.PreconditionFunc = func(store cache.Store) (bool, error) {
_, exists, err := store.Get(&core.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}})
if err != nil {
return false, err
}
if exists {
return false, nil
}
return true, nil
}
conditionFunc := func(e watch.Event) (bool, error) {
if e.Type == watch.Deleted {
return true, nil
}
return false, nil
}
obj := &core.Namespace{}
_, err := watchtools.UntilWithSync(ctx, lw, obj, preconditionFunc, conditionFunc)
return err
}
func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) watch.Interface {
watcher, err := provider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
panic(err.Error())
}
return watcher
}
func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*core.Namespace, error) {
namespaceSpec := &core.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
return provider.clientSet.CoreV1().Namespaces().Create(ctx, namespaceSpec, metav1.CreateOptions{})
}
type ApiServerOptions struct {
Namespace string
PodName string
PodImage string
ServiceAccountName string
IsNamespaceRestricted bool
SyncEntriesConfig *shared.SyncEntriesConfig
MaxEntriesDBSizeBytes int64
Resources shared.Resources
ImagePullPolicy core.PullPolicy
DumpLogs bool
}
func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiServerOptions) (*core.Pod, error) {
var marshaledSyncEntriesConfig []byte
if opts.SyncEntriesConfig != nil {
var err error
if marshaledSyncEntriesConfig, err = json.Marshal(opts.SyncEntriesConfig); err != nil {
return nil, err
}
}
configMapVolume := &core.ConfigMapVolumeSource{}
configMapVolume.Name = ConfigMapName
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
if err != nil {
return nil, errors.New(fmt.Sprintf("invalid cpu limit for %s container", opts.PodName))
}
memLimit, err := resource.ParseQuantity(opts.Resources.MemoryLimit)
if err != nil {
return nil, errors.New(fmt.Sprintf("invalid memory limit for %s container", opts.PodName))
}
cpuRequests, err := resource.ParseQuantity(opts.Resources.CpuRequests)
if err != nil {
return nil, errors.New(fmt.Sprintf("invalid cpu request for %s container", opts.PodName))
}
memRequests, err := resource.ParseQuantity(opts.Resources.MemoryRequests)
if err != nil {
return nil, errors.New(fmt.Sprintf("invalid memory request for %s container", opts.PodName))
}
command := []string{"./mizuagent", "--api-server"}
if opts.IsNamespaceRestricted {
command = append(command, "--namespace", opts.Namespace)
}
port := intstr.FromInt(shared.DefaultApiServerPort)
debugMode := ""
if opts.DumpLogs {
debugMode = "1"
}
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: opts.PodName,
Namespace: opts.Namespace,
Labels: map[string]string{"app": opts.PodName},
},
Spec: core.PodSpec{
Containers: []core.Container{
{
Name: opts.PodName,
Image: opts.PodImage,
ImagePullPolicy: opts.ImagePullPolicy,
VolumeMounts: []core.VolumeMount{
{
Name: ConfigMapName,
MountPath: shared.ConfigDirPath,
},
},
Command: command,
Env: []core.EnvVar{
{
Name: shared.SyncEntriesConfigEnvVar,
Value: string(marshaledSyncEntriesConfig),
},
{
Name: shared.DebugModeEnvVar,
Value: debugMode,
},
},
Resources: core.ResourceRequirements{
Limits: core.ResourceList{
"cpu": cpuLimit,
"memory": memLimit,
},
Requests: core.ResourceList{
"cpu": cpuRequests,
"memory": memRequests,
},
},
ReadinessProbe: &core.Probe{
Handler: core.Handler{
TCPSocket: &core.TCPSocketAction{
Port: port,
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
},
LivenessProbe: &core.Probe{
Handler: core.Handler{
HTTPGet: &core.HTTPGetAction{
Path: "/echo",
Port: port,
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
},
},
},
Volumes: []core.Volume{
{
Name: ConfigMapName,
VolumeSource: core.VolumeSource{
ConfigMap: configMapVolume,
},
},
},
DNSPolicy: core.DNSClusterFirstWithHostNet,
TerminationGracePeriodSeconds: new(int64),
},
}
//define the service account only when it exists to prevent pod crash
if opts.ServiceAccountName != "" {
pod.Spec.ServiceAccountName = opts.ServiceAccountName
}
return provider.clientSet.CoreV1().Pods(opts.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
func (provider *Provider) CreateService(ctx context.Context, namespace string, serviceName string, appLabelValue string) (*core.Service, error) {
service := core.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: namespace,
},
Spec: core.ServiceSpec{
Ports: []core.ServicePort{{TargetPort: intstr.FromInt(shared.DefaultApiServerPort), Port: 80}},
Type: core.ServiceTypeClusterIP,
Selector: map[string]string{"app": appLabelValue},
},
}
return provider.clientSet.CoreV1().Services(namespace).Create(ctx, &service, metav1.CreateOptions{})
}
func (provider *Provider) DoesServicesExist(ctx context.Context, namespace string, name string) (bool, error) {
resource, err := provider.clientSet.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
}
func (provider *Provider) doesResourceExist(resource interface{}, err error) (bool, error) {
// Getting NotFound error is the expected behavior when a resource does not exist.
if k8serrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return resource != nil, nil
}
func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string, serviceAccountName string, clusterRoleName string, clusterRoleBindingName string, version string) error {
serviceAccount := &core.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: serviceAccountName,
Namespace: namespace,
Labels: map[string]string{"mizu-cli-version": version},
},
}
clusterRole := &rbac.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: clusterRoleName,
Labels: map[string]string{"mizu-cli-version": version},
},
Rules: []rbac.PolicyRule{
{
APIGroups: []string{"", "extensions", "apps"},
Resources: []string{"pods", "services", "endpoints"},
Verbs: []string{"list", "get", "watch"},
},
},
}
clusterRoleBinding := &rbac.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: clusterRoleBindingName,
Labels: map[string]string{"mizu-cli-version": version},
},
RoleRef: rbac.RoleRef{
Name: clusterRoleName,
Kind: "ClusterRole",
APIGroup: "rbac.authorization.k8s.io",
},
Subjects: []rbac.Subject{
{
Kind: "ServiceAccount",
Name: serviceAccountName,
Namespace: namespace,
},
},
}
_, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Create(ctx, serviceAccount, metav1.CreateOptions{})
if err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}
_, err = provider.clientSet.RbacV1().ClusterRoles().Create(ctx, clusterRole, metav1.CreateOptions{})
if err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}
_, err = provider.clientSet.RbacV1().ClusterRoleBindings().Create(ctx, clusterRoleBinding, metav1.CreateOptions{})
if err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}
return nil
}
func (provider *Provider) CreateMizuRBACNamespaceRestricted(ctx context.Context, namespace string, serviceAccountName string, roleName string, roleBindingName string, version string) error {
serviceAccount := &core.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: serviceAccountName,
Namespace: namespace,
Labels: map[string]string{"mizu-cli-version": version},
},
}
role := &rbac.Role{
ObjectMeta: metav1.ObjectMeta{
Name: roleName,
Labels: map[string]string{"mizu-cli-version": version},
},
Rules: []rbac.PolicyRule{
{
APIGroups: []string{"", "extensions", "apps"},
Resources: []string{"pods", "services", "endpoints"},
Verbs: []string{"list", "get", "watch"},
},
},
}
roleBinding := &rbac.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: roleBindingName,
Labels: map[string]string{"mizu-cli-version": version},
},
RoleRef: rbac.RoleRef{
Name: roleName,
Kind: "Role",
APIGroup: "rbac.authorization.k8s.io",
},
Subjects: []rbac.Subject{
{
Kind: "ServiceAccount",
Name: serviceAccountName,
Namespace: namespace,
},
},
}
_, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Create(ctx, serviceAccount, metav1.CreateOptions{})
if err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}
_, err = provider.clientSet.RbacV1().Roles(namespace).Create(ctx, role, metav1.CreateOptions{})
if err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}
_, err = provider.clientSet.RbacV1().RoleBindings(namespace).Create(ctx, roleBinding, metav1.CreateOptions{})
if err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}
return nil
}
func (provider *Provider) RemoveNamespace(ctx context.Context, name string) error {
err := provider.clientSet.CoreV1().Namespaces().Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveClusterRole(ctx context.Context, name string) error {
err := provider.clientSet.RbacV1().ClusterRoles().Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveClusterRoleBinding(ctx context.Context, name string) error {
err := provider.clientSet.RbacV1().ClusterRoleBindings().Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveRoleBinding(ctx context.Context, namespace string, name string) error {
err := provider.clientSet.RbacV1().RoleBindings(namespace).Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveRole(ctx context.Context, namespace string, name string) error {
err := provider.clientSet.RbacV1().Roles(namespace).Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveServicAccount(ctx context.Context, namespace string, name string) error {
err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Delete(ctx, name, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error {
err := provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveConfigMap(ctx context.Context, namespace string, configMapName string) error {
err := provider.clientSet.CoreV1().ConfigMaps(namespace).Delete(ctx, configMapName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error {
err := provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error {
err := provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
return provider.handleRemovalError(err)
}
func (provider *Provider) handleRemovalError(err error) error {
// Ignore NotFound - There is nothing to delete.
// Ignore Forbidden - Assume that a user could not have created the resource in the first place.
if k8serrors.IsNotFound(err) || k8serrors.IsForbidden(err) {
return nil
}
return err
}
func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, configMapName string, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
configMapData := make(map[string]string, 0)
if serializedValidationRules != "" {
configMapData[shared.ValidationRulesFileName] = serializedValidationRules
}
if serializedContract != "" {
configMapData[shared.ContractFileName] = serializedContract
}
configMapData[shared.ConfigFileName] = serializedMizuConfig
configMap := &core.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Namespace: namespace,
},
Data: configMapData,
}
if _, err := provider.clientSet.CoreV1().ConfigMaps(namespace).Create(ctx, configMap, metav1.CreateOptions{}); err != nil {
return err
}
return nil
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, dumpLogs bool) error {
logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
if len(nodeToTappedPodIPMap) == 0 {
return fmt.Errorf("daemon set %s must tap at least 1 pod", daemonSetName)
}
nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap)
if err != nil {
return err
}
mizuApiFilteringOptionsJsonStr, err := json.Marshal(mizuApiFilteringOptions)
if err != nil {
return err
}
mizuCmd := []string{
"./mizuagent",
"-i", "any",
"--tap",
"--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp),
"--nodefrag",
}
debugMode := ""
if dumpLogs {
debugMode = "1"
}
agentContainer := applyconfcore.Container()
agentContainer.WithName(tapperPodName)
agentContainer.WithImage(podImage)
agentContainer.WithImagePullPolicy(imagePullPolicy)
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(true))
agentContainer.WithCommand(mizuCmd...)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.DebugModeEnvVar).WithValue(debugMode),
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
applyconfcore.EnvVar().WithName(shared.MizuFilteringOptionsEnvVar).WithValue(string(mizuApiFilteringOptionsJsonStr)),
)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(
applyconfcore.EnvVarSource().WithFieldRef(
applyconfcore.ObjectFieldSelector().WithAPIVersion("v1").WithFieldPath("spec.nodeName"),
),
),
)
cpuLimit, err := resource.ParseQuantity(resources.CpuLimit)
if err != nil {
return errors.New(fmt.Sprintf("invalid cpu limit for %s container", tapperPodName))
}
memLimit, err := resource.ParseQuantity(resources.MemoryLimit)
if err != nil {
return errors.New(fmt.Sprintf("invalid memory limit for %s container", tapperPodName))
}
cpuRequests, err := resource.ParseQuantity(resources.CpuRequests)
if err != nil {
return errors.New(fmt.Sprintf("invalid cpu request for %s container", tapperPodName))
}
memRequests, err := resource.ParseQuantity(resources.MemoryRequests)
if err != nil {
return errors.New(fmt.Sprintf("invalid memory request for %s container", tapperPodName))
}
agentResourceLimits := core.ResourceList{
"cpu": cpuLimit,
"memory": memLimit,
}
agentResourceRequests := core.ResourceList{
"cpu": cpuRequests,
"memory": memRequests,
}
agentResources := applyconfcore.ResourceRequirements().WithRequests(agentResourceRequests).WithLimits(agentResourceLimits)
agentContainer.WithResources(agentResources)
nodeNames := make([]string, 0, len(nodeToTappedPodIPMap))
for nodeName := range nodeToTappedPodIPMap {
nodeNames = append(nodeNames, nodeName)
}
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
nodeSelectorRequirement.WithKey("kubernetes.io/hostname")
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpIn)
nodeSelectorRequirement.WithValues(nodeNames...)
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
nodeSelectorTerm.WithMatchExpressions(nodeSelectorRequirement)
nodeSelector := applyconfcore.NodeSelector()
nodeSelector.WithNodeSelectorTerms(nodeSelectorTerm)
nodeAffinity := applyconfcore.NodeAffinity()
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
affinity := applyconfcore.Affinity()
affinity.WithNodeAffinity(nodeAffinity)
noExecuteToleration := applyconfcore.Toleration()
noExecuteToleration.WithOperator(core.TolerationOpExists)
noExecuteToleration.WithEffect(core.TaintEffectNoExecute)
noScheduleToleration := applyconfcore.Toleration()
noScheduleToleration.WithOperator(core.TolerationOpExists)
noScheduleToleration.WithEffect(core.TaintEffectNoSchedule)
volumeName := ConfigMapName
configMapVolume := applyconfcore.VolumeApplyConfiguration{
Name: &volumeName,
VolumeSourceApplyConfiguration: applyconfcore.VolumeSourceApplyConfiguration{
ConfigMap: &applyconfcore.ConfigMapVolumeSourceApplyConfiguration{
LocalObjectReferenceApplyConfiguration: applyconfcore.LocalObjectReferenceApplyConfiguration{
Name: &volumeName,
},
},
},
}
mountPath := shared.ConfigDirPath
configMapVolumeMount := applyconfcore.VolumeMountApplyConfiguration{
Name: &volumeName,
MountPath: &mountPath,
}
agentContainer.WithVolumeMounts(&configMapVolumeMount)
podSpec := applyconfcore.PodSpec()
podSpec.WithHostNetwork(true)
podSpec.WithDNSPolicy(core.DNSClusterFirstWithHostNet)
podSpec.WithTerminationGracePeriodSeconds(0)
if serviceAccountName != "" {
podSpec.WithServiceAccountName(serviceAccountName)
}
podSpec.WithContainers(agentContainer)
podSpec.WithAffinity(affinity)
podSpec.WithTolerations(noExecuteToleration, noScheduleToleration)
podSpec.WithVolumes(&configMapVolume)
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{"app": tapperPodName})
podTemplate.WithSpec(podSpec)
labelSelector := applyconfmeta.LabelSelector()
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
daemonSet.WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, metav1.ApplyOptions{FieldManager: fieldManagerName})
return err
}
func (provider *Provider) ListAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) {
var pods []core.Pod
for _, namespace := range namespaces {
namespacePods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pods in ns: [%s], %w", namespace, err)
}
pods = append(pods, namespacePods.Items...)
}
matchingPods := make([]core.Pod, 0)
for _, pod := range pods {
if regex.MatchString(pod.Name) {
matchingPods = append(matchingPods, pod)
}
}
return matchingPods, nil
}
func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) {
pods, err := provider.ListAllPodsMatchingRegex(ctx, regex, namespaces)
if err != nil {
return nil, err
}
matchingPods := make([]core.Pod, 0)
for _, pod := range pods {
if isPodRunning(&pod) {
matchingPods = append(matchingPods, pod)
}
}
return matchingPods, nil
}
func (provider *Provider) GetPodLogs(ctx context.Context, namespace string, podName string) (string, error) {
podLogOpts := core.PodLogOptions{}
req := provider.clientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts)
podLogs, err := req.Stream(ctx)
if err != nil {
return "", fmt.Errorf("error opening log stream on ns: %s, pod: %s, %w", namespace, podName, err)
}
defer podLogs.Close()
buf := new(bytes.Buffer)
if _, err = io.Copy(buf, podLogs); err != nil {
return "", fmt.Errorf("error copy information from podLogs to buf, ns: %s, pod: %s, %w", namespace, podName, err)
}
str := buf.String()
return str, nil
}
func (provider *Provider) GetNamespaceEvents(ctx context.Context, namespace string) (string, error) {
eventsOpts := metav1.ListOptions{}
eventList, err := provider.clientSet.CoreV1().Events(namespace).List(ctx, eventsOpts)
if err != nil {
return "", fmt.Errorf("error getting events on ns: %s, %w", namespace, err)
}
return eventList.String(), nil
}
func getClientSet(config *restclient.Config) (*kubernetes.Clientset, error) {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientSet, nil
}
func loadKubernetesConfiguration(kubeConfigPath string) clientcmd.ClientConfig {
logger.Log.Debugf("Using kube config %s", kubeConfigPath)
configPathList := filepath.SplitList(kubeConfigPath)
configLoadingRules := &clientcmd.ClientConfigLoadingRules{}
if len(configPathList) <= 1 {
configLoadingRules.ExplicitPath = kubeConfigPath
} else {
configLoadingRules.Precedence = configPathList
}
contextName := ""
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
configLoadingRules,
&clientcmd.ConfigOverrides{
CurrentContext: contextName,
},
)
}
func isPodRunning(pod *core.Pod) bool {
return pod.Status.Phase == core.PodRunning
}
// We added this after a customer tried to run mizu from lens, which used len's kube config, which have cluster server configuration, which points to len's local proxy.
// The workaround was to use the user's local default kube config.
// For now - we are blocking the option to run mizu through a proxy to k8s server
func validateNotProxy(kubernetesConfig clientcmd.ClientConfig, restClientConfig *restclient.Config) error {
kubernetesUrl, err := url.Parse(restClientConfig.Host)
if err != nil {
logger.Log.Debugf("validateNotProxy - error while parsing kubernetes host, err: %v", err)
return nil
}
restProxyClientConfig, _ := kubernetesConfig.ClientConfig()
restProxyClientConfig.Host = kubernetesUrl.Host
clientProxySet, err := getClientSet(restProxyClientConfig)
if err == nil {
proxyServerVersion, err := clientProxySet.ServerVersion()
if err != nil {
return nil
}
if *proxyServerVersion == (version.Info{}) {
return &ClusterBehindProxyError{}
}
}
return nil
}
func validateKubernetesVersion(clientSet *kubernetes.Clientset) error {
serverVersion, err := clientSet.ServerVersion()
if err != nil {
logger.Log.Debugf("error while getting kubernetes server version, err: %v", err)
return nil
}
serverVersionSemVer := semver.SemVersion(serverVersion.GitVersion)
minKubernetesServerVersionSemVer := semver.SemVersion(MinKubernetesServerVersion)
if minKubernetesServerVersionSemVer.GreaterThan(serverVersionSemVer) {
return fmt.Errorf("kubernetes server version %v is not supported, supporting only kubernetes server version of %v or higher", serverVersion.GitVersion, MinKubernetesServerVersion)
}
return nil
}

View File

@@ -0,0 +1,67 @@
package kubernetes
import (
"fmt"
"net"
"net/http"
"strings"
"time"
"github.com/up9inc/mizu/shared/logger"
"k8s.io/kubectl/pkg/proxy"
)
const k8sProxyApiPrefix = "/"
const mizuServicePort = 80
func StartProxy(kubernetesProvider *Provider, proxyHost string, mizuPort uint16, mizuNamespace string, mizuServiceName string) error {
logger.Log.Debugf("Starting proxy. namespace: [%v], service name: [%s], port: [%v]", mizuNamespace, mizuServiceName, mizuPort)
filter := &proxy.FilterServer{
AcceptPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathAcceptRE),
RejectPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathRejectRE),
AcceptHosts: proxy.MakeRegexpArrayOrDie("^.*"),
RejectMethods: proxy.MakeRegexpArrayOrDie(proxy.DefaultMethodRejectRE),
}
proxyHandler, err := proxy.NewProxyHandler(k8sProxyApiPrefix, filter, &kubernetesProvider.clientConfig, time.Second*2)
if err != nil {
return err
}
mux := http.NewServeMux()
mux.Handle(k8sProxyApiPrefix, proxyHandler)
mux.Handle("/static/", getRerouteHttpHandlerMizuStatic(proxyHandler, mizuNamespace, mizuServiceName))
mux.Handle("/mizu/", getRerouteHttpHandlerMizuAPI(proxyHandler, mizuNamespace, mizuServiceName))
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", proxyHost, int(mizuPort)))
if err != nil {
return err
}
server := http.Server{
Handler: mux,
}
return server.Serve(l)
}
func getMizuApiServerProxiedHostAndPath(mizuNamespace string, mizuServiceName string) string {
return fmt.Sprintf("/api/v1/namespaces/%s/services/%s:%d/proxy/", mizuNamespace, mizuServiceName, mizuServicePort)
}
func GetMizuApiServerProxiedHostAndPath(mizuPort uint16) string {
return fmt.Sprintf("localhost:%d/mizu", mizuPort)
}
func getRerouteHttpHandlerMizuAPI(proxyHandler http.Handler, mizuNamespace string, mizuServiceName string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = strings.Replace(r.URL.Path, "/mizu/", getMizuApiServerProxiedHostAndPath(mizuNamespace, mizuServiceName), 1)
proxyHandler.ServeHTTP(w, r)
})
}
func getRerouteHttpHandlerMizuStatic(proxyHandler http.Handler, mizuNamespace string, mizuServiceName string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = strings.Replace(r.URL.Path, "/static/", fmt.Sprintf("%s/static/", getMizuApiServerProxiedHostAndPath(mizuNamespace, mizuServiceName)), 1)
proxyHandler.ServeHTTP(w, r)
})
}

View File

@@ -0,0 +1,57 @@
package kubernetes
import (
core "k8s.io/api/core/v1"
"regexp"
)
func GetNodeHostToTappedPodIpsMap(tappedPods []core.Pod) map[string][]string {
nodeToTappedPodIPMap := make(map[string][]string, 0)
for _, pod := range tappedPods {
existingList := nodeToTappedPodIPMap[pod.Spec.NodeName]
if existingList == nil {
nodeToTappedPodIPMap[pod.Spec.NodeName] = []string{pod.Status.PodIP}
} else {
nodeToTappedPodIPMap[pod.Spec.NodeName] = append(nodeToTappedPodIPMap[pod.Spec.NodeName], pod.Status.PodIP)
}
}
return nodeToTappedPodIPMap
}
func excludeMizuPods(pods []core.Pod) []core.Pod {
mizuPrefixRegex := regexp.MustCompile("^" + MizuResourcesPrefix)
nonMizuPods := make([]core.Pod, 0)
for _, pod := range pods {
if !mizuPrefixRegex.MatchString(pod.Name) {
nonMizuPods = append(nonMizuPods, pod)
}
}
return nonMizuPods
}
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
added = getMissingPods(newPods, oldPods)
removed = getMissingPods(oldPods, newPods)
return added, removed
}
//returns pods present in pods1 array and missing in pods2 array
func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
missingPods := make([]core.Pod, 0)
for _, pod1 := range pods1 {
var found = false
for _, pod2 := range pods2 {
if pod1.UID == pod2.UID {
found = true
break
}
}
if !found {
missingPods = append(missingPods, pod1)
}
}
return missingPods
}

109
shared/kubernetes/watch.go Normal file
View File

@@ -0,0 +1,109 @@
package kubernetes
import (
"context"
"errors"
"fmt"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"regexp"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/watch"
)
func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetNamespaces []string, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) {
addedChan := make(chan *corev1.Pod)
modifiedChan := make(chan *corev1.Pod)
removedChan := make(chan *corev1.Pod)
errorChan := make(chan error)
var wg sync.WaitGroup
for _, targetNamespace := range targetNamespaces {
wg.Add(1)
go func(targetNamespace string) {
defer wg.Done()
watchRestartDebouncer := debounce.NewDebouncer(1 * time.Minute, func() {})
for {
watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace)
err := startWatchLoop(ctx, watcher, podFilter, addedChan, modifiedChan, removedChan) // blocking
watcher.Stop()
select {
case <- ctx.Done():
return
default:
break
}
if err != nil {
errorChan <- fmt.Errorf("error in k8 watch: %v", err)
break
} else {
if !watchRestartDebouncer.IsOn() {
watchRestartDebouncer.SetOn()
logger.Log.Debug("k8s watch channel closed, restarting watcher")
time.Sleep(time.Second * 5)
continue
} else {
errorChan <- errors.New("k8s watch unstable, closes frequently")
break
}
}
}
}(targetNamespace)
}
go func() {
<-ctx.Done()
wg.Wait()
close(addedChan)
close(modifiedChan)
close(removedChan)
close(errorChan)
}()
return addedChan, modifiedChan, removedChan, errorChan
}
func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp, addedChan chan *corev1.Pod, modifiedChan chan *corev1.Pod, removedChan chan *corev1.Pod) error {
resultChan := watcher.ResultChan()
for {
select {
case e, isChannelOpen := <-resultChan:
if !isChannelOpen {
return nil
}
if e.Type == watch.Error {
return apierrors.FromObject(e.Object)
}
pod, ok := e.Object.(*corev1.Pod)
if !ok {
continue
}
if !podFilter.MatchString(pod.Name) {
continue
}
switch e.Type {
case watch.Added:
addedChan <- pod
case watch.Modified:
modifiedChan <- pod
case watch.Deleted:
removedChan <- pod
}
case <-ctx.Done():
return nil
}
}
}