Send pod info to tapper (#532)

This commit is contained in:
David Levanon
2021-12-16 10:51:03 +02:00
committed by GitHub
parent a06d5cfbde
commit ab029f4394
10 changed files with 96 additions and 46 deletions

View File

@@ -31,7 +31,7 @@ type MizuTapperSyncer struct {
TapPodChangesOut chan TappedPodChangeEvent
TapperStatusChangedOut chan shared.TapperStatus
ErrorOut chan K8sTapManagerError
nodeToTappedPodIPMap map[string][]string
nodeToTappedPodMap map[string][]core.Pod
}
type TapperSyncerConfig struct {
@@ -239,11 +239,11 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch
}
if len(addedPods) > 0 || len(removedPods) > 0 {
tapperSyncer.CurrentlyTappedPods = podsToTap
tapperSyncer.nodeToTappedPodIPMap = GetNodeHostToTappedPodIpsMap(tapperSyncer.CurrentlyTappedPods)
tapperSyncer.nodeToTappedPodMap = GetNodeHostToTappedPodsMap(tapperSyncer.CurrentlyTappedPods)
tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{
Added: addedPods,
Removed: removedPods,
ExpectedTapperAmount: len(tapperSyncer.nodeToTappedPodIPMap),
ExpectedTapperAmount: len(tapperSyncer.nodeToTappedPodMap),
}
return nil, true
}
@@ -252,7 +252,7 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch
}
func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
if len(tapperSyncer.nodeToTappedPodIPMap) > 0 {
if len(tapperSyncer.nodeToTappedPodMap) > 0 {
var serviceAccountName string
if tapperSyncer.config.MizuServiceAccountExists {
serviceAccountName = ServiceAccountName
@@ -267,7 +267,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
tapperSyncer.config.AgentImage,
TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", ApiServerPodName, tapperSyncer.config.MizuResourcesNamespace),
tapperSyncer.nodeToTappedPodIPMap,
tapperSyncer.nodeToTappedPodMap,
serviceAccountName,
tapperSyncer.config.TapperResources,
tapperSyncer.config.ImagePullPolicy,
@@ -277,7 +277,7 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
); err != nil {
return err
}
logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodIPMap))
logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodMap))
} else {
if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil {
return err

View File

@@ -597,14 +597,14 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
return nil
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, istio bool) error {
logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodMap map[string][]core.Pod, serviceAccountName string, resources shared.Resources, imagePullPolicy core.PullPolicy, mizuApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, istio bool) error {
logger.Log.Debugf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodMap), namespace, daemonSetName, podImage, tapperPodName)
if len(nodeToTappedPodIPMap) == 0 {
if len(nodeToTappedPodMap) == 0 {
return fmt.Errorf("daemon set %s must tap at least 1 pod", daemonSetName)
}
nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap)
nodeToTappedPodMapJsonStr, err := json.Marshal(nodeToTappedPodMap)
if err != nil {
return err
}
@@ -645,7 +645,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.LogLevelEnvVar).WithValue(logLevel.String()),
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
applyconfcore.EnvVar().WithName(shared.MizuFilteringOptionsEnvVar).WithValue(string(mizuApiFilteringOptionsJsonStr)),
)
@@ -683,8 +683,8 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentResources := applyconfcore.ResourceRequirements().WithRequests(agentResourceRequests).WithLimits(agentResourceLimits)
agentContainer.WithResources(agentResources)
nodeNames := make([]string, 0, len(nodeToTappedPodIPMap))
for nodeName := range nodeToTappedPodIPMap {
nodeNames := make([]string, 0, len(nodeToTappedPodMap))
for nodeName := range nodeToTappedPodMap {
nodeNames = append(nodeNames, nodeName)
}
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()

View File

@@ -1,22 +1,38 @@
package kubernetes
import (
"regexp"
"github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1"
"regexp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func GetNodeHostToTappedPodIpsMap(tappedPods []core.Pod) map[string][]string {
nodeToTappedPodIPMap := make(map[string][]string, 0)
func GetNodeHostToTappedPodsMap(tappedPods []core.Pod) map[string][]core.Pod {
nodeToTappedPodMap := make(map[string][]core.Pod, 0)
for _, pod := range tappedPods {
existingList := nodeToTappedPodIPMap[pod.Spec.NodeName]
minimizedPod := getMinimizedPod(pod)
existingList := nodeToTappedPodMap[pod.Spec.NodeName]
if existingList == nil {
nodeToTappedPodIPMap[pod.Spec.NodeName] = []string{pod.Status.PodIP}
nodeToTappedPodMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
} else {
nodeToTappedPodIPMap[pod.Spec.NodeName] = append(nodeToTappedPodIPMap[pod.Spec.NodeName], pod.Status.PodIP)
nodeToTappedPodMap[pod.Spec.NodeName] = append(nodeToTappedPodMap[pod.Spec.NodeName], minimizedPod)
}
}
return nodeToTappedPodIPMap
return nodeToTappedPodMap
}
func getMinimizedPod(fullPod core.Pod) core.Pod {
return core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fullPod.Name,
},
Status: v1.PodStatus{
PodIP: fullPod.Status.PodIP,
},
}
}
func excludeMizuPods(pods []core.Pod) []core.Pod {