API server stores tappers status (#531)

This commit is contained in:
Igor Gov
2021-12-15 14:52:49 +02:00
committed by GitHub
parent 0a915b3fe7
commit 90c210452d
11 changed files with 243 additions and 290 deletions

View File

@@ -3,6 +3,7 @@ package kubernetes
import (
"context"
"regexp"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
@@ -10,13 +11,15 @@ import (
type EventWatchHelper struct {
kubernetesProvider *Provider
NameRegexFilter *regexp.Regexp
NameRegexFilter *regexp.Regexp
Kind string
}
func NewEventWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp) *EventWatchHelper {
func NewEventWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp, kind string) *EventWatchHelper {
return &EventWatchHelper{
kubernetesProvider: kubernetesProvider,
NameRegexFilter: NameRegexFilter,
NameRegexFilter: NameRegexFilter,
Kind: kind,
}
}
@@ -31,6 +34,10 @@ func (wh *EventWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
return false, nil
}
if strings.ToLower(event.Regarding.Kind) != strings.ToLower(wh.Kind) {
return false, nil
}
return true, nil
}

View File

@@ -23,13 +23,15 @@ type TappedPodChangeEvent struct {
// MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created
type MizuTapperSyncer struct {
context context.Context
CurrentlyTappedPods []core.Pod
config TapperSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TappedPodChangeEvent
ErrorOut chan K8sTapManagerError
nodeToTappedPodIPMap map[string][]string
startTime time.Time
context context.Context
CurrentlyTappedPods []core.Pod
config TapperSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TappedPodChangeEvent
TapperStatusChangedOut chan shared.TapperStatus
ErrorOut chan K8sTapManagerError
nodeToTappedPodIPMap map[string][]string
}
type TapperSyncerConfig struct {
@@ -46,14 +48,16 @@ type TapperSyncerConfig struct {
Istio bool
}
func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig) (*MizuTapperSyncer, error) {
func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig, startTime time.Time) (*MizuTapperSyncer, error) {
syncer := &MizuTapperSyncer{
context: ctx,
CurrentlyTappedPods: make([]core.Pod, 0),
config: config,
kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TappedPodChangeEvent, 100),
ErrorOut: make(chan K8sTapManagerError, 100),
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
context: ctx,
CurrentlyTappedPods: make([]core.Pod, 0),
config: config,
kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TappedPodChangeEvent, 100),
TapperStatusChangedOut: make(chan shared.TapperStatus, 100),
ErrorOut: make(chan K8sTapManagerError, 100),
}
if err, _ := syncer.updateCurrentlyTappedPods(); err != nil {
@@ -65,9 +69,72 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
}
go syncer.watchPodsForTapping()
go syncer.watchTapperEvents()
return syncer, nil
}
func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() {
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName))
eventWatchHelper := NewEventWatchHelper(tapperSyncer.kubernetesProvider, mizuResourceRegex, "pod")
eventChan, errorChan := FilteredWatch(tapperSyncer.context, eventWatchHelper, []string{tapperSyncer.config.MizuResourcesNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
logger.Log.Errorf(fmt.Sprintf("Error parsing Mizu resource event: %+v", err))
}
if tapperSyncer.startTime.After(event.CreationTimestamp.Time) {
continue
}
logger.Log.Debugf(
fmt.Sprintf("Watching tapper events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s",
event.Name,
event.CreationTimestamp.Time,
event.Regarding.Name,
event.Regarding.Kind,
event.Reason,
event.Note))
pod, err1 := tapperSyncer.kubernetesProvider.GetPod(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, event.Regarding.Name)
if err1 != nil {
logger.Log.Debugf(fmt.Sprintf("Failed to get tapper pod %s", event.Regarding.Name))
continue
}
nodeName := ""
if event.Reason != "FailedScheduling" {
nodeName = pod.Spec.NodeName
} else {
nodeName = pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values[0]
}
taperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: event.Reason}
tapperSyncer.TapperStatusChangedOut <- taperStatus
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
logger.Log.Errorf("Watching tapper events loop, error: %+v", err)
case <-tapperSyncer.context.Done():
logger.Log.Debugf("Watching tapper events loop, ctx done")
return
}
}
}
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
@@ -108,7 +175,6 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
continue
}
switch wEvent.Type {
case EventAdded:
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)

View File

@@ -351,8 +351,8 @@ func (provider *Provider) CreateService(ctx context.Context, namespace string, s
}
func (provider *Provider) DoesServicesExist(ctx context.Context, namespace string, name string) (bool, error) {
resource, err := provider.clientSet.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(resource, err)
serviceResource, err := provider.clientSet.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(serviceResource, err)
}
func (provider *Provider) doesResourceExist(resource interface{}, err error) (bool, error) {
@@ -642,7 +642,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
"--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp),
"--nodefrag",
}
if istio {
mizuCmd = append(mizuCmd, "--procfs", procfsMountPath, "--istio")
}
@@ -653,13 +653,13 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithImagePullPolicy(imagePullPolicy)
caps := applyconfcore.Capabilities().WithDrop("ALL").WithAdd("NET_RAW").WithAdd("NET_ADMIN")
if istio {
caps = caps.WithAdd("SYS_ADMIN") // for reading /proc/PID/net/ns
caps = caps.WithAdd("SYS_PTRACE") // for setting netns to other process
caps = caps.WithAdd("SYS_ADMIN") // for reading /proc/PID/net/ns
caps = caps.WithAdd("SYS_PTRACE") // for setting netns to other process
caps = caps.WithAdd("DAC_OVERRIDE") // for reading /proc/PID/environ
}
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithCapabilities(caps))
agentContainer.WithCommand(mizuCmd...)
@@ -780,10 +780,10 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
return err
}
func (provider *Provider) ListAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) {
func (provider *Provider) listPodsImpl(ctx context.Context, regex *regexp.Regexp, namespaces []string, listOptions metav1.ListOptions) ([]core.Pod, error) {
var pods []core.Pod
for _, namespace := range namespaces {
namespacePods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
namespacePods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, listOptions)
if err != nil {
return nil, fmt.Errorf("failed to get pods in ns: [%s], %w", namespace, err)
}
@@ -800,6 +800,14 @@ func (provider *Provider) ListAllPodsMatchingRegex(ctx context.Context, regex *r
return matchingPods, nil
}
func (provider *Provider) ListAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) {
return provider.listPodsImpl(ctx, regex, namespaces, metav1.ListOptions{})
}
func (provider *Provider) GetPod(ctx context.Context, namespaces string, podName string) (*core.Pod, error) {
return provider.clientSet.CoreV1().Pods(namespaces).Get(ctx, podName, metav1.GetOptions{})
}
func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) {
pods, err := provider.ListAllPodsMatchingRegex(ctx, regex, namespaces)
if err != nil {

View File

@@ -57,11 +57,10 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
return missingPods
}
func GetPodInfosForPods(pods []core.Pod) []shared.PodInfo {
podInfos := make([]shared.PodInfo, 0)
for _, pod := range pods {
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName})
}
return podInfos
}