mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-01 10:36:55 +00:00
Fix: tapper status race condition (#788)
This commit is contained in:
@@ -34,9 +34,12 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
|||||||
tappers.Connected()
|
tappers.Connected()
|
||||||
} else {
|
} else {
|
||||||
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
||||||
|
|
||||||
socketListLock.Lock()
|
socketListLock.Lock()
|
||||||
browserClientSocketUUIDs = append(browserClientSocketUUIDs, socketId)
|
browserClientSocketUUIDs = append(browserClientSocketUUIDs, socketId)
|
||||||
socketListLock.Unlock()
|
socketListLock.Unlock()
|
||||||
|
|
||||||
|
BroadcastTappedPodsStatus()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
20
agent/pkg/api/utils.go
Normal file
20
agent/pkg/api/utils.go
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/up9inc/mizu/agent/pkg/providers/tappedPods"
|
||||||
|
"github.com/up9inc/mizu/shared"
|
||||||
|
"github.com/up9inc/mizu/shared/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BroadcastTappedPodsStatus() {
|
||||||
|
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
|
||||||
|
|
||||||
|
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
|
||||||
|
if jsonBytes, err := json.Marshal(message); err != nil {
|
||||||
|
logger.Log.Errorf("Could not Marshal message %v", err)
|
||||||
|
} else {
|
||||||
|
BroadcastToBrowserClients(jsonBytes)
|
||||||
|
}
|
||||||
|
}
|
@@ -7,6 +7,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/up9inc/mizu/agent/pkg/api"
|
||||||
"github.com/up9inc/mizu/agent/pkg/config"
|
"github.com/up9inc/mizu/agent/pkg/config"
|
||||||
"github.com/up9inc/mizu/agent/pkg/models"
|
"github.com/up9inc/mizu/agent/pkg/models"
|
||||||
"github.com/up9inc/mizu/agent/pkg/providers"
|
"github.com/up9inc/mizu/agent/pkg/providers"
|
||||||
@@ -36,7 +37,7 @@ func PostTapConfig(c *gin.Context) {
|
|||||||
tappedPods.Set([]*shared.PodInfo{})
|
tappedPods.Set([]*shared.PodInfo{})
|
||||||
tappers.ResetStatus()
|
tappers.ResetStatus()
|
||||||
|
|
||||||
broadcastTappedPodsStatus()
|
api.BroadcastTappedPodsStatus()
|
||||||
}
|
}
|
||||||
|
|
||||||
var tappedNamespaces []string
|
var tappedNamespaces []string
|
||||||
@@ -135,7 +136,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
|
|||||||
}
|
}
|
||||||
|
|
||||||
tappedPods.Set(kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods))
|
tappedPods.Set(kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods))
|
||||||
broadcastTappedPodsStatus()
|
api.BroadcastTappedPodsStatus()
|
||||||
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
|
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop")
|
logger.Log.Debug("mizuTapperSyncer tapper status changed channel closed, ending listener loop")
|
||||||
@@ -143,7 +144,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
|
|||||||
}
|
}
|
||||||
|
|
||||||
tappers.SetStatus(&tapperStatus)
|
tappers.SetStatus(&tapperStatus)
|
||||||
broadcastTappedPodsStatus()
|
api.BroadcastTappedPodsStatus()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
||||||
return
|
return
|
||||||
|
@@ -1,7 +1,6 @@
|
|||||||
package controllers
|
package controllers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
@@ -39,18 +38,7 @@ func PostTappedPods(c *gin.Context) {
|
|||||||
|
|
||||||
logger.Log.Infof("[Status] POST request: %d tapped pods", len(requestTappedPods))
|
logger.Log.Infof("[Status] POST request: %d tapped pods", len(requestTappedPods))
|
||||||
tappedPods.Set(requestTappedPods)
|
tappedPods.Set(requestTappedPods)
|
||||||
broadcastTappedPodsStatus()
|
api.BroadcastTappedPodsStatus()
|
||||||
}
|
|
||||||
|
|
||||||
func broadcastTappedPodsStatus() {
|
|
||||||
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
|
|
||||||
|
|
||||||
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
|
|
||||||
if jsonBytes, err := json.Marshal(message); err != nil {
|
|
||||||
logger.Log.Errorf("Could not Marshal message %v", err)
|
|
||||||
} else {
|
|
||||||
api.BroadcastToBrowserClients(jsonBytes)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func PostTapperStatus(c *gin.Context) {
|
func PostTapperStatus(c *gin.Context) {
|
||||||
@@ -67,7 +55,7 @@ func PostTapperStatus(c *gin.Context) {
|
|||||||
|
|
||||||
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
|
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
|
||||||
tappers.SetStatus(tapperStatus)
|
tappers.SetStatus(tapperStatus)
|
||||||
broadcastTappedPodsStatus()
|
api.BroadcastTappedPodsStatus()
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetConnectedTappersCount(c *gin.Context) {
|
func GetConnectedTappersCount(c *gin.Context) {
|
||||||
|
Reference in New Issue
Block a user