diff --git a/agent/main.go b/agent/main.go index e9609c586..40b92fd54 100644 --- a/agent/main.go +++ b/agent/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "encoding/json" "errors" "flag" @@ -11,7 +10,6 @@ import ( "mizuserver/pkg/config" "mizuserver/pkg/controllers" "mizuserver/pkg/models" - "mizuserver/pkg/providers" "mizuserver/pkg/routes" "mizuserver/pkg/up9" "mizuserver/pkg/utils" @@ -22,14 +20,12 @@ import ( "path" "path/filepath" "plugin" - "regexp" "sort" "strconv" "strings" "syscall" "time" - "github.com/up9inc/mizu/shared/kubernetes" v1 "k8s.io/api/core/v1" "github.com/antelman107/net-wait-go/wait" @@ -256,13 +252,18 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) { app.Use(DisableRootStaticCache()) if err := setUIMode(); err != nil { - logger.Log.Panicf("Error setting ui mode, err: %v", err) + logger.Log.Errorf("Error setting ui mode, err: %v", err) } app.Use(static.ServeRoot("/", "./site")) app.Use(CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before api.WebSocketRoutes(app, &eventHandlers, startTime) + + if config.Config.StandaloneMode { + routes.StandaloneRoutes(app) + } + routes.QueryRoutes(app) routes.EntriesRoutes(app) routes.MetadataRoutes(app) @@ -471,67 +472,3 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) { } } } - -func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, targetNamespaces []string, podFilterRegex regexp.Regexp, ignoredUserAgents []string, mizuApiFilteringOptions tapApi.TrafficFilteringOptions, istio bool) (*kubernetes.MizuTapperSyncer, error) { - tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ - TargetNamespaces: targetNamespaces, - PodFilterRegex: podFilterRegex, - MizuResourcesNamespace: config.Config.MizuResourcesNamespace, - AgentImage: config.Config.AgentImage, - TapperResources: config.Config.TapperResources, - ImagePullPolicy: v1.PullPolicy(config.Config.PullPolicy), - LogLevel: config.Config.LogLevel, - IgnoredUserAgents: ignoredUserAgents, - MizuApiFilteringOptions: mizuApiFilteringOptions, - MizuServiceAccountExists: true, //assume service account exists since install mode will not function without it anyway - Istio: istio, - }, time.Now()) - - if err != nil { - return nil, err - } - - // handle tapperSyncer events (pod changes and errors) - go func() { - for { - select { - case syncerErr, ok := <-tapperSyncer.ErrorOut: - if !ok { - logger.Log.Debug("mizuTapperSyncer err channel closed, ending listener loop") - return - } - logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr) - case tapPodChangeEvent, ok := <-tapperSyncer.TapPodChangesOut: - if !ok { - logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop") - return - } - providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)} - - tappedPodsStatus := utils.GetTappedPodsStatus() - - serializedTapStatus, err := json.Marshal(shared.CreateWebSocketStatusMessage(tappedPodsStatus)) - if err != nil { - logger.Log.Fatalf("error serializing tap status: %v", err) - } - api.BroadcastToBrowserClients(serializedTapStatus) - providers.ExpectedTapperAmount = tapPodChangeEvent.ExpectedTapperAmount - case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut: - if !ok { - logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop") - return - } - if providers.TappersStatus == nil { - providers.TappersStatus = make(map[string]shared.TapperStatus) - } - providers.TappersStatus[tapperStatus.NodeName] = tapperStatus - - case <-ctx.Done(): - logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") - return - } - } - }() - - return tapperSyncer, nil -} diff --git a/agent/pkg/api/socket_routes.go b/agent/pkg/api/socket_routes.go index a96bb11ff..89e8f45f0 100644 --- a/agent/pkg/api/socket_routes.go +++ b/agent/pkg/api/socket_routes.go @@ -83,10 +83,10 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even meta := make(chan []byte) defer func() { + socketCleanup(socketId, connectedWebsockets[socketId]) data <- []byte(basenine.CloseChannel) meta <- []byte(basenine.CloseChannel) connection.Close() - socketCleanup(socketId, connectedWebsockets[socketId]) }() eventHandlers.WebSocketConnect(socketId, isTapper) @@ -97,7 +97,12 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even for { _, msg, err := ws.ReadMessage() if err != nil { - logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err) + if _, ok := err.(*websocket.CloseError); ok { + logger.Log.Debugf("Received websocket close message, socket id: %d", socketId) + } else { + logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err) + } + break } diff --git a/agent/pkg/controllers/standalone_controller.go b/agent/pkg/controllers/standalone_controller.go new file mode 100644 index 000000000..ea9468772 --- /dev/null +++ b/agent/pkg/controllers/standalone_controller.go @@ -0,0 +1,133 @@ +package controllers + +import ( + "context" + "github.com/gin-gonic/gin" + "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/shared/kubernetes" + "github.com/up9inc/mizu/shared/logger" + tapApi "github.com/up9inc/mizu/tap/api" + v1 "k8s.io/api/core/v1" + "mizuserver/pkg/config" + "mizuserver/pkg/models" + "mizuserver/pkg/providers" + "net/http" + "regexp" + "time" +) + +var globalTapConfig *models.StandaloneTapConfig +var cancelTapperSyncer context.CancelFunc +var kubernetesProvider *kubernetes.Provider + +func PostTapConfig(c *gin.Context) { + tapConfig := &models.StandaloneTapConfig{} + + if err := c.Bind(tapConfig); err != nil { + c.JSON(http.StatusBadRequest, err) + return + } + + if cancelTapperSyncer != nil { + cancelTapperSyncer() + + providers.TapStatus = shared.TapStatus{} + providers.TappersStatus = make(map[string]shared.TapperStatus) + + broadcastTappedPodsStatus() + } + + if kubernetesProvider == nil { + var err error + kubernetesProvider, err = kubernetes.NewProviderInCluster() + if err != nil { + c.JSON(http.StatusBadRequest, err) + return + } + } + + ctx, cancel := context.WithCancel(context.Background()) + + var tappedNamespaces []string + for namespace, tapped := range tapConfig.TappedNamespaces { + if tapped { + tappedNamespaces = append(tappedNamespaces, namespace) + } + } + + podRegex, _ := regexp.Compile(".*") + + if _, err := startMizuTapperSyncer(ctx, kubernetesProvider, tappedNamespaces, *podRegex, []string{} , tapApi.TrafficFilteringOptions{}, false); err != nil { + c.JSON(http.StatusBadRequest, err) + cancel() + return + } + + cancelTapperSyncer = cancel + globalTapConfig = tapConfig + + c.JSON(http.StatusOK, "OK") +} + +func GetTapConfig(c *gin.Context) { + if globalTapConfig != nil { + c.JSON(http.StatusOK, globalTapConfig) + } + + c.JSON(http.StatusBadRequest, "Not config found") +} + +func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, targetNamespaces []string, podFilterRegex regexp.Regexp, ignoredUserAgents []string, mizuApiFilteringOptions tapApi.TrafficFilteringOptions, istio bool) (*kubernetes.MizuTapperSyncer, error) { + tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ + TargetNamespaces: targetNamespaces, + PodFilterRegex: podFilterRegex, + MizuResourcesNamespace: config.Config.MizuResourcesNamespace, + AgentImage: config.Config.AgentImage, + TapperResources: config.Config.TapperResources, + ImagePullPolicy: v1.PullPolicy(config.Config.PullPolicy), + LogLevel: config.Config.LogLevel, + IgnoredUserAgents: ignoredUserAgents, + MizuApiFilteringOptions: mizuApiFilteringOptions, + MizuServiceAccountExists: true, //assume service account exists since install mode will not function without it anyway + Istio: istio, + }, time.Now()) + + if err != nil { + return nil, err + } + + // handle tapperSyncer events (pod changes and errors) + go func() { + for { + select { + case syncerErr, ok := <-tapperSyncer.ErrorOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer err channel closed, ending listener loop") + return + } + logger.Log.Fatalf("fatal tap syncer error: %v", syncerErr) + case _, ok := <-tapperSyncer.TapPodChangesOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop") + return + } + + providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)} + broadcastTappedPodsStatus() + case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut: + if !ok { + logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop") + return + } + + addTapperStatus(tapperStatus) + broadcastTappedPodsStatus() + case <-ctx.Done(): + logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") + return + } + } + }() + + return tapperSyncer, nil +} diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index 8819238db..7ffef36b6 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -54,21 +54,28 @@ func broadcastTappedPodsStatus() { } } +func addTapperStatus(tapperStatus shared.TapperStatus) { + if providers.TappersStatus == nil { + providers.TappersStatus = make(map[string]shared.TapperStatus) + } + + providers.TappersStatus[tapperStatus.NodeName] = tapperStatus +} + func PostTapperStatus(c *gin.Context) { tapperStatus := &shared.TapperStatus{} if err := c.Bind(tapperStatus); err != nil { c.JSON(http.StatusBadRequest, err) return } + if err := validation.Validate(tapperStatus); err != nil { c.JSON(http.StatusBadRequest, err) return } + logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus) - if providers.TappersStatus == nil { - providers.TappersStatus = make(map[string]shared.TapperStatus) - } - providers.TappersStatus[tapperStatus.NodeName] = *tapperStatus + addTapperStatus(*tapperStatus) broadcastTappedPodsStatus() } diff --git a/agent/pkg/models/models.go b/agent/pkg/models/models.go index 84f8e9842..fc55d5ac6 100644 --- a/agent/pkg/models/models.go +++ b/agent/pkg/models/models.go @@ -16,6 +16,10 @@ func GetEntry(r *tapApi.MizuEntry, v tapApi.DataUnmarshaler) error { return v.UnmarshalData(r) } +type StandaloneTapConfig struct { + TappedNamespaces map[string]bool `json:"tappedNamespaces"` +} + type EntriesRequest struct { LeftOff int `form:"leftOff" validate:"required,min=-1"` Direction int `form:"direction" validate:"required,oneof='1' '-1'"` diff --git a/agent/pkg/providers/status_provider.go b/agent/pkg/providers/status_provider.go index 9ed1a2a46..2f90b6d75 100644 --- a/agent/pkg/providers/status_provider.go +++ b/agent/pkg/providers/status_provider.go @@ -20,7 +20,6 @@ var ( TappersStatus map[string]shared.TapperStatus authStatus *models.AuthStatus RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime) - ExpectedTapperAmount = -1 //only relevant in install mode as cli manages tappers otherwise tappersCountLock = sync.Mutex{} ) diff --git a/agent/pkg/routes/standalone_routes.go b/agent/pkg/routes/standalone_routes.go new file mode 100644 index 000000000..1245c6506 --- /dev/null +++ b/agent/pkg/routes/standalone_routes.go @@ -0,0 +1,13 @@ +package routes + +import ( + "github.com/gin-gonic/gin" + "mizuserver/pkg/controllers" +) + +func StandaloneRoutes(ginApp *gin.Engine) { + routeGroup := ginApp.Group("/standalone") + + routeGroup.POST("/tapConfig", controllers.PostTapConfig) + routeGroup.GET("/tapConfig", controllers.GetTapConfig) +} diff --git a/shared/kubernetes/mizuTapperSyncer.go b/shared/kubernetes/mizuTapperSyncer.go index 2f9748a57..8b79540a9 100644 --- a/shared/kubernetes/mizuTapperSyncer.go +++ b/shared/kubernetes/mizuTapperSyncer.go @@ -18,7 +18,6 @@ const updateTappersDelay = 5 * time.Second type TappedPodChangeEvent struct { Added []core.Pod Removed []core.Pod - ExpectedTapperAmount int } // MizuTapperSyncer uses a k8s pod watch to update tapper daemonsets when targeted pods are removed or created @@ -93,6 +92,10 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperPods() { continue } + if tapperSyncer.startTime.After(pod.CreationTimestamp.Time) { + continue + } + logger.Log.Debugf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase) if pod.Spec.NodeName != "" { tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)} @@ -147,7 +150,7 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() { pod, err1 := tapperSyncer.kubernetesProvider.GetPod(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, event.Regarding.Name) if err1 != nil { - logger.Log.Debugf(fmt.Sprintf("Failed to get tapper pod %s", event.Regarding.Name)) + logger.Log.Debugf(fmt.Sprintf("Couldn't get tapper pod %s", event.Regarding.Name)) continue } @@ -284,7 +287,6 @@ func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, ch tapperSyncer.TapPodChangesOut <- TappedPodChangeEvent{ Added: addedPods, Removed: removedPods, - ExpectedTapperAmount: len(tapperSyncer.nodeToTappedPodMap), } return nil, true }