From e9eb34cea8b72e255ffe436531cb267d1092cf06 Mon Sep 17 00:00:00 2001 From: Francesco Giudici Date: Wed, 19 Jan 2022 18:41:19 +0100 Subject: [PATCH 1/5] kata-monitor: improve debug logging Improve debug log formatting of the sandbox cache update process. Move raw and tracing logs from the DEBUG to the TRACE log level. Signed-off-by: Francesco Giudici --- src/runtime/pkg/kata-monitor/cri.go | 15 ++++++++++----- src/runtime/pkg/kata-monitor/metrics.go | 2 +- src/runtime/pkg/kata-monitor/monitor.go | 3 ++- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/runtime/pkg/kata-monitor/cri.go b/src/runtime/pkg/kata-monitor/cri.go index f9cbf3c9f..b0cd82a01 100644 --- a/src/runtime/pkg/kata-monitor/cri.go +++ b/src/runtime/pkg/kata-monitor/cri.go @@ -39,7 +39,6 @@ func getAddressAndDialer(endpoint string) (string, func(ctx context.Context, add func getConnection(endPoint string) (*grpc.ClientConn, error) { var conn *grpc.ClientConn - monitorLog.Debugf("connect using endpoint '%s' with '%s' timeout", endPoint, defaultTimeout) addr, dialer, err := getAddressAndDialer(endPoint) if err != nil { return nil, err @@ -51,7 +50,7 @@ func getConnection(endPoint string) (*grpc.ClientConn, error) { errMsg := errors.Wrapf(err, "connect endpoint '%s', make sure you are running as root and the endpoint has been started", endPoint) return nil, errMsg } - monitorLog.Debugf("connected successfully using endpoint: %s", endPoint) + monitorLog.Tracef("connected successfully using endpoint: %s", endPoint) return conn, nil } @@ -132,28 +131,34 @@ func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool request := &pb.ListPodSandboxRequest{ Filter: filter, } - monitorLog.Debugf("ListPodSandboxRequest: %v", request) + monitorLog.Tracef("ListPodSandboxRequest: %v", request) r, err := runtimeClient.ListPodSandbox(context.Background(), request) if err != nil { return newMap, err } - monitorLog.Debugf("ListPodSandboxResponse: %v", r) + monitorLog.Tracef("ListPodSandboxResponse: %v", r) for _, pod := range r.Items { // Use the cached data if available if isKata, ok := sandboxMap[pod.Id]; ok { newMap[pod.Id] = isKata + if isKata { + monitorLog.Debugf("KATA POD %s (cached)", pod.Id) + } continue } // Check if a directory associated with the POD ID exist on the kata fs: // if so we know that the POD is a kata one. newMap[pod.Id] = checkSandboxFSExists(pod.Id) + if newMap[pod.Id] { + monitorLog.Debugf("KATA POD %s (new)", pod.Id) + } monitorLog.WithFields(logrus.Fields{ "id": pod.Id, "is kata": newMap[pod.Id], "pod": pod, - }).Debug("") + }).Trace("") } return newMap, nil diff --git a/src/runtime/pkg/kata-monitor/metrics.go b/src/runtime/pkg/kata-monitor/metrics.go index b1788de46..5f5fa3e4c 100644 --- a/src/runtime/pkg/kata-monitor/metrics.go +++ b/src/runtime/pkg/kata-monitor/metrics.go @@ -156,7 +156,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { // used to receive response results := make(chan []*dto.MetricFamily, len(sandboxes)) - monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count") + monitorLog.WithField("sandboxes count", len(sandboxes)).Debugf("aggregate sandbox metrics") // get metrics from sandbox's shim for _, sandboxID := range sandboxes { diff --git a/src/runtime/pkg/kata-monitor/monitor.go b/src/runtime/pkg/kata-monitor/monitor.go index 6d0003af5..c5c166d94 100644 --- a/src/runtime/pkg/kata-monitor/monitor.go +++ b/src/runtime/pkg/kata-monitor/monitor.go @@ -126,13 +126,14 @@ func (km *KataMonitor) startPodCacheUpdater() { "cache update timer fires in %d secs", podCacheRefreshDelaySeconds) case <-cacheUpdateTimer.C: + monitorLog.Debugf("sync sandbox cache with the container engine") sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes()) if err != nil { monitorLog.WithError(err).Error("failed to get sandboxes") continue } monitorLog.WithField("count", len(sandboxes)).Info("synced sandbox cache with the container engine") - monitorLog.WithField("sandboxes", sandboxes).Debug("dump sandbox cache") + monitorLog.WithField("sandboxes", sandboxes).Trace("dump sandbox cache") km.sandboxCache.set(sandboxes) } } From e78d80ea0d9f7133b7d8f10a6380f4a5e7eda98e Mon Sep 17 00:00:00 2001 From: Francesco Giudici Date: Thu, 20 Jan 2022 15:09:27 +0100 Subject: [PATCH 2/5] kata-monitor: silently ignore CHMOD events on the sandboxes fs We currently WARN about unexpected fs events, which includes CHMOD operations (which should be actually expected...). Just ignore all the fs events we don't care about without any warn. We dump all the events with debug log in any case. Signed-off-by: Francesco Giudici --- src/runtime/pkg/kata-monitor/monitor.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/runtime/pkg/kata-monitor/monitor.go b/src/runtime/pkg/kata-monitor/monitor.go index c5c166d94..593a15c88 100644 --- a/src/runtime/pkg/kata-monitor/monitor.go +++ b/src/runtime/pkg/kata-monitor/monitor.go @@ -113,9 +113,6 @@ func (km *KataMonitor) startPodCacheUpdater() { "REMOVE event but pod was missing from the sandbox cache") } monitorLog.WithField("pod", id).Info("sandbox cache: removed pod") - - default: - monitorLog.WithField("event", event).Warn("got unexpected fs event") } // While we process fs events directly to update the sandbox cache we need to sync with the From 7516a8c51b40e5b84b46f1888b2337c50e1acce5 Mon Sep 17 00:00:00 2001 From: Francesco Giudici Date: Tue, 25 Jan 2022 09:33:07 +0100 Subject: [PATCH 3/5] kata-monitor: rework the sandbox cache sync with the container manager Kata-monitor detects started and terminated kata pods by monitoring the vc/sbs fs (this makes sense since we will have to access that path to access the sockets there to get the metrics from the shim). While kata-monitor updates its sandbox cache based on the sbs fs events, it will schedule also a sync with the container manager via the CRI in order to sync the list of sandboxes there. The container manager will be the ultimate source of truth, so we will stick with the response from the container manager, removing the sandboxes not reported from the container manager. May happen anyway that when we check the container manager, the new kata pod is not reported yet, and we will remove it from the kata-monitor pod cache. If we don't get any new kata pod added or removed, we will not check with the container manager again, missing reporting metrics about that kata pod. Let's stick with the sbs fs as the source of truth: we will update the cache just following what happens on the sbs fs. At this point we may have also decided to drop the container manager connection... better instead to keep it in order to get the kube pod metadata from it, i.e., the kube UID, Name and Namespace associated with the sandbox. Every time we get a new sandbox from the sbs fs we will try to retrieve the pod metadata associated with it. Right now we just attach the container manager sandbox id as a label to the exposed metrics, making hard to link the metrics to the running pod in the kubernetes cluster. With kubernetes pod metadata we will be able to add them as labels to map explicitly the metrics to the kubernetes workloads. Fixes: #3550 Signed-off-by: Francesco Giudici --- src/runtime/pkg/kata-monitor/cri.go | 50 +++++++------- src/runtime/pkg/kata-monitor/metrics.go | 2 +- src/runtime/pkg/kata-monitor/monitor.go | 68 ++++++++++++++----- src/runtime/pkg/kata-monitor/sandbox_cache.go | 34 ++++++---- .../pkg/kata-monitor/sandbox_cache_test.go | 10 +-- 5 files changed, 103 insertions(+), 61 deletions(-) diff --git a/src/runtime/pkg/kata-monitor/cri.go b/src/runtime/pkg/kata-monitor/cri.go index b0cd82a01..0e9e06884 100644 --- a/src/runtime/pkg/kata-monitor/cri.go +++ b/src/runtime/pkg/kata-monitor/cri.go @@ -113,15 +113,15 @@ func parseEndpoint(endpoint string) (string, string, error) { } } -// getSandboxes gets ready sandboxes from the container engine and returns an updated sandboxMap -func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool, error) { - newMap := make(map[string]bool) +// syncSandboxes gets pods metadata from the container manager and updates the sandbox cache. +func (km *KataMonitor) syncSandboxes(sandboxList []string) ([]string, error) { runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint) if err != nil { - return newMap, err + return sandboxList, err } defer closeConnection(runtimeConn) + // TODO: if len(sandboxList) is 1, better we just runtimeClient.PodSandboxStatus(...) targeting the single sandbox filter := &pb.PodSandboxFilter{ State: &pb.PodSandboxStateValue{ State: pb.PodSandboxState_SANDBOX_READY, @@ -134,32 +134,32 @@ func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool monitorLog.Tracef("ListPodSandboxRequest: %v", request) r, err := runtimeClient.ListPodSandbox(context.Background(), request) if err != nil { - return newMap, err + return sandboxList, err } monitorLog.Tracef("ListPodSandboxResponse: %v", r) for _, pod := range r.Items { - // Use the cached data if available - if isKata, ok := sandboxMap[pod.Id]; ok { - newMap[pod.Id] = isKata - if isKata { - monitorLog.Debugf("KATA POD %s (cached)", pod.Id) + for _, sandbox := range sandboxList { + if pod.Id == sandbox { + km.sandboxCache.setMetadata(sandbox, sandboxKubeData{ + uid: pod.Metadata.Uid, + name: pod.Metadata.Name, + namespace: pod.Metadata.Namespace, + }) + + sandboxList = removeFromSandboxList(sandboxList, sandbox) + + monitorLog.WithFields(logrus.Fields{ + "Pod Name": pod.Metadata.Name, + "Pod Namespace": pod.Metadata.Namespace, + "Pod UID": pod.Metadata.Uid, + }).Debugf("Synced KATA POD %s", pod.Id) + + break } - continue } - - // Check if a directory associated with the POD ID exist on the kata fs: - // if so we know that the POD is a kata one. - newMap[pod.Id] = checkSandboxFSExists(pod.Id) - if newMap[pod.Id] { - monitorLog.Debugf("KATA POD %s (new)", pod.Id) - } - monitorLog.WithFields(logrus.Fields{ - "id": pod.Id, - "is kata": newMap[pod.Id], - "pod": pod, - }).Trace("") } - - return newMap, nil + // TODO: here we should mark the sandboxes we failed to retrieve info from: we should try a finite number of times + // to retrieve their metadata: if we fail resign and remove them from the sanbox cache (with a Warning log). + return sandboxList, nil } diff --git a/src/runtime/pkg/kata-monitor/metrics.go b/src/runtime/pkg/kata-monitor/metrics.go index 5f5fa3e4c..d2958441b 100644 --- a/src/runtime/pkg/kata-monitor/metrics.go +++ b/src/runtime/pkg/kata-monitor/metrics.go @@ -141,7 +141,7 @@ func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error { // aggregateSandboxMetrics will get metrics from one sandbox and do some process func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { // get all kata sandboxes from cache - sandboxes := km.sandboxCache.getKataSandboxes() + sandboxes := km.sandboxCache.getSandboxList() // save running kata pods as a metrics. runningShimCount.Set(float64(len(sandboxes))) diff --git a/src/runtime/pkg/kata-monitor/monitor.go b/src/runtime/pkg/kata-monitor/monitor.go index 593a15c88..caacfb222 100644 --- a/src/runtime/pkg/kata-monitor/monitor.go +++ b/src/runtime/pkg/kata-monitor/monitor.go @@ -53,7 +53,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { runtimeEndpoint: runtimeEndpoint, sandboxCache: &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]bool), + sandboxes: make(map[string]sandboxKubeData), }, } @@ -65,6 +65,15 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { return km, nil } +func removeFromSandboxList(sandboxList []string, sandboxToRemove string) []string { + for i, sandbox := range sandboxList { + if sandbox == sandboxToRemove { + return append(sandboxList[:i], sandboxList[i+1:]...) + } + } + return sandboxList +} + // startPodCacheUpdater will boot a thread to manage sandbox cache func (km *KataMonitor) startPodCacheUpdater() { sbsWatcher, err := fsnotify.NewWatcher() @@ -84,9 +93,24 @@ func (km *KataMonitor) startPodCacheUpdater() { monitorLog.Debugf("started fs monitoring @%s", getSandboxFS()) break } - // we refresh the pod cache once if we get multiple add/delete pod events in a short time (< podCacheRefreshDelaySeconds) + // Initial sync with the kata sandboxes already running + sbsFile, err := os.Open(getSandboxFS()) + if err != nil { + monitorLog.WithError(err).Fatal("cannot open sandboxes fs") + os.Exit(1) + } + sandboxList, err := sbsFile.Readdirnames(0) + if err != nil { + monitorLog.WithError(err).Fatal("cannot read sandboxes fs") + os.Exit(1) + } + monitorLog.Debug("initial sync of sbs directory completed") + monitorLog.Tracef("pod list from sbs: %v", sandboxList) + + // We should get kubernetes metadata from the container manager for each new kata sandbox we detect. + // It may take a while for data to be available, so we always wait podCacheRefreshDelaySeconds before checking. cacheUpdateTimer := time.NewTimer(podCacheRefreshDelaySeconds * time.Second) - cacheUpdateTimerWasSet := false + cacheUpdateTimerIsSet := true for { select { case event, ok := <-sbsWatcher.Events: @@ -99,11 +123,18 @@ func (km *KataMonitor) startPodCacheUpdater() { case fsnotify.Create: splitPath := strings.Split(event.Name, string(os.PathSeparator)) id := splitPath[len(splitPath)-1] - if !km.sandboxCache.putIfNotExists(id, true) { + if !km.sandboxCache.putIfNotExists(id, sandboxKubeData{}) { monitorLog.WithField("pod", id).Warn( "CREATE event but pod already present in the sandbox cache") } + sandboxList = append(sandboxList, id) monitorLog.WithField("pod", id).Info("sandbox cache: added pod") + if !cacheUpdateTimerIsSet { + cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) + cacheUpdateTimerIsSet = true + monitorLog.Debugf( + "cache update timer fires in %d secs", podCacheRefreshDelaySeconds) + } case fsnotify.Remove: splitPath := strings.Split(event.Name, string(os.PathSeparator)) @@ -112,26 +143,27 @@ func (km *KataMonitor) startPodCacheUpdater() { monitorLog.WithField("pod", id).Warn( "REMOVE event but pod was missing from the sandbox cache") } + sandboxList = removeFromSandboxList(sandboxList, id) monitorLog.WithField("pod", id).Info("sandbox cache: removed pod") } - // While we process fs events directly to update the sandbox cache we need to sync with the - // container engine to ensure we are on sync with it: we can get out of sync in environments - // where kata workloads can be started by other processes than the container engine. - cacheUpdateTimerWasSet = cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) - monitorLog.WithField("was reset", cacheUpdateTimerWasSet).Debugf( - "cache update timer fires in %d secs", podCacheRefreshDelaySeconds) - case <-cacheUpdateTimer.C: - monitorLog.Debugf("sync sandbox cache with the container engine") - sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes()) + cacheUpdateTimerIsSet = false + monitorLog.WithField("pod list", sandboxList).Debugf( + "retrieve pods metadata from the container manager") + sandboxList, err = km.syncSandboxes(sandboxList) if err != nil { - monitorLog.WithError(err).Error("failed to get sandboxes") + monitorLog.WithError(err).Error("failed to get sandboxes metadata") continue } - monitorLog.WithField("count", len(sandboxes)).Info("synced sandbox cache with the container engine") - monitorLog.WithField("sandboxes", sandboxes).Trace("dump sandbox cache") - km.sandboxCache.set(sandboxes) + if len(sandboxList) > 0 { + monitorLog.WithField("sandboxes", sandboxList).Debugf( + "%d sandboxes still miss metadata", len(sandboxList)) + cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) + cacheUpdateTimerIsSet = true + } + + monitorLog.WithField("sandboxes", km.sandboxCache.getSandboxList()).Trace("dump sandbox cache") } } } @@ -155,7 +187,7 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) { // ListSandboxes list all sandboxes running in Kata func (km *KataMonitor) ListSandboxes(w http.ResponseWriter, r *http.Request) { - sandboxes := km.sandboxCache.getKataSandboxes() + sandboxes := km.sandboxCache.getSandboxList() for _, s := range sandboxes { w.Write([]byte(fmt.Sprintf("%s\n", s))) } diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache.go b/src/runtime/pkg/kata-monitor/sandbox_cache.go index 98978f193..f4f542b41 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache.go @@ -9,28 +9,31 @@ import ( "sync" ) +type sandboxKubeData struct { + uid string + name string + namespace string +} type sandboxCache struct { *sync.Mutex - // the bool value tracks if the pod is a kata one (true) or not (false) - sandboxes map[string]bool + // the sandboxKubeData links the sandbox id from the container manager to the pod metadata of kubernetes + sandboxes map[string]sandboxKubeData } -func (sc *sandboxCache) getAllSandboxes() map[string]bool { +func (sc *sandboxCache) getSandboxes() map[string]sandboxKubeData { sc.Lock() defer sc.Unlock() return sc.sandboxes } -func (sc *sandboxCache) getKataSandboxes() []string { +func (sc *sandboxCache) getSandboxList() []string { sc.Lock() defer sc.Unlock() - var katasandboxes []string - for id, isKata := range sc.sandboxes { - if isKata { - katasandboxes = append(katasandboxes, id) - } + var sandboxList []string + for id := range sc.sandboxes { + sandboxList = append(sandboxList, id) } - return katasandboxes + return sandboxList } func (sc *sandboxCache) deleteIfExists(id string) bool { @@ -46,7 +49,7 @@ func (sc *sandboxCache) deleteIfExists(id string) bool { return false } -func (sc *sandboxCache) putIfNotExists(id string, value bool) bool { +func (sc *sandboxCache) putIfNotExists(id string, value sandboxKubeData) bool { sc.Lock() defer sc.Unlock() @@ -59,7 +62,14 @@ func (sc *sandboxCache) putIfNotExists(id string, value bool) bool { return false } -func (sc *sandboxCache) set(sandboxes map[string]bool) { +func (sc *sandboxCache) setMetadata(id string, value sandboxKubeData) { + sc.Lock() + defer sc.Unlock() + + sc.sandboxes[id] = value +} + +func (sc *sandboxCache) set(sandboxes map[string]sandboxKubeData) { sc.Lock() defer sc.Unlock() sc.sandboxes = sandboxes diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go index 43b8c8c99..6dd2e7894 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go @@ -16,24 +16,24 @@ func TestSandboxCache(t *testing.T) { assert := assert.New(t) sc := &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]bool), + sandboxes: make(map[string]sandboxKubeData), } - scMap := map[string]bool{"111": true} + scMap := map[string]sandboxKubeData{"111": {"1-2-3", "test-name", "test-namespace"}} sc.set(scMap) - scMap = sc.getAllSandboxes() + scMap = sc.getSandboxes() assert.Equal(1, len(scMap)) // put new item id := "new-id" - b := sc.putIfNotExists(id, true) + b := sc.putIfNotExists(id, sandboxKubeData{}) assert.Equal(true, b) assert.Equal(2, len(scMap)) // put key that alreay exists - b = sc.putIfNotExists(id, true) + b = sc.putIfNotExists(id, sandboxKubeData{}) assert.Equal(false, b) b = sc.deleteIfExists(id) From 834e199eee19c430f519417677dfc7f5f99dc7e7 Mon Sep 17 00:00:00 2001 From: Francesco Giudici Date: Tue, 25 Jan 2022 11:13:44 +0100 Subject: [PATCH 4/5] kata-monitor: drop unused functions Drop the functions we are not using anymore. Update the tests too. Signed-off-by: Francesco Giudici --- src/runtime/pkg/kata-monitor/sandbox_cache.go | 12 ------------ .../pkg/kata-monitor/sandbox_cache_test.go | 15 +++++---------- src/runtime/pkg/kata-monitor/shim_client.go | 9 --------- 3 files changed, 5 insertions(+), 31 deletions(-) diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache.go b/src/runtime/pkg/kata-monitor/sandbox_cache.go index f4f542b41..4e3e77845 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache.go @@ -20,12 +20,6 @@ type sandboxCache struct { sandboxes map[string]sandboxKubeData } -func (sc *sandboxCache) getSandboxes() map[string]sandboxKubeData { - sc.Lock() - defer sc.Unlock() - return sc.sandboxes -} - func (sc *sandboxCache) getSandboxList() []string { sc.Lock() defer sc.Unlock() @@ -68,9 +62,3 @@ func (sc *sandboxCache) setMetadata(id string, value sandboxKubeData) { sc.sandboxes[id] = value } - -func (sc *sandboxCache) set(sandboxes map[string]sandboxKubeData) { - sc.Lock() - defer sc.Unlock() - sc.sandboxes = sandboxes -} diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go index 6dd2e7894..4eedf778a 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go @@ -16,21 +16,16 @@ func TestSandboxCache(t *testing.T) { assert := assert.New(t) sc := &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]sandboxKubeData), + sandboxes: map[string]sandboxKubeData{"111": {"1-2-3", "test-name", "test-namespace"}}, } - scMap := map[string]sandboxKubeData{"111": {"1-2-3", "test-name", "test-namespace"}} - - sc.set(scMap) - - scMap = sc.getSandboxes() - assert.Equal(1, len(scMap)) + assert.Equal(1, len(sc.getSandboxList())) // put new item id := "new-id" b := sc.putIfNotExists(id, sandboxKubeData{}) assert.Equal(true, b) - assert.Equal(2, len(scMap)) + assert.Equal(2, len(sc.getSandboxList())) // put key that alreay exists b = sc.putIfNotExists(id, sandboxKubeData{}) @@ -38,9 +33,9 @@ func TestSandboxCache(t *testing.T) { b = sc.deleteIfExists(id) assert.Equal(true, b) - assert.Equal(1, len(scMap)) + assert.Equal(1, len(sc.getSandboxList())) b = sc.deleteIfExists(id) assert.Equal(false, b) - assert.Equal(1, len(scMap)) + assert.Equal(1, len(sc.getSandboxList())) } diff --git a/src/runtime/pkg/kata-monitor/shim_client.go b/src/runtime/pkg/kata-monitor/shim_client.go index 31043c847..bdb62d401 100644 --- a/src/runtime/pkg/kata-monitor/shim_client.go +++ b/src/runtime/pkg/kata-monitor/shim_client.go @@ -10,8 +10,6 @@ import ( "io" "net" "net/http" - "os" - "path/filepath" "time" cdshim "github.com/containerd/containerd/runtime/v2/shim" @@ -43,13 +41,6 @@ func getSandboxFS() string { return shim.GetSandboxesStoragePath() } -func checkSandboxFSExists(sandboxID string) bool { - sbsPath := filepath.Join(string(filepath.Separator), getSandboxFS(), sandboxID) - _, err := os.Stat(sbsPath) - - return !os.IsNotExist(err) -} - // BuildShimClient builds and returns an http client for communicating with the provided sandbox func BuildShimClient(sandboxID string, timeout time.Duration) (*http.Client, error) { return buildUnixSocketClient(shim.SocketAddress(sandboxID), timeout) From ab447285badb68ecda94afe4a306fd188d292e0b Mon Sep 17 00:00:00 2001 From: Francesco Giudici Date: Tue, 25 Jan 2022 13:42:39 +0100 Subject: [PATCH 5/5] kata-monitor: add kubernetes pod metadata labels to metrics Add the POD metadata we get from the container manager to the metrics by adding more labels. Fixes: #3551 Signed-off-by: Francesco Giudici --- src/runtime/pkg/kata-monitor/metrics.go | 38 ++++++++++++++----- src/runtime/pkg/kata-monitor/metrics_test.go | 12 +++++- src/runtime/pkg/kata-monitor/sandbox_cache.go | 8 ++++ 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/src/runtime/pkg/kata-monitor/metrics.go b/src/runtime/pkg/kata-monitor/metrics.go index d2958441b..0216969cb 100644 --- a/src/runtime/pkg/kata-monitor/metrics.go +++ b/src/runtime/pkg/kata-monitor/metrics.go @@ -160,9 +160,13 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { // get metrics from sandbox's shim for _, sandboxID := range sandboxes { + sandboxMetadata, ok := km.sandboxCache.getMetadata(sandboxID) + if !ok { // likely the sandbox has been just removed + continue + } wg.Add(1) - go func(sandboxID string, results chan<- []*dto.MetricFamily) { - sandboxMetrics, err := getParsedMetrics(sandboxID) + go func(sandboxID string, sandboxMetadata sandboxKubeData, results chan<- []*dto.MetricFamily) { + sandboxMetrics, err := getParsedMetrics(sandboxID, sandboxMetadata) if err != nil { monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox") } @@ -170,7 +174,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { results <- sandboxMetrics wg.Done() monitorLog.WithField("sandbox_id", sandboxID).Debug("job finished") - }(sandboxID, results) + }(sandboxID, sandboxMetadata, results) monitorLog.WithField("sandbox_id", sandboxID).Debug("job started") } @@ -219,13 +223,13 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { } -func getParsedMetrics(sandboxID string) ([]*dto.MetricFamily, error) { +func getParsedMetrics(sandboxID string, sandboxMetadata sandboxKubeData) ([]*dto.MetricFamily, error) { body, err := doGet(sandboxID, defaultTimeout, "metrics") if err != nil { return nil, err } - return parsePrometheusMetrics(sandboxID, body) + return parsePrometheusMetrics(sandboxID, sandboxMetadata, body) } // GetSandboxMetrics will get sandbox's metrics from shim @@ -240,7 +244,7 @@ func GetSandboxMetrics(sandboxID string) (string, error) { // parsePrometheusMetrics will decode metrics from Prometheus text format // and return array of *dto.MetricFamily with an ASC order -func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily, error) { +func parsePrometheusMetrics(sandboxID string, sandboxMetadata sandboxKubeData, body []byte) ([]*dto.MetricFamily, error) { reader := bytes.NewReader(body) decoder := expfmt.NewDecoder(reader, expfmt.FmtText) @@ -258,10 +262,24 @@ func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily, metricList := mf.Metric for j := range metricList { metric := metricList[j] - metric.Label = append(metric.Label, &dto.LabelPair{ - Name: mutils.String2Pointer("sandbox_id"), - Value: mutils.String2Pointer(sandboxID), - }) + metric.Label = append(metric.Label, + &dto.LabelPair{ + Name: mutils.String2Pointer("sandbox_id"), + Value: mutils.String2Pointer(sandboxID), + }, + &dto.LabelPair{ + Name: mutils.String2Pointer("kube_uid"), + Value: mutils.String2Pointer(sandboxMetadata.uid), + }, + &dto.LabelPair{ + Name: mutils.String2Pointer("kube_name"), + Value: mutils.String2Pointer(sandboxMetadata.name), + }, + &dto.LabelPair{ + Name: mutils.String2Pointer("kube_namespace"), + Value: mutils.String2Pointer(sandboxMetadata.namespace), + }, + ) } // Kata shim are using prometheus go client, add a prefix for metric name to avoid confusing diff --git a/src/runtime/pkg/kata-monitor/metrics_test.go b/src/runtime/pkg/kata-monitor/metrics_test.go index 5263d2a93..1055a6d36 100644 --- a/src/runtime/pkg/kata-monitor/metrics_test.go +++ b/src/runtime/pkg/kata-monitor/metrics_test.go @@ -40,9 +40,10 @@ ttt 999 func TestParsePrometheusMetrics(t *testing.T) { assert := assert.New(t) sandboxID := "sandboxID-abc" + sandboxMetadata := sandboxKubeData{"123", "pod-name", "pod-namespace"} // parse metrics - list, err := parsePrometheusMetrics(sandboxID, []byte(shimMetricBody)) + list, err := parsePrometheusMetrics(sandboxID, sandboxMetadata, []byte(shimMetricBody)) assert.Nil(err, "parsePrometheusMetrics should not return error") assert.Equal(4, len(list), "should return 3 metric families") @@ -56,9 +57,16 @@ func TestParsePrometheusMetrics(t *testing.T) { // get the metric m := mf.Metric[0] - assert.Equal(1, len(m.Label), "should have only 1 labels") + assert.Equal(4, len(m.Label), "should have 4 labels") assert.Equal("sandbox_id", *m.Label[0].Name, "label name should be sandbox_id") assert.Equal(sandboxID, *m.Label[0].Value, "label value should be", sandboxID) + assert.Equal("kube_uid", *m.Label[1].Name, "label name should be kube_uid") + assert.Equal(sandboxMetadata.uid, *m.Label[1].Value, "label value should be", sandboxMetadata.uid) + + assert.Equal("kube_name", *m.Label[2].Name, "label name should be kube_name") + assert.Equal(sandboxMetadata.name, *m.Label[2].Value, "label value should be", sandboxMetadata.name) + assert.Equal("kube_namespace", *m.Label[3].Name, "label name should be kube_namespace") + assert.Equal(sandboxMetadata.namespace, *m.Label[3].Value, "label value should be", sandboxMetadata.namespace) summary := m.Summary assert.NotNil(summary, "summary should not be nil") diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache.go b/src/runtime/pkg/kata-monitor/sandbox_cache.go index 4e3e77845..ba98a121f 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache.go @@ -62,3 +62,11 @@ func (sc *sandboxCache) setMetadata(id string, value sandboxKubeData) { sc.sandboxes[id] = value } + +func (sc *sandboxCache) getMetadata(id string) (sandboxKubeData, bool) { + sc.Lock() + defer sc.Unlock() + + metadata, ok := sc.sandboxes[id] + return metadata, ok +}