🔨 Replace ApiServer naming with Hub

This commit is contained in:
M. Mert Yildiran
2022-11-26 22:06:06 +03:00
parent 5bd8aea8b9
commit 671aa783c5
15 changed files with 100 additions and 100 deletions

View File

@@ -36,7 +36,7 @@ type tapState struct {
var state tapState
var connector *connect.Connector
var apiServerPodReady bool
var hubPodReady bool
var frontPodReady bool
var proxyDone bool
@@ -88,7 +88,7 @@ func RunKubesharkTap() {
}
log.Printf("Waiting for Kubeshark Agent to start...")
if state.kubesharkServiceAccountExists, err = resources.CreateTapKubesharkResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.KubesharkResourcesNamespace, config.Config.AgentImage, config.Config.Tap.MaxEntriesDBSizeBytes(), config.Config.Tap.ApiServerResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Tap.Profiler); err != nil {
if state.kubesharkServiceAccountExists, err = resources.CreateTapKubesharkResources(ctx, kubernetesProvider, serializedKubesharkConfig, config.Config.IsNsRestrictedMode(), config.Config.KubesharkResourcesNamespace, config.Config.AgentImage, config.Config.Tap.MaxEntriesDBSizeBytes(), config.Config.Tap.HubResources, config.Config.ImagePullPolicy(), config.Config.LogLevel(), config.Config.Tap.Profiler); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
log.Print("Kubeshark is already running in this namespace, change the `kubeshark-resources-namespace` configuration or run `kubeshark clean` to remove the currently running Kubeshark instance")
@@ -102,8 +102,8 @@ func RunKubesharkTap() {
defer finishTapExecution(kubernetesProvider)
go goUtils.HandleExcWrapper(watchApiServerEvents, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchHubEvents, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchHubPod, ctx, kubernetesProvider, cancel)
go goUtils.HandleExcWrapper(watchFrontPod, ctx, kubernetesProvider, cancel)
// block until exit signal or error
@@ -132,8 +132,8 @@ func getTapKubesharkAgentConfig() *models.Config {
}
/*
this function is a bit problematic as it might be detached from the actual pods the kubeshark api server will tap.
The alternative would be to wait for api server to be ready and then query it for the pods it listens to, this has
this function is a bit problematic as it might be detached from the actual pods the Kubeshark Hub will tap.
The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
*/
func printTappedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
@@ -229,14 +229,14 @@ func getErrorDisplayTextForK8sTapManagerError(err kubernetes.K8sTapManagerError)
}
}
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.HubPodName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.KubesharkResourcesNamespace}, podWatchHelper)
isPodReady := false
apiServerTimeoutSec := config.GetIntEnvConfig(config.ApiServerTimeoutSec, 120)
timeAfter := time.After(time.Duration(apiServerTimeoutSec) * time.Second)
hubTimeoutSec := config.GetIntEnvConfig(config.HubTimeoutSec, 120)
timeAfter := time.After(time.Duration(hubTimeoutSec) * time.Second)
for {
select {
case wEvent, ok := <-eventChan:
@@ -247,9 +247,9 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
switch wEvent.Type {
case kubernetes.EventAdded:
log.Printf("Watching API Server pod loop, added")
log.Printf("Watching Hub pod loop, added")
case kubernetes.EventDeleted:
log.Printf("%s removed", kubernetes.ApiServerPodName)
log.Printf("%s removed", kubernetes.HubPodName)
cancel()
return
case kubernetes.EventModified:
@@ -260,15 +260,15 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
continue
}
log.Printf("Watching API Server pod loop, modified: %v, containers statuses: %v", modifiedPod.Status.Phase, modifiedPod.Status.ContainerStatuses)
log.Printf("Watching Hub pod loop, modified: %v, containers statuses: %v", modifiedPod.Status.Phase, modifiedPod.Status.ContainerStatuses)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
apiServerPodReady = true
postApiServerStarted(ctx, kubernetesProvider, cancel)
hubPodReady = true
postHubStarted(ctx, kubernetesProvider, cancel)
}
if !proxyDone && apiServerPodReady && frontPodReady {
if !proxyDone && hubPodReady && frontPodReady {
proxyDone = true
postFrontStarted(ctx, kubernetesProvider, cancel)
}
@@ -288,11 +288,11 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
case <-timeAfter:
if !isPodReady {
log.Printf(utils.Error, "Kubeshark API server was not ready in time")
log.Printf(utils.Error, "Kubeshark Hub was not ready in time")
cancel()
}
case <-ctx.Done():
log.Printf("Watching API Server pod loop, ctx done")
log.Printf("Watching Hub pod loop, ctx done")
return
}
}
@@ -304,8 +304,8 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider,
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.KubesharkResourcesNamespace}, podWatchHelper)
isPodReady := false
apiServerTimeoutSec := config.GetIntEnvConfig(config.ApiServerTimeoutSec, 120)
timeAfter := time.After(time.Duration(apiServerTimeoutSec) * time.Second)
hubTimeoutSec := config.GetIntEnvConfig(config.HubTimeoutSec, 120)
timeAfter := time.After(time.Duration(hubTimeoutSec) * time.Second)
for {
select {
case wEvent, ok := <-eventChan:
@@ -316,7 +316,7 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider,
switch wEvent.Type {
case kubernetes.EventAdded:
log.Printf("Watching API Server pod loop, added")
log.Printf("Watching Hub pod loop, added")
case kubernetes.EventDeleted:
log.Printf("%s removed", kubernetes.FrontPodName)
cancel()
@@ -329,14 +329,14 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider,
continue
}
log.Printf("Watching API Server pod loop, modified: %v, containers statuses: %v", modifiedPod.Status.Phase, modifiedPod.Status.ContainerStatuses)
log.Printf("Watching Hub pod loop, modified: %v, containers statuses: %v", modifiedPod.Status.Phase, modifiedPod.Status.ContainerStatuses)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
frontPodReady = true
}
if !proxyDone && apiServerPodReady && frontPodReady {
if !proxyDone && hubPodReady && frontPodReady {
proxyDone = true
postFrontStarted(ctx, kubernetesProvider, cancel)
}
@@ -356,18 +356,18 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider,
case <-timeAfter:
if !isPodReady {
log.Printf(utils.Error, "Kubeshark API server was not ready in time")
log.Printf(utils.Error, "Kubeshark Hub was not ready in time")
cancel()
}
case <-ctx.Done():
log.Printf("Watching API Server pod loop, ctx done")
log.Printf("Watching Hub pod loop, ctx done")
return
}
}
}
func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s", kubernetes.ApiServerPodName))
func watchHubEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s", kubernetes.HubPodName))
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, podExactRegex, "pod")
eventChan, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.KubesharkResourcesNamespace}, eventWatchHelper)
for {
@@ -389,7 +389,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr
}
log.Printf(
"Watching API server events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s",
"Watching Hub events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s",
event.Name,
event.CreationTimestamp.Time,
event.Regarding.Name,
@@ -400,7 +400,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr
switch event.Reason {
case "FailedScheduling", "Failed":
log.Printf(utils.Error, fmt.Sprintf("Kubeshark API Server status: %s - %s", event.Reason, event.Note))
log.Printf(utils.Error, fmt.Sprintf("Kubeshark Hub status: %s - %s", event.Reason, event.Note))
cancel()
}
@@ -410,16 +410,16 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr
continue
}
log.Printf("[Error] Watching API server events loop, error: %+v", err)
log.Printf("[Error] Watching Hub events loop, error: %+v", err)
case <-ctx.Done():
log.Printf("Watching API server events loop, ctx done")
log.Printf("Watching Hub events loop, ctx done")
return
}
}
}
func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.ApiServerServiceName, config.Config.Hub.PortForward.SrcPort, config.Config.Hub.PortForward.DstPort, "/echo")
func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.HubServiceName, config.Config.Hub.PortForward.SrcPort, config.Config.Hub.PortForward.DstPort, "/echo")
if err := startTapperSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil {
log.Printf(utils.Error, fmt.Sprintf("Error starting kubeshark tapper syncer: %v", errormessage.FormatError(err)))
@@ -427,7 +427,7 @@ func postApiServerStarted(ctx context.Context, kubernetesProvider *kubernetes.Pr
}
url := kubernetes.GetLocalhostOnPort(config.Config.Hub.PortForward.SrcPort)
log.Printf("API Server is available at %s", url)
log.Printf("Hub is available at %s", url)
}
func postFrontStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {