From 26a9c31d1e9f80a85f29f7409fdfd9d27630d4a7 Mon Sep 17 00:00:00 2001 From: RoyUP9 <87927115+RoyUP9@users.noreply.github.com> Date: Wed, 12 Jan 2022 16:03:50 +0200 Subject: [PATCH] Extracted agent status to consistent volume (#628) --- acceptanceTests/testsUtils.go | 8 +- agent/pkg/api/socket_server_handlers.go | 6 +- agent/pkg/controllers/config_controller.go | 6 +- agent/pkg/controllers/status_controller.go | 21 +++-- .../tapConfig/tap_config_provider.go | 40 ++++----- .../tappedPods/tapped_pods_provider.go | 30 ++++++- .../pkg/providers/tappers/tappers_provider.go | 82 +++++++++++++++++++ .../tappersCount/tappers_count_provider.go | 25 ------ .../tappersStatus/tappers_status_provider.go | 25 ------ agent/pkg/routes/status_routes.go | 2 +- agent/pkg/utils/utils.go | 26 ++++++ shared/models.go | 6 +- 12 files changed, 173 insertions(+), 104 deletions(-) create mode 100644 agent/pkg/providers/tappers/tappers_provider.go delete mode 100644 agent/pkg/providers/tappersCount/tappers_count_provider.go delete mode 100644 agent/pkg/providers/tappersStatus/tappers_status_provider.go diff --git a/acceptanceTests/testsUtils.go b/acceptanceTests/testsUtils.go index 58f4d69bd..2ec7adc48 100644 --- a/acceptanceTests/testsUtils.go +++ b/acceptanceTests/testsUtils.go @@ -183,16 +183,16 @@ func tryExecuteFunc(executeFunc func() error) (err interface{}) { } func waitTapPodsReady(apiServerUrl string) error { - resolvingUrl := fmt.Sprintf("%v/status/tappersCount", apiServerUrl) + resolvingUrl := fmt.Sprintf("%v/status/connectedTappersCount", apiServerUrl) tapPodsReadyFunc := func() error { requestResult, requestErr := executeHttpGetRequest(resolvingUrl) if requestErr != nil { return requestErr } - tappersCount := requestResult.(float64) - if tappersCount == 0 { - return fmt.Errorf("no tappers running") + connectedTappersCount := requestResult.(float64) + if connectedTappersCount == 0 { + return fmt.Errorf("no connected tappers running") } time.Sleep(waitAfterTapPodsReady) return nil diff --git a/agent/pkg/api/socket_server_handlers.go b/agent/pkg/api/socket_server_handlers.go index ac292eb0d..b40f66ecd 100644 --- a/agent/pkg/api/socket_server_handlers.go +++ b/agent/pkg/api/socket_server_handlers.go @@ -5,7 +5,7 @@ import ( "fmt" "mizuserver/pkg/models" "mizuserver/pkg/providers" - "mizuserver/pkg/providers/tappersCount" + "mizuserver/pkg/providers/tappers" "mizuserver/pkg/up9" "sync" @@ -30,7 +30,7 @@ func init() { func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) { if isTapper { logger.Log.Infof("Websocket event - Tapper connected, socket ID: %d", socketId) - tappersCount.Add() + tappers.Connected() } else { logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId) socketListLock.Lock() @@ -42,7 +42,7 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) { func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) { if isTapper { logger.Log.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId) - tappersCount.Remove() + tappers.Disconnected() } else { logger.Log.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId) socketListLock.Lock() diff --git a/agent/pkg/controllers/config_controller.go b/agent/pkg/controllers/config_controller.go index 877e04576..44927581f 100644 --- a/agent/pkg/controllers/config_controller.go +++ b/agent/pkg/controllers/config_controller.go @@ -13,7 +13,7 @@ import ( "mizuserver/pkg/providers" "mizuserver/pkg/providers/tapConfig" "mizuserver/pkg/providers/tappedPods" - "mizuserver/pkg/providers/tappersStatus" + "mizuserver/pkg/providers/tappers" "net/http" "regexp" "time" @@ -33,7 +33,7 @@ func PostTapConfig(c *gin.Context) { cancelTapperSyncer() tappedPods.Set([]*shared.PodInfo{}) - tappersStatus.Reset() + tappers.ResetStatus() broadcastTappedPodsStatus() } @@ -141,7 +141,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t return } - tappersStatus.Set(&tapperStatus) + tappers.SetStatus(&tapperStatus) broadcastTappedPodsStatus() case <-ctx.Done(): logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") diff --git a/agent/pkg/controllers/status_controller.go b/agent/pkg/controllers/status_controller.go index f3329d0cc..fce273cc6 100644 --- a/agent/pkg/controllers/status_controller.go +++ b/agent/pkg/controllers/status_controller.go @@ -9,23 +9,22 @@ import ( "mizuserver/pkg/holder" "mizuserver/pkg/providers" "mizuserver/pkg/providers/tappedPods" - "mizuserver/pkg/providers/tappersCount" - "mizuserver/pkg/providers/tappersStatus" + "mizuserver/pkg/providers/tappers" "mizuserver/pkg/up9" "mizuserver/pkg/validation" "net/http" ) func HealthCheck(c *gin.Context) { - tappers := make([]*shared.TapperStatus, 0) - for _, value := range tappersStatus.Get() { - tappers = append(tappers, value) + tappersStatus := make([]*shared.TapperStatus, 0) + for _, value := range tappers.GetStatus() { + tappersStatus = append(tappersStatus, value) } response := shared.HealthResponse{ - TappedPods: tappedPods.Get(), - TappersCount: tappersCount.Get(), - TappersStatus: tappers, + TappedPods: tappedPods.Get(), + ConnectedTappersCount: tappers.GetConnectedCount(), + TappersStatus: tappersStatus, } c.JSON(http.StatusOK, response) } @@ -66,12 +65,12 @@ func PostTapperStatus(c *gin.Context) { } logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus) - tappersStatus.Set(tapperStatus) + tappers.SetStatus(tapperStatus) broadcastTappedPodsStatus() } -func GetTappersCount(c *gin.Context) { - c.JSON(http.StatusOK, tappersCount.Get()) +func GetConnectedTappersCount(c *gin.Context) { + c.JSON(http.StatusOK, tappers.GetConnectedCount()) } func GetAuthStatus(c *gin.Context) { diff --git a/agent/pkg/providers/tapConfig/tap_config_provider.go b/agent/pkg/providers/tapConfig/tap_config_provider.go index 8ecac79b4..b907d7395 100644 --- a/agent/pkg/providers/tapConfig/tap_config_provider.go +++ b/agent/pkg/providers/tapConfig/tap_config_provider.go @@ -1,40 +1,32 @@ package tapConfig import ( - "encoding/json" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared/logger" - "io/ioutil" "mizuserver/pkg/models" + "mizuserver/pkg/utils" "os" "sync" ) const FilePath = shared.DataDirPath + "tap-config.json" -var lock = &sync.Mutex{} - -var config *models.TapConfig +var ( + lock = &sync.Mutex{} + syncOnce sync.Once + config *models.TapConfig +) func Get() *models.TapConfig { - if config == nil { - lock.Lock() - defer lock.Unlock() + syncOnce.Do(func() { + if err := utils.ReadJsonFile(FilePath, &config); err != nil { + config = &models.TapConfig{TappedNamespaces: make(map[string]bool)} - if config == nil { - if content, err := ioutil.ReadFile(FilePath); err != nil { - config = &models.TapConfig{TappedNamespaces: make(map[string]bool)} - if !os.IsNotExist(err) { - logger.Log.Errorf("Error loading tap config from file, err: %v", err) - } - } else { - if err = json.Unmarshal(content, &config); err != nil { - config = &models.TapConfig{TappedNamespaces: make(map[string]bool)} - logger.Log.Errorf("Error while unmarshal tap config, err: %v", err) - } + if !os.IsNotExist(err) { + logger.Log.Errorf("Error reading tap config from file, err: %v", err) } } - } + }) return config } @@ -44,11 +36,7 @@ func Save(tapConfigToSave *models.TapConfig) { defer lock.Unlock() config = tapConfigToSave - if data, err := json.Marshal(config); err != nil { - logger.Log.Errorf("Error while marshal tap config, err: %v", err) - } else { - if err := ioutil.WriteFile(FilePath, data, 0644); err != nil { - logger.Log.Errorf("Error writing tap config to file, err: %v", err) - } + if err := utils.SaveJsonFile(FilePath, config); err != nil { + logger.Log.Errorf("Error saving tap config, err: %v", err) } } diff --git a/agent/pkg/providers/tappedPods/tapped_pods_provider.go b/agent/pkg/providers/tappedPods/tapped_pods_provider.go index 5e43d1d74..3791bd689 100644 --- a/agent/pkg/providers/tappedPods/tapped_pods_provider.go +++ b/agent/pkg/providers/tappedPods/tapped_pods_provider.go @@ -2,25 +2,49 @@ package tappedPods import ( "github.com/up9inc/mizu/shared" - "mizuserver/pkg/providers/tappersStatus" + "github.com/up9inc/mizu/shared/logger" + "mizuserver/pkg/providers/tappers" + "mizuserver/pkg/utils" + "os" "strings" + "sync" ) -var tappedPods []*shared.PodInfo +const FilePath = shared.DataDirPath + "tapped-pods.json" + +var ( + lock = &sync.Mutex{} + syncOnce sync.Once + tappedPods []*shared.PodInfo +) func Get() []*shared.PodInfo { + syncOnce.Do(func() { + if err := utils.ReadJsonFile(FilePath, &tappedPods); err != nil { + if !os.IsNotExist(err) { + logger.Log.Errorf("Error reading tapped pods from file, err: %v", err) + } + } + }) + return tappedPods } func Set(tappedPodsToSet []*shared.PodInfo) { + lock.Lock() + defer lock.Unlock() + tappedPods = tappedPodsToSet + if err := utils.SaveJsonFile(FilePath, tappedPods); err != nil { + logger.Log.Errorf("Error saving tapped pods, err: %v", err) + } } func GetTappedPodsStatus() []shared.TappedPodStatus { tappedPodsStatus := make([]shared.TappedPodStatus, 0) for _, pod := range Get() { var status string - if tapperStatus, ok := tappersStatus.Get()[pod.NodeName]; ok { + if tapperStatus, ok := tappers.GetStatus()[pod.NodeName]; ok { status = strings.ToLower(tapperStatus.Status) } diff --git a/agent/pkg/providers/tappers/tappers_provider.go b/agent/pkg/providers/tappers/tappers_provider.go new file mode 100644 index 000000000..cad982a7b --- /dev/null +++ b/agent/pkg/providers/tappers/tappers_provider.go @@ -0,0 +1,82 @@ +package tappers + +import ( + "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/shared/logger" + "mizuserver/pkg/utils" + "os" + "sync" +) + +const FilePath = shared.DataDirPath + "tappers-status.json" + +var ( + lockStatus = &sync.Mutex{} + syncOnce sync.Once + status map[string]*shared.TapperStatus + + lockConnectedCount = &sync.Mutex{} + connectedCount int +) + +func GetStatus() map[string]*shared.TapperStatus { + initStatus() + + return status +} + +func SetStatus(tapperStatus *shared.TapperStatus) { + initStatus() + + lockStatus.Lock() + defer lockStatus.Unlock() + + status[tapperStatus.NodeName] = tapperStatus + + saveStatus() +} + +func ResetStatus() { + lockStatus.Lock() + defer lockStatus.Unlock() + + status = make(map[string]*shared.TapperStatus) + + saveStatus() +} + +func GetConnectedCount() int { + return connectedCount +} + +func Connected() { + lockConnectedCount.Lock() + defer lockConnectedCount.Unlock() + + connectedCount++ +} + +func Disconnected() { + lockConnectedCount.Lock() + defer lockConnectedCount.Unlock() + + connectedCount-- +} + +func initStatus() { + syncOnce.Do(func() { + if err := utils.ReadJsonFile(FilePath, &status); err != nil { + status = make(map[string]*shared.TapperStatus) + + if !os.IsNotExist(err) { + logger.Log.Errorf("Error reading tappers status from file, err: %v", err) + } + } + }) +} + +func saveStatus() { + if err := utils.SaveJsonFile(FilePath, status); err != nil { + logger.Log.Errorf("Error saving tappers status, err: %v", err) + } +} diff --git a/agent/pkg/providers/tappersCount/tappers_count_provider.go b/agent/pkg/providers/tappersCount/tappers_count_provider.go deleted file mode 100644 index f1aa793e1..000000000 --- a/agent/pkg/providers/tappersCount/tappers_count_provider.go +++ /dev/null @@ -1,25 +0,0 @@ -package tappersCount - -import "sync" - -var lock = &sync.Mutex{} - -var tappersCount int - -func Add() { - lock.Lock() - defer lock.Unlock() - - tappersCount++ -} - -func Remove() { - lock.Lock() - defer lock.Unlock() - - tappersCount-- -} - -func Get() int { - return tappersCount -} diff --git a/agent/pkg/providers/tappersStatus/tappers_status_provider.go b/agent/pkg/providers/tappersStatus/tappers_status_provider.go deleted file mode 100644 index ad802e632..000000000 --- a/agent/pkg/providers/tappersStatus/tappers_status_provider.go +++ /dev/null @@ -1,25 +0,0 @@ -package tappersStatus - -import "github.com/up9inc/mizu/shared" - -var tappersStatus map[string]*shared.TapperStatus - -func Get() map[string]*shared.TapperStatus { - if tappersStatus == nil { - tappersStatus = make(map[string]*shared.TapperStatus) - } - - return tappersStatus -} - -func Set(tapperStatus *shared.TapperStatus) { - if tappersStatus == nil { - tappersStatus = make(map[string]*shared.TapperStatus) - } - - tappersStatus[tapperStatus.NodeName] = tapperStatus -} - -func Reset() { - tappersStatus = make(map[string]*shared.TapperStatus) -} diff --git a/agent/pkg/routes/status_routes.go b/agent/pkg/routes/status_routes.go index 4d2a6a90f..3c58a2f6a 100644 --- a/agent/pkg/routes/status_routes.go +++ b/agent/pkg/routes/status_routes.go @@ -15,7 +15,7 @@ func StatusRoutes(ginApp *gin.Engine) { routeGroup.POST("/tappedPods", controllers.PostTappedPods) routeGroup.POST("/tapperStatus", controllers.PostTapperStatus) - routeGroup.GET("/tappersCount", controllers.GetTappersCount) + routeGroup.GET("/connectedTappersCount", controllers.GetConnectedTappersCount) routeGroup.GET("/tap", controllers.GetTappingStatus) routeGroup.GET("/auth", controllers.GetAuthStatus) diff --git a/agent/pkg/utils/utils.go b/agent/pkg/utils/utils.go index 83152b96b..d94be8c37 100644 --- a/agent/pkg/utils/utils.go +++ b/agent/pkg/utils/utils.go @@ -2,7 +2,9 @@ package utils import ( "context" + "encoding/json" "fmt" + "io/ioutil" "net/http" "net/url" "os" @@ -59,3 +61,27 @@ func SetHostname(address, newHostname string) string { return replacedUrl.String() } + +func ReadJsonFile(filePath string, value interface{}) error { + if content, err := ioutil.ReadFile(filePath); err != nil { + return err + } else { + if err = json.Unmarshal(content, value); err != nil { + return err + } + } + + return nil +} + +func SaveJsonFile(filePath string, value interface{}) error { + if data, err := json.Marshal(value); err != nil { + return err + } else { + if err = ioutil.WriteFile(filePath, data, 0644); err != nil { + return err + } + } + + return nil +} diff --git a/shared/models.go b/shared/models.go index 28dae2feb..912061a01 100644 --- a/shared/models.go +++ b/shared/models.go @@ -122,9 +122,9 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc } type HealthResponse struct { - TappedPods []*PodInfo `json:"tappedPods"` - TappersCount int `json:"tappersCount"` - TappersStatus []*TapperStatus `json:"tappersStatus"` + TappedPods []*PodInfo `json:"tappedPods"` + ConnectedTappersCount int `json:"connectedTappersCount"` + TappersStatus []*TapperStatus `json:"tappersStatus"` } type VersionResponse struct {