mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-30 20:54:26 +00:00
kata-monitor: trivial: rename symbols & labels
We introduced collection of sandboxes metadata from the CRI that will be attached to the sandbox metrics: this will allow to immediately match sandboxes metrics with CRI workloads. Rename the symbols from *Kube* to *CRI* as the metadata will be there every time pods are created through CRI, also if kubernetes is not installed (e.g., 'crictl runp'). Signed-off-by: Francesco Giudici <fgiudici@redhat.com>
This commit is contained in:
parent
3ac52e8193
commit
fec26f8e51
@ -141,7 +141,7 @@ func (km *KataMonitor) syncSandboxes(sandboxList []string) ([]string, error) {
|
|||||||
for _, pod := range r.Items {
|
for _, pod := range r.Items {
|
||||||
for _, sandbox := range sandboxList {
|
for _, sandbox := range sandboxList {
|
||||||
if pod.Id == sandbox {
|
if pod.Id == sandbox {
|
||||||
km.sandboxCache.setMetadata(sandbox, sandboxKubeData{
|
km.sandboxCache.setCRIMetadata(sandbox, sandboxCRIMetadata{
|
||||||
uid: pod.Metadata.Uid,
|
uid: pod.Metadata.Uid,
|
||||||
name: pod.Metadata.Name,
|
name: pod.Metadata.Name,
|
||||||
namespace: pod.Metadata.Namespace,
|
namespace: pod.Metadata.Namespace,
|
||||||
@ -150,9 +150,9 @@ func (km *KataMonitor) syncSandboxes(sandboxList []string) ([]string, error) {
|
|||||||
sandboxList = removeFromSandboxList(sandboxList, sandbox)
|
sandboxList = removeFromSandboxList(sandboxList, sandbox)
|
||||||
|
|
||||||
monitorLog.WithFields(logrus.Fields{
|
monitorLog.WithFields(logrus.Fields{
|
||||||
"Pod Name": pod.Metadata.Name,
|
"cri-name": pod.Metadata.Name,
|
||||||
"Pod Namespace": pod.Metadata.Namespace,
|
"cri-namespace": pod.Metadata.Namespace,
|
||||||
"Pod UID": pod.Metadata.Uid,
|
"cri-uid": pod.Metadata.Uid,
|
||||||
}).Debugf("Synced KATA POD %s", pod.Id)
|
}).Debugf("Synced KATA POD %s", pod.Id)
|
||||||
|
|
||||||
break
|
break
|
||||||
|
@ -160,12 +160,12 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
|
|||||||
|
|
||||||
// get metrics from sandbox's shim
|
// get metrics from sandbox's shim
|
||||||
for _, sandboxID := range sandboxes {
|
for _, sandboxID := range sandboxes {
|
||||||
sandboxMetadata, ok := km.sandboxCache.getMetadata(sandboxID)
|
sandboxMetadata, ok := km.sandboxCache.getCRIMetadata(sandboxID)
|
||||||
if !ok { // likely the sandbox has been just removed
|
if !ok { // likely the sandbox has been just removed
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(sandboxID string, sandboxMetadata sandboxKubeData, results chan<- []*dto.MetricFamily) {
|
go func(sandboxID string, sandboxMetadata sandboxCRIMetadata, results chan<- []*dto.MetricFamily) {
|
||||||
sandboxMetrics, err := getParsedMetrics(sandboxID, sandboxMetadata)
|
sandboxMetrics, err := getParsedMetrics(sandboxID, sandboxMetadata)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
|
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
|
||||||
@ -223,7 +223,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getParsedMetrics(sandboxID string, sandboxMetadata sandboxKubeData) ([]*dto.MetricFamily, error) {
|
func getParsedMetrics(sandboxID string, sandboxMetadata sandboxCRIMetadata) ([]*dto.MetricFamily, error) {
|
||||||
body, err := doGet(sandboxID, defaultTimeout, "metrics")
|
body, err := doGet(sandboxID, defaultTimeout, "metrics")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -244,7 +244,7 @@ func GetSandboxMetrics(sandboxID string) (string, error) {
|
|||||||
|
|
||||||
// parsePrometheusMetrics will decode metrics from Prometheus text format
|
// parsePrometheusMetrics will decode metrics from Prometheus text format
|
||||||
// and return array of *dto.MetricFamily with an ASC order
|
// and return array of *dto.MetricFamily with an ASC order
|
||||||
func parsePrometheusMetrics(sandboxID string, sandboxMetadata sandboxKubeData, body []byte) ([]*dto.MetricFamily, error) {
|
func parsePrometheusMetrics(sandboxID string, sandboxMetadata sandboxCRIMetadata, body []byte) ([]*dto.MetricFamily, error) {
|
||||||
reader := bytes.NewReader(body)
|
reader := bytes.NewReader(body)
|
||||||
decoder := expfmt.NewDecoder(reader, expfmt.FmtText)
|
decoder := expfmt.NewDecoder(reader, expfmt.FmtText)
|
||||||
|
|
||||||
@ -268,15 +268,15 @@ func parsePrometheusMetrics(sandboxID string, sandboxMetadata sandboxKubeData, b
|
|||||||
Value: mutils.String2Pointer(sandboxID),
|
Value: mutils.String2Pointer(sandboxID),
|
||||||
},
|
},
|
||||||
&dto.LabelPair{
|
&dto.LabelPair{
|
||||||
Name: mutils.String2Pointer("kube_uid"),
|
Name: mutils.String2Pointer("cri_uid"),
|
||||||
Value: mutils.String2Pointer(sandboxMetadata.uid),
|
Value: mutils.String2Pointer(sandboxMetadata.uid),
|
||||||
},
|
},
|
||||||
&dto.LabelPair{
|
&dto.LabelPair{
|
||||||
Name: mutils.String2Pointer("kube_name"),
|
Name: mutils.String2Pointer("cri_name"),
|
||||||
Value: mutils.String2Pointer(sandboxMetadata.name),
|
Value: mutils.String2Pointer(sandboxMetadata.name),
|
||||||
},
|
},
|
||||||
&dto.LabelPair{
|
&dto.LabelPair{
|
||||||
Name: mutils.String2Pointer("kube_namespace"),
|
Name: mutils.String2Pointer("cri_namespace"),
|
||||||
Value: mutils.String2Pointer(sandboxMetadata.namespace),
|
Value: mutils.String2Pointer(sandboxMetadata.namespace),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
@ -40,7 +40,7 @@ ttt 999
|
|||||||
func TestParsePrometheusMetrics(t *testing.T) {
|
func TestParsePrometheusMetrics(t *testing.T) {
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
sandboxID := "sandboxID-abc"
|
sandboxID := "sandboxID-abc"
|
||||||
sandboxMetadata := sandboxKubeData{"123", "pod-name", "pod-namespace"}
|
sandboxMetadata := sandboxCRIMetadata{"123", "pod-name", "pod-namespace"}
|
||||||
|
|
||||||
// parse metrics
|
// parse metrics
|
||||||
list, err := parsePrometheusMetrics(sandboxID, sandboxMetadata, []byte(shimMetricBody))
|
list, err := parsePrometheusMetrics(sandboxID, sandboxMetadata, []byte(shimMetricBody))
|
||||||
@ -60,12 +60,12 @@ func TestParsePrometheusMetrics(t *testing.T) {
|
|||||||
assert.Equal(4, len(m.Label), "should have 4 labels")
|
assert.Equal(4, len(m.Label), "should have 4 labels")
|
||||||
assert.Equal("sandbox_id", *m.Label[0].Name, "label name should be sandbox_id")
|
assert.Equal("sandbox_id", *m.Label[0].Name, "label name should be sandbox_id")
|
||||||
assert.Equal(sandboxID, *m.Label[0].Value, "label value should be", sandboxID)
|
assert.Equal(sandboxID, *m.Label[0].Value, "label value should be", sandboxID)
|
||||||
assert.Equal("kube_uid", *m.Label[1].Name, "label name should be kube_uid")
|
assert.Equal("cri_uid", *m.Label[1].Name, "label name should be cri_uid")
|
||||||
assert.Equal(sandboxMetadata.uid, *m.Label[1].Value, "label value should be", sandboxMetadata.uid)
|
assert.Equal(sandboxMetadata.uid, *m.Label[1].Value, "label value should be", sandboxMetadata.uid)
|
||||||
|
|
||||||
assert.Equal("kube_name", *m.Label[2].Name, "label name should be kube_name")
|
assert.Equal("cri_name", *m.Label[2].Name, "label name should be cri_name")
|
||||||
assert.Equal(sandboxMetadata.name, *m.Label[2].Value, "label value should be", sandboxMetadata.name)
|
assert.Equal(sandboxMetadata.name, *m.Label[2].Value, "label value should be", sandboxMetadata.name)
|
||||||
assert.Equal("kube_namespace", *m.Label[3].Name, "label name should be kube_namespace")
|
assert.Equal("cri_namespace", *m.Label[3].Name, "label name should be cri_namespace")
|
||||||
assert.Equal(sandboxMetadata.namespace, *m.Label[3].Value, "label value should be", sandboxMetadata.namespace)
|
assert.Equal(sandboxMetadata.namespace, *m.Label[3].Value, "label value should be", sandboxMetadata.namespace)
|
||||||
|
|
||||||
summary := m.Summary
|
summary := m.Summary
|
||||||
|
@ -53,7 +53,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
|
|||||||
runtimeEndpoint: runtimeEndpoint,
|
runtimeEndpoint: runtimeEndpoint,
|
||||||
sandboxCache: &sandboxCache{
|
sandboxCache: &sandboxCache{
|
||||||
Mutex: &sync.Mutex{},
|
Mutex: &sync.Mutex{},
|
||||||
sandboxes: make(map[string]sandboxKubeData),
|
sandboxes: make(map[string]sandboxCRIMetadata),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,13 +105,13 @@ func (km *KataMonitor) startPodCacheUpdater() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
for _, sandbox := range sandboxList {
|
for _, sandbox := range sandboxList {
|
||||||
km.sandboxCache.putIfNotExists(sandbox, sandboxKubeData{})
|
km.sandboxCache.putIfNotExists(sandbox, sandboxCRIMetadata{})
|
||||||
}
|
}
|
||||||
|
|
||||||
monitorLog.Debug("initial sync of sbs directory completed")
|
monitorLog.Debug("initial sync of sbs directory completed")
|
||||||
monitorLog.Tracef("pod list from sbs: %v", sandboxList)
|
monitorLog.Tracef("pod list from sbs: %v", sandboxList)
|
||||||
|
|
||||||
// We should get kubernetes metadata from the container manager for each new kata sandbox we detect.
|
// We try to get CRI (kubernetes) metadata from the container manager for each new kata sandbox we detect.
|
||||||
// It may take a while for data to be available, so we always wait podCacheRefreshDelaySeconds before checking.
|
// It may take a while for data to be available, so we always wait podCacheRefreshDelaySeconds before checking.
|
||||||
cacheUpdateTimer := time.NewTimer(podCacheRefreshDelaySeconds * time.Second)
|
cacheUpdateTimer := time.NewTimer(podCacheRefreshDelaySeconds * time.Second)
|
||||||
cacheUpdateTimerIsSet := true
|
cacheUpdateTimerIsSet := true
|
||||||
@ -127,7 +127,7 @@ func (km *KataMonitor) startPodCacheUpdater() {
|
|||||||
case fsnotify.Create:
|
case fsnotify.Create:
|
||||||
splitPath := strings.Split(event.Name, string(os.PathSeparator))
|
splitPath := strings.Split(event.Name, string(os.PathSeparator))
|
||||||
id := splitPath[len(splitPath)-1]
|
id := splitPath[len(splitPath)-1]
|
||||||
if !km.sandboxCache.putIfNotExists(id, sandboxKubeData{}) {
|
if !km.sandboxCache.putIfNotExists(id, sandboxCRIMetadata{}) {
|
||||||
monitorLog.WithField("pod", id).Warn(
|
monitorLog.WithField("pod", id).Warn(
|
||||||
"CREATE event but pod already present in the sandbox cache")
|
"CREATE event but pod already present in the sandbox cache")
|
||||||
}
|
}
|
||||||
|
@ -9,15 +9,15 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type sandboxKubeData struct {
|
type sandboxCRIMetadata struct {
|
||||||
uid string
|
uid string
|
||||||
name string
|
name string
|
||||||
namespace string
|
namespace string
|
||||||
}
|
}
|
||||||
type sandboxCache struct {
|
type sandboxCache struct {
|
||||||
*sync.Mutex
|
*sync.Mutex
|
||||||
// the sandboxKubeData links the sandbox id from the container manager to the pod metadata of kubernetes
|
// the sandboxCRIMetadata links the sandbox id from the container manager to the pod metadata of kubernetes
|
||||||
sandboxes map[string]sandboxKubeData
|
sandboxes map[string]sandboxCRIMetadata
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *sandboxCache) getSandboxList() []string {
|
func (sc *sandboxCache) getSandboxList() []string {
|
||||||
@ -43,7 +43,7 @@ func (sc *sandboxCache) deleteIfExists(id string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *sandboxCache) putIfNotExists(id string, value sandboxKubeData) bool {
|
func (sc *sandboxCache) putIfNotExists(id string, value sandboxCRIMetadata) bool {
|
||||||
sc.Lock()
|
sc.Lock()
|
||||||
defer sc.Unlock()
|
defer sc.Unlock()
|
||||||
|
|
||||||
@ -56,14 +56,14 @@ func (sc *sandboxCache) putIfNotExists(id string, value sandboxKubeData) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *sandboxCache) setMetadata(id string, value sandboxKubeData) {
|
func (sc *sandboxCache) setCRIMetadata(id string, value sandboxCRIMetadata) {
|
||||||
sc.Lock()
|
sc.Lock()
|
||||||
defer sc.Unlock()
|
defer sc.Unlock()
|
||||||
|
|
||||||
sc.sandboxes[id] = value
|
sc.sandboxes[id] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *sandboxCache) getMetadata(id string) (sandboxKubeData, bool) {
|
func (sc *sandboxCache) getCRIMetadata(id string) (sandboxCRIMetadata, bool) {
|
||||||
sc.Lock()
|
sc.Lock()
|
||||||
defer sc.Unlock()
|
defer sc.Unlock()
|
||||||
|
|
||||||
|
@ -16,19 +16,19 @@ func TestSandboxCache(t *testing.T) {
|
|||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
sc := &sandboxCache{
|
sc := &sandboxCache{
|
||||||
Mutex: &sync.Mutex{},
|
Mutex: &sync.Mutex{},
|
||||||
sandboxes: map[string]sandboxKubeData{"111": {"1-2-3", "test-name", "test-namespace"}},
|
sandboxes: map[string]sandboxCRIMetadata{"111": {"1-2-3", "test-name", "test-namespace"}},
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Equal(1, len(sc.getSandboxList()))
|
assert.Equal(1, len(sc.getSandboxList()))
|
||||||
|
|
||||||
// put new item
|
// put new item
|
||||||
id := "new-id"
|
id := "new-id"
|
||||||
b := sc.putIfNotExists(id, sandboxKubeData{})
|
b := sc.putIfNotExists(id, sandboxCRIMetadata{})
|
||||||
assert.Equal(true, b)
|
assert.Equal(true, b)
|
||||||
assert.Equal(2, len(sc.getSandboxList()))
|
assert.Equal(2, len(sc.getSandboxList()))
|
||||||
|
|
||||||
// put key that alreay exists
|
// put key that alreay exists
|
||||||
b = sc.putIfNotExists(id, sandboxKubeData{})
|
b = sc.putIfNotExists(id, sandboxCRIMetadata{})
|
||||||
assert.Equal(false, b)
|
assert.Equal(false, b)
|
||||||
|
|
||||||
b = sc.deleteIfExists(id)
|
b = sc.deleteIfExists(id)
|
||||||
|
Loading…
Reference in New Issue
Block a user