mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-01 04:26:49 +00:00
332 lines
10 KiB
Go
332 lines
10 KiB
Go
package kubernetes
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/op/go-logging"
|
|
"github.com/up9inc/mizu/shared"
|
|
"github.com/up9inc/mizu/shared/debounce"
|
|
"github.com/up9inc/mizu/shared/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
core "k8s.io/api/core/v1"
|
|
"regexp"
|
|
"time"
|
|
)
|
|
|
|
const updateTappersDelay = 5 * time.Second
|
|
|
|
type TappedPodChangeEvent struct {
|
|
Added []core.Pod
|
|
Removed []core.Pod
|
|
}
|
|
|
|
// MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created
|
|
type MizuTapperSyncer struct {
|
|
startTime time.Time
|
|
context context.Context
|
|
CurrentlyTappedPods []core.Pod
|
|
config TapperSyncerConfig
|
|
kubernetesProvider *Provider
|
|
TapPodChangesOut chan TappedPodChangeEvent
|
|
TapperStatusChangedOut chan shared.TapperStatus
|
|
ErrorOut chan K8sTapManagerError
|
|
nodeToTappedPodMap map[string][]core.Pod
|
|
}
|
|
|
|
type TapperSyncerConfig struct {
|
|
TargetNamespaces []string
|
|
PodFilterRegex regexp.Regexp
|
|
MizuResourcesNamespace string
|
|
AgentImage string
|
|
TapperResources shared.Resources
|
|
ImagePullPolicy core.PullPolicy
|
|
LogLevel logging.Level
|
|
IgnoredUserAgents []string
|
|
MizuApiFilteringOptions api.TrafficFilteringOptions
|
|
MizuServiceAccountExists bool
|
|
Istio bool
|
|
}
|
|
|
|
func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Provider, config TapperSyncerConfig, startTime time.Time) (*MizuTapperSyncer, error) {
|
|
syncer := &MizuTapperSyncer{
|
|
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
|
|
context: ctx,
|
|
CurrentlyTappedPods: make([]core.Pod, 0),
|
|
config: config,
|
|
kubernetesProvider: kubernetesProvider,
|
|
TapPodChangesOut: make(chan TappedPodChangeEvent, 100),
|
|
TapperStatusChangedOut: make(chan shared.TapperStatus, 100),
|
|
ErrorOut: make(chan K8sTapManagerError, 100),
|
|
}
|
|
|
|
if err, _ := syncer.updateCurrentlyTappedPods(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := syncer.updateMizuTappers(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go syncer.watchPodsForTapping()
|
|
go syncer.watchTapperEvents()
|
|
go syncer.watchTapperPods()
|
|
return syncer, nil
|
|
}
|
|
|
|
func (tapperSyncer *MizuTapperSyncer) watchTapperPods() {
|
|
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName))
|
|
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, mizuResourceRegex)
|
|
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, []string{tapperSyncer.config.MizuResourcesNamespace}, podWatchHelper)
|
|
|
|
for {
|
|
select {
|
|
case wEvent, ok := <-eventChan:
|
|
if !ok {
|
|
eventChan = nil
|
|
continue
|
|
}
|
|
|
|
pod, err := wEvent.ToPod()
|
|
if err != nil {
|
|
logger.Log.Debugf("[ERROR] parsing Mizu resource pod: %+v", err)
|
|
continue
|
|
}
|
|
|
|
if tapperSyncer.startTime.After(pod.CreationTimestamp.Time) {
|
|
continue
|
|
}
|
|
|
|
logger.Log.Debugf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase)
|
|
if pod.Spec.NodeName != "" {
|
|
tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)}
|
|
tapperSyncer.TapperStatusChangedOut <- tapperStatus
|
|
}
|
|
|
|
case err, ok := <-errorChan:
|
|
if !ok {
|
|
errorChan = nil
|
|
continue
|
|
}
|
|
logger.Log.Debugf("[ERROR] Watching tapper pods loop, error: %+v", err)
|
|
|
|
case <-tapperSyncer.context.Done():
|
|
logger.Log.Debugf("Watching tapper pods loop, ctx done")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() {
|
|
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", TapperPodName))
|
|
eventWatchHelper := NewEventWatchHelper(tapperSyncer.kubernetesProvider, mizuResourceRegex, "pod")
|
|
eventChan, errorChan := FilteredWatch(tapperSyncer.context, eventWatchHelper, []string{tapperSyncer.config.MizuResourcesNamespace}, eventWatchHelper)
|
|
|
|
for {
|
|
select {
|
|
case wEvent, ok := <-eventChan:
|
|
if !ok {
|
|
eventChan = nil
|
|
continue
|
|
}
|
|
|
|
event, err := wEvent.ToEvent()
|
|
if err != nil {
|
|
logger.Log.Debugf("[ERROR] parsing Mizu resource event: %+v", err)
|
|
continue
|
|
}
|
|
|
|
if tapperSyncer.startTime.After(event.CreationTimestamp.Time) {
|
|
continue
|
|
}
|
|
|
|
logger.Log.Debugf(
|
|
fmt.Sprintf("Watching tapper events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s",
|
|
event.Name,
|
|
event.CreationTimestamp.Time,
|
|
event.Regarding.Name,
|
|
event.Regarding.Kind,
|
|
event.Reason,
|
|
event.Note))
|
|
|
|
pod, err1 := tapperSyncer.kubernetesProvider.GetPod(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, event.Regarding.Name)
|
|
if err1 != nil {
|
|
logger.Log.Debugf(fmt.Sprintf("Couldn't get tapper pod %s", event.Regarding.Name))
|
|
continue
|
|
}
|
|
|
|
nodeName := ""
|
|
if event.Reason != "FailedScheduling" {
|
|
nodeName = pod.Spec.NodeName
|
|
} else {
|
|
nodeName = pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values[0]
|
|
}
|
|
|
|
tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: nodeName, Status: string(pod.Status.Phase)}
|
|
tapperSyncer.TapperStatusChangedOut <- tapperStatus
|
|
|
|
case err, ok := <-errorChan:
|
|
if !ok {
|
|
errorChan = nil
|
|
continue
|
|
}
|
|
|
|
logger.Log.Debugf("[ERROR] Watching tapper events loop, error: %+v", err)
|
|
|
|
case <-tapperSyncer.context.Done():
|
|
logger.Log.Debugf("Watching tapper events loop, ctx done")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
|
|
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
|
|
eventChan, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
|
|
|
|
restartTappers := func() {
|
|
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
|
|
if err != nil {
|
|
tapperSyncer.ErrorOut <- K8sTapManagerError{
|
|
OriginalError: err,
|
|
TapManagerReason: TapManagerPodListError,
|
|
}
|
|
}
|
|
|
|
if !changeFound {
|
|
logger.Log.Debugf("Nothing changed update tappers not needed")
|
|
return
|
|
}
|
|
if err := tapperSyncer.updateMizuTappers(); err != nil {
|
|
tapperSyncer.ErrorOut <- K8sTapManagerError{
|
|
OriginalError: err,
|
|
TapManagerReason: TapManagerTapperUpdateError,
|
|
}
|
|
}
|
|
}
|
|
restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers)
|
|
|
|
for {
|
|
select {
|
|
case wEvent, ok := <-eventChan:
|
|
if !ok {
|
|
eventChan = nil
|
|
continue
|
|
}
|
|
|
|
pod, err := wEvent.ToPod()
|
|
if err != nil {
|
|
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
|
|
continue
|
|
}
|
|
|
|
switch wEvent.Type {
|
|
case EventAdded:
|
|
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
|
restartTappersDebouncer.SetOn()
|
|
case EventDeleted:
|
|
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
|
restartTappersDebouncer.SetOn()
|
|
case EventModified:
|
|
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
|
|
// Act only if the modified pod has already obtained an IP address.
|
|
// After filtering for IPs, on a normal pod restart this includes the following events:
|
|
// - Pod deletion
|
|
// - Pod reaches start state
|
|
// - Pod reaches ready state
|
|
// Ready/unready transitions might also trigger this event.
|
|
if pod.Status.PodIP != "" {
|
|
restartTappersDebouncer.SetOn()
|
|
}
|
|
case EventBookmark:
|
|
break
|
|
case EventError:
|
|
break
|
|
}
|
|
case err, ok := <-errorChan:
|
|
if !ok {
|
|
errorChan = nil
|
|
continue
|
|
}
|
|
|
|
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
|
|
continue
|
|
|
|
case <-tapperSyncer.context.Done():
|
|
logger.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`")
|
|
restartTappersDebouncer.Cancel()
|
|
// TODO: Does this also perform cleanup?
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tapperSyncer *MizuTapperSyncer) handleErrorInWatchLoop(err error, restartTappersDebouncer *debounce.Debouncer) {
|
|
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
|
|
restartTappersDebouncer.Cancel()
|
|
tapperSyncer.ErrorOut <- K8sTapManagerError{
|
|
OriginalError: err,
|
|
TapManagerReason: TapManagerPodWatchError,
|
|
}
|
|
}
|
|
|
|
func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, changesFound bool) {
|
|
if matchingPods, err := tapperSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(tapperSyncer.context, &tapperSyncer.config.PodFilterRegex, tapperSyncer.config.TargetNamespaces); err != nil {
|
|
return err, false
|
|
} else {
|
|
podsToTap := excludeMizuPods(matchingPods)
|
|
addedPods, removedPods := getPodArrayDiff(tapperSyncer.CurrentlyTappedPods, podsToTap)
|
|
for _, addedPod := range addedPods {
|
|
logger.Log.Debugf("tapping new pod %s", addedPod.Name)
|
|
}
|
|
for _, removedPod := range removedPods {
|
|
logger.Log.Debugf("pod %s is no longer running, tapping for it stopped", removedPod.Name)
|
|
}
|
|
if len(addedPods) > 0 || len(removedPods) > 0 {
|
|
tapperSyncer.CurrentlyTappedPods = podsToTap
|
|
tapperSyncer.nodeToTappedPodMap = GetNodeHostToTappedPodsMap(tapperSyncer.CurrentlyTappedPods)
|
|
tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{
|
|
Added: addedPods,
|
|
Removed: removedPods,
|
|
}
|
|
return nil, true
|
|
}
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
|
if len(tapperSyncer.nodeToTappedPodMap) > 0 {
|
|
var serviceAccountName string
|
|
if tapperSyncer.config.MizuServiceAccountExists {
|
|
serviceAccountName = ServiceAccountName
|
|
} else {
|
|
serviceAccountName = ""
|
|
}
|
|
|
|
if err := tapperSyncer.kubernetesProvider.ApplyMizuTapperDaemonSet(
|
|
tapperSyncer.context,
|
|
tapperSyncer.config.MizuResourcesNamespace,
|
|
TapperDaemonSetName,
|
|
tapperSyncer.config.AgentImage,
|
|
TapperPodName,
|
|
fmt.Sprintf("%s.%s.svc.cluster.local", ApiServerPodName, tapperSyncer.config.MizuResourcesNamespace),
|
|
tapperSyncer.nodeToTappedPodMap,
|
|
serviceAccountName,
|
|
tapperSyncer.config.TapperResources,
|
|
tapperSyncer.config.ImagePullPolicy,
|
|
tapperSyncer.config.MizuApiFilteringOptions,
|
|
tapperSyncer.config.LogLevel,
|
|
tapperSyncer.config.Istio,
|
|
); err != nil {
|
|
return err
|
|
}
|
|
logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodMap))
|
|
} else {
|
|
if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|