From 671aa783c52d96a3e08c72cae6b842c65fece0b0 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Sat, 26 Nov 2022 22:06:06 +0300 Subject: [PATCH] :hammer: Replace `ApiServer` naming with `Hub` --- cmd/check/kubernetesResources.go | 14 +++--- cmd/check/serverConnection.go | 24 +++++----- cmd/common.go | 2 +- cmd/tapRunner.go | 68 ++++++++++++++--------------- cmd/viewRunner.go | 8 ++-- config/configStructs/tapConfig.go | 2 +- config/envConfig.go | 4 +- internal/connect/hub.go | 20 ++++----- kubernetes/consts.go | 4 +- kubernetes/kubesharkTapperSyncer.go | 2 +- kubernetes/provider.go | 10 ++--- kubernetes/proxy.go | 8 ++-- resources/cleanResources.go | 8 ++-- resources/createResources.go | 24 +++++----- utils/http.go | 2 +- 15 files changed, 100 insertions(+), 100 deletions(-) diff --git a/cmd/check/kubernetesResources.go b/cmd/check/kubernetesResources.go index 002729639..e02d81dad 100644 --- a/cmd/check/kubernetesResources.go +++ b/cmd/check/kubernetesResources.go @@ -36,8 +36,8 @@ func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Pro allResourcesExist = checkResourceExist(kubernetes.ClusterRoleBindingName, "cluster role binding", exist, err) && allResourcesExist } - exist, err = kubernetesProvider.DoesServiceExist(ctx, config.Config.KubesharkResourcesNamespace, kubernetes.ApiServerServiceName) - allResourcesExist = checkResourceExist(kubernetes.ApiServerServiceName, "service", exist, err) && allResourcesExist + exist, err = kubernetesProvider.DoesServiceExist(ctx, config.Config.KubesharkResourcesNamespace, kubernetes.HubServiceName) + allResourcesExist = checkResourceExist(kubernetes.HubServiceName, "service", exist, err) && allResourcesExist allResourcesExist = checkPodResourcesExist(ctx, kubernetesProvider) && allResourcesExist @@ -45,18 +45,18 @@ func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Pro } func checkPodResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool { - if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.KubesharkResourcesNamespace, kubernetes.ApiServerPodName); err != nil { - log.Printf("%v error checking if '%v' pod is running, err: %v", fmt.Sprintf(utils.Red, "✗"), kubernetes.ApiServerPodName, err) + if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.KubesharkResourcesNamespace, kubernetes.HubPodName); err != nil { + log.Printf("%v error checking if '%v' pod is running, err: %v", fmt.Sprintf(utils.Red, "✗"), kubernetes.HubPodName, err) return false } else if len(pods) == 0 { - log.Printf("%v '%v' pod doesn't exist", fmt.Sprintf(utils.Red, "✗"), kubernetes.ApiServerPodName) + log.Printf("%v '%v' pod doesn't exist", fmt.Sprintf(utils.Red, "✗"), kubernetes.HubPodName) return false } else if !kubernetes.IsPodRunning(&pods[0]) { - log.Printf("%v '%v' pod not running", fmt.Sprintf(utils.Red, "✗"), kubernetes.ApiServerPodName) + log.Printf("%v '%v' pod not running", fmt.Sprintf(utils.Red, "✗"), kubernetes.HubPodName) return false } - log.Printf("%v '%v' pod running", fmt.Sprintf(utils.Green, "√"), kubernetes.ApiServerPodName) + log.Printf("%v '%v' pod running", fmt.Sprintf(utils.Green, "√"), kubernetes.HubPodName) if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.KubesharkResourcesNamespace, kubernetes.TapperPodName); err != nil { log.Printf("%v error checking if '%v' pods are running, err: %v", fmt.Sprintf(utils.Red, "✗"), kubernetes.TapperPodName, err) diff --git a/cmd/check/serverConnection.go b/cmd/check/serverConnection.go index bd2a0ee0d..3155619cb 100644 --- a/cmd/check/serverConnection.go +++ b/cmd/check/serverConnection.go @@ -13,40 +13,40 @@ import ( ) func ServerConnection(kubernetesProvider *kubernetes.Provider) bool { - log.Printf("\nAPI-server-connectivity\n--------------------") + log.Printf("\nHub connectivity\n--------------------") serverUrl := kubernetes.GetLocalhostOnPort(config.Config.Hub.PortForward.SrcPort) connector := connect.NewConnector(serverUrl, 1, connect.DefaultTimeout) if err := connector.TestConnection(""); err == nil { - log.Printf("%v found Kubeshark server tunnel available and connected successfully to API server", fmt.Sprintf(utils.Green, "√")) + log.Printf("%v found Kubeshark server tunnel available and connected successfully to Hub", fmt.Sprintf(utils.Green, "√")) return true } - connectedToApiServer := false + connectedToHub := false if err := checkProxy(serverUrl, kubernetesProvider); err != nil { - log.Printf("%v couldn't connect to API server using proxy, err: %v", fmt.Sprintf(utils.Red, "✗"), err) + log.Printf("%v couldn't connect to Hub using proxy, err: %v", fmt.Sprintf(utils.Red, "✗"), err) } else { - connectedToApiServer = true - log.Printf("%v connected successfully to API server using proxy", fmt.Sprintf(utils.Green, "√")) + connectedToHub = true + log.Printf("%v connected successfully to Hub using proxy", fmt.Sprintf(utils.Green, "√")) } if err := checkPortForward(serverUrl, kubernetesProvider); err != nil { - log.Printf("%v couldn't connect to API server using port-forward, err: %v", fmt.Sprintf(utils.Red, "✗"), err) + log.Printf("%v couldn't connect to Hub using port-forward, err: %v", fmt.Sprintf(utils.Red, "✗"), err) } else { - connectedToApiServer = true - log.Printf("%v connected successfully to API server using port-forward", fmt.Sprintf(utils.Green, "√")) + connectedToHub = true + log.Printf("%v connected successfully to Hub using port-forward", fmt.Sprintf(utils.Green, "√")) } - return connectedToApiServer + return connectedToHub } func checkProxy(serverUrl string, kubernetesProvider *kubernetes.Provider) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Hub.PortForward.SrcPort, config.Config.Hub.PortForward.DstPort, config.Config.KubesharkResourcesNamespace, kubernetes.ApiServerServiceName, cancel) + httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Hub.PortForward.SrcPort, config.Config.Hub.PortForward.DstPort, config.Config.KubesharkResourcesNamespace, kubernetes.HubServiceName, cancel) if err != nil { return err } @@ -67,7 +67,7 @@ func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName) + podRegex, _ := regexp.Compile(kubernetes.HubPodName) forwarder, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.KubesharkResourcesNamespace, podRegex, config.Config.Tap.GuiPort, config.Config.Tap.GuiPort, ctx, cancel) if err != nil { return err diff --git a/cmd/common.go b/cmd/common.go index fe8efa62c..0e024dd9f 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -39,7 +39,7 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con log.Printf("Error occurred while stopping proxy %v", errormessage.FormatError(err)) } - podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName) + podRegex, _ := regexp.Compile(kubernetes.HubPodName) if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.KubesharkResourcesNamespace, podRegex, srcPort, dstPort, ctx, cancel); err != nil { log.Printf(utils.Error, fmt.Sprintf("Error occured while running port forward [%s] %v\n"+ "Try setting different port by using --%s", podRegex, errormessage.FormatError(err), configStructs.GuiPortTapName)) diff --git a/cmd/tapRunner.go b/cmd/tapRunner.go index c0753b142..8a11a0099 100644 --- a/cmd/tapRunner.go +++ b/cmd/tapRunner.go @@ -36,7 +36,7 @@ type tapState struct { var state tapState var connector *connect.Connector -var apiServerPodReady bool +var hubPodReady bool var frontPodReady bool var proxyDone bool @@ -88,7 +88,7 @@ func RunKubesharkTap() { } log.Printf("Waiting for Kubeshark Agent to start...") - if state.kubesharkServiceAccountExists, err = resources.CreateTapKubesharkResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.KubesharkResourcesNamespace, config.Config.AgentImage, config.Config.Tap.MaxEntriesDBSizeBytes(), config.Config.Tap.ApiServerResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Tap.Profiler); err != nil { + if state.kubesharkServiceAccountExists, err = resources.CreateTapKubesharkResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.KubesharkResourcesNamespace, config.Config.AgentImage, config.Config.Tap.MaxEntriesDBSizeBytes(), config.Config.Tap.HubResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Tap.Profiler); err != nil { var statusError *k8serrors.StatusError if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) { log.Print("Kubeshark is already running in this namespace, change the `kubeshark-resources-namespace` configuration or run `kubeshark clean` to remove the currently running Kubeshark instance") @@ -102,8 +102,8 @@ func RunKubesharkTap() { defer finishTapExecution(kubernetesProvider) - go goUtils.HandleExcWrapper(watchApiServerEvents, ctx, kubernetesProvider, cancel) - go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel) + go goUtils.HandleExcWrapper(watchHubEvents, ctx, kubernetesProvider, cancel) + go goUtils.HandleExcWrapper(watchHubPod, ctx, kubernetesProvider, cancel) go goUtils.HandleExcWrapper(watchFrontPod, ctx, kubernetesProvider, cancel) // block until exit signal or error @@ -132,8 +132,8 @@ func getTapKubesharkAgentConfig() *models.Config { } /* -this function is a bit problematic as it might be detached from the actual pods the kubeshark api server will tap. -The alternative would be to wait for api server to be ready and then query it for the pods it listens to, this has +this function is a bit problematic as it might be detached from the actual pods the Kubeshark Hub will tap. +The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any. */ func printTappedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error { @@ -229,14 +229,14 @@ func getErrorDisplayTextForK8sTapManagerError(err kubernetes.K8sTapManagerError) } } -func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { - podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName)) +func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { + podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.HubPodName)) podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.KubesharkResourcesNamespace}, podWatchHelper) isPodReady := false - apiServerTimeoutSec := config.GetIntEnvConfig(config.ApiServerTimeoutSec, 120) - timeAfter := time.After(time.Duration(apiServerTimeoutSec) * time.Second) + hubTimeoutSec := config.GetIntEnvConfig(config.HubTimeoutSec, 120) + timeAfter := time.After(time.Duration(hubTimeoutSec) * time.Second) for { select { case wEvent, ok := <-eventChan: @@ -247,9 +247,9 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi switch wEvent.Type { case kubernetes.EventAdded: - log.Printf("Watching API Server pod loop, added") + log.Printf("Watching Hub pod loop, added") case kubernetes.EventDeleted: - log.Printf("%s removed", kubernetes.ApiServerPodName) + log.Printf("%s removed", kubernetes.HubPodName) cancel() return case kubernetes.EventModified: @@ -260,15 +260,15 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi continue } - log.Printf("Watching API Server pod loop, modified: %v, containers statuses: %v", modifiedPod.Status.Phase, modifiedPod.Status.ContainerStatuses) + log.Printf("Watching Hub pod loop, modified: %v, containers statuses: %v", modifiedPod.Status.Phase, modifiedPod.Status.ContainerStatuses) if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { isPodReady = true - apiServerPodReady = true - postApiServerStarted(ctx, kubernetesProvider, cancel) + hubPodReady = true + postHubStarted(ctx, kubernetesProvider, cancel) } - if !proxyDone && apiServerPodReady && frontPodReady { + if !proxyDone && hubPodReady && frontPodReady { proxyDone = true postFrontStarted(ctx, kubernetesProvider, cancel) } @@ -288,11 +288,11 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi case <-timeAfter: if !isPodReady { - log.Printf(utils.Error, "Kubeshark API server was not ready in time") + log.Printf(utils.Error, "Kubeshark Hub was not ready in time") cancel() } case <-ctx.Done(): - log.Printf("Watching API Server pod loop, ctx done") + log.Printf("Watching Hub pod loop, ctx done") return } } @@ -304,8 +304,8 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.KubesharkResourcesNamespace}, podWatchHelper) isPodReady := false - apiServerTimeoutSec := config.GetIntEnvConfig(config.ApiServerTimeoutSec, 120) - timeAfter := time.After(time.Duration(apiServerTimeoutSec) * time.Second) + hubTimeoutSec := config.GetIntEnvConfig(config.HubTimeoutSec, 120) + timeAfter := time.After(time.Duration(hubTimeoutSec) * time.Second) for { select { case wEvent, ok := <-eventChan: @@ -316,7 +316,7 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, switch wEvent.Type { case kubernetes.EventAdded: - log.Printf("Watching API Server pod loop, added") + log.Printf("Watching Hub pod loop, added") case kubernetes.EventDeleted: log.Printf("%s removed", kubernetes.FrontPodName) cancel() @@ -329,14 +329,14 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, continue } - log.Printf("Watching API Server pod loop, modified: %v, containers statuses: %v", modifiedPod.Status.Phase, modifiedPod.Status.ContainerStatuses) + log.Printf("Watching Hub pod loop, modified: %v, containers statuses: %v", modifiedPod.Status.Phase, modifiedPod.Status.ContainerStatuses) if modifiedPod.Status.Phase == core.PodRunning && !isPodReady { isPodReady = true frontPodReady = true } - if !proxyDone && apiServerPodReady && frontPodReady { + if !proxyDone && hubPodReady && frontPodReady { proxyDone = true postFrontStarted(ctx, kubernetesProvider, cancel) } @@ -356,18 +356,18 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, case <-timeAfter: if !isPodReady { - log.Printf(utils.Error, "Kubeshark API server was not ready in time") + log.Printf(utils.Error, "Kubeshark Hub was not ready in time") cancel() } case <-ctx.Done(): - log.Printf("Watching API Server pod loop, ctx done") + log.Printf("Watching Hub pod loop, ctx done") return } } } -func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { - podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s", kubernetes.ApiServerPodName)) +func watchHubEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { + podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s", kubernetes.HubPodName)) eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, podExactRegex, "pod") eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.KubesharkResourcesNamespace}, eventWatchHelper) for { @@ -389,7 +389,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr } log.Printf( - "Watching API server events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s", + "Watching Hub events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s", event.Name, event.CreationTimestamp.Time, event.Regarding.Name, @@ -400,7 +400,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr switch event.Reason { case "FailedScheduling", "Failed": - log.Printf(utils.Error, fmt.Sprintf("Kubeshark API Server status: %s - %s", event.Reason, event.Note)) + log.Printf(utils.Error, fmt.Sprintf("Kubeshark Hub status: %s - %s", event.Reason, event.Note)) cancel() } @@ -410,16 +410,16 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr continue } - log.Printf("[Error] Watching API server events loop, error: %+v", err) + log.Printf("[Error] Watching Hub events loop, error: %+v", err) case <-ctx.Done(): - log.Printf("Watching API server events loop, ctx done") + log.Printf("Watching Hub events loop, ctx done") return } } } -func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { - startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.ApiServerServiceName, config.Config.Hub.PortForward.SrcPort, config.Config.Hub.PortForward.DstPort, "/echo") +func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { + startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.HubServiceName, config.Config.Hub.PortForward.SrcPort, config.Config.Hub.PortForward.DstPort, "/echo") if err := startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil { log.Printf(utils.Error, fmt.Sprintf("Error starting kubeshark tapper syncer: %v", errormessage.FormatError(err))) @@ -427,7 +427,7 @@ func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Pr } url := kubernetes.GetLocalhostOnPort(config.Config.Hub.PortForward.SrcPort) - log.Printf("API Server is available at %s", url) + log.Printf("Hub is available at %s", url) } func postFrontStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { diff --git a/cmd/viewRunner.go b/cmd/viewRunner.go index 58d31a890..2465bf607 100644 --- a/cmd/viewRunner.go +++ b/cmd/viewRunner.go @@ -25,14 +25,14 @@ func runKubesharkView() { url := config.Config.View.Url if url == "" { - exists, err := kubernetesProvider.DoesServiceExist(ctx, config.Config.KubesharkResourcesNamespace, kubernetes.ApiServerServiceName) + exists, err := kubernetesProvider.DoesServiceExist(ctx, config.Config.KubesharkResourcesNamespace, kubernetes.HubServiceName) if err != nil { log.Printf("Failed to found kubeshark service %v", err) cancel() return } if !exists { - log.Printf("%s service not found, you should run `kubeshark tap` command first", kubernetes.ApiServerServiceName) + log.Printf("%s service not found, you should run `kubeshark tap` command first", kubernetes.HubServiceName) cancel() return } @@ -41,7 +41,7 @@ func runKubesharkView() { response, err := http.Get(fmt.Sprintf("%s/", url)) if err == nil && response.StatusCode == 200 { - log.Printf("Found a running service %s and open port %d", kubernetes.ApiServerServiceName, config.Config.Front.PortForward.SrcPort) + log.Printf("Found a running service %s and open port %d", kubernetes.HubServiceName, config.Config.Front.PortForward.SrcPort) return } log.Printf("Establishing connection to k8s cluster...") @@ -50,7 +50,7 @@ func runKubesharkView() { connector := connect.NewConnector(url, connect.DefaultRetries, connect.DefaultTimeout) if err := connector.TestConnection(""); err != nil { - log.Printf(utils.Error, "Couldn't connect to API server.") + log.Printf(utils.Error, "Couldn't connect to Hub.") return } diff --git a/config/configStructs/tapConfig.go b/config/configStructs/tapConfig.go index 5a1679685..fd0a2a7fd 100644 --- a/config/configStructs/tapConfig.go +++ b/config/configStructs/tapConfig.go @@ -44,7 +44,7 @@ type TapConfig struct { HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"` InsertionFilter string `yaml:"insertion-filter" default:""` DryRun bool `yaml:"dry-run" default:"false"` - ApiServerResources models.Resources `yaml:"api-server-resources"` + HubResources models.Resources `yaml:"hub-resources"` TapperResources models.Resources `yaml:"tapper-resources"` ServiceMesh bool `yaml:"service-mesh" default:"false"` Tls bool `yaml:"tls" default:"false"` diff --git a/config/envConfig.go b/config/envConfig.go index acbe5e1ab..cf06d4ef3 100644 --- a/config/envConfig.go +++ b/config/envConfig.go @@ -6,8 +6,8 @@ import ( ) const ( - ApiServerRetries = "API_SERVER_RETRIES" - ApiServerTimeoutSec = "API_SERVER_TIMEOUT_SEC" + HubRetries = "HUB_SERVER_RETRIES" + HubTimeoutSec = "HUB_SERVER_TIMEOUT_SEC" ) func GetIntEnvConfig(key string, defaultValue int) int { diff --git a/internal/connect/hub.go b/internal/connect/hub.go index ccaf4ad40..b52515090 100644 --- a/internal/connect/hub.go +++ b/internal/connect/hub.go @@ -27,7 +27,7 @@ const DefaultTimeout = 2 * time.Second func NewConnector(url string, retries int, timeout time.Duration) *Connector { return &Connector{ url: url, - retries: config.GetIntEnvConfig(config.ApiServerRetries, retries), + retries: config.GetIntEnvConfig(config.HubRetries, retries), client: &http.Client{ Timeout: timeout, }, @@ -38,9 +38,9 @@ func (connector *Connector) TestConnection(path string) error { retriesLeft := connector.retries for retriesLeft > 0 { if isReachable, err := connector.isReachable(path); err != nil || !isReachable { - log.Printf("api server not ready yet %v", err) + log.Printf("Hub is not ready yet %v!", err) } else { - log.Printf("connection test to api server passed successfully") + log.Printf("Connection test to Hub passed successfully!") break } retriesLeft -= 1 @@ -48,7 +48,7 @@ func (connector *Connector) TestConnection(path string) error { } if retriesLeft == 0 { - return fmt.Errorf("couldn't reach the api server after %v retries", connector.retries) + return fmt.Errorf("Couldn't reach the Hub after %d retries!", connector.retries) } return nil } @@ -66,12 +66,12 @@ func (connector *Connector) ReportTapperStatus(tapperStatus models.TapperStatus) tapperStatusUrl := fmt.Sprintf("%s/status/tapperStatus", connector.url) if jsonValue, err := json.Marshal(tapperStatus); err != nil { - return fmt.Errorf("failed Marshal the tapper status %w", err) + return fmt.Errorf("Failed Marshal the tapper status %w", err) } else { if _, err := utils.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil { - return fmt.Errorf("failed sending to API server the tapped pods %w", err) + return fmt.Errorf("Failed sending to Hub the tapped pods %w", err) } else { - log.Printf("Reported to server API about tapper status: %v", tapperStatus) + log.Printf("Reported to Hub about tapper status: %v", tapperStatus) return nil } } @@ -81,12 +81,12 @@ func (connector *Connector) ReportTappedPods(pods []core.Pod) error { tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", connector.url) if jsonValue, err := json.Marshal(pods); err != nil { - return fmt.Errorf("failed Marshal the tapped pods %w", err) + return fmt.Errorf("Failed Marshal the tapped pods %w", err) } else { if _, err := utils.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), connector.client); err != nil { - return fmt.Errorf("failed sending to API server the tapped pods %w", err) + return fmt.Errorf("Failed sending to Hub the tapped pods %w", err) } else { - log.Printf("Reported to server API about %d taped pods successfully", len(pods)) + log.Printf("Reported to Hub about %d taped pods successfully", len(pods)) return nil } } diff --git a/kubernetes/consts.go b/kubernetes/consts.go index 7487255d7..ac24ce6c0 100644 --- a/kubernetes/consts.go +++ b/kubernetes/consts.go @@ -4,8 +4,8 @@ const ( KubesharkResourcesPrefix = "ks-" FrontPodName = KubesharkResourcesPrefix + "front" FrontServiceName = FrontPodName - ApiServerPodName = KubesharkResourcesPrefix + "hub" - ApiServerServiceName = ApiServerPodName + HubPodName = KubesharkResourcesPrefix + "hub" + HubServiceName = HubPodName ClusterRoleBindingName = KubesharkResourcesPrefix + "cluster-role-binding" ClusterRoleName = KubesharkResourcesPrefix + "cluster-role" K8sAllNamespaces = "" diff --git a/kubernetes/kubesharkTapperSyncer.go b/kubernetes/kubesharkTapperSyncer.go index af0301966..ef0a87ef9 100644 --- a/kubernetes/kubesharkTapperSyncer.go +++ b/kubernetes/kubesharkTapperSyncer.go @@ -331,7 +331,7 @@ func (tapperSyncer *KubesharkTapperSyncer) updateKubesharkTappers() error { TapperDaemonSetName, "kubeshark/worker:latest", TapperPodName, - fmt.Sprintf("%s.%s.svc", ApiServerPodName, tapperSyncer.config.KubesharkResourcesNamespace), + fmt.Sprintf("%s.%s.svc", HubPodName, tapperSyncer.config.KubesharkResourcesNamespace), nodeNames, serviceAccountName, tapperSyncer.config.TapperResources, diff --git a/kubernetes/provider.go b/kubernetes/provider.go index 20164b23d..e2050b13a 100644 --- a/kubernetes/provider.go +++ b/kubernetes/provider.go @@ -170,7 +170,7 @@ func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*co return provider.clientSet.CoreV1().Namespaces().Create(ctx, namespaceSpec, metav1.CreateOptions{}) } -type ApiServerOptions struct { +type HubOptions struct { Namespace string PodName string PodImage string @@ -185,7 +185,7 @@ type ApiServerOptions struct { Profiler bool } -func (provider *Provider) BuildApiServerPod(opts *ApiServerOptions, mountVolumeClaim bool, volumeClaimName string, createAuthContainer bool) (*core.Pod, error) { +func (provider *Provider) BuildHubPod(opts *HubOptions, mountVolumeClaim bool, volumeClaimName string, createAuthContainer bool) (*core.Pod, error) { configMapVolume := &core.ConfigMapVolumeSource{} configMapVolume.Name = ConfigMapName @@ -400,7 +400,7 @@ func (provider *Provider) BuildApiServerPod(opts *ApiServerOptions, mountVolumeC return pod, nil } -func (provider *Provider) BuildFrontPod(opts *ApiServerOptions, mountVolumeClaim bool, volumeClaimName string, createAuthContainer bool) (*core.Pod, error) { +func (provider *Provider) BuildFrontPod(opts *HubOptions, mountVolumeClaim bool, volumeClaimName string, createAuthContainer bool) (*core.Pod, error) { configMapVolume := &core.ConfigMapVolumeSource{} configMapVolume.Name = ConfigMapName @@ -806,7 +806,7 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, return nil } -func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeNames []string, serviceAccountName string, resources models.Resources, imagePullPolicy core.PullPolicy, kubesharkApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, serviceMesh bool, tls bool, maxLiveStreams int) error { +func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, hubPodIp string, nodeNames []string, serviceAccountName string, resources models.Resources, imagePullPolicy core.PullPolicy, kubesharkApiFilteringOptions api.TrafficFilteringOptions, logLevel logging.Level, serviceMesh bool, tls bool, maxLiveStreams int) error { log.Printf("Applying %d tapper daemon sets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeNames), namespace, daemonSetName, podImage, tapperPodName) if len(nodeNames) == 0 { @@ -821,7 +821,7 @@ func (provider *Provider) ApplyKubesharkTapperDaemonSet(ctx context.Context, nam kubesharkCmd := []string{ "./worker", "-i", "any", - "--api-server-address", fmt.Sprintf("ws://%s/wsTapper", apiServerPodIp), + "--api-server-address", fmt.Sprintf("ws://%s/wsTapper", hubPodIp), "--nodefrag", "--max-live-streams", strconv.Itoa(maxLiveStreams), } diff --git a/kubernetes/proxy.go b/kubernetes/proxy.go index 745fc60ac..322182a2d 100644 --- a/kubernetes/proxy.go +++ b/kubernetes/proxy.go @@ -58,7 +58,7 @@ func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16, return server, nil } -func getKubesharkApiServerProxiedHostAndPath(kubesharkNamespace string, kubesharkServiceName string) string { +func getKubesharkHubProxiedHostAndPath(kubesharkNamespace string, kubesharkServiceName string) string { return fmt.Sprintf("/api/v1/namespaces/%s/services/%s:%d/proxy", kubesharkNamespace, kubesharkServiceName, kubesharkServicePort) } @@ -78,11 +78,11 @@ func getRerouteHttpHandlerKubesharkAPI(proxyHandler http.Handler, kubesharkNames return } - proxiedPath := getKubesharkApiServerProxiedHostAndPath(kubesharkNamespace, kubesharkServiceName) + proxiedPath := getKubesharkHubProxiedHostAndPath(kubesharkNamespace, kubesharkServiceName) //avoid redirecting several times if !strings.Contains(r.URL.Path, proxiedPath) { - r.URL.Path = fmt.Sprintf("%s%s", getKubesharkApiServerProxiedHostAndPath(kubesharkNamespace, kubesharkServiceName), r.URL.Path) + r.URL.Path = fmt.Sprintf("%s%s", getKubesharkHubProxiedHostAndPath(kubesharkNamespace, kubesharkServiceName), r.URL.Path) } proxyHandler.ServeHTTP(w, r) }) @@ -90,7 +90,7 @@ func getRerouteHttpHandlerKubesharkAPI(proxyHandler http.Handler, kubesharkNames func getRerouteHttpHandlerKubesharkStatic(proxyHandler http.Handler, kubesharkNamespace string, kubesharkServiceName string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - r.URL.Path = strings.Replace(r.URL.Path, "/static/", fmt.Sprintf("%s/static/", getKubesharkApiServerProxiedHostAndPath(kubesharkNamespace, kubesharkServiceName)), 1) + r.URL.Path = strings.Replace(r.URL.Path, "/static/", fmt.Sprintf("%s/static/", getKubesharkHubProxiedHostAndPath(kubesharkNamespace, kubesharkServiceName)), 1) proxyHandler.ServeHTTP(w, r) }) } diff --git a/resources/cleanResources.go b/resources/cleanResources.go index f7a051d13..da20bbf33 100644 --- a/resources/cleanResources.go +++ b/resources/cleanResources.go @@ -89,8 +89,8 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.Provider, kubesharkResourcesNamespace string) []string { leftoverResources := make([]string, 0) - if err := kubernetesProvider.RemoveService(ctx, kubesharkResourcesNamespace, kubernetes.ApiServerServiceName); err != nil { - resourceDesc := fmt.Sprintf("Service %s in namespace %s", kubernetes.ApiServerServiceName, kubesharkResourcesNamespace) + if err := kubernetesProvider.RemoveService(ctx, kubesharkResourcesNamespace, kubernetes.HubServiceName); err != nil { + resourceDesc := fmt.Sprintf("Service %s in namespace %s", kubernetes.HubServiceName, kubesharkResourcesNamespace) handleDeletionError(err, resourceDesc, &leftoverResources) } @@ -140,8 +140,8 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P } } - if err := kubernetesProvider.RemovePod(ctx, kubesharkResourcesNamespace, kubernetes.ApiServerPodName); err != nil { - resourceDesc := fmt.Sprintf("Pod %s in namespace %s", kubernetes.ApiServerPodName, kubesharkResourcesNamespace) + if err := kubernetesProvider.RemovePod(ctx, kubesharkResourcesNamespace, kubernetes.HubPodName); err != nil { + resourceDesc := fmt.Sprintf("Pod %s in namespace %s", kubernetes.HubPodName, kubesharkResourcesNamespace) handleDeletionError(err, resourceDesc, &leftoverResources) } diff --git a/resources/createResources.go b/resources/createResources.go index c5ccf78e2..3668da03b 100644 --- a/resources/createResources.go +++ b/resources/createResources.go @@ -15,7 +15,7 @@ import ( core "k8s.io/api/core/v1" ) -func CreateTapKubesharkResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedKubesharkConfig string, isNsRestrictedMode bool, kubesharkResourcesNamespace string, agentImage string, maxEntriesDBSizeBytes int64, apiServerResources models.Resources, imagePullPolicy core.PullPolicy, logLevel logging.Level, profiler bool) (bool, error) { +func CreateTapKubesharkResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedKubesharkConfig string, isNsRestrictedMode bool, kubesharkResourcesNamespace string, agentImage string, maxEntriesDBSizeBytes int64, hubResources models.Resources, imagePullPolicy core.PullPolicy, logLevel logging.Level, profiler bool) (bool, error) { if !isNsRestrictedMode { if err := createKubesharkNamespace(ctx, kubernetesProvider, kubesharkResourcesNamespace); err != nil { return false, err @@ -38,22 +38,22 @@ func CreateTapKubesharkResources(ctx context.Context, kubernetesProvider *kubern serviceAccountName = "" } - opts := &kubernetes.ApiServerOptions{ + opts := &kubernetes.HubOptions{ Namespace: kubesharkResourcesNamespace, - PodName: kubernetes.ApiServerPodName, + PodName: kubernetes.HubPodName, PodImage: agentImage, KratosImage: "", KetoImage: "", ServiceAccountName: serviceAccountName, IsNamespaceRestricted: isNsRestrictedMode, MaxEntriesDBSizeBytes: maxEntriesDBSizeBytes, - Resources: apiServerResources, + Resources: hubResources, ImagePullPolicy: imagePullPolicy, LogLevel: logLevel, Profiler: profiler, } - frontOpts := &kubernetes.ApiServerOptions{ + frontOpts := &kubernetes.HubOptions{ Namespace: kubesharkResourcesNamespace, PodName: kubernetes.FrontPodName, PodImage: agentImage, @@ -62,13 +62,13 @@ func CreateTapKubesharkResources(ctx context.Context, kubernetesProvider *kubern ServiceAccountName: serviceAccountName, IsNamespaceRestricted: isNsRestrictedMode, MaxEntriesDBSizeBytes: maxEntriesDBSizeBytes, - Resources: apiServerResources, + Resources: hubResources, ImagePullPolicy: imagePullPolicy, LogLevel: logLevel, Profiler: profiler, } - if err := createKubesharkApiServerPod(ctx, kubernetesProvider, opts); err != nil { + if err := createKubesharkHubPod(ctx, kubernetesProvider, opts); err != nil { return kubesharkServiceAccountExists, err } @@ -76,12 +76,12 @@ func CreateTapKubesharkResources(ctx context.Context, kubernetesProvider *kubern return kubesharkServiceAccountExists, err } - _, err = kubernetesProvider.CreateService(ctx, kubesharkResourcesNamespace, kubernetes.ApiServerServiceName, kubernetes.ApiServerServiceName, 80, int32(config.Config.Hub.PortForward.DstPort), int32(config.Config.Hub.PortForward.SrcPort)) + _, err = kubernetesProvider.CreateService(ctx, kubesharkResourcesNamespace, kubernetes.HubServiceName, kubernetes.HubServiceName, 80, int32(config.Config.Hub.PortForward.DstPort), int32(config.Config.Hub.PortForward.SrcPort)) if err != nil { return kubesharkServiceAccountExists, err } - log.Printf("Successfully created service: %s", kubernetes.ApiServerServiceName) + log.Printf("Successfully created service: %s", kubernetes.HubServiceName) _, err = kubernetesProvider.CreateService(ctx, kubesharkResourcesNamespace, kubernetes.FrontServiceName, kubernetes.FrontServiceName, 80, int32(config.Config.Front.PortForward.DstPort), int32(config.Config.Front.PortForward.SrcPort)) if err != nil { @@ -117,8 +117,8 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P return true, nil } -func createKubesharkApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error { - pod, err := kubernetesProvider.BuildApiServerPod(opts, false, "", false) +func createKubesharkHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.HubOptions) error { + pod, err := kubernetesProvider.BuildHubPod(opts, false, "", false) if err != nil { return err } @@ -129,7 +129,7 @@ func createKubesharkApiServerPod(ctx context.Context, kubernetesProvider *kubern return nil } -func createFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error { +func createFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.HubOptions) error { pod, err := kubernetesProvider.BuildFrontPod(opts, false, "", false) if err != nil { return err diff --git a/utils/http.go b/utils/http.go index 68aeb7dce..86fde791b 100644 --- a/utils/http.go +++ b/utils/http.go @@ -29,7 +29,7 @@ func Do(req *http.Request, client *http.Client) (*http.Response, error) { func checkError(response *http.Response, errInOperation error) (*http.Response, error) { if errInOperation != nil { return response, errInOperation - // Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success. + // Check only if status != 200 (and not status >= 300). Hub return only 200 on success. } else if response.StatusCode != http.StatusOK { body, err := io.ReadAll(response.Body) response.Body.Close()