mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-29 17:49:40 +00:00
🔨 Move apiserver
package into internal/connect
This commit is contained in:
parent
fa5a87b9d5
commit
919fe81982
@ -6,8 +6,8 @@ import (
|
||||
"log"
|
||||
"regexp"
|
||||
|
||||
"github.com/kubeshark/kubeshark/apiserver"
|
||||
"github.com/kubeshark/kubeshark/config"
|
||||
"github.com/kubeshark/kubeshark/internal/connect"
|
||||
"github.com/kubeshark/kubeshark/kubernetes"
|
||||
"github.com/kubeshark/kubeshark/utils"
|
||||
)
|
||||
@ -17,8 +17,8 @@ func ServerConnection(kubernetesProvider *kubernetes.Provider) bool {
|
||||
|
||||
serverUrl := kubernetes.GetLocalhostOnPort(config.Config.Hub.PortForward.SrcPort)
|
||||
|
||||
apiServerProvider := apiserver.NewProvider(serverUrl, 1, apiserver.DefaultTimeout)
|
||||
if err := apiServerProvider.TestConnection(""); err == nil {
|
||||
connector := connect.NewConnector(serverUrl, 1, connect.DefaultTimeout)
|
||||
if err := connector.TestConnection(""); err == nil {
|
||||
log.Printf("%v found Kubeshark server tunnel available and connected successfully to API server", fmt.Sprintf(utils.Green, "√"))
|
||||
return true
|
||||
}
|
||||
@ -51,8 +51,8 @@ func checkProxy(serverUrl string, kubernetesProvider *kubernetes.Provider) error
|
||||
return err
|
||||
}
|
||||
|
||||
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiServerProvider.TestConnection(""); err != nil {
|
||||
connector := connect.NewConnector(serverUrl, connect.DefaultRetries, connect.DefaultTimeout)
|
||||
if err := connector.TestConnection(""); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -73,8 +73,8 @@ func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider)
|
||||
return err
|
||||
}
|
||||
|
||||
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiServerProvider.TestConnection(""); err != nil {
|
||||
connector := connect.NewConnector(serverUrl, connect.DefaultRetries, connect.DefaultTimeout)
|
||||
if err := connector.TestConnection(""); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -10,9 +10,9 @@ import (
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/kubeshark/kubeshark/apiserver"
|
||||
"github.com/kubeshark/kubeshark/config/configStructs"
|
||||
"github.com/kubeshark/kubeshark/errormessage"
|
||||
"github.com/kubeshark/kubeshark/internal/connect"
|
||||
"github.com/kubeshark/kubeshark/kubeshark"
|
||||
"github.com/kubeshark/kubeshark/kubeshark/fsUtils"
|
||||
"github.com/kubeshark/kubeshark/resources"
|
||||
@ -32,8 +32,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
return
|
||||
}
|
||||
|
||||
provider := apiserver.NewProvider(kubernetes.GetLocalhostOnPort(srcPort), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := provider.TestConnection(healthCheck); err != nil {
|
||||
connector := connect.NewConnector(kubernetes.GetLocalhostOnPort(srcPort), connect.DefaultRetries, connect.DefaultTimeout)
|
||||
if err := connector.TestConnection(healthCheck); err != nil {
|
||||
log.Printf("Couldn't connect using proxy, stopping proxy and trying to create port-forward")
|
||||
if err := httpServer.Shutdown(ctx); err != nil {
|
||||
log.Printf("Error occurred while stopping proxy %v", errormessage.FormatError(err))
|
||||
@ -47,8 +47,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
return
|
||||
}
|
||||
|
||||
provider = apiserver.NewProvider(kubernetes.GetLocalhostOnPort(srcPort), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := provider.TestConnection(healthCheck); err != nil {
|
||||
connector = connect.NewConnector(kubernetes.GetLocalhostOnPort(srcPort), connect.DefaultRetries, connect.DefaultTimeout)
|
||||
if err := connector.TestConnection(healthCheck); err != nil {
|
||||
log.Printf(utils.Error, fmt.Sprintf("Couldn't connect to [%s].", serviceName))
|
||||
cancel()
|
||||
return
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/kubeshark/kubeshark/internal/connect"
|
||||
"github.com/kubeshark/kubeshark/resources"
|
||||
"github.com/kubeshark/kubeshark/utils"
|
||||
|
||||
@ -16,7 +17,6 @@ import (
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/kubeshark/kubeshark/apiserver"
|
||||
"github.com/kubeshark/kubeshark/cmd/goUtils"
|
||||
"github.com/kubeshark/kubeshark/config"
|
||||
"github.com/kubeshark/kubeshark/config/configStructs"
|
||||
@ -35,7 +35,7 @@ type tapState struct {
|
||||
}
|
||||
|
||||
var state tapState
|
||||
var apiProvider *apiserver.Provider
|
||||
var connector *connect.Connector
|
||||
var apiServerPodReady bool
|
||||
var frontPodReady bool
|
||||
var proxyDone bool
|
||||
@ -43,7 +43,7 @@ var proxyDone bool
|
||||
func RunKubesharkTap() {
|
||||
state.startTime = time.Now()
|
||||
|
||||
apiProvider = apiserver.NewProvider(kubernetes.GetLocalhostOnPort(config.Config.Hub.PortForward.SrcPort), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
connector = connect.NewProvider(kubernetes.GetLocalhostOnPort(config.Config.Hub.PortForward.SrcPort), connect.DefaultRetries, connect.DefaultTimeout)
|
||||
|
||||
kubernetesProvider, err := getKubernetesProviderForCli()
|
||||
if err != nil {
|
||||
|
@ -6,9 +6,9 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/kubeshark/kubeshark/internal/connect"
|
||||
"github.com/kubeshark/kubeshark/utils"
|
||||
|
||||
"github.com/kubeshark/kubeshark/apiserver"
|
||||
"github.com/kubeshark/kubeshark/config"
|
||||
"github.com/kubeshark/kubeshark/kubernetes"
|
||||
)
|
||||
@ -48,8 +48,8 @@ func runKubesharkView() {
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.FrontServiceName, config.Config.Front.PortForward.SrcPort, config.Config.Front.PortForward.DstPort, "")
|
||||
}
|
||||
|
||||
apiServerProvider := apiserver.NewProvider(url, apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiServerProvider.TestConnection(""); err != nil {
|
||||
connector := connect.NewConnector(url, connect.DefaultRetries, connect.DefaultTimeout)
|
||||
if err := connector.TestConnection(""); err != nil {
|
||||
log.Printf(utils.Error, "Couldn't connect to API server.")
|
||||
return
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package apiserver
|
||||
package connect
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
@ -15,7 +15,7 @@ import (
|
||||
core "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
type Provider struct {
|
||||
type Connector struct {
|
||||
url string
|
||||
retries int
|
||||
client *http.Client
|
||||
@ -24,8 +24,8 @@ type Provider struct {
|
||||
const DefaultRetries = 3
|
||||
const DefaultTimeout = 2 * time.Second
|
||||
|
||||
func NewProvider(url string, retries int, timeout time.Duration) *Provider {
|
||||
return &Provider{
|
||||
func NewConnector(url string, retries int, timeout time.Duration) *Connector {
|
||||
return &Connector{
|
||||
url: url,
|
||||
retries: config.GetIntEnvConfig(config.ApiServerRetries, retries),
|
||||
client: &http.Client{
|
||||
@ -34,10 +34,10 @@ func NewProvider(url string, retries int, timeout time.Duration) *Provider {
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *Provider) TestConnection(path string) error {
|
||||
retriesLeft := provider.retries
|
||||
func (connector *Connector) TestConnection(path string) error {
|
||||
retriesLeft := connector.retries
|
||||
for retriesLeft > 0 {
|
||||
if isReachable, err := provider.isReachable(path); err != nil || !isReachable {
|
||||
if isReachable, err := connector.isReachable(path); err != nil || !isReachable {
|
||||
log.Printf("api server not ready yet %v", err)
|
||||
} else {
|
||||
log.Printf("connection test to api server passed successfully")
|
||||
@ -48,27 +48,27 @@ func (provider *Provider) TestConnection(path string) error {
|
||||
}
|
||||
|
||||
if retriesLeft == 0 {
|
||||
return fmt.Errorf("couldn't reach the api server after %v retries", provider.retries)
|
||||
return fmt.Errorf("couldn't reach the api server after %v retries", connector.retries)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) isReachable(path string) (bool, error) {
|
||||
targetUrl := fmt.Sprintf("%s%s", provider.url, path)
|
||||
if _, err := utils.Get(targetUrl, provider.client); err != nil {
|
||||
func (connector *Connector) isReachable(path string) (bool, error) {
|
||||
targetUrl := fmt.Sprintf("%s%s", connector.url, path)
|
||||
if _, err := utils.Get(targetUrl, connector.client); err != nil {
|
||||
return false, err
|
||||
} else {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *Provider) ReportTapperStatus(tapperStatus models.TapperStatus) error {
|
||||
tapperStatusUrl := fmt.Sprintf("%s/status/tapperStatus", provider.url)
|
||||
func (connector *Connector) ReportTapperStatus(tapperStatus models.TapperStatus) error {
|
||||
tapperStatusUrl := fmt.Sprintf("%s/status/tapperStatus", connector.url)
|
||||
|
||||
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapper status %w", err)
|
||||
} else {
|
||||
if _, err := utils.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue), provider.client); err != nil {
|
||||
if _, err := utils.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
|
||||
} else {
|
||||
log.Printf("Reported to server API about tapper status: %v", tapperStatus)
|
||||
@ -77,13 +77,13 @@ func (provider *Provider) ReportTapperStatus(tapperStatus models.TapperStatus) e
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
|
||||
tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", provider.url)
|
||||
func (connector *Connector) ReportTappedPods(pods []core.Pod) error {
|
||||
tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", connector.url)
|
||||
|
||||
if jsonValue, err := json.Marshal(pods); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapped pods %w", err)
|
||||
} else {
|
||||
if _, err := utils.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), provider.client); err != nil {
|
||||
if _, err := utils.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
|
||||
} else {
|
||||
log.Printf("Reported to server API about %d taped pods successfully", len(pods))
|
Loading…
Reference in New Issue
Block a user