From fc1822f09412a427741bb72465b60f42d460482e Mon Sep 17 00:00:00 2001 From: Francesco Giudici Date: Wed, 1 Sep 2021 14:49:20 +0200 Subject: [PATCH] kata-monitor: improve sandbox caching In order to retrieve the list of sandboxes, we poll the container engine every 15 seconds via the CRI. Once we have the list we have to inspect each pod to find out the kata ones. This commit extend the sandbox cache to keep track of all the pods, marking the kata ones, so that during the next polling only the new sandboxes should be inspected to figure out which ones are using the kata runtime. Fixes: #2563 Signed-off-by: Francesco Giudici (cherry picked from commit 245a12bbb71f04cf43811183ba6a1dfdb4c0ed78) --- src/runtime/pkg/kata-monitor/cri.go | 27 ++++++++++++------- src/runtime/pkg/kata-monitor/metrics.go | 6 ++--- src/runtime/pkg/kata-monitor/monitor.go | 18 +++---------- src/runtime/pkg/kata-monitor/sandbox_cache.go | 23 ++++++++++++---- .../pkg/kata-monitor/sandbox_cache_test.go | 8 +++--- 5 files changed, 45 insertions(+), 37 deletions(-) diff --git a/src/runtime/pkg/kata-monitor/cri.go b/src/runtime/pkg/kata-monitor/cri.go index a06faad67..eb1cea284 100644 --- a/src/runtime/pkg/kata-monitor/cri.go +++ b/src/runtime/pkg/kata-monitor/cri.go @@ -117,13 +117,12 @@ func parseEndpoint(endpoint string) (string, string, error) { } } -// getSandboxes gets kata sandboxes from the container engine. -func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) { - - sandboxMap := make(map[string]struct{}) +// getSandboxes gets ready sandboxes from the container engine and returns an updated sandboxMap +func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool, error) { + newMap := make(map[string]bool) runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint) if err != nil { - return sandboxMap, err + return newMap, err } defer closeConnection(runtimeConn) @@ -139,11 +138,17 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) { monitorLog.Debugf("ListPodSandboxRequest: %v", request) r, err := runtimeClient.ListPodSandbox(context.Background(), request) if err != nil { - return sandboxMap, err + return newMap, err } monitorLog.Debugf("ListPodSandboxResponse: %v", r) for _, pod := range r.Items { + // Use the cached data if available + if isKata, ok := sandboxMap[pod.Id]; ok { + newMap[pod.Id] = isKata + continue + } + request := &pb.PodSandboxStatusRequest{ PodSandboxId: pod.Id, Verbose: true, @@ -151,7 +156,7 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) { r, err := runtimeClient.PodSandboxStatus(context.Background(), request) if err != nil { - return sandboxMap, err + return newMap, err } lowRuntime := "" @@ -187,7 +192,7 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) { // only for kata PODs. if lowRuntime == "" { monitorLog.WithField("pod", r).Warning("unable to retrieve the runtime type") - sandboxMap[pod.Id] = struct{}{} + newMap[pod.Id] = true continue } @@ -195,9 +200,11 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) { "low runtime": lowRuntime, }).Debug("") if strings.Contains(lowRuntime, "kata") { - sandboxMap[pod.Id] = struct{}{} + newMap[pod.Id] = true + } else { + newMap[pod.Id] = false } } - return sandboxMap, nil + return newMap, nil } diff --git a/src/runtime/pkg/kata-monitor/metrics.go b/src/runtime/pkg/kata-monitor/metrics.go index d9c1d324a..b1788de46 100644 --- a/src/runtime/pkg/kata-monitor/metrics.go +++ b/src/runtime/pkg/kata-monitor/metrics.go @@ -140,8 +140,8 @@ func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error { // aggregateSandboxMetrics will get metrics from one sandbox and do some process func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { - // get all sandboxes from cache - sandboxes := km.sandboxCache.getAllSandboxes() + // get all kata sandboxes from cache + sandboxes := km.sandboxCache.getKataSandboxes() // save running kata pods as a metrics. runningShimCount.Set(float64(len(sandboxes))) @@ -159,7 +159,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count") // get metrics from sandbox's shim - for sandboxID := range sandboxes { + for _, sandboxID := range sandboxes { wg.Add(1) go func(sandboxID string, results chan<- []*dto.MetricFamily) { sandboxMetrics, err := getParsedMetrics(sandboxID) diff --git a/src/runtime/pkg/kata-monitor/monitor.go b/src/runtime/pkg/kata-monitor/monitor.go index 85cdeddf7..89419a6df 100644 --- a/src/runtime/pkg/kata-monitor/monitor.go +++ b/src/runtime/pkg/kata-monitor/monitor.go @@ -50,7 +50,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { runtimeEndpoint: runtimeEndpoint, sandboxCache: &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]struct{}), + sandboxes: make(map[string]bool), }, } @@ -66,7 +66,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { func (km *KataMonitor) startPodCacheUpdater() { for { time.Sleep(podCacheRefreshTimeSeconds * time.Second) - sandboxes, err := km.getSandboxes() + sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes()) if err != nil { monitorLog.WithError(err).Error("failed to get sandboxes") continue @@ -95,20 +95,8 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) { // ListSandboxes list all sandboxes running in Kata func (km *KataMonitor) ListSandboxes(w http.ResponseWriter, r *http.Request) { - sandboxes := km.getSandboxList() + sandboxes := km.sandboxCache.getKataSandboxes() for _, s := range sandboxes { w.Write([]byte(fmt.Sprintf("%s\n", s))) } } - -func (km *KataMonitor) getSandboxList() []string { - sn := km.sandboxCache.getAllSandboxes() - result := make([]string, len(sn)) - - i := 0 - for k := range sn { - result[i] = k - i++ - } - return result -} diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache.go b/src/runtime/pkg/kata-monitor/sandbox_cache.go index 7e6d63488..98978f193 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache.go @@ -11,15 +11,28 @@ import ( type sandboxCache struct { *sync.Mutex - sandboxes map[string]struct{} + // the bool value tracks if the pod is a kata one (true) or not (false) + sandboxes map[string]bool } -func (sc *sandboxCache) getAllSandboxes() map[string]struct{} { +func (sc *sandboxCache) getAllSandboxes() map[string]bool { sc.Lock() defer sc.Unlock() return sc.sandboxes } +func (sc *sandboxCache) getKataSandboxes() []string { + sc.Lock() + defer sc.Unlock() + var katasandboxes []string + for id, isKata := range sc.sandboxes { + if isKata { + katasandboxes = append(katasandboxes, id) + } + } + return katasandboxes +} + func (sc *sandboxCache) deleteIfExists(id string) bool { sc.Lock() defer sc.Unlock() @@ -33,12 +46,12 @@ func (sc *sandboxCache) deleteIfExists(id string) bool { return false } -func (sc *sandboxCache) putIfNotExists(id string) bool { +func (sc *sandboxCache) putIfNotExists(id string, value bool) bool { sc.Lock() defer sc.Unlock() if _, found := sc.sandboxes[id]; !found { - sc.sandboxes[id] = struct{}{} + sc.sandboxes[id] = value return true } @@ -46,7 +59,7 @@ func (sc *sandboxCache) putIfNotExists(id string) bool { return false } -func (sc *sandboxCache) set(sandboxes map[string]struct{}) { +func (sc *sandboxCache) set(sandboxes map[string]bool) { sc.Lock() defer sc.Unlock() sc.sandboxes = sandboxes diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go index fc07fbcdc..43b8c8c99 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go @@ -16,10 +16,10 @@ func TestSandboxCache(t *testing.T) { assert := assert.New(t) sc := &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]struct{}), + sandboxes: make(map[string]bool), } - scMap := map[string]struct{}{"111": {}} + scMap := map[string]bool{"111": true} sc.set(scMap) @@ -28,12 +28,12 @@ func TestSandboxCache(t *testing.T) { // put new item id := "new-id" - b := sc.putIfNotExists(id) + b := sc.putIfNotExists(id, true) assert.Equal(true, b) assert.Equal(2, len(scMap)) // put key that alreay exists - b = sc.putIfNotExists(id) + b = sc.putIfNotExists(id, true) assert.Equal(false, b) b = sc.deleteIfExists(id)