diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 6ec498773..000000000 --- a/.gitignore +++ /dev/null @@ -1,66 +0,0 @@ -# Binaries for programs and plugins -*.exe -*.exe~ -*.dll -*.so -*.dylib - -# Test binary, built with `go test -c` -*.test - -# Output of the go coverage tool, specifically when used with LiteIDE -*.out - -# Dependency directories (remove the comment below to include it) -# vendor/ -.idea/ -build - -# Mac OS -.DS_Store -.vscode/ - -# Ignore the scripts that are created for development -*dev.* - -# Environment variables -.env - -# pprof -pprof/* - -# Database Files -*.db -*.gob - -# Nohup Files - https://man7.org/linux/man-pages/man1/nohup.1p.html -nohup.* - -# Cypress tests -cypress.env.json -*/cypress/downloads -*/cypress/fixtures -*/cypress/plugins -*/cypress/screenshots -*/cypress/videos -*/cypress/support - -# UI folders to ignore -**/node_modules/** -**/dist/** -*.editorconfig - -# Ignore *.log files -*.log - -# Object files -*.o - -# Binaries -bin - -# Scripts -scripts/ - -# CWD config YAML -kubeshark.yaml diff --git a/cmd/scripts.go b/cmd/scripts.go index 50798b50e..64614b869 100644 --- a/cmd/scripts.go +++ b/cmd/scripts.go @@ -3,7 +3,12 @@ package cmd import ( "context" "encoding/json" + "errors" + "os" + "os/signal" "strings" + "sync" + "time" "github.com/creasty/defaults" "github.com/fsnotify/fsnotify" @@ -11,9 +16,11 @@ import ( "github.com/kubeshark/kubeshark/config/configStructs" "github.com/kubeshark/kubeshark/kubernetes" "github.com/kubeshark/kubeshark/misc" - "github.com/kubeshark/kubeshark/utils" "github.com/rs/zerolog/log" "github.com/spf13/cobra" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" ) var scriptsCmd = &cobra.Command{ @@ -50,39 +57,79 @@ func runScripts() { return } - watchScripts(kubernetesProvider, true) + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt) + + wg.Add(1) + go func() { + defer wg.Done() + watchConfigMap(ctx, kubernetesProvider) + }() + + wg.Add(1) + go func() { + defer wg.Done() + watchScripts(ctx, kubernetesProvider, true) + }() + + go func() { + <-signalChan + log.Debug().Msg("Received interrupt, stopping watchers.") + cancel() + }() + + wg.Wait() + } func createScript(provider *kubernetes.Provider, script misc.ConfigMapScript) (index int64, err error) { + const maxRetries = 5 var scripts map[int64]misc.ConfigMapScript - scripts, err = kubernetes.ConfigGetScripts(provider) - if err != nil { - return - } - script.Active = kubernetes.IsActiveScript(provider, script.Title) - index = int64(len(scripts)) - if script.Title != "New Script" { - for i, v := range scripts { - if v.Title == script.Title { - index = int64(i) + + for i := 0; i < maxRetries; i++ { + scripts, err = kubernetes.ConfigGetScripts(provider) + if err != nil { + return + } + script.Active = kubernetes.IsActiveScript(provider, script.Title) + index = int64(len(scripts)) + if script.Title != "New Script" { + for i, v := range scripts { + if v.Title == script.Title { + index = int64(i) + } } } - } - scripts[index] = script + scripts[index] = script - log.Info().Str("title", script.Title).Bool("Active", script.Active).Int64("Index", index).Msg("Creating script") - var data []byte - data, err = json.Marshal(scripts) - if err != nil { - return + log.Info().Str("title", script.Title).Bool("Active", script.Active).Int64("Index", index).Msg("Creating script") + var data []byte + data, err = json.Marshal(scripts) + if err != nil { + return + } + + _, err = kubernetes.SetConfig(provider, kubernetes.CONFIG_SCRIPTING_SCRIPTS, string(data)) + if err == nil { + return index, nil + } + + if k8serrors.IsConflict(err) { + log.Warn().Err(err).Msg("Conflict detected, retrying update...") + time.Sleep(500 * time.Millisecond) + continue + } + + return 0, err } - _, err = kubernetes.SetConfig(provider, kubernetes.CONFIG_SCRIPTING_SCRIPTS, string(data)) - if err != nil { - return - } - - return + log.Error().Msg("Max retries reached for creating script due to conflicts.") + return 0, errors.New("max retries reached due to conflicts while creating script") } func updateScript(provider *kubernetes.Provider, index int64, script misc.ConfigMapScript) (err error) { @@ -134,7 +181,7 @@ func deleteScript(provider *kubernetes.Provider, index int64) (err error) { return } -func watchScripts(provider *kubernetes.Provider, block bool) { +func watchScripts(ctx context.Context, provider *kubernetes.Provider, block bool) { files := make(map[string]int64) scripts, err := config.Config.Scripting.GetScripts() @@ -162,9 +209,31 @@ func watchScripts(provider *kubernetes.Provider, block bool) { defer watcher.Close() } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt) + + go func() { + <-signalChan + log.Debug().Msg("Received interrupt, stopping script watch.") + cancel() + watcher.Close() + }() + + if err := watcher.Add(config.Config.Scripting.Source); err != nil { + log.Error().Err(err).Msg("Failed to add scripting source to watcher") + return + } + go func() { for { select { + case <-ctx.Done(): + log.Debug().Msg("Script watcher exiting gracefully.") + return + // watch for events case event := <-watcher.Events: if !strings.HasSuffix(event.Name, "js") { @@ -213,9 +282,12 @@ func watchScripts(provider *kubernetes.Provider, block bool) { // pass } - // watch for errors - case err := <-watcher.Errors: - log.Error().Err(err).Send() + case err, ok := <-watcher.Errors: + if !ok { + log.Info().Msg("Watcher errors channel closed.") + return + } + log.Error().Err(err).Msg("Watcher error encountered") } } }() @@ -227,8 +299,70 @@ func watchScripts(provider *kubernetes.Provider, block bool) { log.Info().Str("directory", config.Config.Scripting.Source).Msg("Watching scripts against changes:") if block { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - utils.WaitForTermination(ctx, cancel) + <-ctx.Done() } } + +func watchConfigMap(ctx context.Context, provider *kubernetes.Provider) { + clientset := provider.GetClientSet() + configMapName := kubernetes.SELF_RESOURCES_PREFIX + kubernetes.SUFFIX_CONFIG_MAP + + for { + select { + case <-ctx.Done(): + log.Info().Msg("ConfigMap watcher exiting gracefully.") + return + + default: + watcher, err := clientset.CoreV1().ConfigMaps(config.Config.Tap.Release.Namespace).Watch(context.TODO(), metav1.ListOptions{ + FieldSelector: "metadata.name=" + configMapName, + }) + if err != nil { + log.Warn().Err(err).Msg("ConfigMap not found, retrying in 5 seconds...") + time.Sleep(5 * time.Second) + continue + } + + for event := range watcher.ResultChan() { + select { + case <-ctx.Done(): + log.Info().Msg("ConfigMap watcher loop exiting gracefully.") + watcher.Stop() + return + + default: + if event.Type == watch.Added { + log.Info().Msg("ConfigMap created or modified") + runScriptsSync(provider) + } else if event.Type == watch.Deleted { + log.Warn().Msg("ConfigMap deleted, waiting for recreation...") + watcher.Stop() + break + } + } + } + + time.Sleep(5 * time.Second) + } + } +} + +func runScriptsSync(provider *kubernetes.Provider) { + files := make(map[string]int64) + + scripts, err := config.Config.Scripting.GetScripts() + if err != nil { + log.Error().Err(err).Send() + return + } + + for _, script := range scripts { + index, err := createScript(provider, script.ConfigMap()) + if err != nil { + log.Error().Err(err).Send() + continue + } + files[script.Path] = index + } + log.Info().Msg("Synchronized scripts with ConfigMap.") +} diff --git a/cmd/tapRunner.go b/cmd/tapRunner.go index 7f2c0f684..bff13e778 100644 --- a/cmd/tapRunner.go +++ b/cmd/tapRunner.go @@ -425,7 +425,7 @@ func postFrontStarted(ctx context.Context, kubernetesProvider *kubernetes.Provid } if config.Config.Scripting.Source != "" && config.Config.Scripting.WatchScripts { - watchScripts(kubernetesProvider, false) + watchScripts(ctx, kubernetesProvider, false) } if config.Config.Scripting.Console { diff --git a/kubernetes/provider.go b/kubernetes/provider.go index a4f66d50c..721ff5f42 100644 --- a/kubernetes/provider.go +++ b/kubernetes/provider.go @@ -247,6 +247,10 @@ func (provider *Provider) GetNamespaces() (namespaces []string) { return } +func (provider *Provider) GetClientSet() *kubernetes.Clientset { + return provider.clientSet +} + func getClientSet(config *rest.Config) (*kubernetes.Clientset, error) { clientSet, err := kubernetes.NewForConfig(config) if err != nil {