mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-26 16:24:54 +00:00
Refactor to agent status (#622)
This commit is contained in:
parent
a55f51f0e7
commit
b96542a8ed
@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"mizuserver/pkg/models"
|
"mizuserver/pkg/models"
|
||||||
"mizuserver/pkg/providers"
|
"mizuserver/pkg/providers"
|
||||||
|
"mizuserver/pkg/providers/tappersCount"
|
||||||
"mizuserver/pkg/up9"
|
"mizuserver/pkg/up9"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -29,7 +30,7 @@ func init() {
|
|||||||
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||||
if isTapper {
|
if isTapper {
|
||||||
logger.Log.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
|
logger.Log.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
|
||||||
providers.TapperAdded()
|
tappersCount.Add()
|
||||||
} else {
|
} else {
|
||||||
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
||||||
socketListLock.Lock()
|
socketListLock.Lock()
|
||||||
@ -41,7 +42,7 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
|||||||
func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
|
func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
|
||||||
if isTapper {
|
if isTapper {
|
||||||
logger.Log.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
|
logger.Log.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
|
||||||
providers.TapperRemoved()
|
tappersCount.Remove()
|
||||||
} else {
|
} else {
|
||||||
logger.Log.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
|
logger.Log.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
|
||||||
socketListLock.Lock()
|
socketListLock.Lock()
|
||||||
|
@ -12,6 +12,8 @@ import (
|
|||||||
"mizuserver/pkg/models"
|
"mizuserver/pkg/models"
|
||||||
"mizuserver/pkg/providers"
|
"mizuserver/pkg/providers"
|
||||||
"mizuserver/pkg/providers/tapConfig"
|
"mizuserver/pkg/providers/tapConfig"
|
||||||
|
"mizuserver/pkg/providers/tappedPods"
|
||||||
|
"mizuserver/pkg/providers/tappersStatus"
|
||||||
"net/http"
|
"net/http"
|
||||||
"regexp"
|
"regexp"
|
||||||
"time"
|
"time"
|
||||||
@ -30,8 +32,8 @@ func PostTapConfig(c *gin.Context) {
|
|||||||
if cancelTapperSyncer != nil {
|
if cancelTapperSyncer != nil {
|
||||||
cancelTapperSyncer()
|
cancelTapperSyncer()
|
||||||
|
|
||||||
providers.TapStatus = shared.TapStatus{}
|
tappedPods.Set([]*shared.PodInfo{})
|
||||||
providers.TappersStatus = make(map[string]shared.TapperStatus)
|
tappersStatus.Reset()
|
||||||
|
|
||||||
broadcastTappedPodsStatus()
|
broadcastTappedPodsStatus()
|
||||||
}
|
}
|
||||||
@ -131,7 +133,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)}
|
tappedPods.Set(kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods))
|
||||||
broadcastTappedPodsStatus()
|
broadcastTappedPodsStatus()
|
||||||
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
|
case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut:
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -139,7 +141,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
addTapperStatus(tapperStatus)
|
tappersStatus.Set(&tapperStatus)
|
||||||
broadcastTappedPodsStatus()
|
broadcastTappedPodsStatus()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
||||||
|
@ -8,43 +8,42 @@ import (
|
|||||||
"mizuserver/pkg/api"
|
"mizuserver/pkg/api"
|
||||||
"mizuserver/pkg/holder"
|
"mizuserver/pkg/holder"
|
||||||
"mizuserver/pkg/providers"
|
"mizuserver/pkg/providers"
|
||||||
|
"mizuserver/pkg/providers/tappedPods"
|
||||||
|
"mizuserver/pkg/providers/tappersCount"
|
||||||
|
"mizuserver/pkg/providers/tappersStatus"
|
||||||
"mizuserver/pkg/up9"
|
"mizuserver/pkg/up9"
|
||||||
"mizuserver/pkg/utils"
|
|
||||||
"mizuserver/pkg/validation"
|
"mizuserver/pkg/validation"
|
||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
func HealthCheck(c *gin.Context) {
|
func HealthCheck(c *gin.Context) {
|
||||||
tappers := make([]shared.TapperStatus, 0)
|
tappers := make([]*shared.TapperStatus, 0)
|
||||||
for _, value := range providers.TappersStatus {
|
for _, value := range tappersStatus.Get() {
|
||||||
tappers = append(tappers, value)
|
tappers = append(tappers, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
response := shared.HealthResponse{
|
response := shared.HealthResponse{
|
||||||
TapStatus: providers.TapStatus,
|
TappedPods: tappedPods.Get(),
|
||||||
TappersCount: providers.TappersCount,
|
TappersCount: tappersCount.Get(),
|
||||||
TappersStatus: tappers,
|
TappersStatus: tappers,
|
||||||
}
|
}
|
||||||
c.JSON(http.StatusOK, response)
|
c.JSON(http.StatusOK, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
func PostTappedPods(c *gin.Context) {
|
func PostTappedPods(c *gin.Context) {
|
||||||
tapStatus := &shared.TapStatus{}
|
var requestTappedPods []*shared.PodInfo
|
||||||
if err := c.Bind(tapStatus); err != nil {
|
if err := c.Bind(&requestTappedPods); err != nil {
|
||||||
c.JSON(http.StatusBadRequest, err)
|
c.JSON(http.StatusBadRequest, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := validation.Validate(tapStatus); err != nil {
|
|
||||||
c.JSON(http.StatusBadRequest, err)
|
logger.Log.Infof("[Status] POST request: %d tapped pods", len(requestTappedPods))
|
||||||
return
|
tappedPods.Set(requestTappedPods)
|
||||||
}
|
|
||||||
logger.Log.Infof("[Status] POST request: %d tapped pods", len(tapStatus.Pods))
|
|
||||||
providers.TapStatus.Pods = tapStatus.Pods
|
|
||||||
broadcastTappedPodsStatus()
|
broadcastTappedPodsStatus()
|
||||||
}
|
}
|
||||||
|
|
||||||
func broadcastTappedPodsStatus() {
|
func broadcastTappedPodsStatus() {
|
||||||
tappedPodsStatus := utils.GetTappedPodsStatus()
|
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
|
||||||
|
|
||||||
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
|
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
|
||||||
if jsonBytes, err := json.Marshal(message); err != nil {
|
if jsonBytes, err := json.Marshal(message); err != nil {
|
||||||
@ -54,14 +53,6 @@ func broadcastTappedPodsStatus() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func addTapperStatus(tapperStatus shared.TapperStatus) {
|
|
||||||
if providers.TappersStatus == nil {
|
|
||||||
providers.TappersStatus = make(map[string]shared.TapperStatus)
|
|
||||||
}
|
|
||||||
|
|
||||||
providers.TappersStatus[tapperStatus.NodeName] = tapperStatus
|
|
||||||
}
|
|
||||||
|
|
||||||
func PostTapperStatus(c *gin.Context) {
|
func PostTapperStatus(c *gin.Context) {
|
||||||
tapperStatus := &shared.TapperStatus{}
|
tapperStatus := &shared.TapperStatus{}
|
||||||
if err := c.Bind(tapperStatus); err != nil {
|
if err := c.Bind(tapperStatus); err != nil {
|
||||||
@ -75,12 +66,12 @@ func PostTapperStatus(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
|
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
|
||||||
addTapperStatus(*tapperStatus)
|
tappersStatus.Set(tapperStatus)
|
||||||
broadcastTappedPodsStatus()
|
broadcastTappedPodsStatus()
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTappersCount(c *gin.Context) {
|
func GetTappersCount(c *gin.Context) {
|
||||||
c.JSON(http.StatusOK, providers.TappersCount)
|
c.JSON(http.StatusOK, tappersCount.Get())
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetAuthStatus(c *gin.Context) {
|
func GetAuthStatus(c *gin.Context) {
|
||||||
@ -94,7 +85,7 @@ func GetAuthStatus(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func GetTappingStatus(c *gin.Context) {
|
func GetTappingStatus(c *gin.Context) {
|
||||||
tappedPodsStatus := utils.GetTappedPodsStatus()
|
tappedPodsStatus := tappedPods.GetTappedPodsStatus()
|
||||||
c.JSON(http.StatusOK, tappedPodsStatus)
|
c.JSON(http.StatusOK, tappedPodsStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,19 +8,14 @@ import (
|
|||||||
"github.com/up9inc/mizu/tap"
|
"github.com/up9inc/mizu/tap"
|
||||||
"mizuserver/pkg/models"
|
"mizuserver/pkg/models"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const tlsLinkRetainmentTime = time.Minute * 15
|
const tlsLinkRetainmentTime = time.Minute * 15
|
||||||
|
|
||||||
var (
|
var (
|
||||||
TappersCount int
|
authStatus *models.AuthStatus
|
||||||
TapStatus shared.TapStatus
|
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
|
||||||
TappersStatus map[string]shared.TapperStatus
|
|
||||||
authStatus *models.AuthStatus
|
|
||||||
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
|
|
||||||
tappersCountLock = sync.Mutex{}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetAuthStatus() (*models.AuthStatus, error) {
|
func GetAuthStatus() (*models.AuthStatus, error) {
|
||||||
@ -68,15 +63,3 @@ func GetAllRecentTLSAddresses() []string {
|
|||||||
|
|
||||||
return recentTLSLinks
|
return recentTLSLinks
|
||||||
}
|
}
|
||||||
|
|
||||||
func TapperAdded() {
|
|
||||||
tappersCountLock.Lock()
|
|
||||||
TappersCount++
|
|
||||||
tappersCountLock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TapperRemoved() {
|
|
||||||
tappersCountLock.Lock()
|
|
||||||
TappersCount--
|
|
||||||
tappersCountLock.Unlock()
|
|
||||||
}
|
|
||||||
|
32
agent/pkg/providers/tappedPods/tapped_pods_provider.go
Normal file
32
agent/pkg/providers/tappedPods/tapped_pods_provider.go
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
package tappedPods
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/up9inc/mizu/shared"
|
||||||
|
"mizuserver/pkg/providers/tappersStatus"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
var tappedPods []*shared.PodInfo
|
||||||
|
|
||||||
|
func Get() []*shared.PodInfo {
|
||||||
|
return tappedPods
|
||||||
|
}
|
||||||
|
|
||||||
|
func Set(tappedPodsToSet []*shared.PodInfo) {
|
||||||
|
tappedPods = tappedPodsToSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetTappedPodsStatus() []shared.TappedPodStatus {
|
||||||
|
tappedPodsStatus := make([]shared.TappedPodStatus, 0)
|
||||||
|
for _, pod := range Get() {
|
||||||
|
var status string
|
||||||
|
if tapperStatus, ok := tappersStatus.Get()[pod.NodeName]; ok {
|
||||||
|
status = strings.ToLower(tapperStatus.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
isTapped := status == "running"
|
||||||
|
tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped})
|
||||||
|
}
|
||||||
|
|
||||||
|
return tappedPodsStatus
|
||||||
|
}
|
25
agent/pkg/providers/tappersCount/tappers_count_provider.go
Normal file
25
agent/pkg/providers/tappersCount/tappers_count_provider.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package tappersCount
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
var lock = &sync.Mutex{}
|
||||||
|
|
||||||
|
var tappersCount int
|
||||||
|
|
||||||
|
func Add() {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
|
||||||
|
tappersCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
func Remove() {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
|
||||||
|
tappersCount--
|
||||||
|
}
|
||||||
|
|
||||||
|
func Get() int {
|
||||||
|
return tappersCount
|
||||||
|
}
|
25
agent/pkg/providers/tappersStatus/tappers_status_provider.go
Normal file
25
agent/pkg/providers/tappersStatus/tappers_status_provider.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package tappersStatus
|
||||||
|
|
||||||
|
import "github.com/up9inc/mizu/shared"
|
||||||
|
|
||||||
|
var tappersStatus map[string]*shared.TapperStatus
|
||||||
|
|
||||||
|
func Get() map[string]*shared.TapperStatus {
|
||||||
|
if tappersStatus == nil {
|
||||||
|
tappersStatus = make(map[string]*shared.TapperStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tappersStatus
|
||||||
|
}
|
||||||
|
|
||||||
|
func Set(tapperStatus *shared.TapperStatus) {
|
||||||
|
if tappersStatus == nil {
|
||||||
|
tappersStatus = make(map[string]*shared.TapperStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
tappersStatus[tapperStatus.NodeName] = tapperStatus
|
||||||
|
}
|
||||||
|
|
||||||
|
func Reset() {
|
||||||
|
tappersStatus = make(map[string]*shared.TapperStatus)
|
||||||
|
}
|
@ -3,12 +3,10 @@ package utils
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"mizuserver/pkg/providers"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"strings"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -45,16 +43,6 @@ func StartServer(app *gin.Engine) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTappedPodsStatus() []shared.TappedPodStatus {
|
|
||||||
tappedPodsStatus := make([]shared.TappedPodStatus, 0)
|
|
||||||
for _, pod := range providers.TapStatus.Pods {
|
|
||||||
status := strings.ToLower(providers.TappersStatus[pod.NodeName].Status)
|
|
||||||
isTapped := status == "running"
|
|
||||||
tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped})
|
|
||||||
}
|
|
||||||
return tappedPodsStatus
|
|
||||||
}
|
|
||||||
|
|
||||||
func CheckErr(e error) {
|
func CheckErr(e error) {
|
||||||
if e != nil {
|
if e != nil {
|
||||||
logger.Log.Errorf("%v", e)
|
logger.Log.Errorf("%v", e)
|
||||||
|
@ -87,9 +87,8 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
|
|||||||
tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", provider.url)
|
tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", provider.url)
|
||||||
|
|
||||||
podInfos := kubernetes.GetPodInfosForPods(pods)
|
podInfos := kubernetes.GetPodInfosForPods(pods)
|
||||||
tapStatus := shared.TapStatus{Pods: podInfos}
|
|
||||||
|
|
||||||
if jsonValue, err := json.Marshal(tapStatus); err != nil {
|
if jsonValue, err := json.Marshal(podInfos); err != nil {
|
||||||
return fmt.Errorf("failed Marshal the tapped pods %w", err)
|
return fmt.Errorf("failed Marshal the tapped pods %w", err)
|
||||||
} else {
|
} else {
|
||||||
if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||||
|
@ -99,7 +99,7 @@ func watchApiServerPodReady(ctx context.Context, kubernetesProvider *kubernetes.
|
|||||||
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
|
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
|
||||||
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
|
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
|
||||||
|
|
||||||
timeAfter := time.After(30 * time.Second)
|
timeAfter := time.After(1 * time.Minute)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case wEvent, ok := <-eventChan:
|
case wEvent, ok := <-eventChan:
|
||||||
|
@ -73,10 +73,10 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
|||||||
return missingPods
|
return missingPods
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetPodInfosForPods(pods []core.Pod) []shared.PodInfo {
|
func GetPodInfosForPods(pods []core.Pod) []*shared.PodInfo {
|
||||||
podInfos := make([]shared.PodInfo, 0)
|
podInfos := make([]*shared.PodInfo, 0)
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName})
|
podInfos = append(podInfos, &shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName})
|
||||||
}
|
}
|
||||||
return podInfos
|
return podInfos
|
||||||
}
|
}
|
||||||
|
@ -81,10 +81,6 @@ type TappedPodStatus struct {
|
|||||||
IsTapped bool `json:"isTapped"`
|
IsTapped bool `json:"isTapped"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TapStatus struct {
|
|
||||||
Pods []PodInfo `json:"pods"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type PodInfo struct {
|
type PodInfo struct {
|
||||||
Namespace string `json:"namespace"`
|
Namespace string `json:"namespace"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
@ -124,9 +120,9 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc
|
|||||||
}
|
}
|
||||||
|
|
||||||
type HealthResponse struct {
|
type HealthResponse struct {
|
||||||
TapStatus TapStatus `json:"tapStatus"`
|
TappedPods []*PodInfo `json:"tappedPods"`
|
||||||
TappersCount int `json:"tappersCount"`
|
TappersCount int `json:"tappersCount"`
|
||||||
TappersStatus []TapperStatus `json:"tappersStatus"`
|
TappersStatus []*TapperStatus `json:"tappersStatus"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type VersionResponse struct {
|
type VersionResponse struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user