Extracted agent status to consistent volume (#628)

This commit is contained in:
RoyUP9 2022-01-12 16:03:50 +02:00 committed by GitHub
parent 68c4ee9a4f
commit 26a9c31d1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 173 additions and 104 deletions

View File

@ -183,16 +183,16 @@ func tryExecuteFunc(executeFunc func() error) (err interface{}) {
} }
func waitTapPodsReady(apiServerUrl string) error { func waitTapPodsReady(apiServerUrl string) error {
resolvingUrl := fmt.Sprintf("%v/status/tappersCount", apiServerUrl) resolvingUrl := fmt.Sprintf("%v/status/connectedTappersCount", apiServerUrl)
tapPodsReadyFunc := func() error { tapPodsReadyFunc := func() error {
requestResult, requestErr := executeHttpGetRequest(resolvingUrl) requestResult, requestErr := executeHttpGetRequest(resolvingUrl)
if requestErr != nil { if requestErr != nil {
return requestErr return requestErr
} }
tappersCount := requestResult.(float64) connectedTappersCount := requestResult.(float64)
if tappersCount == 0 { if connectedTappersCount == 0 {
return fmt.Errorf("no tappers running") return fmt.Errorf("no connected tappers running")
} }
time.Sleep(waitAfterTapPodsReady) time.Sleep(waitAfterTapPodsReady)
return nil return nil

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"mizuserver/pkg/models" "mizuserver/pkg/models"
"mizuserver/pkg/providers" "mizuserver/pkg/providers"
"mizuserver/pkg/providers/tappersCount" "mizuserver/pkg/providers/tappers"
"mizuserver/pkg/up9" "mizuserver/pkg/up9"
"sync" "sync"
@ -30,7 +30,7 @@ func init() {
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) { func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
if isTapper { if isTapper {
logger.Log.Infof("Websocket event - Tapper connected, socket ID: %d", socketId) logger.Log.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
tappersCount.Add() tappers.Connected()
} else { } else {
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId) logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
socketListLock.Lock() socketListLock.Lock()
@ -42,7 +42,7 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) { func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
if isTapper { if isTapper {
logger.Log.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId) logger.Log.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
tappersCount.Remove() tappers.Disconnected()
} else { } else {
logger.Log.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId) logger.Log.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
socketListLock.Lock() socketListLock.Lock()

View File

@ -13,7 +13,7 @@ import (
"mizuserver/pkg/providers" "mizuserver/pkg/providers"
"mizuserver/pkg/providers/tapConfig" "mizuserver/pkg/providers/tapConfig"
"mizuserver/pkg/providers/tappedPods" "mizuserver/pkg/providers/tappedPods"
"mizuserver/pkg/providers/tappersStatus" "mizuserver/pkg/providers/tappers"
"net/http" "net/http"
"regexp" "regexp"
"time" "time"
@ -33,7 +33,7 @@ func PostTapConfig(c *gin.Context) {
cancelTapperSyncer() cancelTapperSyncer()
tappedPods.Set([]*shared.PodInfo{}) tappedPods.Set([]*shared.PodInfo{})
tappersStatus.Reset() tappers.ResetStatus()
broadcastTappedPodsStatus() broadcastTappedPodsStatus()
} }
@ -141,7 +141,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider, t
return return
} }
tappersStatus.Set(&tapperStatus) tappers.SetStatus(&tapperStatus)
broadcastTappedPodsStatus() broadcastTappedPodsStatus()
case <-ctx.Done(): case <-ctx.Done():
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done") logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")

View File

@ -9,23 +9,22 @@ import (
"mizuserver/pkg/holder" "mizuserver/pkg/holder"
"mizuserver/pkg/providers" "mizuserver/pkg/providers"
"mizuserver/pkg/providers/tappedPods" "mizuserver/pkg/providers/tappedPods"
"mizuserver/pkg/providers/tappersCount" "mizuserver/pkg/providers/tappers"
"mizuserver/pkg/providers/tappersStatus"
"mizuserver/pkg/up9" "mizuserver/pkg/up9"
"mizuserver/pkg/validation" "mizuserver/pkg/validation"
"net/http" "net/http"
) )
func HealthCheck(c *gin.Context) { func HealthCheck(c *gin.Context) {
tappers := make([]*shared.TapperStatus, 0) tappersStatus := make([]*shared.TapperStatus, 0)
for _, value := range tappersStatus.Get() { for _, value := range tappers.GetStatus() {
tappers = append(tappers, value) tappersStatus = append(tappersStatus, value)
} }
response := shared.HealthResponse{ response := shared.HealthResponse{
TappedPods: tappedPods.Get(), TappedPods: tappedPods.Get(),
TappersCount: tappersCount.Get(), ConnectedTappersCount: tappers.GetConnectedCount(),
TappersStatus: tappers, TappersStatus: tappersStatus,
} }
c.JSON(http.StatusOK, response) c.JSON(http.StatusOK, response)
} }
@ -66,12 +65,12 @@ func PostTapperStatus(c *gin.Context) {
} }
logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus) logger.Log.Infof("[Status] POST request, tapper status: %v", tapperStatus)
tappersStatus.Set(tapperStatus) tappers.SetStatus(tapperStatus)
broadcastTappedPodsStatus() broadcastTappedPodsStatus()
} }
func GetTappersCount(c *gin.Context) { func GetConnectedTappersCount(c *gin.Context) {
c.JSON(http.StatusOK, tappersCount.Get()) c.JSON(http.StatusOK, tappers.GetConnectedCount())
} }
func GetAuthStatus(c *gin.Context) { func GetAuthStatus(c *gin.Context) {

View File

@ -1,40 +1,32 @@
package tapConfig package tapConfig
import ( import (
"encoding/json"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
"io/ioutil"
"mizuserver/pkg/models" "mizuserver/pkg/models"
"mizuserver/pkg/utils"
"os" "os"
"sync" "sync"
) )
const FilePath = shared.DataDirPath + "tap-config.json" const FilePath = shared.DataDirPath + "tap-config.json"
var lock = &sync.Mutex{} var (
lock = &sync.Mutex{}
var config *models.TapConfig syncOnce sync.Once
config *models.TapConfig
)
func Get() *models.TapConfig { func Get() *models.TapConfig {
if config == nil { syncOnce.Do(func() {
lock.Lock() if err := utils.ReadJsonFile(FilePath, &config); err != nil {
defer lock.Unlock() config = &models.TapConfig{TappedNamespaces: make(map[string]bool)}
if config == nil { if !os.IsNotExist(err) {
if content, err := ioutil.ReadFile(FilePath); err != nil { logger.Log.Errorf("Error reading tap config from file, err: %v", err)
config = &models.TapConfig{TappedNamespaces: make(map[string]bool)}
if !os.IsNotExist(err) {
logger.Log.Errorf("Error loading tap config from file, err: %v", err)
}
} else {
if err = json.Unmarshal(content, &config); err != nil {
config = &models.TapConfig{TappedNamespaces: make(map[string]bool)}
logger.Log.Errorf("Error while unmarshal tap config, err: %v", err)
}
} }
} }
} })
return config return config
} }
@ -44,11 +36,7 @@ func Save(tapConfigToSave *models.TapConfig) {
defer lock.Unlock() defer lock.Unlock()
config = tapConfigToSave config = tapConfigToSave
if data, err := json.Marshal(config); err != nil { if err := utils.SaveJsonFile(FilePath, config); err != nil {
logger.Log.Errorf("Error while marshal tap config, err: %v", err) logger.Log.Errorf("Error saving tap config, err: %v", err)
} else {
if err := ioutil.WriteFile(FilePath, data, 0644); err != nil {
logger.Log.Errorf("Error writing tap config to file, err: %v", err)
}
} }
} }

View File

@ -2,25 +2,49 @@ package tappedPods
import ( import (
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"mizuserver/pkg/providers/tappersStatus" "github.com/up9inc/mizu/shared/logger"
"mizuserver/pkg/providers/tappers"
"mizuserver/pkg/utils"
"os"
"strings" "strings"
"sync"
) )
var tappedPods []*shared.PodInfo const FilePath = shared.DataDirPath + "tapped-pods.json"
var (
lock = &sync.Mutex{}
syncOnce sync.Once
tappedPods []*shared.PodInfo
)
func Get() []*shared.PodInfo { func Get() []*shared.PodInfo {
syncOnce.Do(func() {
if err := utils.ReadJsonFile(FilePath, &tappedPods); err != nil {
if !os.IsNotExist(err) {
logger.Log.Errorf("Error reading tapped pods from file, err: %v", err)
}
}
})
return tappedPods return tappedPods
} }
func Set(tappedPodsToSet []*shared.PodInfo) { func Set(tappedPodsToSet []*shared.PodInfo) {
lock.Lock()
defer lock.Unlock()
tappedPods = tappedPodsToSet tappedPods = tappedPodsToSet
if err := utils.SaveJsonFile(FilePath, tappedPods); err != nil {
logger.Log.Errorf("Error saving tapped pods, err: %v", err)
}
} }
func GetTappedPodsStatus() []shared.TappedPodStatus { func GetTappedPodsStatus() []shared.TappedPodStatus {
tappedPodsStatus := make([]shared.TappedPodStatus, 0) tappedPodsStatus := make([]shared.TappedPodStatus, 0)
for _, pod := range Get() { for _, pod := range Get() {
var status string var status string
if tapperStatus, ok := tappersStatus.Get()[pod.NodeName]; ok { if tapperStatus, ok := tappers.GetStatus()[pod.NodeName]; ok {
status = strings.ToLower(tapperStatus.Status) status = strings.ToLower(tapperStatus.Status)
} }

View File

@ -0,0 +1,82 @@
package tappers
import (
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"mizuserver/pkg/utils"
"os"
"sync"
)
const FilePath = shared.DataDirPath + "tappers-status.json"
var (
lockStatus = &sync.Mutex{}
syncOnce sync.Once
status map[string]*shared.TapperStatus
lockConnectedCount = &sync.Mutex{}
connectedCount int
)
func GetStatus() map[string]*shared.TapperStatus {
initStatus()
return status
}
func SetStatus(tapperStatus *shared.TapperStatus) {
initStatus()
lockStatus.Lock()
defer lockStatus.Unlock()
status[tapperStatus.NodeName] = tapperStatus
saveStatus()
}
func ResetStatus() {
lockStatus.Lock()
defer lockStatus.Unlock()
status = make(map[string]*shared.TapperStatus)
saveStatus()
}
func GetConnectedCount() int {
return connectedCount
}
func Connected() {
lockConnectedCount.Lock()
defer lockConnectedCount.Unlock()
connectedCount++
}
func Disconnected() {
lockConnectedCount.Lock()
defer lockConnectedCount.Unlock()
connectedCount--
}
func initStatus() {
syncOnce.Do(func() {
if err := utils.ReadJsonFile(FilePath, &status); err != nil {
status = make(map[string]*shared.TapperStatus)
if !os.IsNotExist(err) {
logger.Log.Errorf("Error reading tappers status from file, err: %v", err)
}
}
})
}
func saveStatus() {
if err := utils.SaveJsonFile(FilePath, status); err != nil {
logger.Log.Errorf("Error saving tappers status, err: %v", err)
}
}

View File

@ -1,25 +0,0 @@
package tappersCount
import "sync"
var lock = &sync.Mutex{}
var tappersCount int
func Add() {
lock.Lock()
defer lock.Unlock()
tappersCount++
}
func Remove() {
lock.Lock()
defer lock.Unlock()
tappersCount--
}
func Get() int {
return tappersCount
}

View File

@ -1,25 +0,0 @@
package tappersStatus
import "github.com/up9inc/mizu/shared"
var tappersStatus map[string]*shared.TapperStatus
func Get() map[string]*shared.TapperStatus {
if tappersStatus == nil {
tappersStatus = make(map[string]*shared.TapperStatus)
}
return tappersStatus
}
func Set(tapperStatus *shared.TapperStatus) {
if tappersStatus == nil {
tappersStatus = make(map[string]*shared.TapperStatus)
}
tappersStatus[tapperStatus.NodeName] = tapperStatus
}
func Reset() {
tappersStatus = make(map[string]*shared.TapperStatus)
}

View File

@ -15,7 +15,7 @@ func StatusRoutes(ginApp *gin.Engine) {
routeGroup.POST("/tappedPods", controllers.PostTappedPods) routeGroup.POST("/tappedPods", controllers.PostTappedPods)
routeGroup.POST("/tapperStatus", controllers.PostTapperStatus) routeGroup.POST("/tapperStatus", controllers.PostTapperStatus)
routeGroup.GET("/tappersCount", controllers.GetTappersCount) routeGroup.GET("/connectedTappersCount", controllers.GetConnectedTappersCount)
routeGroup.GET("/tap", controllers.GetTappingStatus) routeGroup.GET("/tap", controllers.GetTappingStatus)
routeGroup.GET("/auth", controllers.GetAuthStatus) routeGroup.GET("/auth", controllers.GetAuthStatus)

View File

@ -2,7 +2,9 @@ package utils
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -59,3 +61,27 @@ func SetHostname(address, newHostname string) string {
return replacedUrl.String() return replacedUrl.String()
} }
func ReadJsonFile(filePath string, value interface{}) error {
if content, err := ioutil.ReadFile(filePath); err != nil {
return err
} else {
if err = json.Unmarshal(content, value); err != nil {
return err
}
}
return nil
}
func SaveJsonFile(filePath string, value interface{}) error {
if data, err := json.Marshal(value); err != nil {
return err
} else {
if err = ioutil.WriteFile(filePath, data, 0644); err != nil {
return err
}
}
return nil
}

View File

@ -122,9 +122,9 @@ func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSoc
} }
type HealthResponse struct { type HealthResponse struct {
TappedPods []*PodInfo `json:"tappedPods"` TappedPods []*PodInfo `json:"tappedPods"`
TappersCount int `json:"tappersCount"` ConnectedTappersCount int `json:"connectedTappersCount"`
TappersStatus []*TapperStatus `json:"tappersStatus"` TappersStatus []*TapperStatus `json:"tappersStatus"`
} }
type VersionResponse struct { type VersionResponse struct {