mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-02 11:05:22 +00:00
Add /db/flush
and /db/reset
API endpoints (#896)
* Add `/db/flush` and `/db/reset` API endpoints * Handle the unmarshalling errors better in the WebSocket * Handle Basenine connection error better in the WebSocket * Upgrade to Basenine `v0.6.5` * Fix the duplicated `StartTime` state Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/agent/pkg/utils"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
@@ -59,13 +60,13 @@ func init() {
|
||||
connectedWebsockets = make(map[int]*SocketConnection)
|
||||
}
|
||||
|
||||
func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int64) {
|
||||
func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers) {
|
||||
SocketGetBrowserHandler = func(c *gin.Context) {
|
||||
websocketHandler(c.Writer, c.Request, eventHandlers, false, startTime)
|
||||
websocketHandler(c.Writer, c.Request, eventHandlers, false)
|
||||
}
|
||||
|
||||
SocketGetTapperHandler = func(c *gin.Context) {
|
||||
websocketHandler(c.Writer, c.Request, eventHandlers, true, startTime)
|
||||
websocketHandler(c.Writer, c.Request, eventHandlers, true)
|
||||
}
|
||||
|
||||
app.GET("/ws", func(c *gin.Context) {
|
||||
@@ -77,7 +78,7 @@ func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int
|
||||
})
|
||||
}
|
||||
|
||||
func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool, startTime int64) {
|
||||
func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool) {
|
||||
ws, err := websocketUpgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed to set websocket upgrade: %v", err)
|
||||
@@ -99,7 +100,9 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
if !isTapper {
|
||||
connection, err = basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
logger.Log.Errorf("Failed to establish a connection to Basenine: %v", err)
|
||||
socketCleanup(socketId, connectedWebsockets[socketId])
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,7 +118,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
|
||||
eventHandlers.WebSocketConnect(socketId, isTapper)
|
||||
|
||||
startTimeBytes, _ := models.CreateWebsocketStartTimeMessage(startTime)
|
||||
startTimeBytes, _ := models.CreateWebsocketStartTimeMessage(utils.StartTime)
|
||||
|
||||
if err = SendToSocket(socketId, startTimeBytes); err != nil {
|
||||
logger.Log.Error(err)
|
||||
@@ -137,7 +140,8 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
|
||||
if !isTapper && !isQuerySet {
|
||||
if err := json.Unmarshal(msg, ¶ms); err != nil {
|
||||
logger.Log.Errorf("Error: %v", socketId, err)
|
||||
logger.Log.Errorf("Error unmarshalling parameters: %v", socketId, err)
|
||||
continue
|
||||
}
|
||||
|
||||
query := params.Query
|
||||
@@ -166,6 +170,10 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
|
||||
var entry *tapApi.Entry
|
||||
err = json.Unmarshal(bytes, &entry)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("Error unmarshalling entry: %v", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
var message []byte
|
||||
if params.EnableFullEntries {
|
||||
@@ -193,7 +201,8 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
var metadata *basenine.Metadata
|
||||
err = json.Unmarshal(bytes, &metadata)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("Error recieving metadata: %v", err.Error())
|
||||
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
|
||||
|
Reference in New Issue
Block a user