🐛 Fix PostWorkerPodToHub and PostTargettedPodsToHub failure

This commit is contained in:
M. Mert Yildiran 2022-12-26 07:23:00 +03:00
parent 8235dc7d5f
commit 57257025d2
No known key found for this signature in database
GPG Key ID: DA5D6DCBB758A461
2 changed files with 31 additions and 20 deletions

View File

@ -174,17 +174,13 @@ func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider
log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop") log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop")
return return
} }
if err := connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods); err != nil { go connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods)
log.Error().Err(err).Msg("Failed to POST targetted pods to Hub.")
}
case pod, ok := <-workerSyncer.WorkerPodsChanges: case pod, ok := <-workerSyncer.WorkerPodsChanges:
if !ok { if !ok {
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop") log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
return return
} }
if err := connector.PostWorkerPodToHub(pod); err != nil { go connector.PostWorkerPodToHub(pod)
log.Error().Err(err).Msg("Failed to POST Worker pod to Hub.")
}
case <-ctx.Done(): case <-ctx.Done():
log.Debug().Msg("workerSyncer event listener loop exiting due to context done") log.Debug().Msg("workerSyncer event listener loop exiting due to context done")
return return

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"net/url"
"time" "time"
"github.com/kubeshark/kubeshark/utils" "github.com/kubeshark/kubeshark/utils"
@ -62,32 +63,46 @@ func (connector *Connector) isReachable(path string) (bool, error) {
} }
} }
func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) error { func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) {
postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url) postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url)
if podMarshalled, err := json.Marshal(pod); err != nil { if podMarshalled, err := json.Marshal(pod); err != nil {
return fmt.Errorf("Failed to marshal the Worker pod: %w", err) log.Error().Err(err).Msg("Failed to marshal the Worker pod:")
} else { } else {
if _, err := utils.Post(postWorkerUrl, "application/json", bytes.NewBuffer(podMarshalled), connector.client); err != nil { ok := false
return fmt.Errorf("Failed sending the Worker pod to Hub: %w", err) for !ok {
if _, err = utils.Post(postWorkerUrl, "application/json", bytes.NewBuffer(podMarshalled), connector.client); err != nil {
if _, ok := err.(*url.Error); ok {
break
}
log.Debug().Err(err).Msg("Failed sending the Worker pod to Hub:")
} else { } else {
ok = true
log.Debug().Interface("worker-pod", pod).Msg("Reported worker pod to Hub:") log.Debug().Interface("worker-pod", pod).Msg("Reported worker pod to Hub:")
return nil }
time.Sleep(1 * time.Second)
} }
} }
} }
func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) error { func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) {
postTargettedUrl := fmt.Sprintf("%s/pods/targetted", connector.url) postTargettedUrl := fmt.Sprintf("%s/pods/targetted", connector.url)
if podsMarshalled, err := json.Marshal(pods); err != nil { if podsMarshalled, err := json.Marshal(pods); err != nil {
return fmt.Errorf("Failed to marshal the targetted pods: %w", err) log.Error().Err(err).Msg("Failed to marshal the targetted pods:")
} else { } else {
if _, err := utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil { ok := false
return fmt.Errorf("Failed sending the targetted pods to Hub: %w", err) for !ok {
if _, err = utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
if _, ok := err.(*url.Error); ok {
break
}
log.Debug().Err(err).Msg("Failed sending the targetted pods to Hub:")
} else { } else {
ok = true
log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:") log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:")
return nil }
time.Sleep(1 * time.Second)
} }
} }
} }