kata-monitor: improve sandbox caching

In order to retrieve the list of sandboxes, we poll the container engine
every 15 seconds via the CRI. Once we have the list we have to inspect
each pod to find out the kata ones.
This commit extend the sandbox cache to keep track of all the pods,
marking the kata ones, so that during the next polling only the new
sandboxes should be inspected to figure out which ones are using the
kata runtime.

Fixes: #2563
Signed-off-by: Francesco Giudici <fgiudici@redhat.com>
This commit is contained in:
Francesco Giudici 2021-09-01 14:49:20 +02:00
parent fc067d61d4
commit 245a12bbb7
5 changed files with 45 additions and 37 deletions

View File

@ -117,13 +117,12 @@ func parseEndpoint(endpoint string) (string, string, error) {
}
}
// getSandboxes gets kata sandboxes from the container engine.
func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) {
sandboxMap := make(map[string]struct{})
// getSandboxes gets ready sandboxes from the container engine and returns an updated sandboxMap
func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool, error) {
newMap := make(map[string]bool)
runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint)
if err != nil {
return sandboxMap, err
return newMap, err
}
defer closeConnection(runtimeConn)
@ -139,11 +138,17 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) {
monitorLog.Debugf("ListPodSandboxRequest: %v", request)
r, err := runtimeClient.ListPodSandbox(context.Background(), request)
if err != nil {
return sandboxMap, err
return newMap, err
}
monitorLog.Debugf("ListPodSandboxResponse: %v", r)
for _, pod := range r.Items {
// Use the cached data if available
if isKata, ok := sandboxMap[pod.Id]; ok {
newMap[pod.Id] = isKata
continue
}
request := &pb.PodSandboxStatusRequest{
PodSandboxId: pod.Id,
Verbose: true,
@ -151,7 +156,7 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) {
r, err := runtimeClient.PodSandboxStatus(context.Background(), request)
if err != nil {
return sandboxMap, err
return newMap, err
}
lowRuntime := ""
@ -187,7 +192,7 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) {
// only for kata PODs.
if lowRuntime == "" {
monitorLog.WithField("pod", r).Warning("unable to retrieve the runtime type")
sandboxMap[pod.Id] = struct{}{}
newMap[pod.Id] = true
continue
}
@ -195,9 +200,11 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) {
"low runtime": lowRuntime,
}).Debug("")
if strings.Contains(lowRuntime, "kata") {
sandboxMap[pod.Id] = struct{}{}
newMap[pod.Id] = true
} else {
newMap[pod.Id] = false
}
}
return sandboxMap, nil
return newMap, nil
}

View File

@ -140,8 +140,8 @@ func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error {
// aggregateSandboxMetrics will get metrics from one sandbox and do some process
func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
// get all sandboxes from cache
sandboxes := km.sandboxCache.getAllSandboxes()
// get all kata sandboxes from cache
sandboxes := km.sandboxCache.getKataSandboxes()
// save running kata pods as a metrics.
runningShimCount.Set(float64(len(sandboxes)))
@ -159,7 +159,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count")
// get metrics from sandbox's shim
for sandboxID := range sandboxes {
for _, sandboxID := range sandboxes {
wg.Add(1)
go func(sandboxID string, results chan<- []*dto.MetricFamily) {
sandboxMetrics, err := getParsedMetrics(sandboxID)

View File

@ -50,7 +50,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
runtimeEndpoint: runtimeEndpoint,
sandboxCache: &sandboxCache{
Mutex: &sync.Mutex{},
sandboxes: make(map[string]struct{}),
sandboxes: make(map[string]bool),
},
}
@ -66,7 +66,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
func (km *KataMonitor) startPodCacheUpdater() {
for {
time.Sleep(podCacheRefreshTimeSeconds * time.Second)
sandboxes, err := km.getSandboxes()
sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes())
if err != nil {
monitorLog.WithError(err).Error("failed to get sandboxes")
continue
@ -95,20 +95,8 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) {
// ListSandboxes list all sandboxes running in Kata
func (km *KataMonitor) ListSandboxes(w http.ResponseWriter, r *http.Request) {
sandboxes := km.getSandboxList()
sandboxes := km.sandboxCache.getKataSandboxes()
for _, s := range sandboxes {
w.Write([]byte(fmt.Sprintf("%s\n", s)))
}
}
func (km *KataMonitor) getSandboxList() []string {
sn := km.sandboxCache.getAllSandboxes()
result := make([]string, len(sn))
i := 0
for k := range sn {
result[i] = k
i++
}
return result
}

View File

@ -11,15 +11,28 @@ import (
type sandboxCache struct {
*sync.Mutex
sandboxes map[string]struct{}
// the bool value tracks if the pod is a kata one (true) or not (false)
sandboxes map[string]bool
}
func (sc *sandboxCache) getAllSandboxes() map[string]struct{} {
func (sc *sandboxCache) getAllSandboxes() map[string]bool {
sc.Lock()
defer sc.Unlock()
return sc.sandboxes
}
func (sc *sandboxCache) getKataSandboxes() []string {
sc.Lock()
defer sc.Unlock()
var katasandboxes []string
for id, isKata := range sc.sandboxes {
if isKata {
katasandboxes = append(katasandboxes, id)
}
}
return katasandboxes
}
func (sc *sandboxCache) deleteIfExists(id string) bool {
sc.Lock()
defer sc.Unlock()
@ -33,12 +46,12 @@ func (sc *sandboxCache) deleteIfExists(id string) bool {
return false
}
func (sc *sandboxCache) putIfNotExists(id string) bool {
func (sc *sandboxCache) putIfNotExists(id string, value bool) bool {
sc.Lock()
defer sc.Unlock()
if _, found := sc.sandboxes[id]; !found {
sc.sandboxes[id] = struct{}{}
sc.sandboxes[id] = value
return true
}
@ -46,7 +59,7 @@ func (sc *sandboxCache) putIfNotExists(id string) bool {
return false
}
func (sc *sandboxCache) set(sandboxes map[string]struct{}) {
func (sc *sandboxCache) set(sandboxes map[string]bool) {
sc.Lock()
defer sc.Unlock()
sc.sandboxes = sandboxes

View File

@ -16,10 +16,10 @@ func TestSandboxCache(t *testing.T) {
assert := assert.New(t)
sc := &sandboxCache{
Mutex: &sync.Mutex{},
sandboxes: make(map[string]struct{}),
sandboxes: make(map[string]bool),
}
scMap := map[string]struct{}{"111": {}}
scMap := map[string]bool{"111": true}
sc.set(scMap)
@ -28,12 +28,12 @@ func TestSandboxCache(t *testing.T) {
// put new item
id := "new-id"
b := sc.putIfNotExists(id)
b := sc.putIfNotExists(id, true)
assert.Equal(true, b)
assert.Equal(2, len(scMap))
// put key that alreay exists
b = sc.putIfNotExists(id)
b = sc.putIfNotExists(id, true)
assert.Equal(false, b)
b = sc.deleteIfExists(id)