Watch cm creation and sync scripts (#1637)

* Fix graceful shutdown

* add helpers

* Watch for configmap changes

---------

Co-authored-by: Alon Girmonsky <1990761+alongir@users.noreply.github.com>
This commit is contained in:
Volodymyr Stoiko 2024-11-05 23:35:17 +02:00 committed by GitHub
parent 4a6628a3e8
commit bba1bbd1fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 171 additions and 99 deletions

66
.gitignore vendored
View File

@ -1,66 +0,0 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
.idea/
build
# Mac OS
.DS_Store
.vscode/
# Ignore the scripts that are created for development
*dev.*
# Environment variables
.env
# pprof
pprof/*
# Database Files
*.db
*.gob
# Nohup Files - https://man7.org/linux/man-pages/man1/nohup.1p.html
nohup.*
# Cypress tests
cypress.env.json
*/cypress/downloads
*/cypress/fixtures
*/cypress/plugins
*/cypress/screenshots
*/cypress/videos
*/cypress/support
# UI folders to ignore
**/node_modules/**
**/dist/**
*.editorconfig
# Ignore *.log files
*.log
# Object files
*.o
# Binaries
bin
# Scripts
scripts/
# CWD config YAML
kubeshark.yaml

View File

@ -3,7 +3,12 @@ package cmd
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"os"
"os/signal"
"strings" "strings"
"sync"
"time"
"github.com/creasty/defaults" "github.com/creasty/defaults"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
@ -11,9 +16,11 @@ import (
"github.com/kubeshark/kubeshark/config/configStructs" "github.com/kubeshark/kubeshark/config/configStructs"
"github.com/kubeshark/kubeshark/kubernetes" "github.com/kubeshark/kubeshark/kubernetes"
"github.com/kubeshark/kubeshark/misc" "github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/spf13/cobra" "github.com/spf13/cobra"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
) )
var scriptsCmd = &cobra.Command{ var scriptsCmd = &cobra.Command{
@ -50,39 +57,79 @@ func runScripts() {
return return
} }
watchScripts(kubernetesProvider, true) var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
wg.Add(1)
go func() {
defer wg.Done()
watchConfigMap(ctx, kubernetesProvider)
}()
wg.Add(1)
go func() {
defer wg.Done()
watchScripts(ctx, kubernetesProvider, true)
}()
go func() {
<-signalChan
log.Debug().Msg("Received interrupt, stopping watchers.")
cancel()
}()
wg.Wait()
} }
func createScript(provider *kubernetes.Provider, script misc.ConfigMapScript) (index int64, err error) { func createScript(provider *kubernetes.Provider, script misc.ConfigMapScript) (index int64, err error) {
const maxRetries = 5
var scripts map[int64]misc.ConfigMapScript var scripts map[int64]misc.ConfigMapScript
scripts, err = kubernetes.ConfigGetScripts(provider)
if err != nil { for i := 0; i < maxRetries; i++ {
return scripts, err = kubernetes.ConfigGetScripts(provider)
} if err != nil {
script.Active = kubernetes.IsActiveScript(provider, script.Title) return
index = int64(len(scripts)) }
if script.Title != "New Script" { script.Active = kubernetes.IsActiveScript(provider, script.Title)
for i, v := range scripts { index = int64(len(scripts))
if v.Title == script.Title { if script.Title != "New Script" {
index = int64(i) for i, v := range scripts {
if v.Title == script.Title {
index = int64(i)
}
} }
} }
} scripts[index] = script
scripts[index] = script
log.Info().Str("title", script.Title).Bool("Active", script.Active).Int64("Index", index).Msg("Creating script") log.Info().Str("title", script.Title).Bool("Active", script.Active).Int64("Index", index).Msg("Creating script")
var data []byte var data []byte
data, err = json.Marshal(scripts) data, err = json.Marshal(scripts)
if err != nil { if err != nil {
return return
}
_, err = kubernetes.SetConfig(provider, kubernetes.CONFIG_SCRIPTING_SCRIPTS, string(data))
if err == nil {
return index, nil
}
if k8serrors.IsConflict(err) {
log.Warn().Err(err).Msg("Conflict detected, retrying update...")
time.Sleep(500 * time.Millisecond)
continue
}
return 0, err
} }
_, err = kubernetes.SetConfig(provider, kubernetes.CONFIG_SCRIPTING_SCRIPTS, string(data)) log.Error().Msg("Max retries reached for creating script due to conflicts.")
if err != nil { return 0, errors.New("max retries reached due to conflicts while creating script")
return
}
return
} }
func updateScript(provider *kubernetes.Provider, index int64, script misc.ConfigMapScript) (err error) { func updateScript(provider *kubernetes.Provider, index int64, script misc.ConfigMapScript) (err error) {
@ -134,7 +181,7 @@ func deleteScript(provider *kubernetes.Provider, index int64) (err error) {
return return
} }
func watchScripts(provider *kubernetes.Provider, block bool) { func watchScripts(ctx context.Context, provider *kubernetes.Provider, block bool) {
files := make(map[string]int64) files := make(map[string]int64)
scripts, err := config.Config.Scripting.GetScripts() scripts, err := config.Config.Scripting.GetScripts()
@ -162,9 +209,31 @@ func watchScripts(provider *kubernetes.Provider, block bool) {
defer watcher.Close() defer watcher.Close()
} }
ctx, cancel := context.WithCancel(ctx)
defer cancel()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
go func() {
<-signalChan
log.Debug().Msg("Received interrupt, stopping script watch.")
cancel()
watcher.Close()
}()
if err := watcher.Add(config.Config.Scripting.Source); err != nil {
log.Error().Err(err).Msg("Failed to add scripting source to watcher")
return
}
go func() { go func() {
for { for {
select { select {
case <-ctx.Done():
log.Debug().Msg("Script watcher exiting gracefully.")
return
// watch for events // watch for events
case event := <-watcher.Events: case event := <-watcher.Events:
if !strings.HasSuffix(event.Name, "js") { if !strings.HasSuffix(event.Name, "js") {
@ -213,9 +282,12 @@ func watchScripts(provider *kubernetes.Provider, block bool) {
// pass // pass
} }
// watch for errors case err, ok := <-watcher.Errors:
case err := <-watcher.Errors: if !ok {
log.Error().Err(err).Send() log.Info().Msg("Watcher errors channel closed.")
return
}
log.Error().Err(err).Msg("Watcher error encountered")
} }
} }
}() }()
@ -227,8 +299,70 @@ func watchScripts(provider *kubernetes.Provider, block bool) {
log.Info().Str("directory", config.Config.Scripting.Source).Msg("Watching scripts against changes:") log.Info().Str("directory", config.Config.Scripting.Source).Msg("Watching scripts against changes:")
if block { if block {
ctx, cancel := context.WithCancel(context.Background()) <-ctx.Done()
defer cancel()
utils.WaitForTermination(ctx, cancel)
} }
} }
func watchConfigMap(ctx context.Context, provider *kubernetes.Provider) {
clientset := provider.GetClientSet()
configMapName := kubernetes.SELF_RESOURCES_PREFIX + kubernetes.SUFFIX_CONFIG_MAP
for {
select {
case <-ctx.Done():
log.Info().Msg("ConfigMap watcher exiting gracefully.")
return
default:
watcher, err := clientset.CoreV1().ConfigMaps(config.Config.Tap.Release.Namespace).Watch(context.TODO(), metav1.ListOptions{
FieldSelector: "metadata.name=" + configMapName,
})
if err != nil {
log.Warn().Err(err).Msg("ConfigMap not found, retrying in 5 seconds...")
time.Sleep(5 * time.Second)
continue
}
for event := range watcher.ResultChan() {
select {
case <-ctx.Done():
log.Info().Msg("ConfigMap watcher loop exiting gracefully.")
watcher.Stop()
return
default:
if event.Type == watch.Added {
log.Info().Msg("ConfigMap created or modified")
runScriptsSync(provider)
} else if event.Type == watch.Deleted {
log.Warn().Msg("ConfigMap deleted, waiting for recreation...")
watcher.Stop()
break
}
}
}
time.Sleep(5 * time.Second)
}
}
}
func runScriptsSync(provider *kubernetes.Provider) {
files := make(map[string]int64)
scripts, err := config.Config.Scripting.GetScripts()
if err != nil {
log.Error().Err(err).Send()
return
}
for _, script := range scripts {
index, err := createScript(provider, script.ConfigMap())
if err != nil {
log.Error().Err(err).Send()
continue
}
files[script.Path] = index
}
log.Info().Msg("Synchronized scripts with ConfigMap.")
}

View File

@ -425,7 +425,7 @@ func postFrontStarted(ctx context.Context, kubernetesProvider *kubernetes.Provid
} }
if config.Config.Scripting.Source != "" && config.Config.Scripting.WatchScripts { if config.Config.Scripting.Source != "" && config.Config.Scripting.WatchScripts {
watchScripts(kubernetesProvider, false) watchScripts(ctx, kubernetesProvider, false)
} }
if config.Config.Scripting.Console { if config.Config.Scripting.Console {

View File

@ -247,6 +247,10 @@ func (provider *Provider) GetNamespaces() (namespaces []string) {
return return
} }
func (provider *Provider) GetClientSet() *kubernetes.Clientset {
return provider.clientSet
}
func getClientSet(config *rest.Config) (*kubernetes.Clientset, error) { func getClientSet(config *rest.Config) (*kubernetes.Clientset, error) {
clientSet, err := kubernetes.NewForConfig(config) clientSet, err := kubernetes.NewForConfig(config)
if err != nil { if err != nil {