diff --git a/src/runtime/pkg/kata-monitor/cri.go b/src/runtime/pkg/kata-monitor/cri.go index f9cbf3c9fd..0e9e068849 100644 --- a/src/runtime/pkg/kata-monitor/cri.go +++ b/src/runtime/pkg/kata-monitor/cri.go @@ -39,7 +39,6 @@ func getAddressAndDialer(endpoint string) (string, func(ctx context.Context, add func getConnection(endPoint string) (*grpc.ClientConn, error) { var conn *grpc.ClientConn - monitorLog.Debugf("connect using endpoint '%s' with '%s' timeout", endPoint, defaultTimeout) addr, dialer, err := getAddressAndDialer(endPoint) if err != nil { return nil, err @@ -51,7 +50,7 @@ func getConnection(endPoint string) (*grpc.ClientConn, error) { errMsg := errors.Wrapf(err, "connect endpoint '%s', make sure you are running as root and the endpoint has been started", endPoint) return nil, errMsg } - monitorLog.Debugf("connected successfully using endpoint: %s", endPoint) + monitorLog.Tracef("connected successfully using endpoint: %s", endPoint) return conn, nil } @@ -114,15 +113,15 @@ func parseEndpoint(endpoint string) (string, string, error) { } } -// getSandboxes gets ready sandboxes from the container engine and returns an updated sandboxMap -func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool, error) { - newMap := make(map[string]bool) +// syncSandboxes gets pods metadata from the container manager and updates the sandbox cache. +func (km *KataMonitor) syncSandboxes(sandboxList []string) ([]string, error) { runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint) if err != nil { - return newMap, err + return sandboxList, err } defer closeConnection(runtimeConn) + // TODO: if len(sandboxList) is 1, better we just runtimeClient.PodSandboxStatus(...) targeting the single sandbox filter := &pb.PodSandboxFilter{ State: &pb.PodSandboxStateValue{ State: pb.PodSandboxState_SANDBOX_READY, @@ -132,29 +131,35 @@ func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool request := &pb.ListPodSandboxRequest{ Filter: filter, } - monitorLog.Debugf("ListPodSandboxRequest: %v", request) + monitorLog.Tracef("ListPodSandboxRequest: %v", request) r, err := runtimeClient.ListPodSandbox(context.Background(), request) if err != nil { - return newMap, err + return sandboxList, err } - monitorLog.Debugf("ListPodSandboxResponse: %v", r) + monitorLog.Tracef("ListPodSandboxResponse: %v", r) for _, pod := range r.Items { - // Use the cached data if available - if isKata, ok := sandboxMap[pod.Id]; ok { - newMap[pod.Id] = isKata - continue + for _, sandbox := range sandboxList { + if pod.Id == sandbox { + km.sandboxCache.setMetadata(sandbox, sandboxKubeData{ + uid: pod.Metadata.Uid, + name: pod.Metadata.Name, + namespace: pod.Metadata.Namespace, + }) + + sandboxList = removeFromSandboxList(sandboxList, sandbox) + + monitorLog.WithFields(logrus.Fields{ + "Pod Name": pod.Metadata.Name, + "Pod Namespace": pod.Metadata.Namespace, + "Pod UID": pod.Metadata.Uid, + }).Debugf("Synced KATA POD %s", pod.Id) + + break + } } - - // Check if a directory associated with the POD ID exist on the kata fs: - // if so we know that the POD is a kata one. - newMap[pod.Id] = checkSandboxFSExists(pod.Id) - monitorLog.WithFields(logrus.Fields{ - "id": pod.Id, - "is kata": newMap[pod.Id], - "pod": pod, - }).Debug("") } - - return newMap, nil + // TODO: here we should mark the sandboxes we failed to retrieve info from: we should try a finite number of times + // to retrieve their metadata: if we fail resign and remove them from the sanbox cache (with a Warning log). + return sandboxList, nil } diff --git a/src/runtime/pkg/kata-monitor/metrics.go b/src/runtime/pkg/kata-monitor/metrics.go index b1788de463..0216969cb1 100644 --- a/src/runtime/pkg/kata-monitor/metrics.go +++ b/src/runtime/pkg/kata-monitor/metrics.go @@ -141,7 +141,7 @@ func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error { // aggregateSandboxMetrics will get metrics from one sandbox and do some process func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { // get all kata sandboxes from cache - sandboxes := km.sandboxCache.getKataSandboxes() + sandboxes := km.sandboxCache.getSandboxList() // save running kata pods as a metrics. runningShimCount.Set(float64(len(sandboxes))) @@ -156,13 +156,17 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { // used to receive response results := make(chan []*dto.MetricFamily, len(sandboxes)) - monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count") + monitorLog.WithField("sandboxes count", len(sandboxes)).Debugf("aggregate sandbox metrics") // get metrics from sandbox's shim for _, sandboxID := range sandboxes { + sandboxMetadata, ok := km.sandboxCache.getMetadata(sandboxID) + if !ok { // likely the sandbox has been just removed + continue + } wg.Add(1) - go func(sandboxID string, results chan<- []*dto.MetricFamily) { - sandboxMetrics, err := getParsedMetrics(sandboxID) + go func(sandboxID string, sandboxMetadata sandboxKubeData, results chan<- []*dto.MetricFamily) { + sandboxMetrics, err := getParsedMetrics(sandboxID, sandboxMetadata) if err != nil { monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox") } @@ -170,7 +174,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { results <- sandboxMetrics wg.Done() monitorLog.WithField("sandbox_id", sandboxID).Debug("job finished") - }(sandboxID, results) + }(sandboxID, sandboxMetadata, results) monitorLog.WithField("sandbox_id", sandboxID).Debug("job started") } @@ -219,13 +223,13 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { } -func getParsedMetrics(sandboxID string) ([]*dto.MetricFamily, error) { +func getParsedMetrics(sandboxID string, sandboxMetadata sandboxKubeData) ([]*dto.MetricFamily, error) { body, err := doGet(sandboxID, defaultTimeout, "metrics") if err != nil { return nil, err } - return parsePrometheusMetrics(sandboxID, body) + return parsePrometheusMetrics(sandboxID, sandboxMetadata, body) } // GetSandboxMetrics will get sandbox's metrics from shim @@ -240,7 +244,7 @@ func GetSandboxMetrics(sandboxID string) (string, error) { // parsePrometheusMetrics will decode metrics from Prometheus text format // and return array of *dto.MetricFamily with an ASC order -func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily, error) { +func parsePrometheusMetrics(sandboxID string, sandboxMetadata sandboxKubeData, body []byte) ([]*dto.MetricFamily, error) { reader := bytes.NewReader(body) decoder := expfmt.NewDecoder(reader, expfmt.FmtText) @@ -258,10 +262,24 @@ func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily, metricList := mf.Metric for j := range metricList { metric := metricList[j] - metric.Label = append(metric.Label, &dto.LabelPair{ - Name: mutils.String2Pointer("sandbox_id"), - Value: mutils.String2Pointer(sandboxID), - }) + metric.Label = append(metric.Label, + &dto.LabelPair{ + Name: mutils.String2Pointer("sandbox_id"), + Value: mutils.String2Pointer(sandboxID), + }, + &dto.LabelPair{ + Name: mutils.String2Pointer("kube_uid"), + Value: mutils.String2Pointer(sandboxMetadata.uid), + }, + &dto.LabelPair{ + Name: mutils.String2Pointer("kube_name"), + Value: mutils.String2Pointer(sandboxMetadata.name), + }, + &dto.LabelPair{ + Name: mutils.String2Pointer("kube_namespace"), + Value: mutils.String2Pointer(sandboxMetadata.namespace), + }, + ) } // Kata shim are using prometheus go client, add a prefix for metric name to avoid confusing diff --git a/src/runtime/pkg/kata-monitor/metrics_test.go b/src/runtime/pkg/kata-monitor/metrics_test.go index 5263d2a932..1055a6d361 100644 --- a/src/runtime/pkg/kata-monitor/metrics_test.go +++ b/src/runtime/pkg/kata-monitor/metrics_test.go @@ -40,9 +40,10 @@ ttt 999 func TestParsePrometheusMetrics(t *testing.T) { assert := assert.New(t) sandboxID := "sandboxID-abc" + sandboxMetadata := sandboxKubeData{"123", "pod-name", "pod-namespace"} // parse metrics - list, err := parsePrometheusMetrics(sandboxID, []byte(shimMetricBody)) + list, err := parsePrometheusMetrics(sandboxID, sandboxMetadata, []byte(shimMetricBody)) assert.Nil(err, "parsePrometheusMetrics should not return error") assert.Equal(4, len(list), "should return 3 metric families") @@ -56,9 +57,16 @@ func TestParsePrometheusMetrics(t *testing.T) { // get the metric m := mf.Metric[0] - assert.Equal(1, len(m.Label), "should have only 1 labels") + assert.Equal(4, len(m.Label), "should have 4 labels") assert.Equal("sandbox_id", *m.Label[0].Name, "label name should be sandbox_id") assert.Equal(sandboxID, *m.Label[0].Value, "label value should be", sandboxID) + assert.Equal("kube_uid", *m.Label[1].Name, "label name should be kube_uid") + assert.Equal(sandboxMetadata.uid, *m.Label[1].Value, "label value should be", sandboxMetadata.uid) + + assert.Equal("kube_name", *m.Label[2].Name, "label name should be kube_name") + assert.Equal(sandboxMetadata.name, *m.Label[2].Value, "label value should be", sandboxMetadata.name) + assert.Equal("kube_namespace", *m.Label[3].Name, "label name should be kube_namespace") + assert.Equal(sandboxMetadata.namespace, *m.Label[3].Value, "label value should be", sandboxMetadata.namespace) summary := m.Summary assert.NotNil(summary, "summary should not be nil") diff --git a/src/runtime/pkg/kata-monitor/monitor.go b/src/runtime/pkg/kata-monitor/monitor.go index 6d0003af59..caacfb2228 100644 --- a/src/runtime/pkg/kata-monitor/monitor.go +++ b/src/runtime/pkg/kata-monitor/monitor.go @@ -53,7 +53,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { runtimeEndpoint: runtimeEndpoint, sandboxCache: &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]bool), + sandboxes: make(map[string]sandboxKubeData), }, } @@ -65,6 +65,15 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { return km, nil } +func removeFromSandboxList(sandboxList []string, sandboxToRemove string) []string { + for i, sandbox := range sandboxList { + if sandbox == sandboxToRemove { + return append(sandboxList[:i], sandboxList[i+1:]...) + } + } + return sandboxList +} + // startPodCacheUpdater will boot a thread to manage sandbox cache func (km *KataMonitor) startPodCacheUpdater() { sbsWatcher, err := fsnotify.NewWatcher() @@ -84,9 +93,24 @@ func (km *KataMonitor) startPodCacheUpdater() { monitorLog.Debugf("started fs monitoring @%s", getSandboxFS()) break } - // we refresh the pod cache once if we get multiple add/delete pod events in a short time (< podCacheRefreshDelaySeconds) + // Initial sync with the kata sandboxes already running + sbsFile, err := os.Open(getSandboxFS()) + if err != nil { + monitorLog.WithError(err).Fatal("cannot open sandboxes fs") + os.Exit(1) + } + sandboxList, err := sbsFile.Readdirnames(0) + if err != nil { + monitorLog.WithError(err).Fatal("cannot read sandboxes fs") + os.Exit(1) + } + monitorLog.Debug("initial sync of sbs directory completed") + monitorLog.Tracef("pod list from sbs: %v", sandboxList) + + // We should get kubernetes metadata from the container manager for each new kata sandbox we detect. + // It may take a while for data to be available, so we always wait podCacheRefreshDelaySeconds before checking. cacheUpdateTimer := time.NewTimer(podCacheRefreshDelaySeconds * time.Second) - cacheUpdateTimerWasSet := false + cacheUpdateTimerIsSet := true for { select { case event, ok := <-sbsWatcher.Events: @@ -99,11 +123,18 @@ func (km *KataMonitor) startPodCacheUpdater() { case fsnotify.Create: splitPath := strings.Split(event.Name, string(os.PathSeparator)) id := splitPath[len(splitPath)-1] - if !km.sandboxCache.putIfNotExists(id, true) { + if !km.sandboxCache.putIfNotExists(id, sandboxKubeData{}) { monitorLog.WithField("pod", id).Warn( "CREATE event but pod already present in the sandbox cache") } + sandboxList = append(sandboxList, id) monitorLog.WithField("pod", id).Info("sandbox cache: added pod") + if !cacheUpdateTimerIsSet { + cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) + cacheUpdateTimerIsSet = true + monitorLog.Debugf( + "cache update timer fires in %d secs", podCacheRefreshDelaySeconds) + } case fsnotify.Remove: splitPath := strings.Split(event.Name, string(os.PathSeparator)) @@ -112,28 +143,27 @@ func (km *KataMonitor) startPodCacheUpdater() { monitorLog.WithField("pod", id).Warn( "REMOVE event but pod was missing from the sandbox cache") } + sandboxList = removeFromSandboxList(sandboxList, id) monitorLog.WithField("pod", id).Info("sandbox cache: removed pod") - - default: - monitorLog.WithField("event", event).Warn("got unexpected fs event") } - // While we process fs events directly to update the sandbox cache we need to sync with the - // container engine to ensure we are on sync with it: we can get out of sync in environments - // where kata workloads can be started by other processes than the container engine. - cacheUpdateTimerWasSet = cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) - monitorLog.WithField("was reset", cacheUpdateTimerWasSet).Debugf( - "cache update timer fires in %d secs", podCacheRefreshDelaySeconds) - case <-cacheUpdateTimer.C: - sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes()) + cacheUpdateTimerIsSet = false + monitorLog.WithField("pod list", sandboxList).Debugf( + "retrieve pods metadata from the container manager") + sandboxList, err = km.syncSandboxes(sandboxList) if err != nil { - monitorLog.WithError(err).Error("failed to get sandboxes") + monitorLog.WithError(err).Error("failed to get sandboxes metadata") continue } - monitorLog.WithField("count", len(sandboxes)).Info("synced sandbox cache with the container engine") - monitorLog.WithField("sandboxes", sandboxes).Debug("dump sandbox cache") - km.sandboxCache.set(sandboxes) + if len(sandboxList) > 0 { + monitorLog.WithField("sandboxes", sandboxList).Debugf( + "%d sandboxes still miss metadata", len(sandboxList)) + cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) + cacheUpdateTimerIsSet = true + } + + monitorLog.WithField("sandboxes", km.sandboxCache.getSandboxList()).Trace("dump sandbox cache") } } } @@ -157,7 +187,7 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) { // ListSandboxes list all sandboxes running in Kata func (km *KataMonitor) ListSandboxes(w http.ResponseWriter, r *http.Request) { - sandboxes := km.sandboxCache.getKataSandboxes() + sandboxes := km.sandboxCache.getSandboxList() for _, s := range sandboxes { w.Write([]byte(fmt.Sprintf("%s\n", s))) } diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache.go b/src/runtime/pkg/kata-monitor/sandbox_cache.go index 98978f193f..ba98a121f2 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache.go @@ -9,28 +9,25 @@ import ( "sync" ) +type sandboxKubeData struct { + uid string + name string + namespace string +} type sandboxCache struct { *sync.Mutex - // the bool value tracks if the pod is a kata one (true) or not (false) - sandboxes map[string]bool + // the sandboxKubeData links the sandbox id from the container manager to the pod metadata of kubernetes + sandboxes map[string]sandboxKubeData } -func (sc *sandboxCache) getAllSandboxes() map[string]bool { +func (sc *sandboxCache) getSandboxList() []string { sc.Lock() defer sc.Unlock() - return sc.sandboxes -} - -func (sc *sandboxCache) getKataSandboxes() []string { - sc.Lock() - defer sc.Unlock() - var katasandboxes []string - for id, isKata := range sc.sandboxes { - if isKata { - katasandboxes = append(katasandboxes, id) - } + var sandboxList []string + for id := range sc.sandboxes { + sandboxList = append(sandboxList, id) } - return katasandboxes + return sandboxList } func (sc *sandboxCache) deleteIfExists(id string) bool { @@ -46,7 +43,7 @@ func (sc *sandboxCache) deleteIfExists(id string) bool { return false } -func (sc *sandboxCache) putIfNotExists(id string, value bool) bool { +func (sc *sandboxCache) putIfNotExists(id string, value sandboxKubeData) bool { sc.Lock() defer sc.Unlock() @@ -59,8 +56,17 @@ func (sc *sandboxCache) putIfNotExists(id string, value bool) bool { return false } -func (sc *sandboxCache) set(sandboxes map[string]bool) { +func (sc *sandboxCache) setMetadata(id string, value sandboxKubeData) { sc.Lock() defer sc.Unlock() - sc.sandboxes = sandboxes + + sc.sandboxes[id] = value +} + +func (sc *sandboxCache) getMetadata(id string) (sandboxKubeData, bool) { + sc.Lock() + defer sc.Unlock() + + metadata, ok := sc.sandboxes[id] + return metadata, ok } diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go index 43b8c8c99b..4eedf778ad 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go @@ -16,31 +16,26 @@ func TestSandboxCache(t *testing.T) { assert := assert.New(t) sc := &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]bool), + sandboxes: map[string]sandboxKubeData{"111": {"1-2-3", "test-name", "test-namespace"}}, } - scMap := map[string]bool{"111": true} - - sc.set(scMap) - - scMap = sc.getAllSandboxes() - assert.Equal(1, len(scMap)) + assert.Equal(1, len(sc.getSandboxList())) // put new item id := "new-id" - b := sc.putIfNotExists(id, true) + b := sc.putIfNotExists(id, sandboxKubeData{}) assert.Equal(true, b) - assert.Equal(2, len(scMap)) + assert.Equal(2, len(sc.getSandboxList())) // put key that alreay exists - b = sc.putIfNotExists(id, true) + b = sc.putIfNotExists(id, sandboxKubeData{}) assert.Equal(false, b) b = sc.deleteIfExists(id) assert.Equal(true, b) - assert.Equal(1, len(scMap)) + assert.Equal(1, len(sc.getSandboxList())) b = sc.deleteIfExists(id) assert.Equal(false, b) - assert.Equal(1, len(scMap)) + assert.Equal(1, len(sc.getSandboxList())) } diff --git a/src/runtime/pkg/kata-monitor/shim_client.go b/src/runtime/pkg/kata-monitor/shim_client.go index 31043c847b..bdb62d401b 100644 --- a/src/runtime/pkg/kata-monitor/shim_client.go +++ b/src/runtime/pkg/kata-monitor/shim_client.go @@ -10,8 +10,6 @@ import ( "io" "net" "net/http" - "os" - "path/filepath" "time" cdshim "github.com/containerd/containerd/runtime/v2/shim" @@ -43,13 +41,6 @@ func getSandboxFS() string { return shim.GetSandboxesStoragePath() } -func checkSandboxFSExists(sandboxID string) bool { - sbsPath := filepath.Join(string(filepath.Separator), getSandboxFS(), sandboxID) - _, err := os.Stat(sbsPath) - - return !os.IsNotExist(err) -} - // BuildShimClient builds and returns an http client for communicating with the provided sandbox func BuildShimClient(sandboxID string, timeout time.Duration) (*http.Client, error) { return buildUnixSocketClient(shim.SocketAddress(sandboxID), timeout)