diff --git a/agent/main.go b/agent/main.go index 445d856c2..2d7c8096e 100644 --- a/agent/main.go +++ b/agent/main.go @@ -95,9 +95,10 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) { app.Use(static.ServeRoot("/", "./site")) app.Use(CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before - routes.WebSocketRoutes(app, &eventHandlers) + api.WebSocketRoutes(app, &eventHandlers) routes.EntriesRoutes(app) routes.MetadataRoutes(app) + routes.StatusRoutes(app) routes.NotFoundRoute(app) utils.StartServer(app) diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index 711b64bf2..2e90d8141 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -88,9 +88,9 @@ func startReadingFiles(workingDir string) { for _, entry := range inputHar.Log.Entries { time.Sleep(time.Millisecond * 250) connectionInfo := &tap.ConnectionInfo{ - ClientIP: fileInfo.Name(), + ClientIP: fileInfo.Name(), ClientPort: "", - ServerIP: "", + ServerIP: "", ServerPort: "", IsOutgoing: false, } @@ -118,7 +118,6 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) { } } - func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) { entryBytes, _ := json.Marshal(entry) serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL) @@ -168,7 +167,7 @@ func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) { return } baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry) - broadcastToBrowserClients(baseEntryBytes) + BroadcastToBrowserClients(baseEntryBytes) } func getServiceNameFromUrl(inputUrl string) (string, string) { @@ -196,6 +195,5 @@ func getEstimatedEntrySizeBytes(mizuEntry models.MizuEntry) int { sizeBytes += 8 // SizeBytes bytes sizeBytes += 1 // IsOutgoing bytes - return sizeBytes } diff --git a/agent/pkg/routes/socket_routes.go b/agent/pkg/api/socket_routes.go similarity index 93% rename from agent/pkg/routes/socket_routes.go rename to agent/pkg/api/socket_routes.go index 31831c4ee..140b07f6d 100644 --- a/agent/pkg/routes/socket_routes.go +++ b/agent/pkg/api/socket_routes.go @@ -1,4 +1,4 @@ -package routes +package api import ( "errors" @@ -18,10 +18,10 @@ type EventHandlers interface { } type SocketConnection struct { - connection *websocket.Conn - lock *sync.Mutex + connection *websocket.Conn + lock *sync.Mutex eventHandlers EventHandlers - isTapper bool + isTapper bool } var websocketUpgrader = websocket.Upgrader{ @@ -91,7 +91,7 @@ func socketCleanup(socketId int, socketConnection *SocketConnection) { socketConnection.eventHandlers.WebSocketDisconnect(socketId, socketConnection.isTapper) } -var db = debounce.NewDebouncer(time.Second * 5, func() { +var db = debounce.NewDebouncer(time.Second*5, func() { fmt.Println("Successfully sent to socket") }) @@ -102,7 +102,7 @@ func SendToSocket(socketId int, message []byte) error { } var sent = false - time.AfterFunc(time.Second * 5, func() { + time.AfterFunc(time.Second*5, func() { if !sent { fmt.Println("Socket timed out") socketCleanup(socketId, socketObj) diff --git a/agent/pkg/api/socket_server_handlers.go b/agent/pkg/api/socket_server_handlers.go index d43a44197..398f450f8 100644 --- a/agent/pkg/api/socket_server_handlers.go +++ b/agent/pkg/api/socket_server_handlers.go @@ -8,7 +8,6 @@ import ( "github.com/up9inc/mizu/tap" "mizuserver/pkg/models" "mizuserver/pkg/providers" - "mizuserver/pkg/routes" "mizuserver/pkg/up9" "sync" ) @@ -17,12 +16,12 @@ var browserClientSocketUUIDs = make([]int, 0) var socketListLock = sync.Mutex{} type RoutesEventHandlers struct { - routes.EventHandlers + EventHandlers SocketHarOutChannel chan<- *tap.OutputChannelItem } func init() { - go up9.UpdateAnalyzeStatus(broadcastToBrowserClients) + go up9.UpdateAnalyzeStatus(BroadcastToBrowserClients) } func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) { @@ -47,15 +46,14 @@ func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) { } } -func broadcastToBrowserClients(message []byte) { +func BroadcastToBrowserClients(message []byte) { for _, socketId := range browserClientSocketUUIDs { go func(socketId int) { - err := routes.SendToSocket(socketId, message) + err := SendToSocket(socketId, message) if err != nil { fmt.Printf("error sending message to socket ID %d: %v", socketId, err) } }(socketId) - } } @@ -81,7 +79,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) { rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err) } else { providers.TapStatus.Pods = statusMessage.TappingStatus.Pods - broadcastToBrowserClients(message) + BroadcastToBrowserClients(message) } case shared.WebsocketMessageTypeOutboundLink: var outboundLinkMessage models.WebsocketOutboundLinkMessage @@ -116,7 +114,7 @@ func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) { rlog.Errorf("Error marshaling outbound link message for broadcasting: %v", err) } else { fmt.Printf("Broadcasting outboundlink message %s\n", string(marshaledMessage)) - broadcastToBrowserClients(marshaledMessage) + BroadcastToBrowserClients(marshaledMessage) } } diff --git a/agent/pkg/controllers/entries_controller.go b/agent/pkg/controllers/entries_controller.go index 57dc611e9..669a64f85 100644 --- a/agent/pkg/controllers/entries_controller.go +++ b/agent/pkg/controllers/entries_controller.go @@ -8,6 +8,7 @@ import ( "github.com/romana/rlog" "mizuserver/pkg/database" "mizuserver/pkg/models" + "mizuserver/pkg/providers" "mizuserver/pkg/up9" "mizuserver/pkg/utils" "mizuserver/pkg/validation" @@ -241,3 +242,15 @@ func GetGeneralStats(c *gin.Context) { database.GetEntriesTable().Raw(sqlQuery).Scan(&result) c.JSON(http.StatusOK, result) } + +func GetTappingStatus(c *gin.Context) { + c.JSON(http.StatusOK, providers.TapStatus) +} + +func AnalyzeInformation(c *gin.Context) { + c.JSON(http.StatusOK, up9.GetAnalyzeInfo()) +} + +func GetRecentTLSLinks(c *gin.Context) { + c.JSON(http.StatusOK, providers.GetAllRecentTLSAddresses()) +} diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index eee2df95f..5e046ee82 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -1,20 +1,32 @@ package controllers import ( + "encoding/json" "github.com/gin-gonic/gin" + "github.com/romana/rlog" + "github.com/up9inc/mizu/shared" + "mizuserver/pkg/api" "mizuserver/pkg/providers" - "mizuserver/pkg/up9" + "mizuserver/pkg/validation" "net/http" ) -func GetTappingStatus(c *gin.Context) { - c.JSON(http.StatusOK, providers.TapStatus) -} - -func AnalyzeInformation(c *gin.Context) { - c.JSON(http.StatusOK, up9.GetAnalyzeInfo()) -} - -func GetRecentTLSLinks(c *gin.Context) { - c.JSON(http.StatusOK, providers.GetAllRecentTLSAddresses()) +func PostTappedPods(c *gin.Context) { + tapStatus := &shared.TapStatus{} + if err := c.Bind(tapStatus); err != nil { + c.JSON(http.StatusBadRequest, err) + return + } + if err := validation.Validate(tapStatus); err != nil { + c.JSON(http.StatusBadRequest, err) + return + } + rlog.Infof("[Status] POST request: %d tapped pods", len(tapStatus.Pods)) + providers.TapStatus.Pods = tapStatus.Pods + message := shared.CreateWebSocketStatusMessage(*tapStatus) + if jsonBytes, err := json.Marshal(message); err != nil { + rlog.Errorf("Could not Marshal message %v\n", err) + } else { + api.BroadcastToBrowserClients(jsonBytes) + } } diff --git a/agent/pkg/routes/status_routes.go b/agent/pkg/routes/status_routes.go new file mode 100644 index 000000000..a6aa41151 --- /dev/null +++ b/agent/pkg/routes/status_routes.go @@ -0,0 +1,12 @@ +package routes + +import ( + "github.com/gin-gonic/gin" + "mizuserver/pkg/controllers" +) + +func StatusRoutes(ginApp *gin.Engine) { + routeGroup := ginApp.Group("/status") + + routeGroup.POST("/tappedPods", controllers.PostTappedPods) +} diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index c72dc0e5e..949f76f90 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -1,7 +1,9 @@ package cmd import ( + "bytes" "context" + "encoding/json" "fmt" "github.com/up9inc/mizu/cli/kubernetes" "github.com/up9inc/mizu/cli/mizu" @@ -237,21 +239,33 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) { } } +func reportTappedPods() { + mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Fetch.MizuPort) + tappedPodsUrl := fmt.Sprintf("http://%s/status/tappedPods", mizuProxiedUrl) + + podInfos := make([]shared.PodInfo, 0) + for _, pod := range currentlyTappedPods { + podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace}) + } + tapStatus := shared.TapStatus{Pods: podInfos} + + if jsonValue, err := json.Marshal(tapStatus); err != nil { + mizu.Log.Debugf("[ERROR] failed Marshal the tapped pods %v", err) + } else { + if response, err := http.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil { + mizu.Log.Debugf("[ERROR] failed sending to API server the tapped pods %v", err) + } else if response.StatusCode != 200 { + mizu.Log.Debugf("[ERROR] failed sending to API server the tapped pods, response status code %v", response.StatusCode) + } else { + mizu.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos)) + } + } +} + func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) { targetNamespace := getNamespace(kubernetesProvider) added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), mizu.Config.Tap.PodRegex()) - controlSocketStr := fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)) - controlSocket, err := mizu.CreateControlSocket(controlSocketStr) - if err != nil { - mizu.Log.Infof("error establishing control socket connection %s", err) - cancel() - } - mizu.Log.Debugf("Control socket created %s", controlSocketStr) - err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods) - if err != nil { - mizu.Log.Debugf("error Sending message via control socket %v, error: %s", controlSocketStr, err) - } restartTappers := func() { err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace) if err != nil { @@ -264,10 +278,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro return } - err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods) - if err != nil { - mizu.Log.Debugf("error Sending message via control socket %v, error: %s", controlSocketStr, err) - } + reportTappedPods() nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods) if err != nil { @@ -388,6 +399,7 @@ func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernet mizu.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)) time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready requestForAnalysis() + reportTappedPods() } case <-timeAfter: if !isPodReady {