mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-05-01 05:04:26 +00:00
kata-monitor: rework the sandbox cache sync with the container manager
Kata-monitor detects started and terminated kata pods by monitoring the vc/sbs fs (this makes sense since we will have to access that path to access the sockets there to get the metrics from the shim). While kata-monitor updates its sandbox cache based on the sbs fs events, it will schedule also a sync with the container manager via the CRI in order to sync the list of sandboxes there. The container manager will be the ultimate source of truth, so we will stick with the response from the container manager, removing the sandboxes not reported from the container manager. May happen anyway that when we check the container manager, the new kata pod is not reported yet, and we will remove it from the kata-monitor pod cache. If we don't get any new kata pod added or removed, we will not check with the container manager again, missing reporting metrics about that kata pod. Let's stick with the sbs fs as the source of truth: we will update the cache just following what happens on the sbs fs. At this point we may have also decided to drop the container manager connection... better instead to keep it in order to get the kube pod metadata from it, i.e., the kube UID, Name and Namespace associated with the sandbox. Every time we get a new sandbox from the sbs fs we will try to retrieve the pod metadata associated with it. Right now we just attach the container manager sandbox id as a label to the exposed metrics, making hard to link the metrics to the running pod in the kubernetes cluster. With kubernetes pod metadata we will be able to add them as labels to map explicitly the metrics to the kubernetes workloads. Fixes: #3550 Signed-off-by: Francesco Giudici <fgiudici@redhat.com>
This commit is contained in:
parent
e78d80ea0d
commit
7516a8c51b
@ -113,15 +113,15 @@ func parseEndpoint(endpoint string) (string, string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// getSandboxes gets ready sandboxes from the container engine and returns an updated sandboxMap
|
||||
func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool, error) {
|
||||
newMap := make(map[string]bool)
|
||||
// syncSandboxes gets pods metadata from the container manager and updates the sandbox cache.
|
||||
func (km *KataMonitor) syncSandboxes(sandboxList []string) ([]string, error) {
|
||||
runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint)
|
||||
if err != nil {
|
||||
return newMap, err
|
||||
return sandboxList, err
|
||||
}
|
||||
defer closeConnection(runtimeConn)
|
||||
|
||||
// TODO: if len(sandboxList) is 1, better we just runtimeClient.PodSandboxStatus(...) targeting the single sandbox
|
||||
filter := &pb.PodSandboxFilter{
|
||||
State: &pb.PodSandboxStateValue{
|
||||
State: pb.PodSandboxState_SANDBOX_READY,
|
||||
@ -134,32 +134,32 @@ func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool
|
||||
monitorLog.Tracef("ListPodSandboxRequest: %v", request)
|
||||
r, err := runtimeClient.ListPodSandbox(context.Background(), request)
|
||||
if err != nil {
|
||||
return newMap, err
|
||||
return sandboxList, err
|
||||
}
|
||||
monitorLog.Tracef("ListPodSandboxResponse: %v", r)
|
||||
|
||||
for _, pod := range r.Items {
|
||||
// Use the cached data if available
|
||||
if isKata, ok := sandboxMap[pod.Id]; ok {
|
||||
newMap[pod.Id] = isKata
|
||||
if isKata {
|
||||
monitorLog.Debugf("KATA POD %s (cached)", pod.Id)
|
||||
}
|
||||
continue
|
||||
}
|
||||
for _, sandbox := range sandboxList {
|
||||
if pod.Id == sandbox {
|
||||
km.sandboxCache.setMetadata(sandbox, sandboxKubeData{
|
||||
uid: pod.Metadata.Uid,
|
||||
name: pod.Metadata.Name,
|
||||
namespace: pod.Metadata.Namespace,
|
||||
})
|
||||
|
||||
sandboxList = removeFromSandboxList(sandboxList, sandbox)
|
||||
|
||||
// Check if a directory associated with the POD ID exist on the kata fs:
|
||||
// if so we know that the POD is a kata one.
|
||||
newMap[pod.Id] = checkSandboxFSExists(pod.Id)
|
||||
if newMap[pod.Id] {
|
||||
monitorLog.Debugf("KATA POD %s (new)", pod.Id)
|
||||
}
|
||||
monitorLog.WithFields(logrus.Fields{
|
||||
"id": pod.Id,
|
||||
"is kata": newMap[pod.Id],
|
||||
"pod": pod,
|
||||
}).Trace("")
|
||||
}
|
||||
"Pod Name": pod.Metadata.Name,
|
||||
"Pod Namespace": pod.Metadata.Namespace,
|
||||
"Pod UID": pod.Metadata.Uid,
|
||||
}).Debugf("Synced KATA POD %s", pod.Id)
|
||||
|
||||
return newMap, nil
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: here we should mark the sandboxes we failed to retrieve info from: we should try a finite number of times
|
||||
// to retrieve their metadata: if we fail resign and remove them from the sanbox cache (with a Warning log).
|
||||
return sandboxList, nil
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error {
|
||||
// aggregateSandboxMetrics will get metrics from one sandbox and do some process
|
||||
func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
|
||||
// get all kata sandboxes from cache
|
||||
sandboxes := km.sandboxCache.getKataSandboxes()
|
||||
sandboxes := km.sandboxCache.getSandboxList()
|
||||
// save running kata pods as a metrics.
|
||||
runningShimCount.Set(float64(len(sandboxes)))
|
||||
|
||||
|
@ -53,7 +53,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
|
||||
runtimeEndpoint: runtimeEndpoint,
|
||||
sandboxCache: &sandboxCache{
|
||||
Mutex: &sync.Mutex{},
|
||||
sandboxes: make(map[string]bool),
|
||||
sandboxes: make(map[string]sandboxKubeData),
|
||||
},
|
||||
}
|
||||
|
||||
@ -65,6 +65,15 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
|
||||
return km, nil
|
||||
}
|
||||
|
||||
func removeFromSandboxList(sandboxList []string, sandboxToRemove string) []string {
|
||||
for i, sandbox := range sandboxList {
|
||||
if sandbox == sandboxToRemove {
|
||||
return append(sandboxList[:i], sandboxList[i+1:]...)
|
||||
}
|
||||
}
|
||||
return sandboxList
|
||||
}
|
||||
|
||||
// startPodCacheUpdater will boot a thread to manage sandbox cache
|
||||
func (km *KataMonitor) startPodCacheUpdater() {
|
||||
sbsWatcher, err := fsnotify.NewWatcher()
|
||||
@ -84,9 +93,24 @@ func (km *KataMonitor) startPodCacheUpdater() {
|
||||
monitorLog.Debugf("started fs monitoring @%s", getSandboxFS())
|
||||
break
|
||||
}
|
||||
// we refresh the pod cache once if we get multiple add/delete pod events in a short time (< podCacheRefreshDelaySeconds)
|
||||
// Initial sync with the kata sandboxes already running
|
||||
sbsFile, err := os.Open(getSandboxFS())
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).Fatal("cannot open sandboxes fs")
|
||||
os.Exit(1)
|
||||
}
|
||||
sandboxList, err := sbsFile.Readdirnames(0)
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).Fatal("cannot read sandboxes fs")
|
||||
os.Exit(1)
|
||||
}
|
||||
monitorLog.Debug("initial sync of sbs directory completed")
|
||||
monitorLog.Tracef("pod list from sbs: %v", sandboxList)
|
||||
|
||||
// We should get kubernetes metadata from the container manager for each new kata sandbox we detect.
|
||||
// It may take a while for data to be available, so we always wait podCacheRefreshDelaySeconds before checking.
|
||||
cacheUpdateTimer := time.NewTimer(podCacheRefreshDelaySeconds * time.Second)
|
||||
cacheUpdateTimerWasSet := false
|
||||
cacheUpdateTimerIsSet := true
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-sbsWatcher.Events:
|
||||
@ -99,11 +123,18 @@ func (km *KataMonitor) startPodCacheUpdater() {
|
||||
case fsnotify.Create:
|
||||
splitPath := strings.Split(event.Name, string(os.PathSeparator))
|
||||
id := splitPath[len(splitPath)-1]
|
||||
if !km.sandboxCache.putIfNotExists(id, true) {
|
||||
if !km.sandboxCache.putIfNotExists(id, sandboxKubeData{}) {
|
||||
monitorLog.WithField("pod", id).Warn(
|
||||
"CREATE event but pod already present in the sandbox cache")
|
||||
}
|
||||
sandboxList = append(sandboxList, id)
|
||||
monitorLog.WithField("pod", id).Info("sandbox cache: added pod")
|
||||
if !cacheUpdateTimerIsSet {
|
||||
cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second)
|
||||
cacheUpdateTimerIsSet = true
|
||||
monitorLog.Debugf(
|
||||
"cache update timer fires in %d secs", podCacheRefreshDelaySeconds)
|
||||
}
|
||||
|
||||
case fsnotify.Remove:
|
||||
splitPath := strings.Split(event.Name, string(os.PathSeparator))
|
||||
@ -112,26 +143,27 @@ func (km *KataMonitor) startPodCacheUpdater() {
|
||||
monitorLog.WithField("pod", id).Warn(
|
||||
"REMOVE event but pod was missing from the sandbox cache")
|
||||
}
|
||||
sandboxList = removeFromSandboxList(sandboxList, id)
|
||||
monitorLog.WithField("pod", id).Info("sandbox cache: removed pod")
|
||||
}
|
||||
|
||||
// While we process fs events directly to update the sandbox cache we need to sync with the
|
||||
// container engine to ensure we are on sync with it: we can get out of sync in environments
|
||||
// where kata workloads can be started by other processes than the container engine.
|
||||
cacheUpdateTimerWasSet = cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second)
|
||||
monitorLog.WithField("was reset", cacheUpdateTimerWasSet).Debugf(
|
||||
"cache update timer fires in %d secs", podCacheRefreshDelaySeconds)
|
||||
|
||||
case <-cacheUpdateTimer.C:
|
||||
monitorLog.Debugf("sync sandbox cache with the container engine")
|
||||
sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes())
|
||||
cacheUpdateTimerIsSet = false
|
||||
monitorLog.WithField("pod list", sandboxList).Debugf(
|
||||
"retrieve pods metadata from the container manager")
|
||||
sandboxList, err = km.syncSandboxes(sandboxList)
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).Error("failed to get sandboxes")
|
||||
monitorLog.WithError(err).Error("failed to get sandboxes metadata")
|
||||
continue
|
||||
}
|
||||
monitorLog.WithField("count", len(sandboxes)).Info("synced sandbox cache with the container engine")
|
||||
monitorLog.WithField("sandboxes", sandboxes).Trace("dump sandbox cache")
|
||||
km.sandboxCache.set(sandboxes)
|
||||
if len(sandboxList) > 0 {
|
||||
monitorLog.WithField("sandboxes", sandboxList).Debugf(
|
||||
"%d sandboxes still miss metadata", len(sandboxList))
|
||||
cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second)
|
||||
cacheUpdateTimerIsSet = true
|
||||
}
|
||||
|
||||
monitorLog.WithField("sandboxes", km.sandboxCache.getSandboxList()).Trace("dump sandbox cache")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -155,7 +187,7 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// ListSandboxes list all sandboxes running in Kata
|
||||
func (km *KataMonitor) ListSandboxes(w http.ResponseWriter, r *http.Request) {
|
||||
sandboxes := km.sandboxCache.getKataSandboxes()
|
||||
sandboxes := km.sandboxCache.getSandboxList()
|
||||
for _, s := range sandboxes {
|
||||
w.Write([]byte(fmt.Sprintf("%s\n", s)))
|
||||
}
|
||||
|
@ -9,28 +9,31 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type sandboxKubeData struct {
|
||||
uid string
|
||||
name string
|
||||
namespace string
|
||||
}
|
||||
type sandboxCache struct {
|
||||
*sync.Mutex
|
||||
// the bool value tracks if the pod is a kata one (true) or not (false)
|
||||
sandboxes map[string]bool
|
||||
// the sandboxKubeData links the sandbox id from the container manager to the pod metadata of kubernetes
|
||||
sandboxes map[string]sandboxKubeData
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) getAllSandboxes() map[string]bool {
|
||||
func (sc *sandboxCache) getSandboxes() map[string]sandboxKubeData {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
return sc.sandboxes
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) getKataSandboxes() []string {
|
||||
func (sc *sandboxCache) getSandboxList() []string {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
var katasandboxes []string
|
||||
for id, isKata := range sc.sandboxes {
|
||||
if isKata {
|
||||
katasandboxes = append(katasandboxes, id)
|
||||
var sandboxList []string
|
||||
for id := range sc.sandboxes {
|
||||
sandboxList = append(sandboxList, id)
|
||||
}
|
||||
}
|
||||
return katasandboxes
|
||||
return sandboxList
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) deleteIfExists(id string) bool {
|
||||
@ -46,7 +49,7 @@ func (sc *sandboxCache) deleteIfExists(id string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) putIfNotExists(id string, value bool) bool {
|
||||
func (sc *sandboxCache) putIfNotExists(id string, value sandboxKubeData) bool {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
|
||||
@ -59,7 +62,14 @@ func (sc *sandboxCache) putIfNotExists(id string, value bool) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) set(sandboxes map[string]bool) {
|
||||
func (sc *sandboxCache) setMetadata(id string, value sandboxKubeData) {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
|
||||
sc.sandboxes[id] = value
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) set(sandboxes map[string]sandboxKubeData) {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
sc.sandboxes = sandboxes
|
||||
|
@ -16,24 +16,24 @@ func TestSandboxCache(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
sc := &sandboxCache{
|
||||
Mutex: &sync.Mutex{},
|
||||
sandboxes: make(map[string]bool),
|
||||
sandboxes: make(map[string]sandboxKubeData),
|
||||
}
|
||||
|
||||
scMap := map[string]bool{"111": true}
|
||||
scMap := map[string]sandboxKubeData{"111": {"1-2-3", "test-name", "test-namespace"}}
|
||||
|
||||
sc.set(scMap)
|
||||
|
||||
scMap = sc.getAllSandboxes()
|
||||
scMap = sc.getSandboxes()
|
||||
assert.Equal(1, len(scMap))
|
||||
|
||||
// put new item
|
||||
id := "new-id"
|
||||
b := sc.putIfNotExists(id, true)
|
||||
b := sc.putIfNotExists(id, sandboxKubeData{})
|
||||
assert.Equal(true, b)
|
||||
assert.Equal(2, len(scMap))
|
||||
|
||||
// put key that alreay exists
|
||||
b = sc.putIfNotExists(id, true)
|
||||
b = sc.putIfNotExists(id, sandboxKubeData{})
|
||||
assert.Equal(false, b)
|
||||
|
||||
b = sc.deleteIfExists(id)
|
||||
|
Loading…
Reference in New Issue
Block a user