mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-08 11:59:17 +00:00
Added update config route for install mode (#581)
This commit is contained in:
parent
2834ae1e85
commit
02b2cbaa03
@ -1,7 +1,6 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
@ -11,7 +10,6 @@ import (
|
|||||||
"mizuserver/pkg/config"
|
"mizuserver/pkg/config"
|
||||||
"mizuserver/pkg/controllers"
|
"mizuserver/pkg/controllers"
|
||||||
"mizuserver/pkg/models"
|
"mizuserver/pkg/models"
|
||||||
"mizuserver/pkg/providers"
|
|
||||||
"mizuserver/pkg/routes"
|
"mizuserver/pkg/routes"
|
||||||
"mizuserver/pkg/up9"
|
"mizuserver/pkg/up9"
|
||||||
"mizuserver/pkg/utils"
|
"mizuserver/pkg/utils"
|
||||||
@ -22,14 +20,12 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"plugin"
|
"plugin"
|
||||||
"regexp"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/up9inc/mizu/shared/kubernetes"
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
|
||||||
"github.com/antelman107/net-wait-go/wait"
|
"github.com/antelman107/net-wait-go/wait"
|
||||||
@ -256,13 +252,18 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) {
|
|||||||
app.Use(DisableRootStaticCache())
|
app.Use(DisableRootStaticCache())
|
||||||
|
|
||||||
if err := setUIMode(); err != nil {
|
if err := setUIMode(); err != nil {
|
||||||
logger.Log.Panicf("Error setting ui mode, err: %v", err)
|
logger.Log.Errorf("Error setting ui mode, err: %v", err)
|
||||||
}
|
}
|
||||||
app.Use(static.ServeRoot("/", "./site"))
|
app.Use(static.ServeRoot("/", "./site"))
|
||||||
|
|
||||||
app.Use(CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
|
app.Use(CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
|
||||||
|
|
||||||
api.WebSocketRoutes(app, &eventHandlers, startTime)
|
api.WebSocketRoutes(app, &eventHandlers, startTime)
|
||||||
|
|
||||||
|
if config.Config.StandaloneMode {
|
||||||
|
routes.StandaloneRoutes(app)
|
||||||
|
}
|
||||||
|
|
||||||
routes.QueryRoutes(app)
|
routes.QueryRoutes(app)
|
||||||
routes.EntriesRoutes(app)
|
routes.EntriesRoutes(app)
|
||||||
routes.MetadataRoutes(app)
|
routes.MetadataRoutes(app)
|
||||||
@ -471,67 +472,3 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, targetNamespaces []string, podFilterRegex regexp.Regexp, ignoredUserAgents []string, mizuApiFilteringOptions tapApi.TrafficFilteringOptions, istio bool) (*kubernetes.MizuTapperSyncer, error) {
|
|
||||||
tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{
|
|
||||||
TargetNamespaces: targetNamespaces,
|
|
||||||
PodFilterRegex: podFilterRegex,
|
|
||||||
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
|
|
||||||
AgentImage: config.Config.AgentImage,
|
|
||||||
TapperResources: config.Config.TapperResources,
|
|
||||||
ImagePullPolicy: v1.PullPolicy(config.Config.PullPolicy),
|
|
||||||
LogLevel: config.Config.LogLevel,
|
|
||||||
IgnoredUserAgents: ignoredUserAgents,
|
|
||||||
MizuApiFilteringOptions: mizuApiFilteringOptions,
|
|
||||||
MizuServiceAccountExists: true, //assume service account exists since install mode will not function without it anyway
|
|
||||||
Istio: istio,
|
|
||||||
}, time.Now())
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle tapperSyncer events (pod changes and errors)
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case syncerErr, ok := <-tapperSyncer.ErrorOut:
|
|
||||||
if !ok {
|
|
||||||
logger.Log.Debug("mizuTapperSyncer err channel closed, ending listener loop")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr)
|
|
||||||
case tapPodChangeEvent, ok := <-tapperSyncer.TapPodChangesOut:
|
|
||||||
if !ok {
|
|
||||||
logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)}
|
|
||||||
|
|
||||||
tappedPodsStatus := utils.GetTappedPodsStatus()
|
|
||||||
|
|
||||||
serializedTapStatus, err := json.Marshal(shared.CreateWebSocketStatusMessage(tappedPodsStatus))
|
|
||||||
if err != nil {
|
|
||||||
logger.Log.Fatalf("error serializing tap status: %v", err)
|
|
||||||
}
|
|
||||||
api.BroadcastToBrowserClients(serializedTapStatus)
|
|
||||||
providers.ExpectedTapperAmount = tapPodChangeEvent.ExpectedTapperAmount
|
|
||||||
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
|
|
||||||
if !ok {
|
|
||||||
logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if providers.TappersStatus == nil {
|
|
||||||
providers.TappersStatus = make(map[string]shared.TapperStatus)
|
|
||||||
}
|
|
||||||
providers.TappersStatus[tapperStatus.NodeName] = tapperStatus
|
|
||||||
|
|
||||||
case <-ctx.Done():
|
|
||||||
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return tapperSyncer, nil
|
|
||||||
}
|
|
||||||
|
@ -83,10 +83,10 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
|||||||
meta := make(chan []byte)
|
meta := make(chan []byte)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
socketCleanup(socketId, connectedWebsockets[socketId])
|
||||||
data <- []byte(basenine.CloseChannel)
|
data <- []byte(basenine.CloseChannel)
|
||||||
meta <- []byte(basenine.CloseChannel)
|
meta <- []byte(basenine.CloseChannel)
|
||||||
connection.Close()
|
connection.Close()
|
||||||
socketCleanup(socketId, connectedWebsockets[socketId])
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
eventHandlers.WebSocketConnect(socketId, isTapper)
|
eventHandlers.WebSocketConnect(socketId, isTapper)
|
||||||
@ -97,7 +97,12 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
|||||||
for {
|
for {
|
||||||
_, msg, err := ws.ReadMessage()
|
_, msg, err := ws.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if _, ok := err.(*websocket.CloseError); ok {
|
||||||
|
logger.Log.Debugf("Received websocket close message, socket id: %d", socketId)
|
||||||
|
} else {
|
||||||
logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
|
logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
|
||||||
|
}
|
||||||
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
133
agent/pkg/controllers/standalone_controller.go
Normal file
133
agent/pkg/controllers/standalone_controller.go
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
package controllers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/up9inc/mizu/shared"
|
||||||
|
"github.com/up9inc/mizu/shared/kubernetes"
|
||||||
|
"github.com/up9inc/mizu/shared/logger"
|
||||||
|
tapApi "github.com/up9inc/mizu/tap/api"
|
||||||
|
v1 "k8s.io/api/core/v1"
|
||||||
|
"mizuserver/pkg/config"
|
||||||
|
"mizuserver/pkg/models"
|
||||||
|
"mizuserver/pkg/providers"
|
||||||
|
"net/http"
|
||||||
|
"regexp"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var globalTapConfig *models.StandaloneTapConfig
|
||||||
|
var cancelTapperSyncer context.CancelFunc
|
||||||
|
var kubernetesProvider *kubernetes.Provider
|
||||||
|
|
||||||
|
func PostTapConfig(c *gin.Context) {
|
||||||
|
tapConfig := &models.StandaloneTapConfig{}
|
||||||
|
|
||||||
|
if err := c.Bind(tapConfig); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if cancelTapperSyncer != nil {
|
||||||
|
cancelTapperSyncer()
|
||||||
|
|
||||||
|
providers.TapStatus = shared.TapStatus{}
|
||||||
|
providers.TappersStatus = make(map[string]shared.TapperStatus)
|
||||||
|
|
||||||
|
broadcastTappedPodsStatus()
|
||||||
|
}
|
||||||
|
|
||||||
|
if kubernetesProvider == nil {
|
||||||
|
var err error
|
||||||
|
kubernetesProvider, err = kubernetes.NewProviderInCluster()
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
var tappedNamespaces []string
|
||||||
|
for namespace, tapped := range tapConfig.TappedNamespaces {
|
||||||
|
if tapped {
|
||||||
|
tappedNamespaces = append(tappedNamespaces, namespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
podRegex, _ := regexp.Compile(".*")
|
||||||
|
|
||||||
|
if _, err := startMizuTapperSyncer(ctx, kubernetesProvider, tappedNamespaces, *podRegex, []string{} , tapApi.TrafficFilteringOptions{}, false); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, err)
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cancelTapperSyncer = cancel
|
||||||
|
globalTapConfig = tapConfig
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, "OK")
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetTapConfig(c *gin.Context) {
|
||||||
|
if globalTapConfig != nil {
|
||||||
|
c.JSON(http.StatusOK, globalTapConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusBadRequest, "Not config found")
|
||||||
|
}
|
||||||
|
|
||||||
|
func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, targetNamespaces []string, podFilterRegex regexp.Regexp, ignoredUserAgents []string, mizuApiFilteringOptions tapApi.TrafficFilteringOptions, istio bool) (*kubernetes.MizuTapperSyncer, error) {
|
||||||
|
tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{
|
||||||
|
TargetNamespaces: targetNamespaces,
|
||||||
|
PodFilterRegex: podFilterRegex,
|
||||||
|
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
|
||||||
|
AgentImage: config.Config.AgentImage,
|
||||||
|
TapperResources: config.Config.TapperResources,
|
||||||
|
ImagePullPolicy: v1.PullPolicy(config.Config.PullPolicy),
|
||||||
|
LogLevel: config.Config.LogLevel,
|
||||||
|
IgnoredUserAgents: ignoredUserAgents,
|
||||||
|
MizuApiFilteringOptions: mizuApiFilteringOptions,
|
||||||
|
MizuServiceAccountExists: true, //assume service account exists since install mode will not function without it anyway
|
||||||
|
Istio: istio,
|
||||||
|
}, time.Now())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle tapperSyncer events (pod changes and errors)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case syncerErr, ok := <-tapperSyncer.ErrorOut:
|
||||||
|
if !ok {
|
||||||
|
logger.Log.Debug("mizuTapperSyncer err channel closed, ending listener loop")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr)
|
||||||
|
case _, ok := <-tapperSyncer.TapPodChangesOut:
|
||||||
|
if !ok {
|
||||||
|
logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)}
|
||||||
|
broadcastTappedPodsStatus()
|
||||||
|
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
|
||||||
|
if !ok {
|
||||||
|
logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
addTapperStatus(tapperStatus)
|
||||||
|
broadcastTappedPodsStatus()
|
||||||
|
case <-ctx.Done():
|
||||||
|
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return tapperSyncer, nil
|
||||||
|
}
|
@ -54,21 +54,28 @@ func broadcastTappedPodsStatus() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addTapperStatus(tapperStatus shared.TapperStatus) {
|
||||||
|
if providers.TappersStatus == nil {
|
||||||
|
providers.TappersStatus = make(map[string]shared.TapperStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
providers.TappersStatus[tapperStatus.NodeName] = tapperStatus
|
||||||
|
}
|
||||||
|
|
||||||
func PostTapperStatus(c *gin.Context) {
|
func PostTapperStatus(c *gin.Context) {
|
||||||
tapperStatus := &shared.TapperStatus{}
|
tapperStatus := &shared.TapperStatus{}
|
||||||
if err := c.Bind(tapperStatus); err != nil {
|
if err := c.Bind(tapperStatus); err != nil {
|
||||||
c.JSON(http.StatusBadRequest, err)
|
c.JSON(http.StatusBadRequest, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := validation.Validate(tapperStatus); err != nil {
|
if err := validation.Validate(tapperStatus); err != nil {
|
||||||
c.JSON(http.StatusBadRequest, err)
|
c.JSON(http.StatusBadRequest, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
|
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
|
||||||
if providers.TappersStatus == nil {
|
addTapperStatus(*tapperStatus)
|
||||||
providers.TappersStatus = make(map[string]shared.TapperStatus)
|
|
||||||
}
|
|
||||||
providers.TappersStatus[tapperStatus.NodeName] = *tapperStatus
|
|
||||||
broadcastTappedPodsStatus()
|
broadcastTappedPodsStatus()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,6 +16,10 @@ func GetEntry(r *tapApi.MizuEntry, v tapApi.DataUnmarshaler) error {
|
|||||||
return v.UnmarshalData(r)
|
return v.UnmarshalData(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StandaloneTapConfig struct {
|
||||||
|
TappedNamespaces map[string]bool `json:"tappedNamespaces"`
|
||||||
|
}
|
||||||
|
|
||||||
type EntriesRequest struct {
|
type EntriesRequest struct {
|
||||||
LeftOff int `form:"leftOff" validate:"required,min=-1"`
|
LeftOff int `form:"leftOff" validate:"required,min=-1"`
|
||||||
Direction int `form:"direction" validate:"required,oneof='1' '-1'"`
|
Direction int `form:"direction" validate:"required,oneof='1' '-1'"`
|
||||||
|
@ -20,7 +20,6 @@ var (
|
|||||||
TappersStatus map[string]shared.TapperStatus
|
TappersStatus map[string]shared.TapperStatus
|
||||||
authStatus *models.AuthStatus
|
authStatus *models.AuthStatus
|
||||||
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
|
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
|
||||||
ExpectedTapperAmount = -1 //only relevant in install mode as cli manages tappers otherwise
|
|
||||||
tappersCountLock = sync.Mutex{}
|
tappersCountLock = sync.Mutex{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
13
agent/pkg/routes/standalone_routes.go
Normal file
13
agent/pkg/routes/standalone_routes.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package routes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"mizuserver/pkg/controllers"
|
||||||
|
)
|
||||||
|
|
||||||
|
func StandaloneRoutes(ginApp *gin.Engine) {
|
||||||
|
routeGroup := ginApp.Group("/standalone")
|
||||||
|
|
||||||
|
routeGroup.POST("/tapConfig", controllers.PostTapConfig)
|
||||||
|
routeGroup.GET("/tapConfig", controllers.GetTapConfig)
|
||||||
|
}
|
@ -18,7 +18,6 @@ const updateTappersDelay = 5 * time.Second
|
|||||||
type TappedPodChangeEvent struct {
|
type TappedPodChangeEvent struct {
|
||||||
Added []core.Pod
|
Added []core.Pod
|
||||||
Removed []core.Pod
|
Removed []core.Pod
|
||||||
ExpectedTapperAmount int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created
|
// MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created
|
||||||
@ -93,6 +92,10 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperPods() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tapperSyncer.startTime.After(pod.CreationTimestamp.Time) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
logger.Log.Debugf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase)
|
logger.Log.Debugf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase)
|
||||||
if pod.Spec.NodeName != "" {
|
if pod.Spec.NodeName != "" {
|
||||||
tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)}
|
tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)}
|
||||||
@ -147,7 +150,7 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() {
|
|||||||
|
|
||||||
pod, err1 := tapperSyncer.kubernetesProvider.GetPod(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, event.Regarding.Name)
|
pod, err1 := tapperSyncer.kubernetesProvider.GetPod(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, event.Regarding.Name)
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
logger.Log.Debugf(fmt.Sprintf("Failed to get tapper pod %s", event.Regarding.Name))
|
logger.Log.Debugf(fmt.Sprintf("Couldn't get tapper pod %s", event.Regarding.Name))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,7 +287,6 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch
|
|||||||
tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{
|
tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{
|
||||||
Added: addedPods,
|
Added: addedPods,
|
||||||
Removed: removedPods,
|
Removed: removedPods,
|
||||||
ExpectedTapperAmount: len(tapperSyncer.nodeToTappedPodMap),
|
|
||||||
}
|
}
|
||||||
return nil, true
|
return nil, true
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user