kubeshark/internal/connect/hub.go
M. Mert Yildiran 3d5c999be1
Make the scritps command directly use the K8s API without requiring a connector to Hub (#1615)
* Make the `scritps` command directly use the K8s API without requiring a connector to Hub

* Fix linter

* Fix linter

---------

Co-authored-by: Alon Girmonsky <1990761+alongir@users.noreply.github.com>
2024-09-23 10:11:44 -07:00

158 lines
4.2 KiB
Go

package connect
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"time"
"github.com/kubeshark/kubeshark/config"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
v1 "k8s.io/api/core/v1"
)
type Connector struct {
url string
retries int
client *http.Client
}
const DefaultRetries = 3
const DefaultTimeout = 2 * time.Second
const DefaultSleep = 1 * time.Second
func NewConnector(url string, retries int, timeout time.Duration) *Connector {
return &Connector{
url: url,
retries: retries,
client: &http.Client{
Timeout: timeout,
},
}
}
func (connector *Connector) TestConnection(path string) error {
retriesLeft := connector.retries
for retriesLeft > 0 {
if isReachable, err := connector.isReachable(path); err != nil || !isReachable {
log.Debug().Str("url", connector.url).Err(err).Msg("Not ready yet!")
} else {
log.Debug().Str("url", connector.url).Msg("Connection test passed successfully.")
break
}
retriesLeft -= 1
time.Sleep(5 * DefaultSleep)
}
if retriesLeft == 0 {
return fmt.Errorf("Couldn't reach the URL: %s after %d retries!", connector.url, connector.retries)
}
return nil
}
func (connector *Connector) isReachable(path string) (bool, error) {
targetUrl := fmt.Sprintf("%s%s", connector.url, path)
if _, err := utils.Get(targetUrl, connector.client); err != nil {
return false, err
} else {
return true, nil
}
}
func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) {
postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url)
if podMarshalled, err := json.Marshal(pod); err != nil {
log.Error().Err(err).Msg("Failed to marshal the Worker pod:")
} else {
ok := false
for !ok {
var resp *http.Response
if resp, err = utils.Post(postWorkerUrl, "application/json", bytes.NewBuffer(podMarshalled), connector.client, config.Config.License); err != nil || resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Warn().Err(err).Msg("Failed sending the Worker pod to Hub. Retrying...")
} else {
log.Debug().Interface("worker-pod", pod).Msg("Reported worker pod to Hub:")
return
}
time.Sleep(DefaultSleep)
}
}
}
type postLicenseRequest struct {
License string `json:"license"`
}
func (connector *Connector) PostLicense(license string) {
postLicenseUrl := fmt.Sprintf("%s/license", connector.url)
payload := postLicenseRequest{
License: license,
}
if payloadMarshalled, err := json.Marshal(payload); err != nil {
log.Error().Err(err).Msg("Failed to marshal the payload:")
} else {
ok := false
for !ok {
var resp *http.Response
if resp, err = utils.Post(postLicenseUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client, config.Config.License); err != nil || resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Warn().Err(err).Msg("Failed sending the license to Hub. Retrying...")
} else {
log.Debug().Str("license", license).Msg("Reported license to Hub:")
return
}
time.Sleep(DefaultSleep)
}
}
}
func (connector *Connector) PostPcapsMerge(out *os.File) {
postEnvUrl := fmt.Sprintf("%s/pcaps/merge", connector.url)
if envMarshalled, err := json.Marshal(map[string]string{"query": ""}); err != nil {
log.Error().Err(err).Msg("Failed to marshal the env:")
} else {
ok := false
for !ok {
var resp *http.Response
if resp, err = utils.Post(postEnvUrl, "application/json", bytes.NewBuffer(envMarshalled), connector.client, config.Config.License); err != nil || resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Warn().Err(err).Msg("Failed exported PCAP download. Retrying...")
} else {
defer resp.Body.Close()
// Check server response
if resp.StatusCode != http.StatusOK {
log.Error().Str("status", resp.Status).Err(err).Msg("Failed exported PCAP download.")
return
}
// Writer the body to file
_, err = io.Copy(out, resp.Body)
if err != nil {
log.Error().Err(err).Msg("Failed writing PCAP export:")
return
}
log.Info().Str("path", out.Name()).Msg("Downloaded exported PCAP:")
return
}
time.Sleep(DefaultSleep)
}
}
}