mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-17 07:16:09 +00:00
TRA-3842 daemon mode (#427)
* Update config.go, tapConfig.go, and models.go * WIP * Update go.sum * Update tapRunner.go * Update tap.go * WIP * WIP * Update Dockerfile, main.go, and 2 more files... * WIP * Update utils.go, tapClusterResourceManagement.go, and utils.go * Merge branch 'develop' * Update metadata_controller.go, utils.go, and 2 more files... * Update main.go, utils.go, and tapRunner.go * Update tapRunner.go * Update config.go, config.go, and models.go * Update main.go, main.go, and stats_provider_test.go * Update provider.go * bug fixes * Update main.go, metadata_controller.go, and 13 more files... * Update metadata_controller.go, status_controller.go, and 4 more files... * Update main.go, config.go, and 3 more files... * Update tapRunner.go * Update config.go, stats_provider_test.go, and consts.go
This commit is contained in:
@@ -3,7 +3,9 @@ package apiserver
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/shared/kubernetes"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -15,30 +17,30 @@ import (
|
||||
core "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
type apiServerProvider struct {
|
||||
type Provider struct {
|
||||
url string
|
||||
isReady bool
|
||||
retries int
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
var Provider = apiServerProvider{retries: config.GetIntEnvConfig(config.ApiServerRetries, 20)}
|
||||
const DefaultRetries = 20
|
||||
const DefaultTimeout = 5 * time.Second
|
||||
|
||||
func (provider *apiServerProvider) InitAndTestConnection(url string) error {
|
||||
healthUrl := fmt.Sprintf("%s/", url)
|
||||
func NewProvider(url string, retries int, timeout time.Duration) *Provider {
|
||||
return &Provider{
|
||||
url: url,
|
||||
retries: config.GetIntEnvConfig(config.ApiServerRetries, retries),
|
||||
client: &http.Client{
|
||||
Timeout: timeout,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *Provider) TestConnection() error {
|
||||
retriesLeft := provider.retries
|
||||
for retriesLeft > 0 {
|
||||
if response, err := http.Get(healthUrl); err != nil {
|
||||
logger.Log.Debugf("[ERROR] failed connecting to api server %v", err)
|
||||
} else if response.StatusCode != 200 {
|
||||
responseBody := ""
|
||||
data, readErr := ioutil.ReadAll(response.Body)
|
||||
if readErr == nil {
|
||||
responseBody = string(data)
|
||||
}
|
||||
|
||||
logger.Log.Debugf("can't connect to api server yet, response status code: %v, body: %v", response.StatusCode, responseBody)
|
||||
|
||||
response.Body.Close()
|
||||
if _, err := provider.GetHealthStatus(); err != nil {
|
||||
logger.Log.Debugf("[ERROR] api server not ready yet %v", err)
|
||||
} else {
|
||||
logger.Log.Debugf("connection test to api server passed successfully")
|
||||
break
|
||||
@@ -48,30 +50,38 @@ func (provider *apiServerProvider) InitAndTestConnection(url string) error {
|
||||
}
|
||||
|
||||
if retriesLeft == 0 {
|
||||
provider.isReady = false
|
||||
return fmt.Errorf("couldn't reach the api server after %v retries", provider.retries)
|
||||
}
|
||||
provider.url = url
|
||||
provider.isReady = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) ReportTappedPods(pods []core.Pod) error {
|
||||
if !provider.isReady {
|
||||
return fmt.Errorf("trying to reach api server when not initialized yet")
|
||||
func (provider *Provider) GetHealthStatus() (*shared.HealthResponse, error) {
|
||||
healthUrl := fmt.Sprintf("%s/status/health", provider.url)
|
||||
if response, err := provider.client.Get(healthUrl); err != nil {
|
||||
return nil, err
|
||||
} else if response.StatusCode > 299 {
|
||||
return nil, errors.New(fmt.Sprintf("status code: %d", response.StatusCode))
|
||||
} else {
|
||||
defer response.Body.Close()
|
||||
|
||||
healthResponse := &shared.HealthResponse{}
|
||||
if err := json.NewDecoder(response.Body).Decode(&healthResponse); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return healthResponse, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
|
||||
tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", provider.url)
|
||||
|
||||
podInfos := make([]shared.PodInfo, 0)
|
||||
for _, pod := range pods {
|
||||
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
|
||||
}
|
||||
podInfos := kubernetes.GetPodInfosForPods(pods)
|
||||
tapStatus := shared.TapStatus{Pods: podInfos}
|
||||
|
||||
if jsonValue, err := json.Marshal(tapStatus); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapped pods %w", err)
|
||||
} else {
|
||||
if response, err := http.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
|
||||
} else if response.StatusCode != 200 {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods, response status code %v", response.StatusCode)
|
||||
@@ -82,20 +92,17 @@ func (provider *apiServerProvider) ReportTappedPods(pods []core.Pod) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, error) {
|
||||
if !provider.isReady {
|
||||
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
|
||||
}
|
||||
func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) {
|
||||
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
|
||||
|
||||
response, requestErr := http.Get(generalStatsUrl)
|
||||
response, requestErr := provider.client.Get(generalStatsUrl)
|
||||
if requestErr != nil {
|
||||
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
|
||||
} else if response.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("failed to get general stats for telemetry, status code: %v", response.StatusCode)
|
||||
}
|
||||
|
||||
defer func() { _ = response.Body.Close() }()
|
||||
defer response.Body.Close()
|
||||
|
||||
data, readErr := ioutil.ReadAll(response.Body)
|
||||
if readErr != nil {
|
||||
@@ -109,16 +116,13 @@ func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, er
|
||||
return generalStats, nil
|
||||
}
|
||||
|
||||
func (provider *apiServerProvider) GetVersion() (string, error) {
|
||||
if !provider.isReady {
|
||||
return "", fmt.Errorf("trying to reach api server when not initialized yet")
|
||||
}
|
||||
func (provider *Provider) GetVersion() (string, error) {
|
||||
versionUrl, _ := url.Parse(fmt.Sprintf("%s/metadata/version", provider.url))
|
||||
req := &http.Request{
|
||||
Method: http.MethodGet,
|
||||
URL: versionUrl,
|
||||
}
|
||||
statusResp, err := http.DefaultClient.Do(req)
|
||||
statusResp, err := provider.client.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
Reference in New Issue
Block a user