diff --git a/cmd/deployRunner.go b/cmd/deployRunner.go index 6b46b4a7c..cdb0b42f9 100644 --- a/cmd/deployRunner.go +++ b/cmd/deployRunner.go @@ -177,13 +177,13 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider if err := connector.ReportTargettedPods(workerSyncer.CurrentlyTargettedPods); err != nil { log.Error().Err(err).Msg("failed update targetted pods.") } - case workerStatus, ok := <-workerSyncer.WorkerStatusChangedOut: + case pod, ok := <-workerSyncer.WorkerPodsChanges: if !ok { log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop") return } - if err := connector.ReportWorkerStatus(workerStatus); err != nil { - log.Error().Err(err).Msg("failed update worker status.") + if err := connector.PostWorkerPodToHub(pod); err != nil { + log.Error().Err(err).Msg("Failed to POST Worker pod to Hub.") } case <-ctx.Done(): log.Debug().Msg("workerSyncer event listener loop exiting due to context done") diff --git a/internal/connect/hub.go b/internal/connect/hub.go index 28425a3b8..8ebf67eaf 100644 --- a/internal/connect/hub.go +++ b/internal/connect/hub.go @@ -7,12 +7,12 @@ import ( "net/http" "time" - "github.com/kubeshark/base/pkg/models" "github.com/kubeshark/kubeshark/utils" "github.com/kubeshark/kubeshark/config" "github.com/rs/zerolog/log" core "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) type Connector struct { @@ -62,16 +62,16 @@ func (connector *Connector) isReachable(path string) (bool, error) { } } -func (connector *Connector) ReportWorkerStatus(workerStatus models.WorkerStatus) error { - workerStatusUrl := fmt.Sprintf("%s/status/workerStatus", connector.url) +func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) error { + setWorkerUrl := fmt.Sprintf("%s/pods/set-worker", connector.url) - if jsonValue, err := json.Marshal(workerStatus); err != nil { - return fmt.Errorf("Failed Marshal the worker status %w", err) + if jsonValue, err := json.Marshal(pod); err != nil { + return fmt.Errorf("Failed to marshal the Worker pod: %w", err) } else { - if _, err := utils.Post(workerStatusUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil { - return fmt.Errorf("Failed sending to Hub the targetted pods %w", err) + if _, err := utils.Post(setWorkerUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil { + return fmt.Errorf("Failed sending the Worker pod to Hub: %w", err) } else { - log.Debug().Interface("worker-status", workerStatus).Msg("Reported to Hub about Worker status:") + log.Debug().Interface("worker-pod", pod).Msg("Reported to Hub about Worker status:") return nil } } diff --git a/kubernetes/kubesharkTapperSyncer.go b/kubernetes/kubesharkTapperSyncer.go index f0887bd30..5936a1e35 100644 --- a/kubernetes/kubesharkTapperSyncer.go +++ b/kubernetes/kubesharkTapperSyncer.go @@ -13,25 +13,25 @@ import ( "github.com/kubeshark/kubeshark/utils" "github.com/rs/zerolog" "github.com/rs/zerolog/log" - core "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) const updateWorkersDelay = 5 * time.Second type TargettedPodChangeEvent struct { - Added []core.Pod - Removed []core.Pod + Added []v1.Pod + Removed []v1.Pod } // WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created type WorkerSyncer struct { startTime time.Time context context.Context - CurrentlyTargettedPods []core.Pod + CurrentlyTargettedPods []v1.Pod config WorkerSyncerConfig kubernetesProvider *Provider DeployPodChangesOut chan TargettedPodChangeEvent - WorkerStatusChangedOut chan models.WorkerStatus + WorkerPodsChanges chan *v1.Pod ErrorOut chan K8sDeployManagerError nodeToTargettedPodMap models.NodeToPodsMap targettedNodes []string @@ -42,7 +42,7 @@ type WorkerSyncerConfig struct { PodFilterRegex regexp.Regexp KubesharkResourcesNamespace string WorkerResources models.Resources - ImagePullPolicy core.PullPolicy + ImagePullPolicy v1.PullPolicy LogLevel zerolog.Level KubesharkApiFilteringOptions api.TrafficFilteringOptions KubesharkServiceAccountExists bool @@ -54,11 +54,11 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide syncer := &WorkerSyncer{ startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution. context: ctx, - CurrentlyTargettedPods: make([]core.Pod, 0), + CurrentlyTargettedPods: make([]v1.Pod, 0), config: config, kubernetesProvider: kubernetesProvider, DeployPodChangesOut: make(chan TargettedPodChangeEvent, 100), - WorkerStatusChangedOut: make(chan models.WorkerStatus, 100), + WorkerPodsChanges: make(chan *v1.Pod, 100), ErrorOut: make(chan K8sDeployManagerError, 100), } @@ -101,8 +101,7 @@ func (workerSyncer *WorkerSyncer) watchWorkerPods() { Interface("phase", pod.Status.Phase). Msg("Watching pod events...") if pod.Spec.NodeName != "" { - workerStatus := models.WorkerStatus{Name: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)} - workerSyncer.WorkerStatusChangedOut <- workerStatus + workerSyncer.WorkerPodsChanges <- pod } case err, ok := <-errorChan: @@ -159,15 +158,7 @@ func (workerSyncer *WorkerSyncer) watchWorkerEvents() { continue } - nodeName := "" - if event.Reason != "FailedScheduling" { - nodeName = pod.Spec.NodeName - } else { - nodeName = pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values[0] - } - - workerStatus := models.WorkerStatus{Name: pod.Name, NodeName: nodeName, Status: string(pod.Status.Phase)} - workerSyncer.WorkerStatusChangedOut <- workerStatus + workerSyncer.WorkerPodsChanges <- pod case err, ok := <-errorChan: if !ok {