diff --git a/agent/pkg/api/socket_server_handlers.go b/agent/pkg/api/socket_server_handlers.go index a02aaa82b..ac292eb0d 100644 --- a/agent/pkg/api/socket_server_handlers.go +++ b/agent/pkg/api/socket_server_handlers.go @@ -5,6 +5,7 @@ import ( "fmt" "mizuserver/pkg/models" "mizuserver/pkg/providers" + "mizuserver/pkg/providers/tappersCount" "mizuserver/pkg/up9" "sync" @@ -29,7 +30,7 @@ func init() { func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) { if isTapper { logger.Log.Infof("Websocket event - Tapper connected, socket ID: %d", socketId) - providers.TapperAdded() + tappersCount.Add() } else { logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId) socketListLock.Lock() @@ -41,7 +42,7 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) { func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) { if isTapper { logger.Log.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId) - providers.TapperRemoved() + tappersCount.Remove() } else { logger.Log.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId) socketListLock.Lock() diff --git a/agent/pkg/controllers/config_controller.go b/agent/pkg/controllers/config_controller.go index fbdcd6bd1..877e04576 100644 --- a/agent/pkg/controllers/config_controller.go +++ b/agent/pkg/controllers/config_controller.go @@ -12,6 +12,8 @@ import ( "mizuserver/pkg/models" "mizuserver/pkg/providers" "mizuserver/pkg/providers/tapConfig" + "mizuserver/pkg/providers/tappedPods" + "mizuserver/pkg/providers/tappersStatus" "net/http" "regexp" "time" @@ -30,8 +32,8 @@ func PostTapConfig(c *gin.Context) { if cancelTapperSyncer != nil { cancelTapperSyncer() - providers.TapStatus = shared.TapStatus{} - providers.TappersStatus = make(map[string]shared.TapperStatus) + tappedPods.Set([]*shared.PodInfo{}) + tappersStatus.Reset() broadcastTappedPodsStatus() } @@ -131,7 +133,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t return } - providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)} + tappedPods.Set(kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)) broadcastTappedPodsStatus() case tapperStatus, ok := <-tapperSyncer.TapperStatusChangedOut: if !ok { @@ -139,7 +141,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t return } - addTapperStatus(tapperStatus) + tappersStatus.Set(&tapperStatus) broadcastTappedPodsStatus() case <-ctx.Done(): logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index 7ffef36b6..f3329d0cc 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -8,43 +8,42 @@ import ( "mizuserver/pkg/api" "mizuserver/pkg/holder" "mizuserver/pkg/providers" + "mizuserver/pkg/providers/tappedPods" + "mizuserver/pkg/providers/tappersCount" + "mizuserver/pkg/providers/tappersStatus" "mizuserver/pkg/up9" - "mizuserver/pkg/utils" "mizuserver/pkg/validation" "net/http" ) func HealthCheck(c *gin.Context) { - tappers := make([]shared.TapperStatus, 0) - for _, value := range providers.TappersStatus { + tappers := make([]*shared.TapperStatus, 0) + for _, value := range tappersStatus.Get() { tappers = append(tappers, value) } response := shared.HealthResponse{ - TapStatus: providers.TapStatus, - TappersCount: providers.TappersCount, + TappedPods: tappedPods.Get(), + TappersCount: tappersCount.Get(), TappersStatus: tappers, } c.JSON(http.StatusOK, response) } func PostTappedPods(c *gin.Context) { - tapStatus := &shared.TapStatus{} - if err := c.Bind(tapStatus); err != nil { + var requestTappedPods []*shared.PodInfo + if err := c.Bind(&requestTappedPods); err != nil { c.JSON(http.StatusBadRequest, err) return } - if err := validation.Validate(tapStatus); err != nil { - c.JSON(http.StatusBadRequest, err) - return - } - logger.Log.Infof("[Status] POST request: %d tapped pods", len(tapStatus.Pods)) - providers.TapStatus.Pods = tapStatus.Pods + + logger.Log.Infof("[Status] POST request: %d tapped pods", len(requestTappedPods)) + tappedPods.Set(requestTappedPods) broadcastTappedPodsStatus() } func broadcastTappedPodsStatus() { - tappedPodsStatus := utils.GetTappedPodsStatus() + tappedPodsStatus := tappedPods.GetTappedPodsStatus() message := shared.CreateWebSocketStatusMessage(tappedPodsStatus) if jsonBytes, err := json.Marshal(message); err != nil { @@ -54,14 +53,6 @@ func broadcastTappedPodsStatus() { } } -func addTapperStatus(tapperStatus shared.TapperStatus) { - if providers.TappersStatus == nil { - providers.TappersStatus = make(map[string]shared.TapperStatus) - } - - providers.TappersStatus[tapperStatus.NodeName] = tapperStatus -} - func PostTapperStatus(c *gin.Context) { tapperStatus := &shared.TapperStatus{} if err := c.Bind(tapperStatus); err != nil { @@ -75,12 +66,12 @@ func PostTapperStatus(c *gin.Context) { } logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus) - addTapperStatus(*tapperStatus) + tappersStatus.Set(tapperStatus) broadcastTappedPodsStatus() } func GetTappersCount(c *gin.Context) { - c.JSON(http.StatusOK, providers.TappersCount) + c.JSON(http.StatusOK, tappersCount.Get()) } func GetAuthStatus(c *gin.Context) { @@ -94,7 +85,7 @@ func GetAuthStatus(c *gin.Context) { } func GetTappingStatus(c *gin.Context) { - tappedPodsStatus := utils.GetTappedPodsStatus() + tappedPodsStatus := tappedPods.GetTappedPodsStatus() c.JSON(http.StatusOK, tappedPodsStatus) } diff --git a/agent/pkg/providers/status_provider.go b/agent/pkg/providers/status_provider.go index 2f90b6d75..ab8ae4ab2 100644 --- a/agent/pkg/providers/status_provider.go +++ b/agent/pkg/providers/status_provider.go @@ -8,19 +8,14 @@ import ( "github.com/up9inc/mizu/tap" "mizuserver/pkg/models" "os" - "sync" "time" ) const tlsLinkRetainmentTime = time.Minute * 15 var ( - TappersCount int - TapStatus shared.TapStatus - TappersStatus map[string]shared.TapperStatus - authStatus *models.AuthStatus - RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime) - tappersCountLock = sync.Mutex{} + authStatus *models.AuthStatus + RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime) ) func GetAuthStatus() (*models.AuthStatus, error) { @@ -68,15 +63,3 @@ func GetAllRecentTLSAddresses() []string { return recentTLSLinks } - -func TapperAdded() { - tappersCountLock.Lock() - TappersCount++ - tappersCountLock.Unlock() -} - -func TapperRemoved() { - tappersCountLock.Lock() - TappersCount-- - tappersCountLock.Unlock() -} diff --git a/agent/pkg/providers/tappedPods/tapped_pods_provider.go b/agent/pkg/providers/tappedPods/tapped_pods_provider.go new file mode 100644 index 000000000..5e43d1d74 --- /dev/null +++ b/agent/pkg/providers/tappedPods/tapped_pods_provider.go @@ -0,0 +1,32 @@ +package tappedPods + +import ( + "github.com/up9inc/mizu/shared" + "mizuserver/pkg/providers/tappersStatus" + "strings" +) + +var tappedPods []*shared.PodInfo + +func Get() []*shared.PodInfo { + return tappedPods +} + +func Set(tappedPodsToSet []*shared.PodInfo) { + tappedPods = tappedPodsToSet +} + +func GetTappedPodsStatus() []shared.TappedPodStatus { + tappedPodsStatus := make([]shared.TappedPodStatus, 0) + for _, pod := range Get() { + var status string + if tapperStatus, ok := tappersStatus.Get()[pod.NodeName]; ok { + status = strings.ToLower(tapperStatus.Status) + } + + isTapped := status == "running" + tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped}) + } + + return tappedPodsStatus +} diff --git a/agent/pkg/providers/tappersCount/tappers_count_provider.go b/agent/pkg/providers/tappersCount/tappers_count_provider.go new file mode 100644 index 000000000..f1aa793e1 --- /dev/null +++ b/agent/pkg/providers/tappersCount/tappers_count_provider.go @@ -0,0 +1,25 @@ +package tappersCount + +import "sync" + +var lock = &sync.Mutex{} + +var tappersCount int + +func Add() { + lock.Lock() + defer lock.Unlock() + + tappersCount++ +} + +func Remove() { + lock.Lock() + defer lock.Unlock() + + tappersCount-- +} + +func Get() int { + return tappersCount +} diff --git a/agent/pkg/providers/tappersStatus/tappers_status_provider.go b/agent/pkg/providers/tappersStatus/tappers_status_provider.go new file mode 100644 index 000000000..ad802e632 --- /dev/null +++ b/agent/pkg/providers/tappersStatus/tappers_status_provider.go @@ -0,0 +1,25 @@ +package tappersStatus + +import "github.com/up9inc/mizu/shared" + +var tappersStatus map[string]*shared.TapperStatus + +func Get() map[string]*shared.TapperStatus { + if tappersStatus == nil { + tappersStatus = make(map[string]*shared.TapperStatus) + } + + return tappersStatus +} + +func Set(tapperStatus *shared.TapperStatus) { + if tappersStatus == nil { + tappersStatus = make(map[string]*shared.TapperStatus) + } + + tappersStatus[tapperStatus.NodeName] = tapperStatus +} + +func Reset() { + tappersStatus = make(map[string]*shared.TapperStatus) +} diff --git a/agent/pkg/utils/utils.go b/agent/pkg/utils/utils.go index a783d128a..83152b96b 100644 --- a/agent/pkg/utils/utils.go +++ b/agent/pkg/utils/utils.go @@ -3,12 +3,10 @@ package utils import ( "context" "fmt" - "mizuserver/pkg/providers" "net/http" "net/url" "os" "os/signal" - "strings" "syscall" "time" @@ -45,16 +43,6 @@ func StartServer(app *gin.Engine) { } } -func GetTappedPodsStatus() []shared.TappedPodStatus { - tappedPodsStatus := make([]shared.TappedPodStatus, 0) - for _, pod := range providers.TapStatus.Pods { - status := strings.ToLower(providers.TappersStatus[pod.NodeName].Status) - isTapped := status == "running" - tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped}) - } - return tappedPodsStatus -} - func CheckErr(e error) { if e != nil { logger.Log.Errorf("%v", e) diff --git a/cli/apiserver/provider.go b/cli/apiserver/provider.go index 67c6eb7a1..5c0bda4d6 100644 --- a/cli/apiserver/provider.go +++ b/cli/apiserver/provider.go @@ -87,9 +87,8 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error { tappedPodsUrl := fmt.Sprintf("%s/status/tappedPods", provider.url) podInfos := kubernetes.GetPodInfosForPods(pods) - tapStatus := shared.TapStatus{Pods: podInfos} - if jsonValue, err := json.Marshal(tapStatus); err != nil { + if jsonValue, err := json.Marshal(podInfos); err != nil { return fmt.Errorf("failed Marshal the tapped pods %w", err) } else { if response, err := provider.client.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil { diff --git a/cli/cmd/installRunner.go b/cli/cmd/installRunner.go index c8d87d614..cbb4602b9 100644 --- a/cli/cmd/installRunner.go +++ b/cli/cmd/installRunner.go @@ -99,7 +99,7 @@ func watchApiServerPodReady(ctx context.Context, kubernetesProvider *kubernetes. podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex) eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper) - timeAfter := time.After(30 * time.Second) + timeAfter := time.After(1 * time.Minute) for { select { case wEvent, ok := <-eventChan: diff --git a/shared/kubernetes/utils.go b/shared/kubernetes/utils.go index 39871e269..d8488989b 100644 --- a/shared/kubernetes/utils.go +++ b/shared/kubernetes/utils.go @@ -73,10 +73,10 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod { return missingPods } -func GetPodInfosForPods(pods []core.Pod) []shared.PodInfo { - podInfos := make([]shared.PodInfo, 0) +func GetPodInfosForPods(pods []core.Pod) []*shared.PodInfo { + podInfos := make([]*shared.PodInfo, 0) for _, pod := range pods { - podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName}) + podInfos = append(podInfos, &shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace, NodeName: pod.Spec.NodeName}) } return podInfos } diff --git a/shared/models.go b/shared/models.go index abf0c8b67..d16ca1d92 100644 --- a/shared/models.go +++ b/shared/models.go @@ -81,10 +81,6 @@ type TappedPodStatus struct { IsTapped bool `json:"isTapped"` } -type TapStatus struct { - Pods []PodInfo `json:"pods"` -} - type PodInfo struct { Namespace string `json:"namespace"` Name string `json:"name"` @@ -124,9 +120,9 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc } type HealthResponse struct { - TapStatus TapStatus `json:"tapStatus"` - TappersCount int `json:"tappersCount"` - TappersStatus []TapperStatus `json:"tappersStatus"` + TappedPods []*PodInfo `json:"tappedPods"` + TappersCount int `json:"tappersCount"` + TappersStatus []*TapperStatus `json:"tappersStatus"` } type VersionResponse struct {