Send worker pod to POST /pods/set-worker HTTP endpoint of Hub

This commit is contained in:
M. Mert Yildiran 2022-12-11 16:08:07 +03:00
parent 24ebd77183
commit bbf6bbcc22
No known key found for this signature in database
GPG Key ID: DA5D6DCBB758A461
3 changed files with 21 additions and 30 deletions

View File

@ -177,13 +177,13 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider
if err := connector.ReportTargettedPods(workerSyncer.CurrentlyTargettedPods); err != nil { if err := connector.ReportTargettedPods(workerSyncer.CurrentlyTargettedPods); err != nil {
log.Error().Err(err).Msg("failed update targetted pods.") log.Error().Err(err).Msg("failed update targetted pods.")
} }
case workerStatus, ok := <-workerSyncer.WorkerStatusChangedOut: case pod, ok := <-workerSyncer.WorkerPodsChanges:
if !ok { if !ok {
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop") log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
return return
} }
if err := connector.ReportWorkerStatus(workerStatus); err != nil { if err := connector.PostWorkerPodToHub(pod); err != nil {
log.Error().Err(err).Msg("failed update worker status.") log.Error().Err(err).Msg("Failed to POST Worker pod to Hub.")
} }
case <-ctx.Done(): case <-ctx.Done():
log.Debug().Msg("workerSyncer event listener loop exiting due to context done") log.Debug().Msg("workerSyncer event listener loop exiting due to context done")

View File

@ -7,12 +7,12 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/kubeshark/base/pkg/models"
"github.com/kubeshark/kubeshark/utils" "github.com/kubeshark/kubeshark/utils"
"github.com/kubeshark/kubeshark/config" "github.com/kubeshark/kubeshark/config"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
) )
type Connector struct { type Connector struct {
@ -62,16 +62,16 @@ func (connector *Connector) isReachable(path string) (bool, error) {
} }
} }
func (connector *Connector) ReportWorkerStatus(workerStatus models.WorkerStatus) error { func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) error {
workerStatusUrl := fmt.Sprintf("%s/status/workerStatus", connector.url) setWorkerUrl := fmt.Sprintf("%s/pods/set-worker", connector.url)
if jsonValue, err := json.Marshal(workerStatus); err != nil { if jsonValue, err := json.Marshal(pod); err != nil {
return fmt.Errorf("Failed Marshal the worker status %w", err) return fmt.Errorf("Failed to marshal the Worker pod: %w", err)
} else { } else {
if _, err := utils.Post(workerStatusUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil { if _, err := utils.Post(setWorkerUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil {
return fmt.Errorf("Failed sending to Hub the targetted pods %w", err) return fmt.Errorf("Failed sending the Worker pod to Hub: %w", err)
} else { } else {
log.Debug().Interface("worker-status", workerStatus).Msg("Reported to Hub about Worker status:") log.Debug().Interface("worker-pod", pod).Msg("Reported to Hub about Worker status:")
return nil return nil
} }
} }

View File

@ -13,25 +13,25 @@ import (
"github.com/kubeshark/kubeshark/utils" "github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
core "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
) )
const updateWorkersDelay = 5 * time.Second const updateWorkersDelay = 5 * time.Second
type TargettedPodChangeEvent struct { type TargettedPodChangeEvent struct {
Added []core.Pod Added []v1.Pod
Removed []core.Pod Removed []v1.Pod
} }
// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created // WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created
type WorkerSyncer struct { type WorkerSyncer struct {
startTime time.Time startTime time.Time
context context.Context context context.Context
CurrentlyTargettedPods []core.Pod CurrentlyTargettedPods []v1.Pod
config WorkerSyncerConfig config WorkerSyncerConfig
kubernetesProvider *Provider kubernetesProvider *Provider
DeployPodChangesOut chan TargettedPodChangeEvent DeployPodChangesOut chan TargettedPodChangeEvent
WorkerStatusChangedOut chan models.WorkerStatus WorkerPodsChanges chan *v1.Pod
ErrorOut chan K8sDeployManagerError ErrorOut chan K8sDeployManagerError
nodeToTargettedPodMap models.NodeToPodsMap nodeToTargettedPodMap models.NodeToPodsMap
targettedNodes []string targettedNodes []string
@ -42,7 +42,7 @@ type WorkerSyncerConfig struct {
PodFilterRegex regexp.Regexp PodFilterRegex regexp.Regexp
KubesharkResourcesNamespace string KubesharkResourcesNamespace string
WorkerResources models.Resources WorkerResources models.Resources
ImagePullPolicy core.PullPolicy ImagePullPolicy v1.PullPolicy
LogLevel zerolog.Level LogLevel zerolog.Level
KubesharkApiFilteringOptions api.TrafficFilteringOptions KubesharkApiFilteringOptions api.TrafficFilteringOptions
KubesharkServiceAccountExists bool KubesharkServiceAccountExists bool
@ -54,11 +54,11 @@ func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provide
syncer := &WorkerSyncer{ syncer := &WorkerSyncer{
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution. startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
context: ctx, context: ctx,
CurrentlyTargettedPods: make([]core.Pod, 0), CurrentlyTargettedPods: make([]v1.Pod, 0),
config: config, config: config,
kubernetesProvider: kubernetesProvider, kubernetesProvider: kubernetesProvider,
DeployPodChangesOut: make(chan TargettedPodChangeEvent, 100), DeployPodChangesOut: make(chan TargettedPodChangeEvent, 100),
WorkerStatusChangedOut: make(chan models.WorkerStatus, 100), WorkerPodsChanges: make(chan *v1.Pod, 100),
ErrorOut: make(chan K8sDeployManagerError, 100), ErrorOut: make(chan K8sDeployManagerError, 100),
} }
@ -101,8 +101,7 @@ func (workerSyncer *WorkerSyncer) watchWorkerPods() {
Interface("phase", pod.Status.Phase). Interface("phase", pod.Status.Phase).
Msg("Watching pod events...") Msg("Watching pod events...")
if pod.Spec.NodeName != "" { if pod.Spec.NodeName != "" {
workerStatus := models.WorkerStatus{Name: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)} workerSyncer.WorkerPodsChanges <- pod
workerSyncer.WorkerStatusChangedOut <- workerStatus
} }
case err, ok := <-errorChan: case err, ok := <-errorChan:
@ -159,15 +158,7 @@ func (workerSyncer *WorkerSyncer) watchWorkerEvents() {
continue continue
} }
nodeName := "" workerSyncer.WorkerPodsChanges <- pod
if event.Reason != "FailedScheduling" {
nodeName = pod.Spec.NodeName
} else {
nodeName = pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchFields[0].Values[0]
}
workerStatus := models.WorkerStatus{Name: pod.Name, NodeName: nodeName, Status: string(pod.Status.Phase)}
workerSyncer.WorkerStatusChangedOut <- workerStatus
case err, ok := <-errorChan: case err, ok := <-errorChan:
if !ok { if !ok {