From 7516a8c51b40e5b84b46f1888b2337c50e1acce5 Mon Sep 17 00:00:00 2001 From: Francesco Giudici Date: Tue, 25 Jan 2022 09:33:07 +0100 Subject: [PATCH] kata-monitor: rework the sandbox cache sync with the container manager Kata-monitor detects started and terminated kata pods by monitoring the vc/sbs fs (this makes sense since we will have to access that path to access the sockets there to get the metrics from the shim). While kata-monitor updates its sandbox cache based on the sbs fs events, it will schedule also a sync with the container manager via the CRI in order to sync the list of sandboxes there. The container manager will be the ultimate source of truth, so we will stick with the response from the container manager, removing the sandboxes not reported from the container manager. May happen anyway that when we check the container manager, the new kata pod is not reported yet, and we will remove it from the kata-monitor pod cache. If we don't get any new kata pod added or removed, we will not check with the container manager again, missing reporting metrics about that kata pod. Let's stick with the sbs fs as the source of truth: we will update the cache just following what happens on the sbs fs. At this point we may have also decided to drop the container manager connection... better instead to keep it in order to get the kube pod metadata from it, i.e., the kube UID, Name and Namespace associated with the sandbox. Every time we get a new sandbox from the sbs fs we will try to retrieve the pod metadata associated with it. Right now we just attach the container manager sandbox id as a label to the exposed metrics, making hard to link the metrics to the running pod in the kubernetes cluster. With kubernetes pod metadata we will be able to add them as labels to map explicitly the metrics to the kubernetes workloads. Fixes: #3550 Signed-off-by: Francesco Giudici --- src/runtime/pkg/kata-monitor/cri.go | 50 +++++++------- src/runtime/pkg/kata-monitor/metrics.go | 2 +- src/runtime/pkg/kata-monitor/monitor.go | 68 ++++++++++++++----- src/runtime/pkg/kata-monitor/sandbox_cache.go | 34 ++++++---- .../pkg/kata-monitor/sandbox_cache_test.go | 10 +-- 5 files changed, 103 insertions(+), 61 deletions(-) diff --git a/src/runtime/pkg/kata-monitor/cri.go b/src/runtime/pkg/kata-monitor/cri.go index b0cd82a016..0e9e068849 100644 --- a/src/runtime/pkg/kata-monitor/cri.go +++ b/src/runtime/pkg/kata-monitor/cri.go @@ -113,15 +113,15 @@ func parseEndpoint(endpoint string) (string, string, error) { } } -// getSandboxes gets ready sandboxes from the container engine and returns an updated sandboxMap -func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool, error) { - newMap := make(map[string]bool) +// syncSandboxes gets pods metadata from the container manager and updates the sandbox cache. +func (km *KataMonitor) syncSandboxes(sandboxList []string) ([]string, error) { runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint) if err != nil { - return newMap, err + return sandboxList, err } defer closeConnection(runtimeConn) + // TODO: if len(sandboxList) is 1, better we just runtimeClient.PodSandboxStatus(...) targeting the single sandbox filter := &pb.PodSandboxFilter{ State: &pb.PodSandboxStateValue{ State: pb.PodSandboxState_SANDBOX_READY, @@ -134,32 +134,32 @@ func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool monitorLog.Tracef("ListPodSandboxRequest: %v", request) r, err := runtimeClient.ListPodSandbox(context.Background(), request) if err != nil { - return newMap, err + return sandboxList, err } monitorLog.Tracef("ListPodSandboxResponse: %v", r) for _, pod := range r.Items { - // Use the cached data if available - if isKata, ok := sandboxMap[pod.Id]; ok { - newMap[pod.Id] = isKata - if isKata { - monitorLog.Debugf("KATA POD %s (cached)", pod.Id) + for _, sandbox := range sandboxList { + if pod.Id == sandbox { + km.sandboxCache.setMetadata(sandbox, sandboxKubeData{ + uid: pod.Metadata.Uid, + name: pod.Metadata.Name, + namespace: pod.Metadata.Namespace, + }) + + sandboxList = removeFromSandboxList(sandboxList, sandbox) + + monitorLog.WithFields(logrus.Fields{ + "Pod Name": pod.Metadata.Name, + "Pod Namespace": pod.Metadata.Namespace, + "Pod UID": pod.Metadata.Uid, + }).Debugf("Synced KATA POD %s", pod.Id) + + break } - continue } - - // Check if a directory associated with the POD ID exist on the kata fs: - // if so we know that the POD is a kata one. - newMap[pod.Id] = checkSandboxFSExists(pod.Id) - if newMap[pod.Id] { - monitorLog.Debugf("KATA POD %s (new)", pod.Id) - } - monitorLog.WithFields(logrus.Fields{ - "id": pod.Id, - "is kata": newMap[pod.Id], - "pod": pod, - }).Trace("") } - - return newMap, nil + // TODO: here we should mark the sandboxes we failed to retrieve info from: we should try a finite number of times + // to retrieve their metadata: if we fail resign and remove them from the sanbox cache (with a Warning log). + return sandboxList, nil } diff --git a/src/runtime/pkg/kata-monitor/metrics.go b/src/runtime/pkg/kata-monitor/metrics.go index 5f5fa3e4c7..d2958441b9 100644 --- a/src/runtime/pkg/kata-monitor/metrics.go +++ b/src/runtime/pkg/kata-monitor/metrics.go @@ -141,7 +141,7 @@ func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error { // aggregateSandboxMetrics will get metrics from one sandbox and do some process func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { // get all kata sandboxes from cache - sandboxes := km.sandboxCache.getKataSandboxes() + sandboxes := km.sandboxCache.getSandboxList() // save running kata pods as a metrics. runningShimCount.Set(float64(len(sandboxes))) diff --git a/src/runtime/pkg/kata-monitor/monitor.go b/src/runtime/pkg/kata-monitor/monitor.go index 593a15c887..caacfb2228 100644 --- a/src/runtime/pkg/kata-monitor/monitor.go +++ b/src/runtime/pkg/kata-monitor/monitor.go @@ -53,7 +53,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { runtimeEndpoint: runtimeEndpoint, sandboxCache: &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]bool), + sandboxes: make(map[string]sandboxKubeData), }, } @@ -65,6 +65,15 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { return km, nil } +func removeFromSandboxList(sandboxList []string, sandboxToRemove string) []string { + for i, sandbox := range sandboxList { + if sandbox == sandboxToRemove { + return append(sandboxList[:i], sandboxList[i+1:]...) + } + } + return sandboxList +} + // startPodCacheUpdater will boot a thread to manage sandbox cache func (km *KataMonitor) startPodCacheUpdater() { sbsWatcher, err := fsnotify.NewWatcher() @@ -84,9 +93,24 @@ func (km *KataMonitor) startPodCacheUpdater() { monitorLog.Debugf("started fs monitoring @%s", getSandboxFS()) break } - // we refresh the pod cache once if we get multiple add/delete pod events in a short time (< podCacheRefreshDelaySeconds) + // Initial sync with the kata sandboxes already running + sbsFile, err := os.Open(getSandboxFS()) + if err != nil { + monitorLog.WithError(err).Fatal("cannot open sandboxes fs") + os.Exit(1) + } + sandboxList, err := sbsFile.Readdirnames(0) + if err != nil { + monitorLog.WithError(err).Fatal("cannot read sandboxes fs") + os.Exit(1) + } + monitorLog.Debug("initial sync of sbs directory completed") + monitorLog.Tracef("pod list from sbs: %v", sandboxList) + + // We should get kubernetes metadata from the container manager for each new kata sandbox we detect. + // It may take a while for data to be available, so we always wait podCacheRefreshDelaySeconds before checking. cacheUpdateTimer := time.NewTimer(podCacheRefreshDelaySeconds * time.Second) - cacheUpdateTimerWasSet := false + cacheUpdateTimerIsSet := true for { select { case event, ok := <-sbsWatcher.Events: @@ -99,11 +123,18 @@ func (km *KataMonitor) startPodCacheUpdater() { case fsnotify.Create: splitPath := strings.Split(event.Name, string(os.PathSeparator)) id := splitPath[len(splitPath)-1] - if !km.sandboxCache.putIfNotExists(id, true) { + if !km.sandboxCache.putIfNotExists(id, sandboxKubeData{}) { monitorLog.WithField("pod", id).Warn( "CREATE event but pod already present in the sandbox cache") } + sandboxList = append(sandboxList, id) monitorLog.WithField("pod", id).Info("sandbox cache: added pod") + if !cacheUpdateTimerIsSet { + cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) + cacheUpdateTimerIsSet = true + monitorLog.Debugf( + "cache update timer fires in %d secs", podCacheRefreshDelaySeconds) + } case fsnotify.Remove: splitPath := strings.Split(event.Name, string(os.PathSeparator)) @@ -112,26 +143,27 @@ func (km *KataMonitor) startPodCacheUpdater() { monitorLog.WithField("pod", id).Warn( "REMOVE event but pod was missing from the sandbox cache") } + sandboxList = removeFromSandboxList(sandboxList, id) monitorLog.WithField("pod", id).Info("sandbox cache: removed pod") } - // While we process fs events directly to update the sandbox cache we need to sync with the - // container engine to ensure we are on sync with it: we can get out of sync in environments - // where kata workloads can be started by other processes than the container engine. - cacheUpdateTimerWasSet = cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) - monitorLog.WithField("was reset", cacheUpdateTimerWasSet).Debugf( - "cache update timer fires in %d secs", podCacheRefreshDelaySeconds) - case <-cacheUpdateTimer.C: - monitorLog.Debugf("sync sandbox cache with the container engine") - sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes()) + cacheUpdateTimerIsSet = false + monitorLog.WithField("pod list", sandboxList).Debugf( + "retrieve pods metadata from the container manager") + sandboxList, err = km.syncSandboxes(sandboxList) if err != nil { - monitorLog.WithError(err).Error("failed to get sandboxes") + monitorLog.WithError(err).Error("failed to get sandboxes metadata") continue } - monitorLog.WithField("count", len(sandboxes)).Info("synced sandbox cache with the container engine") - monitorLog.WithField("sandboxes", sandboxes).Trace("dump sandbox cache") - km.sandboxCache.set(sandboxes) + if len(sandboxList) > 0 { + monitorLog.WithField("sandboxes", sandboxList).Debugf( + "%d sandboxes still miss metadata", len(sandboxList)) + cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) + cacheUpdateTimerIsSet = true + } + + monitorLog.WithField("sandboxes", km.sandboxCache.getSandboxList()).Trace("dump sandbox cache") } } } @@ -155,7 +187,7 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) { // ListSandboxes list all sandboxes running in Kata func (km *KataMonitor) ListSandboxes(w http.ResponseWriter, r *http.Request) { - sandboxes := km.sandboxCache.getKataSandboxes() + sandboxes := km.sandboxCache.getSandboxList() for _, s := range sandboxes { w.Write([]byte(fmt.Sprintf("%s\n", s))) } diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache.go b/src/runtime/pkg/kata-monitor/sandbox_cache.go index 98978f193f..f4f542b41a 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache.go @@ -9,28 +9,31 @@ import ( "sync" ) +type sandboxKubeData struct { + uid string + name string + namespace string +} type sandboxCache struct { *sync.Mutex - // the bool value tracks if the pod is a kata one (true) or not (false) - sandboxes map[string]bool + // the sandboxKubeData links the sandbox id from the container manager to the pod metadata of kubernetes + sandboxes map[string]sandboxKubeData } -func (sc *sandboxCache) getAllSandboxes() map[string]bool { +func (sc *sandboxCache) getSandboxes() map[string]sandboxKubeData { sc.Lock() defer sc.Unlock() return sc.sandboxes } -func (sc *sandboxCache) getKataSandboxes() []string { +func (sc *sandboxCache) getSandboxList() []string { sc.Lock() defer sc.Unlock() - var katasandboxes []string - for id, isKata := range sc.sandboxes { - if isKata { - katasandboxes = append(katasandboxes, id) - } + var sandboxList []string + for id := range sc.sandboxes { + sandboxList = append(sandboxList, id) } - return katasandboxes + return sandboxList } func (sc *sandboxCache) deleteIfExists(id string) bool { @@ -46,7 +49,7 @@ func (sc *sandboxCache) deleteIfExists(id string) bool { return false } -func (sc *sandboxCache) putIfNotExists(id string, value bool) bool { +func (sc *sandboxCache) putIfNotExists(id string, value sandboxKubeData) bool { sc.Lock() defer sc.Unlock() @@ -59,7 +62,14 @@ func (sc *sandboxCache) putIfNotExists(id string, value bool) bool { return false } -func (sc *sandboxCache) set(sandboxes map[string]bool) { +func (sc *sandboxCache) setMetadata(id string, value sandboxKubeData) { + sc.Lock() + defer sc.Unlock() + + sc.sandboxes[id] = value +} + +func (sc *sandboxCache) set(sandboxes map[string]sandboxKubeData) { sc.Lock() defer sc.Unlock() sc.sandboxes = sandboxes diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go index 43b8c8c99b..6dd2e78943 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go @@ -16,24 +16,24 @@ func TestSandboxCache(t *testing.T) { assert := assert.New(t) sc := &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]bool), + sandboxes: make(map[string]sandboxKubeData), } - scMap := map[string]bool{"111": true} + scMap := map[string]sandboxKubeData{"111": {"1-2-3", "test-name", "test-namespace"}} sc.set(scMap) - scMap = sc.getAllSandboxes() + scMap = sc.getSandboxes() assert.Equal(1, len(scMap)) // put new item id := "new-id" - b := sc.putIfNotExists(id, true) + b := sc.putIfNotExists(id, sandboxKubeData{}) assert.Equal(true, b) assert.Equal(2, len(scMap)) // put key that alreay exists - b = sc.putIfNotExists(id, true) + b = sc.putIfNotExists(id, sandboxKubeData{}) assert.Equal(false, b) b = sc.deleteIfExists(id)