mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-14 14:43:46 +00:00
⚡ Don't watch and POST Worker pods to Hub
This commit is contained in:
parent
f128ae3993
commit
846f253a03
@ -161,12 +161,6 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
go connector.PostTargetedPodsToHub(workerSyncer.CurrentlyTargetedPods)
|
go connector.PostTargetedPodsToHub(workerSyncer.CurrentlyTargetedPods)
|
||||||
case pod, ok := <-workerSyncer.WorkerPodsChanges:
|
|
||||||
if !ok {
|
|
||||||
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
go connector.PostWorkerPodToHub(pod)
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Debug().Msg("workerSyncer event listener loop exiting due to context done")
|
log.Debug().Msg("workerSyncer event listener loop exiting due to context done")
|
||||||
return
|
return
|
||||||
|
@ -64,7 +64,6 @@ func (connector *Connector) isReachable(path string) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) {
|
func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) {
|
||||||
// TODO: This request is responsible for proxy_server.go:147] Error while proxying request: context canceled log
|
|
||||||
postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url)
|
postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url)
|
||||||
|
|
||||||
if podMarshalled, err := json.Marshal(pod); err != nil {
|
if podMarshalled, err := json.Marshal(pod); err != nil {
|
||||||
|
@ -9,7 +9,6 @@ import (
|
|||||||
"github.com/kubeshark/base/pkg/models"
|
"github.com/kubeshark/base/pkg/models"
|
||||||
"github.com/kubeshark/kubeshark/debounce"
|
"github.com/kubeshark/kubeshark/debounce"
|
||||||
"github.com/kubeshark/kubeshark/docker"
|
"github.com/kubeshark/kubeshark/docker"
|
||||||
"github.com/kubeshark/kubeshark/misc"
|
|
||||||
"github.com/kubeshark/kubeshark/utils"
|
"github.com/kubeshark/kubeshark/utils"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
@ -30,7 +29,6 @@ type WorkerSyncer struct {
|
|||||||
config WorkerSyncerConfig
|
config WorkerSyncerConfig
|
||||||
kubernetesProvider *Provider
|
kubernetesProvider *Provider
|
||||||
TapPodChangesOut chan TargetedPodChangeEvent
|
TapPodChangesOut chan TargetedPodChangeEvent
|
||||||
WorkerPodsChanges chan *v1.Pod
|
|
||||||
ErrorOut chan K8sTapManagerError
|
ErrorOut chan K8sTapManagerError
|
||||||
nodeToTargetedPodMap models.NodeToPodsMap
|
nodeToTargetedPodMap models.NodeToPodsMap
|
||||||
targetedNodes []string
|
targetedNodes []string
|
||||||
@ -57,7 +55,6 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide
|
|||||||
config: config,
|
config: config,
|
||||||
kubernetesProvider: kubernetesProvider,
|
kubernetesProvider: kubernetesProvider,
|
||||||
TapPodChangesOut: make(chan TargetedPodChangeEvent, 100),
|
TapPodChangesOut: make(chan TargetedPodChangeEvent, 100),
|
||||||
WorkerPodsChanges: make(chan *v1.Pod, 100),
|
|
||||||
ErrorOut: make(chan K8sTapManagerError, 100),
|
ErrorOut: make(chan K8sTapManagerError, 100),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,115 +67,9 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide
|
|||||||
}
|
}
|
||||||
|
|
||||||
go syncer.watchPodsForTargeting()
|
go syncer.watchPodsForTargeting()
|
||||||
go syncer.watchWorkerEvents()
|
|
||||||
go syncer.watchWorkerPods()
|
|
||||||
return syncer, nil
|
return syncer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (workerSyncer *WorkerSyncer) watchWorkerPods() {
|
|
||||||
selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
|
|
||||||
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex)
|
|
||||||
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, []string{workerSyncer.config.SelfNamespace}, podWatchHelper)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case wEvent, ok := <-eventChan:
|
|
||||||
if !ok {
|
|
||||||
eventChan = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
pod, err := wEvent.ToPod()
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Str("pod", WorkerPodName).Err(err).Msg(fmt.Sprintf("While parsing %s resource!", misc.Software))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debug().
|
|
||||||
Str("pod", pod.Name).
|
|
||||||
Str("node", pod.Spec.NodeName).
|
|
||||||
Interface("phase", pod.Status.Phase).
|
|
||||||
Msg("Watching pod events...")
|
|
||||||
if pod.Spec.NodeName != "" {
|
|
||||||
workerSyncer.WorkerPodsChanges <- pod
|
|
||||||
}
|
|
||||||
|
|
||||||
case err, ok := <-errorChan:
|
|
||||||
if !ok {
|
|
||||||
errorChan = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Error().Str("pod", WorkerPodName).Err(err).Msg("While watching pod!")
|
|
||||||
|
|
||||||
case <-workerSyncer.context.Done():
|
|
||||||
log.Debug().
|
|
||||||
Str("pod", WorkerPodName).
|
|
||||||
Msg("Watching pod, context done.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (workerSyncer *WorkerSyncer) watchWorkerEvents() {
|
|
||||||
selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
|
|
||||||
eventWatchHelper := NewEventWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex, "pod")
|
|
||||||
eventChan, errorChan := FilteredWatch(workerSyncer.context, eventWatchHelper, []string{workerSyncer.config.SelfNamespace}, eventWatchHelper)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case wEvent, ok := <-eventChan:
|
|
||||||
if !ok {
|
|
||||||
eventChan = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
event, err := wEvent.ToEvent()
|
|
||||||
if err != nil {
|
|
||||||
log.Error().
|
|
||||||
Str("pod", WorkerPodName).
|
|
||||||
Err(err).
|
|
||||||
Msg("Parsing resource event.")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debug().
|
|
||||||
Str("pod", WorkerPodName).
|
|
||||||
Str("event", event.Name).
|
|
||||||
Time("time", event.CreationTimestamp.Time).
|
|
||||||
Str("name", event.Regarding.Name).
|
|
||||||
Str("kind", event.Regarding.Kind).
|
|
||||||
Str("reason", event.Reason).
|
|
||||||
Str("note", event.Note).
|
|
||||||
Msg("Watching events.")
|
|
||||||
|
|
||||||
pod, err1 := workerSyncer.kubernetesProvider.GetPod(workerSyncer.context, workerSyncer.config.SelfNamespace, event.Regarding.Name)
|
|
||||||
if err1 != nil {
|
|
||||||
log.Error().Str("name", event.Regarding.Name).Msg("Couldn't get pod")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
workerSyncer.WorkerPodsChanges <- pod
|
|
||||||
|
|
||||||
case err, ok := <-errorChan:
|
|
||||||
if !ok {
|
|
||||||
errorChan = nil
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Error().
|
|
||||||
Str("pod", WorkerPodName).
|
|
||||||
Err(err).
|
|
||||||
Msg("While watching events.")
|
|
||||||
|
|
||||||
case <-workerSyncer.context.Done():
|
|
||||||
log.Debug().
|
|
||||||
Str("pod", WorkerPodName).
|
|
||||||
Msg("Watching pod events, context done.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (workerSyncer *WorkerSyncer) watchPodsForTargeting() {
|
func (workerSyncer *WorkerSyncer) watchPodsForTargeting() {
|
||||||
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
|
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
|
||||||
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
|
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
|
||||||
|
Loading…
Reference in New Issue
Block a user