kata-monitor: rework the sandbox cache sync with the container manager

Kata-monitor detects started and terminated kata pods by monitoring the
vc/sbs fs (this makes sense since we will have to access that path to
access the sockets there to get the metrics from the shim).
While kata-monitor updates its sandbox cache based on the sbs fs events,
it will schedule also a sync with the container manager via the CRI in
order to sync the list of sandboxes there.
The container manager will be the ultimate source of truth, so we will
stick with the response from the container manager, removing the
sandboxes not reported from the container manager.

May happen anyway that when we check the container manager, the new kata
pod is not reported yet, and we will remove it from the kata-monitor pod
cache. If we don't get any new kata pod added or removed, we will not
check with the container manager again, missing reporting metrics about
that kata pod.

Let's stick with the sbs fs as the source of truth: we will update the
cache just following what happens on the sbs fs.
At this point we may have also decided to drop the container manager
connection... better instead to keep it in order to get the kube pod
metadata from it, i.e., the kube UID, Name and Namespace associated with
the sandbox.
Every time we get a new sandbox from the sbs fs we will try to retrieve the
pod metadata associated with it.

Right now we just attach the container manager sandbox id as a label to
the exposed metrics, making hard to link the metrics to the running pod
in the kubernetes cluster.
With kubernetes pod metadata we will be able to add them as labels to map
explicitly the metrics to the kubernetes workloads.

Fixes: #3550

Signed-off-by: Francesco Giudici <fgiudici@redhat.com>
This commit is contained in:
Francesco Giudici 2022-01-25 09:33:07 +01:00
parent e78d80ea0d
commit 7516a8c51b
5 changed files with 103 additions and 61 deletions

View File

@ -113,15 +113,15 @@ func parseEndpoint(endpoint string) (string, string, error) {
}
}
// getSandboxes gets ready sandboxes from the container engine and returns an updated sandboxMap
func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool, error) {
newMap := make(map[string]bool)
// syncSandboxes gets pods metadata from the container manager and updates the sandbox cache.
func (km *KataMonitor) syncSandboxes(sandboxList []string) ([]string, error) {
runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint)
if err != nil {
return newMap, err
return sandboxList, err
}
defer closeConnection(runtimeConn)
// TODO: if len(sandboxList) is 1, better we just runtimeClient.PodSandboxStatus(...) targeting the single sandbox
filter := &pb.PodSandboxFilter{
State: &pb.PodSandboxStateValue{
State: pb.PodSandboxState_SANDBOX_READY,
@ -134,32 +134,32 @@ func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool
monitorLog.Tracef("ListPodSandboxRequest: %v", request)
r, err := runtimeClient.ListPodSandbox(context.Background(), request)
if err != nil {
return newMap, err
return sandboxList, err
}
monitorLog.Tracef("ListPodSandboxResponse: %v", r)
for _, pod := range r.Items {
// Use the cached data if available
if isKata, ok := sandboxMap[pod.Id]; ok {
newMap[pod.Id] = isKata
if isKata {
monitorLog.Debugf("KATA POD %s (cached)", pod.Id)
for _, sandbox := range sandboxList {
if pod.Id == sandbox {
km.sandboxCache.setMetadata(sandbox, sandboxKubeData{
uid: pod.Metadata.Uid,
name: pod.Metadata.Name,
namespace: pod.Metadata.Namespace,
})
sandboxList = removeFromSandboxList(sandboxList, sandbox)
monitorLog.WithFields(logrus.Fields{
"Pod Name": pod.Metadata.Name,
"Pod Namespace": pod.Metadata.Namespace,
"Pod UID": pod.Metadata.Uid,
}).Debugf("Synced KATA POD %s", pod.Id)
break
}
continue
}
// Check if a directory associated with the POD ID exist on the kata fs:
// if so we know that the POD is a kata one.
newMap[pod.Id] = checkSandboxFSExists(pod.Id)
if newMap[pod.Id] {
monitorLog.Debugf("KATA POD %s (new)", pod.Id)
}
monitorLog.WithFields(logrus.Fields{
"id": pod.Id,
"is kata": newMap[pod.Id],
"pod": pod,
}).Trace("")
}
return newMap, nil
// TODO: here we should mark the sandboxes we failed to retrieve info from: we should try a finite number of times
// to retrieve their metadata: if we fail resign and remove them from the sanbox cache (with a Warning log).
return sandboxList, nil
}

View File

@ -141,7 +141,7 @@ func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error {
// aggregateSandboxMetrics will get metrics from one sandbox and do some process
func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
// get all kata sandboxes from cache
sandboxes := km.sandboxCache.getKataSandboxes()
sandboxes := km.sandboxCache.getSandboxList()
// save running kata pods as a metrics.
runningShimCount.Set(float64(len(sandboxes)))

View File

@ -53,7 +53,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
runtimeEndpoint: runtimeEndpoint,
sandboxCache: &sandboxCache{
Mutex: &sync.Mutex{},
sandboxes: make(map[string]bool),
sandboxes: make(map[string]sandboxKubeData),
},
}
@ -65,6 +65,15 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
return km, nil
}
func removeFromSandboxList(sandboxList []string, sandboxToRemove string) []string {
for i, sandbox := range sandboxList {
if sandbox == sandboxToRemove {
return append(sandboxList[:i], sandboxList[i+1:]...)
}
}
return sandboxList
}
// startPodCacheUpdater will boot a thread to manage sandbox cache
func (km *KataMonitor) startPodCacheUpdater() {
sbsWatcher, err := fsnotify.NewWatcher()
@ -84,9 +93,24 @@ func (km *KataMonitor) startPodCacheUpdater() {
monitorLog.Debugf("started fs monitoring @%s", getSandboxFS())
break
}
// we refresh the pod cache once if we get multiple add/delete pod events in a short time (< podCacheRefreshDelaySeconds)
// Initial sync with the kata sandboxes already running
sbsFile, err := os.Open(getSandboxFS())
if err != nil {
monitorLog.WithError(err).Fatal("cannot open sandboxes fs")
os.Exit(1)
}
sandboxList, err := sbsFile.Readdirnames(0)
if err != nil {
monitorLog.WithError(err).Fatal("cannot read sandboxes fs")
os.Exit(1)
}
monitorLog.Debug("initial sync of sbs directory completed")
monitorLog.Tracef("pod list from sbs: %v", sandboxList)
// We should get kubernetes metadata from the container manager for each new kata sandbox we detect.
// It may take a while for data to be available, so we always wait podCacheRefreshDelaySeconds before checking.
cacheUpdateTimer := time.NewTimer(podCacheRefreshDelaySeconds * time.Second)
cacheUpdateTimerWasSet := false
cacheUpdateTimerIsSet := true
for {
select {
case event, ok := <-sbsWatcher.Events:
@ -99,11 +123,18 @@ func (km *KataMonitor) startPodCacheUpdater() {
case fsnotify.Create:
splitPath := strings.Split(event.Name, string(os.PathSeparator))
id := splitPath[len(splitPath)-1]
if !km.sandboxCache.putIfNotExists(id, true) {
if !km.sandboxCache.putIfNotExists(id, sandboxKubeData{}) {
monitorLog.WithField("pod", id).Warn(
"CREATE event but pod already present in the sandbox cache")
}
sandboxList = append(sandboxList, id)
monitorLog.WithField("pod", id).Info("sandbox cache: added pod")
if !cacheUpdateTimerIsSet {
cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second)
cacheUpdateTimerIsSet = true
monitorLog.Debugf(
"cache update timer fires in %d secs", podCacheRefreshDelaySeconds)
}
case fsnotify.Remove:
splitPath := strings.Split(event.Name, string(os.PathSeparator))
@ -112,26 +143,27 @@ func (km *KataMonitor) startPodCacheUpdater() {
monitorLog.WithField("pod", id).Warn(
"REMOVE event but pod was missing from the sandbox cache")
}
sandboxList = removeFromSandboxList(sandboxList, id)
monitorLog.WithField("pod", id).Info("sandbox cache: removed pod")
}
// While we process fs events directly to update the sandbox cache we need to sync with the
// container engine to ensure we are on sync with it: we can get out of sync in environments
// where kata workloads can be started by other processes than the container engine.
cacheUpdateTimerWasSet = cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second)
monitorLog.WithField("was reset", cacheUpdateTimerWasSet).Debugf(
"cache update timer fires in %d secs", podCacheRefreshDelaySeconds)
case <-cacheUpdateTimer.C:
monitorLog.Debugf("sync sandbox cache with the container engine")
sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes())
cacheUpdateTimerIsSet = false
monitorLog.WithField("pod list", sandboxList).Debugf(
"retrieve pods metadata from the container manager")
sandboxList, err = km.syncSandboxes(sandboxList)
if err != nil {
monitorLog.WithError(err).Error("failed to get sandboxes")
monitorLog.WithError(err).Error("failed to get sandboxes metadata")
continue
}
monitorLog.WithField("count", len(sandboxes)).Info("synced sandbox cache with the container engine")
monitorLog.WithField("sandboxes", sandboxes).Trace("dump sandbox cache")
km.sandboxCache.set(sandboxes)
if len(sandboxList) > 0 {
monitorLog.WithField("sandboxes", sandboxList).Debugf(
"%d sandboxes still miss metadata", len(sandboxList))
cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second)
cacheUpdateTimerIsSet = true
}
monitorLog.WithField("sandboxes", km.sandboxCache.getSandboxList()).Trace("dump sandbox cache")
}
}
}
@ -155,7 +187,7 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) {
// ListSandboxes list all sandboxes running in Kata
func (km *KataMonitor) ListSandboxes(w http.ResponseWriter, r *http.Request) {
sandboxes := km.sandboxCache.getKataSandboxes()
sandboxes := km.sandboxCache.getSandboxList()
for _, s := range sandboxes {
w.Write([]byte(fmt.Sprintf("%s\n", s)))
}

View File

@ -9,28 +9,31 @@ import (
"sync"
)
type sandboxKubeData struct {
uid string
name string
namespace string
}
type sandboxCache struct {
*sync.Mutex
// the bool value tracks if the pod is a kata one (true) or not (false)
sandboxes map[string]bool
// the sandboxKubeData links the sandbox id from the container manager to the pod metadata of kubernetes
sandboxes map[string]sandboxKubeData
}
func (sc *sandboxCache) getAllSandboxes() map[string]bool {
func (sc *sandboxCache) getSandboxes() map[string]sandboxKubeData {
sc.Lock()
defer sc.Unlock()
return sc.sandboxes
}
func (sc *sandboxCache) getKataSandboxes() []string {
func (sc *sandboxCache) getSandboxList() []string {
sc.Lock()
defer sc.Unlock()
var katasandboxes []string
for id, isKata := range sc.sandboxes {
if isKata {
katasandboxes = append(katasandboxes, id)
}
var sandboxList []string
for id := range sc.sandboxes {
sandboxList = append(sandboxList, id)
}
return katasandboxes
return sandboxList
}
func (sc *sandboxCache) deleteIfExists(id string) bool {
@ -46,7 +49,7 @@ func (sc *sandboxCache) deleteIfExists(id string) bool {
return false
}
func (sc *sandboxCache) putIfNotExists(id string, value bool) bool {
func (sc *sandboxCache) putIfNotExists(id string, value sandboxKubeData) bool {
sc.Lock()
defer sc.Unlock()
@ -59,7 +62,14 @@ func (sc *sandboxCache) putIfNotExists(id string, value bool) bool {
return false
}
func (sc *sandboxCache) set(sandboxes map[string]bool) {
func (sc *sandboxCache) setMetadata(id string, value sandboxKubeData) {
sc.Lock()
defer sc.Unlock()
sc.sandboxes[id] = value
}
func (sc *sandboxCache) set(sandboxes map[string]sandboxKubeData) {
sc.Lock()
defer sc.Unlock()
sc.sandboxes = sandboxes

View File

@ -16,24 +16,24 @@ func TestSandboxCache(t *testing.T) {
assert := assert.New(t)
sc := &sandboxCache{
Mutex: &sync.Mutex{},
sandboxes: make(map[string]bool),
sandboxes: make(map[string]sandboxKubeData),
}
scMap := map[string]bool{"111": true}
scMap := map[string]sandboxKubeData{"111": {"1-2-3", "test-name", "test-namespace"}}
sc.set(scMap)
scMap = sc.getAllSandboxes()
scMap = sc.getSandboxes()
assert.Equal(1, len(scMap))
// put new item
id := "new-id"
b := sc.putIfNotExists(id, true)
b := sc.putIfNotExists(id, sandboxKubeData{})
assert.Equal(true, b)
assert.Equal(2, len(scMap))
// put key that alreay exists
b = sc.putIfNotExists(id, true)
b = sc.putIfNotExists(id, sandboxKubeData{})
assert.Equal(false, b)
b = sc.deleteIfExists(id)