diff --git a/cmd/tapRunner.go b/cmd/tapRunner.go index 6b02d973b..e2b53bf5d 100644 --- a/cmd/tapRunner.go +++ b/cmd/tapRunner.go @@ -161,12 +161,6 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider return } go connector.PostTargetedPodsToHub(workerSyncer.CurrentlyTargetedPods) - case pod, ok := <-workerSyncer.WorkerPodsChanges: - if !ok { - log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop") - return - } - go connector.PostWorkerPodToHub(pod) case <-ctx.Done(): log.Debug().Msg("workerSyncer event listener loop exiting due to context done") return diff --git a/internal/connect/hub.go b/internal/connect/hub.go index 1631a084d..e1e0ee083 100644 --- a/internal/connect/hub.go +++ b/internal/connect/hub.go @@ -64,7 +64,6 @@ func (connector *Connector) isReachable(path string) (bool, error) { } func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) { - // TODO: This request is responsible for proxy_server.go:147] Error while proxying request: context canceled log postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url) if podMarshalled, err := json.Marshal(pod); err != nil { diff --git a/kubernetes/workerSyncer.go b/kubernetes/workerSyncer.go index 16927aad4..6217f5049 100644 --- a/kubernetes/workerSyncer.go +++ b/kubernetes/workerSyncer.go @@ -9,7 +9,6 @@ import ( "github.com/kubeshark/base/pkg/models" "github.com/kubeshark/kubeshark/debounce" "github.com/kubeshark/kubeshark/docker" - "github.com/kubeshark/kubeshark/misc" "github.com/kubeshark/kubeshark/utils" "github.com/rs/zerolog/log" v1 "k8s.io/api/core/v1" @@ -30,7 +29,6 @@ type WorkerSyncer struct { config WorkerSyncerConfig kubernetesProvider *Provider TapPodChangesOut chan TargetedPodChangeEvent - WorkerPodsChanges chan *v1.Pod ErrorOut chan K8sTapManagerError nodeToTargetedPodMap models.NodeToPodsMap targetedNodes []string @@ -57,7 +55,6 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide config: config, kubernetesProvider: kubernetesProvider, TapPodChangesOut: make(chan TargetedPodChangeEvent, 100), - WorkerPodsChanges: make(chan *v1.Pod, 100), ErrorOut: make(chan K8sTapManagerError, 100), } @@ -70,115 +67,9 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide } go syncer.watchPodsForTargeting() - go syncer.watchWorkerEvents() - go syncer.watchWorkerPods() return syncer, nil } -func (workerSyncer *WorkerSyncer) watchWorkerPods() { - selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName)) - podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex) - eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, []string{workerSyncer.config.SelfNamespace}, podWatchHelper) - - for { - select { - case wEvent, ok := <-eventChan: - if !ok { - eventChan = nil - continue - } - - pod, err := wEvent.ToPod() - if err != nil { - log.Error().Str("pod", WorkerPodName).Err(err).Msg(fmt.Sprintf("While parsing %s resource!", misc.Software)) - continue - } - - log.Debug(). - Str("pod", pod.Name). - Str("node", pod.Spec.NodeName). - Interface("phase", pod.Status.Phase). - Msg("Watching pod events...") - if pod.Spec.NodeName != "" { - workerSyncer.WorkerPodsChanges <- pod - } - - case err, ok := <-errorChan: - if !ok { - errorChan = nil - continue - } - log.Error().Str("pod", WorkerPodName).Err(err).Msg("While watching pod!") - - case <-workerSyncer.context.Done(): - log.Debug(). - Str("pod", WorkerPodName). - Msg("Watching pod, context done.") - return - } - } -} - -func (workerSyncer *WorkerSyncer) watchWorkerEvents() { - selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName)) - eventWatchHelper := NewEventWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex, "pod") - eventChan, errorChan := FilteredWatch(workerSyncer.context, eventWatchHelper, []string{workerSyncer.config.SelfNamespace}, eventWatchHelper) - - for { - select { - case wEvent, ok := <-eventChan: - if !ok { - eventChan = nil - continue - } - - event, err := wEvent.ToEvent() - if err != nil { - log.Error(). - Str("pod", WorkerPodName). - Err(err). - Msg("Parsing resource event.") - continue - } - - log.Debug(). - Str("pod", WorkerPodName). - Str("event", event.Name). - Time("time", event.CreationTimestamp.Time). - Str("name", event.Regarding.Name). - Str("kind", event.Regarding.Kind). - Str("reason", event.Reason). - Str("note", event.Note). - Msg("Watching events.") - - pod, err1 := workerSyncer.kubernetesProvider.GetPod(workerSyncer.context, workerSyncer.config.SelfNamespace, event.Regarding.Name) - if err1 != nil { - log.Error().Str("name", event.Regarding.Name).Msg("Couldn't get pod") - continue - } - - workerSyncer.WorkerPodsChanges <- pod - - case err, ok := <-errorChan: - if !ok { - errorChan = nil - continue - } - - log.Error(). - Str("pod", WorkerPodName). - Err(err). - Msg("While watching events.") - - case <-workerSyncer.context.Done(): - log.Debug(). - Str("pod", WorkerPodName). - Msg("Watching pod events, context done.") - return - } - } -} - func (workerSyncer *WorkerSyncer) watchPodsForTargeting() { podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex) eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)