From 57257025d2607967aff2c38dcea19e84e8438d28 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Mon, 26 Dec 2022 07:23:00 +0300 Subject: [PATCH] :bug: Fix `PostWorkerPodToHub` and `PostTargettedPodsToHub` failure --- cmd/tapRunner.go | 8 ++------ internal/connect/hub.go | 43 +++++++++++++++++++++++++++-------------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/cmd/tapRunner.go b/cmd/tapRunner.go index 5ca005bb8..c2c00e0ef 100644 --- a/cmd/tapRunner.go +++ b/cmd/tapRunner.go @@ -174,17 +174,13 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop") return } - if err := connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods); err != nil { - log.Error().Err(err).Msg("Failed to POST targetted pods to Hub.") - } + go connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods) case pod, ok := <-workerSyncer.WorkerPodsChanges: if !ok { log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop") return } - if err := connector.PostWorkerPodToHub(pod); err != nil { - log.Error().Err(err).Msg("Failed to POST Worker pod to Hub.") - } + go connector.PostWorkerPodToHub(pod) case <-ctx.Done(): log.Debug().Msg("workerSyncer event listener loop exiting due to context done") return diff --git a/internal/connect/hub.go b/internal/connect/hub.go index 57f54a66e..4507f049d 100644 --- a/internal/connect/hub.go +++ b/internal/connect/hub.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "time" "github.com/kubeshark/kubeshark/utils" @@ -62,32 +63,46 @@ func (connector *Connector) isReachable(path string) (bool, error) { } } -func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) error { +func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) { postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url) if podMarshalled, err := json.Marshal(pod); err != nil { - return fmt.Errorf("Failed to marshal the Worker pod: %w", err) + log.Error().Err(err).Msg("Failed to marshal the Worker pod:") } else { - if _, err := utils.Post(postWorkerUrl, "application/json", bytes.NewBuffer(podMarshalled), connector.client); err != nil { - return fmt.Errorf("Failed sending the Worker pod to Hub: %w", err) - } else { - log.Debug().Interface("worker-pod", pod).Msg("Reported worker pod to Hub:") - return nil + ok := false + for !ok { + if _, err = utils.Post(postWorkerUrl, "application/json", bytes.NewBuffer(podMarshalled), connector.client); err != nil { + if _, ok := err.(*url.Error); ok { + break + } + log.Debug().Err(err).Msg("Failed sending the Worker pod to Hub:") + } else { + ok = true + log.Debug().Interface("worker-pod", pod).Msg("Reported worker pod to Hub:") + } + time.Sleep(1 * time.Second) } } } -func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) error { +func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) { postTargettedUrl := fmt.Sprintf("%s/pods/targetted", connector.url) if podsMarshalled, err := json.Marshal(pods); err != nil { - return fmt.Errorf("Failed to marshal the targetted pods: %w", err) + log.Error().Err(err).Msg("Failed to marshal the targetted pods:") } else { - if _, err := utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil { - return fmt.Errorf("Failed sending the targetted pods to Hub: %w", err) - } else { - log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:") - return nil + ok := false + for !ok { + if _, err = utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil { + if _, ok := err.(*url.Error); ok { + break + } + log.Debug().Err(err).Msg("Failed sending the targetted pods to Hub:") + } else { + ok = true + log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:") + } + time.Sleep(1 * time.Second) } } }