mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-04-27 19:47:31 +00:00
Make the scritps
command directly use the K8s API without requiring a connector to Hub (#1615)
* Make the `scritps` command directly use the K8s API without requiring a connector to Hub * Fix linter * Fix linter --------- Co-authored-by: Alon Girmonsky <1990761+alongir@users.noreply.github.com>
This commit is contained in:
parent
98173350ec
commit
3d5c999be1
110
cmd/scripts.go
110
cmd/scripts.go
@ -2,14 +2,12 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/creasty/defaults"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/kubeshark/kubeshark/config"
|
||||
"github.com/kubeshark/kubeshark/config/configStructs"
|
||||
"github.com/kubeshark/kubeshark/internal/connect"
|
||||
"github.com/kubeshark/kubeshark/kubernetes"
|
||||
"github.com/kubeshark/kubeshark/misc"
|
||||
"github.com/kubeshark/kubeshark/utils"
|
||||
@ -45,19 +43,99 @@ func runScripts() {
|
||||
return
|
||||
}
|
||||
|
||||
hubUrl := kubernetes.GetHubUrl()
|
||||
response, err := http.Get(fmt.Sprintf("%s/echo", hubUrl))
|
||||
if err != nil || response.StatusCode != 200 {
|
||||
log.Info().Msg(fmt.Sprintf(utils.Yellow, "Couldn't connect to Hub. Establishing proxy..."))
|
||||
runProxy(false, true)
|
||||
kubernetesProvider, err := getKubernetesProviderForCli(false, false)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
return
|
||||
}
|
||||
|
||||
connector = connect.NewConnector(kubernetes.GetHubUrl(), connect.DefaultRetries, connect.DefaultTimeout)
|
||||
|
||||
watchScripts(true)
|
||||
watchScripts(kubernetesProvider, true)
|
||||
}
|
||||
|
||||
func watchScripts(block bool) {
|
||||
func createScript(provider *kubernetes.Provider, script misc.ConfigMapScript) (index int64, err error) {
|
||||
var scripts map[int64]misc.ConfigMapScript
|
||||
scripts, err = kubernetes.ConfigGetScripts(provider)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Turn it into updateScript if there is a script with the same title
|
||||
var setScript bool
|
||||
if script.Title != "New Script" {
|
||||
for i, v := range scripts {
|
||||
if v.Title == script.Title {
|
||||
scripts[i] = script
|
||||
setScript = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !setScript {
|
||||
index = int64(len(scripts))
|
||||
scripts[index] = script
|
||||
}
|
||||
|
||||
var data []byte
|
||||
data, err = json.Marshal(scripts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = kubernetes.SetConfig(provider, kubernetes.CONFIG_SCRIPTING_SCRIPTS, string(data))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func updateScript(provider *kubernetes.Provider, index int64, script misc.ConfigMapScript) (err error) {
|
||||
var scripts map[int64]misc.ConfigMapScript
|
||||
scripts, err = kubernetes.ConfigGetScripts(provider)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
scripts[index] = script
|
||||
|
||||
var data []byte
|
||||
data, err = json.Marshal(scripts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = kubernetes.SetConfig(provider, kubernetes.CONFIG_SCRIPTING_SCRIPTS, string(data))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func deleteScript(provider *kubernetes.Provider, index int64) (err error) {
|
||||
var scripts map[int64]misc.ConfigMapScript
|
||||
scripts, err = kubernetes.ConfigGetScripts(provider)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
delete(scripts, index)
|
||||
|
||||
var data []byte
|
||||
data, err = json.Marshal(scripts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = kubernetes.SetConfig(provider, kubernetes.CONFIG_SCRIPTING_SCRIPTS, string(data))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func watchScripts(provider *kubernetes.Provider, block bool) {
|
||||
files := make(map[string]int64)
|
||||
|
||||
scripts, err := config.Config.Scripting.GetScripts()
|
||||
@ -67,7 +145,7 @@ func watchScripts(block bool) {
|
||||
}
|
||||
|
||||
for _, script := range scripts {
|
||||
index, err := connector.PostScript(script)
|
||||
index, err := createScript(provider, script.ConfigMap())
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
return
|
||||
@ -98,7 +176,7 @@ func watchScripts(block bool) {
|
||||
continue
|
||||
}
|
||||
|
||||
index, err := connector.PostScript(script)
|
||||
index, err := createScript(provider, script.ConfigMap())
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
continue
|
||||
@ -114,7 +192,7 @@ func watchScripts(block bool) {
|
||||
continue
|
||||
}
|
||||
|
||||
err = connector.PutScript(script, index)
|
||||
err = updateScript(provider, index, script.ConfigMap())
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
continue
|
||||
@ -122,7 +200,7 @@ func watchScripts(block bool) {
|
||||
|
||||
case fsnotify.Rename:
|
||||
index := files[event.Name]
|
||||
err := connector.DeleteScript(index)
|
||||
err := deleteScript(provider, index)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
continue
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/kubeshark/kubeshark/internal/connect"
|
||||
"github.com/kubeshark/kubeshark/kubernetes/helm"
|
||||
"github.com/kubeshark/kubeshark/misc"
|
||||
"github.com/kubeshark/kubeshark/utils"
|
||||
@ -32,7 +31,6 @@ type tapState struct {
|
||||
}
|
||||
|
||||
var state tapState
|
||||
var connector *connect.Connector
|
||||
|
||||
type Readiness struct {
|
||||
Hub bool
|
||||
@ -52,8 +50,6 @@ func tap() {
|
||||
Str("limit", config.Config.Tap.StorageLimit).
|
||||
Msg(fmt.Sprintf("%s will store the traffic up to a limit (per node). Oldest TCP/UDP streams will be removed once the limit is reached.", misc.Software))
|
||||
|
||||
connector = connect.NewConnector(kubernetes.GetHubUrl(), connect.DefaultRetries, connect.DefaultTimeout)
|
||||
|
||||
kubernetesProvider, err := getKubernetesProviderForCli(false, false)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
@ -429,7 +425,7 @@ func postFrontStarted(ctx context.Context, kubernetesProvider *kubernetes.Provid
|
||||
}
|
||||
|
||||
if config.Config.Scripting.Source != "" && config.Config.Scripting.WatchScripts {
|
||||
watchScripts(false)
|
||||
watchScripts(kubernetesProvider, false)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,6 @@ package connect
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -12,7 +11,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/kubeshark/kubeshark/config"
|
||||
"github.com/kubeshark/kubeshark/misc"
|
||||
"github.com/kubeshark/kubeshark/utils"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
@ -121,140 +119,6 @@ func (connector *Connector) PostLicense(license string) {
|
||||
}
|
||||
}
|
||||
|
||||
type postScriptRequest struct {
|
||||
Title string `json:"title"`
|
||||
Code string `json:"code"`
|
||||
}
|
||||
|
||||
func (connector *Connector) PostScript(script *misc.Script) (index int64, err error) {
|
||||
postScriptUrl := fmt.Sprintf("%s/scripts", connector.url)
|
||||
|
||||
payload := postScriptRequest{
|
||||
Title: script.Title,
|
||||
Code: script.Code,
|
||||
}
|
||||
|
||||
var scriptMarshalled []byte
|
||||
if scriptMarshalled, err = json.Marshal(payload); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to marshal the script:")
|
||||
} else {
|
||||
ok := false
|
||||
for !ok {
|
||||
var resp *http.Response
|
||||
if resp, err = utils.Post(postScriptUrl, "application/json", bytes.NewBuffer(scriptMarshalled), connector.client, config.Config.License); err != nil || resp.StatusCode != http.StatusOK {
|
||||
if _, ok := err.(*url.Error); ok {
|
||||
break
|
||||
}
|
||||
log.Warn().Err(err).Msg("Failed creating script Hub:")
|
||||
} else {
|
||||
|
||||
var j map[string]interface{}
|
||||
err = json.NewDecoder(resp.Body).Decode(&j)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
val, ok := j["index"]
|
||||
if !ok {
|
||||
err = errors.New("Response does not contain `key` field!")
|
||||
return
|
||||
}
|
||||
|
||||
index = int64(val.(float64))
|
||||
|
||||
log.Debug().Int("index", int(index)).Interface("script", script).Msg("Created script on Hub:")
|
||||
return
|
||||
}
|
||||
time.Sleep(DefaultSleep)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (connector *Connector) PutScript(script *misc.Script, index int64) (err error) {
|
||||
putScriptUrl := fmt.Sprintf("%s/scripts/%d", connector.url, index)
|
||||
|
||||
var scriptMarshalled []byte
|
||||
if scriptMarshalled, err = json.Marshal(script); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to marshal the script:")
|
||||
} else {
|
||||
ok := false
|
||||
for !ok {
|
||||
client := &http.Client{}
|
||||
|
||||
var req *http.Request
|
||||
req, err = http.NewRequest(http.MethodPut, putScriptUrl, bytes.NewBuffer(scriptMarshalled))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
return
|
||||
}
|
||||
utils.AddIgnoreCaptureHeader(req)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("License-Key", config.Config.License)
|
||||
|
||||
var resp *http.Response
|
||||
resp, err = client.Do(req)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
return
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
if _, ok := err.(*url.Error); ok {
|
||||
break
|
||||
}
|
||||
log.Warn().Err(err).Msg("Failed updating script on Hub:")
|
||||
} else {
|
||||
log.Debug().Int("index", int(index)).Interface("script", script).Msg("Updated script on Hub:")
|
||||
return
|
||||
}
|
||||
time.Sleep(DefaultSleep)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (connector *Connector) DeleteScript(index int64) (err error) {
|
||||
deleteScriptUrl := fmt.Sprintf("%s/scripts/%d", connector.url, index)
|
||||
|
||||
ok := false
|
||||
for !ok {
|
||||
client := &http.Client{}
|
||||
|
||||
var req *http.Request
|
||||
req, err = http.NewRequest(http.MethodDelete, deleteScriptUrl, nil)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
return
|
||||
}
|
||||
utils.AddIgnoreCaptureHeader(req)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("License-Key", config.Config.License)
|
||||
|
||||
var resp *http.Response
|
||||
resp, err = client.Do(req)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
return
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
if _, ok := err.(*url.Error); ok {
|
||||
break
|
||||
}
|
||||
log.Warn().Err(err).Msg("Failed deleting script on Hub:")
|
||||
} else {
|
||||
log.Debug().Int("index", int(index)).Msg("Deleted script on Hub:")
|
||||
return
|
||||
}
|
||||
time.Sleep(DefaultSleep)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (connector *Connector) PostPcapsMerge(out *os.File) {
|
||||
postEnvUrl := fmt.Sprintf("%s/pcaps/merge", connector.url)
|
||||
|
||||
|
@ -2,8 +2,10 @@ package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/kubeshark/kubeshark/config"
|
||||
"github.com/kubeshark/kubeshark/misc"
|
||||
"github.com/rs/zerolog/log"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -23,6 +25,7 @@ const (
|
||||
CONFIG_AUTH_ENABLED = "AUTH_ENABLED"
|
||||
CONFIG_AUTH_TYPE = "AUTH_TYPE"
|
||||
CONFIG_AUTH_SAML_IDP_METADATA_URL = "AUTH_SAML_IDP_METADATA_URL"
|
||||
CONFIG_SCRIPTING_SCRIPTS = "SCRIPTING_SCRIPTS"
|
||||
)
|
||||
|
||||
func SetSecret(provider *Provider, key string, value string) (updated bool, err error) {
|
||||
@ -48,6 +51,17 @@ func SetSecret(provider *Provider, key string, value string) (updated bool, err
|
||||
return
|
||||
}
|
||||
|
||||
func GetConfig(provider *Provider, key string) (value string, err error) {
|
||||
var configMap *v1.ConfigMap
|
||||
configMap, err = provider.clientSet.CoreV1().ConfigMaps(config.Config.Tap.Release.Namespace).Get(context.TODO(), SELF_RESOURCES_PREFIX+SUFFIX_CONFIG_MAP, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
value = configMap.Data[key]
|
||||
return
|
||||
}
|
||||
|
||||
func SetConfig(provider *Provider, key string, value string) (updated bool, err error) {
|
||||
var configMap *v1.ConfigMap
|
||||
configMap, err = provider.clientSet.CoreV1().ConfigMaps(config.Config.Tap.Release.Namespace).Get(context.TODO(), SELF_RESOURCES_PREFIX+SUFFIX_CONFIG_MAP, metav1.GetOptions{})
|
||||
@ -70,3 +84,14 @@ func SetConfig(provider *Provider, key string, value string) (updated bool, err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func ConfigGetScripts(provider *Provider) (scripts map[int64]misc.ConfigMapScript, err error) {
|
||||
var data string
|
||||
data, err = GetConfig(provider, CONFIG_SCRIPTING_SCRIPTS)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(data), &scripts)
|
||||
return
|
||||
}
|
||||
|
@ -15,6 +15,18 @@ type Script struct {
|
||||
Code string `json:"code"`
|
||||
}
|
||||
|
||||
type ConfigMapScript struct {
|
||||
Title string `json:"title"`
|
||||
Code string `json:"code"`
|
||||
}
|
||||
|
||||
func (s *Script) ConfigMap() ConfigMapScript {
|
||||
return ConfigMapScript{
|
||||
Title: s.Title,
|
||||
Code: s.Code,
|
||||
}
|
||||
}
|
||||
|
||||
func ReadScriptFile(path string) (script *Script, err error) {
|
||||
filename := filepath.Base(path)
|
||||
var body []byte
|
||||
|
Loading…
Reference in New Issue
Block a user