mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-24 06:27:39 +00:00
kata-monitor: improve sandbox caching
In order to retrieve the list of sandboxes, we poll the container engine every 15 seconds via the CRI. Once we have the list we have to inspect each pod to find out the kata ones. This commit extend the sandbox cache to keep track of all the pods, marking the kata ones, so that during the next polling only the new sandboxes should be inspected to figure out which ones are using the kata runtime. Fixes: #2563 Signed-off-by: Francesco Giudici <fgiudici@redhat.com>
This commit is contained in:
parent
fc067d61d4
commit
245a12bbb7
@ -117,13 +117,12 @@ func parseEndpoint(endpoint string) (string, string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// getSandboxes gets kata sandboxes from the container engine.
|
||||
func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) {
|
||||
|
||||
sandboxMap := make(map[string]struct{})
|
||||
// getSandboxes gets ready sandboxes from the container engine and returns an updated sandboxMap
|
||||
func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool, error) {
|
||||
newMap := make(map[string]bool)
|
||||
runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint)
|
||||
if err != nil {
|
||||
return sandboxMap, err
|
||||
return newMap, err
|
||||
}
|
||||
defer closeConnection(runtimeConn)
|
||||
|
||||
@ -139,11 +138,17 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) {
|
||||
monitorLog.Debugf("ListPodSandboxRequest: %v", request)
|
||||
r, err := runtimeClient.ListPodSandbox(context.Background(), request)
|
||||
if err != nil {
|
||||
return sandboxMap, err
|
||||
return newMap, err
|
||||
}
|
||||
monitorLog.Debugf("ListPodSandboxResponse: %v", r)
|
||||
|
||||
for _, pod := range r.Items {
|
||||
// Use the cached data if available
|
||||
if isKata, ok := sandboxMap[pod.Id]; ok {
|
||||
newMap[pod.Id] = isKata
|
||||
continue
|
||||
}
|
||||
|
||||
request := &pb.PodSandboxStatusRequest{
|
||||
PodSandboxId: pod.Id,
|
||||
Verbose: true,
|
||||
@ -151,7 +156,7 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) {
|
||||
|
||||
r, err := runtimeClient.PodSandboxStatus(context.Background(), request)
|
||||
if err != nil {
|
||||
return sandboxMap, err
|
||||
return newMap, err
|
||||
}
|
||||
|
||||
lowRuntime := ""
|
||||
@ -187,7 +192,7 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) {
|
||||
// only for kata PODs.
|
||||
if lowRuntime == "" {
|
||||
monitorLog.WithField("pod", r).Warning("unable to retrieve the runtime type")
|
||||
sandboxMap[pod.Id] = struct{}{}
|
||||
newMap[pod.Id] = true
|
||||
continue
|
||||
}
|
||||
|
||||
@ -195,9 +200,11 @@ func (km *KataMonitor) getSandboxes() (map[string]struct{}, error) {
|
||||
"low runtime": lowRuntime,
|
||||
}).Debug("")
|
||||
if strings.Contains(lowRuntime, "kata") {
|
||||
sandboxMap[pod.Id] = struct{}{}
|
||||
newMap[pod.Id] = true
|
||||
} else {
|
||||
newMap[pod.Id] = false
|
||||
}
|
||||
}
|
||||
|
||||
return sandboxMap, nil
|
||||
return newMap, nil
|
||||
}
|
||||
|
@ -140,8 +140,8 @@ func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error {
|
||||
|
||||
// aggregateSandboxMetrics will get metrics from one sandbox and do some process
|
||||
func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
|
||||
// get all sandboxes from cache
|
||||
sandboxes := km.sandboxCache.getAllSandboxes()
|
||||
// get all kata sandboxes from cache
|
||||
sandboxes := km.sandboxCache.getKataSandboxes()
|
||||
// save running kata pods as a metrics.
|
||||
runningShimCount.Set(float64(len(sandboxes)))
|
||||
|
||||
@ -159,7 +159,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
|
||||
monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count")
|
||||
|
||||
// get metrics from sandbox's shim
|
||||
for sandboxID := range sandboxes {
|
||||
for _, sandboxID := range sandboxes {
|
||||
wg.Add(1)
|
||||
go func(sandboxID string, results chan<- []*dto.MetricFamily) {
|
||||
sandboxMetrics, err := getParsedMetrics(sandboxID)
|
||||
|
@ -50,7 +50,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
|
||||
runtimeEndpoint: runtimeEndpoint,
|
||||
sandboxCache: &sandboxCache{
|
||||
Mutex: &sync.Mutex{},
|
||||
sandboxes: make(map[string]struct{}),
|
||||
sandboxes: make(map[string]bool),
|
||||
},
|
||||
}
|
||||
|
||||
@ -66,7 +66,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
|
||||
func (km *KataMonitor) startPodCacheUpdater() {
|
||||
for {
|
||||
time.Sleep(podCacheRefreshTimeSeconds * time.Second)
|
||||
sandboxes, err := km.getSandboxes()
|
||||
sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes())
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).Error("failed to get sandboxes")
|
||||
continue
|
||||
@ -95,20 +95,8 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// ListSandboxes list all sandboxes running in Kata
|
||||
func (km *KataMonitor) ListSandboxes(w http.ResponseWriter, r *http.Request) {
|
||||
sandboxes := km.getSandboxList()
|
||||
sandboxes := km.sandboxCache.getKataSandboxes()
|
||||
for _, s := range sandboxes {
|
||||
w.Write([]byte(fmt.Sprintf("%s\n", s)))
|
||||
}
|
||||
}
|
||||
|
||||
func (km *KataMonitor) getSandboxList() []string {
|
||||
sn := km.sandboxCache.getAllSandboxes()
|
||||
result := make([]string, len(sn))
|
||||
|
||||
i := 0
|
||||
for k := range sn {
|
||||
result[i] = k
|
||||
i++
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
@ -11,15 +11,28 @@ import (
|
||||
|
||||
type sandboxCache struct {
|
||||
*sync.Mutex
|
||||
sandboxes map[string]struct{}
|
||||
// the bool value tracks if the pod is a kata one (true) or not (false)
|
||||
sandboxes map[string]bool
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) getAllSandboxes() map[string]struct{} {
|
||||
func (sc *sandboxCache) getAllSandboxes() map[string]bool {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
return sc.sandboxes
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) getKataSandboxes() []string {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
var katasandboxes []string
|
||||
for id, isKata := range sc.sandboxes {
|
||||
if isKata {
|
||||
katasandboxes = append(katasandboxes, id)
|
||||
}
|
||||
}
|
||||
return katasandboxes
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) deleteIfExists(id string) bool {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
@ -33,12 +46,12 @@ func (sc *sandboxCache) deleteIfExists(id string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) putIfNotExists(id string) bool {
|
||||
func (sc *sandboxCache) putIfNotExists(id string, value bool) bool {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
|
||||
if _, found := sc.sandboxes[id]; !found {
|
||||
sc.sandboxes[id] = struct{}{}
|
||||
sc.sandboxes[id] = value
|
||||
return true
|
||||
}
|
||||
|
||||
@ -46,7 +59,7 @@ func (sc *sandboxCache) putIfNotExists(id string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) set(sandboxes map[string]struct{}) {
|
||||
func (sc *sandboxCache) set(sandboxes map[string]bool) {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
sc.sandboxes = sandboxes
|
||||
|
@ -16,10 +16,10 @@ func TestSandboxCache(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
sc := &sandboxCache{
|
||||
Mutex: &sync.Mutex{},
|
||||
sandboxes: make(map[string]struct{}),
|
||||
sandboxes: make(map[string]bool),
|
||||
}
|
||||
|
||||
scMap := map[string]struct{}{"111": {}}
|
||||
scMap := map[string]bool{"111": true}
|
||||
|
||||
sc.set(scMap)
|
||||
|
||||
@ -28,12 +28,12 @@ func TestSandboxCache(t *testing.T) {
|
||||
|
||||
// put new item
|
||||
id := "new-id"
|
||||
b := sc.putIfNotExists(id)
|
||||
b := sc.putIfNotExists(id, true)
|
||||
assert.Equal(true, b)
|
||||
assert.Equal(2, len(scMap))
|
||||
|
||||
// put key that alreay exists
|
||||
b = sc.putIfNotExists(id)
|
||||
b = sc.putIfNotExists(id, true)
|
||||
assert.Equal(false, b)
|
||||
|
||||
b = sc.deleteIfExists(id)
|
||||
|
Loading…
Reference in New Issue
Block a user