mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-02 19:15:26 +00:00
@@ -30,6 +30,11 @@ type SocketConnection struct {
|
|||||||
isTapper bool
|
isTapper bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type WebSocketParams struct {
|
||||||
|
Query string `json:"query"`
|
||||||
|
EnableFullEntries bool `json:"enableFullEntries"`
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
websocketUpgrader = websocket.Upgrader{
|
websocketUpgrader = websocket.Upgrader{
|
||||||
ReadBufferSize: 1024,
|
ReadBufferSize: 1024,
|
||||||
@@ -110,31 +115,26 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
|||||||
logger.Log.Error(err)
|
logger.Log.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
out:
|
var params WebSocketParams
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// params[0]: query
|
_, msg, err := ws.ReadMessage()
|
||||||
// params[1]: enableFullEntries (empty: disable, non-empty: enable)
|
if err != nil {
|
||||||
params := make([][]byte, 2)
|
if _, ok := err.(*websocket.CloseError); ok {
|
||||||
for i := range params {
|
logger.Log.Debugf("Received websocket close message, socket id: %d", socketId)
|
||||||
_, params[i], err = ws.ReadMessage()
|
} else {
|
||||||
if err != nil {
|
logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
|
||||||
if _, ok := err.(*websocket.CloseError); ok {
|
|
||||||
logger.Log.Debugf("Received websocket close message, socket id: %d", socketId)
|
|
||||||
} else {
|
|
||||||
logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
break out
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
enableFullEntries := false
|
break
|
||||||
if len(params[1]) > 0 {
|
|
||||||
enableFullEntries = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !isTapper && !isQuerySet {
|
if !isTapper && !isQuerySet {
|
||||||
query := string(params[0])
|
if err := json.Unmarshal(msg, ¶ms); err != nil {
|
||||||
|
logger.Log.Errorf("Error: %v", socketId, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
query := params.Query
|
||||||
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
|
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
|
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
|
||||||
@@ -162,7 +162,7 @@ out:
|
|||||||
err = json.Unmarshal(bytes, &entry)
|
err = json.Unmarshal(bytes, &entry)
|
||||||
|
|
||||||
var message []byte
|
var message []byte
|
||||||
if enableFullEntries {
|
if params.EnableFullEntries {
|
||||||
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
||||||
} else {
|
} else {
|
||||||
base := tapApi.Summarize(entry)
|
base := tapApi.Summarize(entry)
|
||||||
@@ -201,7 +201,7 @@ out:
|
|||||||
|
|
||||||
connection.Query(query, data, meta)
|
connection.Query(query, data, meta)
|
||||||
} else {
|
} else {
|
||||||
eventHandlers.WebSocketMessage(socketId, params[0])
|
eventHandlers.WebSocketMessage(socketId, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -121,8 +121,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
|
|||||||
ws.current = new WebSocket(MizuWebsocketURL);
|
ws.current = new WebSocket(MizuWebsocketURL);
|
||||||
ws.current.onopen = () => {
|
ws.current.onopen = () => {
|
||||||
setWsConnection(WsConnectionStatus.Connected);
|
setWsConnection(WsConnectionStatus.Connected);
|
||||||
ws.current.send(query);
|
ws.current.send(JSON.stringify({"query": query, "enableFullEntries": false}));
|
||||||
ws.current.send("");
|
|
||||||
}
|
}
|
||||||
ws.current.onclose = () => {
|
ws.current.onclose = () => {
|
||||||
setWsConnection(WsConnectionStatus.Closed);
|
setWsConnection(WsConnectionStatus.Closed);
|
||||||
|
Reference in New Issue
Block a user