diff --git a/src/runtime/cli/kata-monitor/main.go b/src/runtime/cli/kata-monitor/main.go index c3565bcaf..7702a7135 100644 --- a/src/runtime/cli/kata-monitor/main.go +++ b/src/runtime/cli/kata-monitor/main.go @@ -9,7 +9,7 @@ import ( "flag" "net/http" "os" - "runtime" + goruntime "runtime" "text/template" "time" @@ -18,8 +18,7 @@ import ( ) var monitorListenAddr = flag.String("listen-address", ":8090", "The address to listen on for HTTP requests.") -var containerdAddr = flag.String("containerd-address", "/run/containerd/containerd.sock", "Containerd address to accept client requests.") -var containerdConfig = flag.String("containerd-conf", "/etc/containerd/config.toml", "Containerd config file.") +var runtimeEndpoint = flag.String("runtime-endpoint", "/run/containerd/containerd.sock", `Endpoint of CRI container runtime service. (default: "/run/containerd/containerd.sock")`) var logLevel = flag.String("log-level", "info", "Log level of logrus(trace/debug/info/warn/error/fatal/panic).") // These values are overridden via ldflags @@ -59,9 +58,9 @@ func main() { ver := versionInfo{ AppName: appName, Version: version, - GoVersion: runtime.Version(), - Os: runtime.GOOS, - Arch: runtime.GOARCH, + GoVersion: goruntime.Version(), + Os: goruntime.GOOS, + Arch: goruntime.GOARCH, GitCommit: GitCommit, } @@ -85,16 +84,15 @@ func main() { "git-commit": ver.GitCommit, // properties from command-line options - "listen-address": *monitorListenAddr, - "containerd-address": *containerdAddr, - "containerd-conf": *containerdConfig, - "log-level": *logLevel, + "listen-address": *monitorListenAddr, + "runtime-endpoint": *runtimeEndpoint, + "log-level": *logLevel, } logrus.WithFields(announceFields).Info("announce") // create new kataMonitor - km, err := kataMonitor.NewKataMonitor(*containerdAddr, *containerdConfig) + km, err := kataMonitor.NewKataMonitor(*runtimeEndpoint) if err != nil { panic(err) } diff --git a/src/runtime/pkg/kata-monitor/containerd.go b/src/runtime/pkg/kata-monitor/containerd.go deleted file mode 100644 index 774cb98af..000000000 --- a/src/runtime/pkg/kata-monitor/containerd.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright (c) 2020 Ant Financial -// -// SPDX-License-Identifier: Apache-2.0 -// - -package katamonitor - -import ( - "context" - - "github.com/sirupsen/logrus" - - "github.com/containerd/containerd" - "github.com/containerd/containerd/containers" - "github.com/containerd/containerd/namespaces" - "github.com/containerd/typeurl" - - "github.com/kata-containers/kata-containers/src/runtime/pkg/types" - vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers" - "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/oci" - "github.com/opencontainers/runtime-spec/specs-go" -) - -func getContainer(containersClient containers.Store, namespace, cid string) (containers.Container, error) { - ctx := context.Background() - ctx = namespaces.WithNamespace(ctx, namespace) - return containersClient.Get(ctx, cid) -} - -// isSandboxContainer return true if the container is a sandbox container. -func isSandboxContainer(c *containers.Container) bool { - // unmarshal from any to spec. - if c.Spec == nil { - monitorLog.WithField("container", c.ID).Error("container spec is nil") - return false - } - - v, err := typeurl.UnmarshalAny(c.Spec) - if err != nil { - monitorLog.WithError(err).Error("failed to Unmarshal container spec") - return false - } - - // convert to oci spec type - ociSpec := v.(*specs.Spec) - - // get container type - containerType, err := oci.ContainerType(*ociSpec) - if err != nil { - monitorLog.WithError(err).Error("failed to get contaienr type") - return false - } - - // return if is a sandbox container - return containerType == vc.PodSandbox -} - -// getSandboxes get kata sandbox from containerd. -// this will be called only after monitor start. -func (ka *KataMonitor) getSandboxes() (map[string]string, error) { - client, err := containerd.New(ka.containerdAddr) - if err != nil { - return nil, err - } - defer client.Close() - - ctx := context.Background() - - // first all namespaces. - namespaceList, err := client.NamespaceService().List(ctx) - if err != nil { - return nil, err - } - - // map of type: value: namespace> - sandboxMap := make(map[string]string) - - for _, namespace := range namespaceList { - - initSandboxByNamespaceFunc := func(namespace string) error { - ctx := context.Background() - namespacedCtx := namespaces.WithNamespace(ctx, namespace) - // only list Kata Containers pods/containers - containers, err := client.ContainerService().List(namespacedCtx, - "runtime.name~="+types.KataRuntimeNameRegexp+`,labels."io.cri-containerd.kind"==sandbox`) - if err != nil { - return err - } - - for i := range containers { - c := containers[i] - isc := isSandboxContainer(&c) - monitorLog.WithFields(logrus.Fields{"container": c.ID, "result": isc}).Debug("is this a sandbox container?") - if isc { - sandboxMap[c.ID] = namespace - } - } - return nil - } - - if err := initSandboxByNamespaceFunc(namespace); err != nil { - return nil, err - } - } - - return sandboxMap, nil -} diff --git a/src/runtime/pkg/kata-monitor/containerd_test.go b/src/runtime/pkg/kata-monitor/containerd_test.go deleted file mode 100644 index b22448ee1..000000000 --- a/src/runtime/pkg/kata-monitor/containerd_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright (c) 2020 Ant Financial -// -// SPDX-License-Identifier: Apache-2.0 -// - -package katamonitor - -import ( - "testing" - - criContainerdAnnotations "github.com/containerd/cri-containerd/pkg/annotations" - "github.com/containerd/typeurl" - - "github.com/containerd/containerd/containers" - "github.com/opencontainers/runtime-spec/specs-go" - "github.com/stretchr/testify/assert" -) - -func TestIsSandboxContainer(t *testing.T) { - assert := assert.New(t) - - c := &containers.Container{} - isc := isSandboxContainer(c) - assert.Equal(false, isc, "should not be a sandbox container") - - spec := &specs.Spec{ - Annotations: map[string]string{}, - } - - any, err := typeurl.MarshalAny(spec) - assert.Nil(err, "MarshalAny failed for spec") - - c.Spec = any - // default container is a pod(sandbox) container - isc = isSandboxContainer(c) - assert.Equal(true, isc, "should be a sandbox container") - - testCases := []struct { - annotationKey string - annotationValue string - result bool - }{ - { - annotationKey: criContainerdAnnotations.ContainerType, - annotationValue: "", - result: false, - }, - { - annotationKey: criContainerdAnnotations.ContainerType, - annotationValue: criContainerdAnnotations.ContainerTypeContainer, - result: false, - }, - { - annotationKey: criContainerdAnnotations.ContainerType, - annotationValue: "pod", - result: false, - }, - { - annotationKey: criContainerdAnnotations.ContainerType, - annotationValue: criContainerdAnnotations.ContainerTypeSandbox, - result: true, - }, - } - - for _, tc := range testCases { - spec.Annotations = map[string]string{ - tc.annotationKey: tc.annotationValue, - } - any, err := typeurl.MarshalAny(spec) - assert.Nil(err, "MarshalAny failed for spec") - c.Spec = any - isc = isSandboxContainer(c) - assert.Equal(tc.result, isc, "assert failed for checking if is a sandbox container") - } - -} diff --git a/src/runtime/pkg/kata-monitor/cri.go b/src/runtime/pkg/kata-monitor/cri.go new file mode 100644 index 000000000..087f09772 --- /dev/null +++ b/src/runtime/pkg/kata-monitor/cri.go @@ -0,0 +1,221 @@ +// Copyright (c) 2020 Ant Group +// Copyright (c) 2021 Red Hat Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package katamonitor + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/url" + "regexp" + + "github.com/kata-containers/kata-containers/src/runtime/pkg/types" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/xeipuuv/gojsonpointer" + "google.golang.org/grpc" + + pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" +) + +const ( + // unixProtocol is the network protocol of unix socket. + unixProtocol = "unix" + + k8sContainerdNamespace = "k8s.io" +) + +// getAddressAndDialer returns the address parsed from the given endpoint and a context dialer. +func getAddressAndDialer(endpoint string) (string, func(ctx context.Context, addr string) (net.Conn, error), error) { + protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol) + if err != nil { + return "", nil, err + } + if protocol != unixProtocol { + return "", nil, fmt.Errorf("only support unix socket endpoint") + } + + return addr, dial, nil +} + +func getConnection(endPoint string) (*grpc.ClientConn, error) { + var conn *grpc.ClientConn + monitorLog.Debugf("connect using endpoint '%s' with '%s' timeout", endPoint, defaultTimeout) + addr, dialer, err := getAddressAndDialer(endPoint) + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + conn, err = grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithContextDialer(dialer)) + if err != nil { + errMsg := errors.Wrapf(err, "connect endpoint '%s', make sure you are running as root and the endpoint has been started", endPoint) + return nil, errMsg + } + monitorLog.Debugf("connected successfully using endpoint: %s", endPoint) + return conn, nil +} + +func matchesRegex(pattern, target string) bool { + if pattern == "" { + return true + } + matched, err := regexp.MatchString(pattern, target) + if err != nil { + // Assume it's not a match if an error occurs. + return false + } + return matched +} + +func closeConnection(conn *grpc.ClientConn) error { + if conn == nil { + return nil + } + return conn.Close() +} + +func getRuntimeClient(runtimeEndpoint string) (pb.RuntimeServiceClient, *grpc.ClientConn, error) { + var ( + conn *grpc.ClientConn + err error + ) + // Set up a connection to the server. + // If no EndPoint set then use the default endpoint types + conn, err = getConnection(runtimeEndpoint) + if err != nil { + return nil, nil, err + } + + runtimeClient := pb.NewRuntimeServiceClient(conn) + return runtimeClient, conn, nil +} + +func dial(ctx context.Context, addr string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, unixProtocol, addr) +} + +func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string) (protocol string, addr string, err error) { + if protocol, addr, err = parseEndpoint(endpoint); err != nil && protocol == "" { + fallbackEndpoint := fallbackProtocol + "://" + endpoint + protocol, addr, err = parseEndpoint(fallbackEndpoint) + if err == nil { + monitorLog.Warningf("Using %q as endpoint is deprecated, please consider using full url format %q.", endpoint, fallbackEndpoint) + } + } + return +} + +func parseEndpoint(endpoint string) (string, string, error) { + u, err := url.Parse(endpoint) + if err != nil { + return "", "", err + } + + switch u.Scheme { + case "tcp": + return "tcp", u.Host, nil + + case "unix": + return "unix", u.Path, nil + + case "": + return "", "", fmt.Errorf("using %q as endpoint is deprecated, please consider using full url format", endpoint) + + default: + return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme) + } +} + +// getSandboxes get kata sandbox from the container engine. +// this will be called only after monitor start. +func (km *KataMonitor) getSandboxes() (map[string]string, error) { + + sandboxMap := map[string]string{} + runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint) + if err != nil { + return sandboxMap, err + } + defer closeConnection(runtimeConn) + + filter := &pb.PodSandboxFilter{ + State: &pb.PodSandboxStateValue{ + State: pb.PodSandboxState_SANDBOX_READY, + }, + } + + request := &pb.ListPodSandboxRequest{ + Filter: filter, + } + monitorLog.Debugf("ListPodSandboxRequest: %v", request) + r, err := runtimeClient.ListPodSandbox(context.Background(), request) + if err != nil { + return sandboxMap, err + } + monitorLog.Debugf("ListPodSandboxResponse: %v", r) + + for _, pod := range r.Items { + request := &pb.PodSandboxStatusRequest{ + PodSandboxId: pod.Id, + Verbose: true, + } + + r, err := runtimeClient.PodSandboxStatus(context.Background(), request) + if err != nil { + return sandboxMap, err + } + + lowRuntime := "" + highRuntime := "" + var res map[string]interface{} + if err := json.Unmarshal([]byte(r.Info["info"]), &res); err != nil { + monitorLog.WithError(err).WithField("pod", r).Error("failed to Unmarshal pod info") + continue + } else { + monitorLog.WithField("pod info", res).Debug("") + // get high level container runtime + pointer, _ := gojsonpointer.NewJsonPointer("/runtimeSpec/annotations/io.container.manager") + rt, _, _ := pointer.Get(res) + if rt != nil { + if str, ok := rt.(string); ok { + if str == "cri-o" { + highRuntime = RuntimeCRIO + } else { + highRuntime = RuntimeContainerd + } + } + } + + // get low level container runtime + // containerd stores the pod runtime in "/runtimeType" while CRI-O stores it the + // io.kubernetes.cri-o.RuntimeHandler annotation: check for both. + keys := []string{"/runtimeType", "/runtimeSpec/annotations/io.kubernetes.cri-o.RuntimeHandler"} + for _, key := range keys { + pointer, _ := gojsonpointer.NewJsonPointer(key) + rt, _, _ := pointer.Get(res) + if rt != nil { + if str, ok := rt.(string); ok { + lowRuntime = str + break + } + } + } + } + + // Filter by pod name/namespace regular expressions. + monitorLog.WithFields(logrus.Fields{ + "low runtime": lowRuntime, + "high runtime": highRuntime, + }).Debug("") + if matchesRegex(types.KataRuntimeNameRegexp, lowRuntime) || matchesRegex("kata*", lowRuntime) { + sandboxMap[pod.Id] = highRuntime + } + } + + return sandboxMap, nil +} diff --git a/src/runtime/pkg/kata-monitor/metrics.go b/src/runtime/pkg/kata-monitor/metrics.go index 8e0d4c7e4..bbc62d18d 100644 --- a/src/runtime/pkg/kata-monitor/metrics.go +++ b/src/runtime/pkg/kata-monitor/metrics.go @@ -74,8 +74,14 @@ func registerMetrics() { // getMonitorAddress get metrics address for a sandbox, the abstract unix socket address is saved // in `metrics_address` with the same place of `address`. -func (km *KataMonitor) getMonitorAddress(sandboxID, namespace string) (string, error) { - path := filepath.Join(km.containerdStatePath, types.ContainerdRuntimeTaskPath, namespace, sandboxID, "monitor_address") +func (km *KataMonitor) getMonitorAddress(sandboxID, runtime string) (string, error) { + path := filepath.Join("/run/containerd", types.ContainerdRuntimeTaskPath, k8sContainerdNamespace, sandboxID, "monitor_address") + if runtime == RuntimeCRIO { + path = filepath.Join("/run/containers/storage/overlay-containers", sandboxID, "userdata", "monitor_address") + } + + monitorLog.WithField("path", path).Debug("get monitor address") + data, err := ioutil.ReadFile(path) if err != nil { return "", err @@ -173,9 +179,9 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count") // get metrics from sandbox's shim - for sandboxID, namespace := range sandboxes { + for sandboxID, runtime := range sandboxes { wg.Add(1) - go func(sandboxID, namespace string, results chan<- []*dto.MetricFamily) { + go func(sandboxID, runtime string, results chan<- []*dto.MetricFamily) { sandboxMetrics, err := getParsedMetrics(sandboxID) if err != nil { monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox") @@ -184,7 +190,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { results <- sandboxMetrics wg.Done() monitorLog.WithField("sandbox_id", sandboxID).Debug("job finished") - }(sandboxID, namespace, results) + }(sandboxID, runtime, results) monitorLog.WithField("sandbox_id", sandboxID).Debug("job started") } @@ -278,7 +284,7 @@ func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily, }) } - // Kata shim are using prometheus go client, add an prefix for metric name to avoid confusing + // Kata shim are using prometheus go client, add a prefix for metric name to avoid confusing if mf.Name != nil && (strings.HasPrefix(*mf.Name, "go_") || strings.HasPrefix(*mf.Name, "process_")) { mf.Name = mutils.String2Pointer("kata_shim_" + *mf.Name) } diff --git a/src/runtime/pkg/kata-monitor/monitor.go b/src/runtime/pkg/kata-monitor/monitor.go index 87cd1187b..677ac0d06 100644 --- a/src/runtime/pkg/kata-monitor/monitor.go +++ b/src/runtime/pkg/kata-monitor/monitor.go @@ -6,21 +6,24 @@ package katamonitor import ( + "errors" "fmt" "net/http" - "os" + "strings" "sync" + "time" - "github.com/containerd/containerd/defaults" - srvconfig "github.com/containerd/containerd/services/server/config" "github.com/sirupsen/logrus" - - // register grpc event types - _ "github.com/containerd/containerd/api/events" ) var monitorLog = logrus.WithField("source", "kata-monitor") +const ( + RuntimeContainerd = "containerd" + RuntimeCRIO = "cri-o" + podCacheRefreshTimeSeconds = 15 +) + // SetLogger sets the logger for katamonitor package. func SetLogger(logger *logrus.Entry) { fields := monitorLog.Data @@ -29,55 +32,48 @@ func SetLogger(logger *logrus.Entry) { // KataMonitor is monitor agent type KataMonitor struct { - sandboxCache *sandboxCache - containerdAddr string - containerdConfigFile string - containerdStatePath string + sandboxCache *sandboxCache + runtimeEndpoint string } // NewKataMonitor create and return a new KataMonitor instance -func NewKataMonitor(containerdAddr, containerdConfigFile string) (*KataMonitor, error) { - if containerdAddr == "" { - return nil, fmt.Errorf("containerd serve address missing") +func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { + if runtimeEndpoint == "" { + return nil, errors.New("runtime endpoint missing") } - containerdConf := &srvconfig.Config{ - State: defaults.DefaultStateDir, - } - - if err := srvconfig.LoadConfig(containerdConfigFile, containerdConf); err != nil && !os.IsNotExist(err) { - return nil, err + if !strings.HasPrefix(runtimeEndpoint, "unix") { + runtimeEndpoint = "unix://" + runtimeEndpoint } km := &KataMonitor{ - containerdAddr: containerdAddr, - containerdConfigFile: containerdConfigFile, - containerdStatePath: containerdConf.State, + runtimeEndpoint: runtimeEndpoint, sandboxCache: &sandboxCache{ Mutex: &sync.Mutex{}, sandboxes: make(map[string]string), }, } - if err := km.initSandboxCache(); err != nil { - return nil, err - } - // register metrics registerMetrics() - go km.sandboxCache.startEventsListener(km.containerdAddr) + go km.startPodCacheUpdater() return km, nil } -func (km *KataMonitor) initSandboxCache() error { - sandboxes, err := km.getSandboxes() - if err != nil { - return err +// startPodCacheUpdater will boot a thread to manage sandbox cache +func (km *KataMonitor) startPodCacheUpdater() { + for { + time.Sleep(podCacheRefreshTimeSeconds * time.Second) + sandboxes, err := km.getSandboxes() + if err != nil { + monitorLog.WithError(err).Error("failed to get sandboxes") + continue + } + monitorLog.WithField("count", len(sandboxes)).Debug("update sandboxes list") + km.sandboxCache.set(sandboxes) } - km.sandboxCache.init(sandboxes) - return nil } // GetAgentURL returns agent URL @@ -117,6 +113,6 @@ func (km *KataMonitor) getSandboxList() []string { return result } -func (km *KataMonitor) getSandboxNamespace(sandbox string) (string, error) { - return km.sandboxCache.getSandboxNamespace(sandbox) +func (km *KataMonitor) getSandboxRuntime(sandbox string) (string, error) { + return km.sandboxCache.getSandboxRuntime(sandbox) } diff --git a/src/runtime/pkg/kata-monitor/pprof.go b/src/runtime/pkg/kata-monitor/pprof.go index 4943455ff..11f5c34cf 100644 --- a/src/runtime/pkg/kata-monitor/pprof.go +++ b/src/runtime/pkg/kata-monitor/pprof.go @@ -27,12 +27,12 @@ func (km *KataMonitor) composeSocketAddress(r *http.Request) (string, error) { return "", err } - namespace, err := km.getSandboxNamespace(sandbox) + runtime, err := km.getSandboxRuntime(sandbox) if err != nil { return "", err } - return km.getMonitorAddress(sandbox, namespace) + return km.getMonitorAddress(sandbox, runtime) } func (km *KataMonitor) proxyRequest(w http.ResponseWriter, r *http.Request) { diff --git a/src/runtime/pkg/kata-monitor/pprof_test.go b/src/runtime/pkg/kata-monitor/pprof_test.go deleted file mode 100644 index 221261d40..000000000 --- a/src/runtime/pkg/kata-monitor/pprof_test.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright (c) 2020 Ant Financial -// -// SPDX-License-Identifier: Apache-2.0 -// - -package katamonitor - -import ( - "fmt" - "io/ioutil" - "net/http" - "os" - "path/filepath" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestComposeSocketAddress(t *testing.T) { - assert := assert.New(t) - path := fmt.Sprintf("/tmp/TestComposeSocketAddress-%d", time.Now().Nanosecond()) - statePath := filepath.Join(path, "io.containerd.runtime.v2.task") - - sandboxes := map[string]string{"foo": "ns-foo", "bar": "ns-bar"} - defer func() { - os.RemoveAll(path) - }() - - for sandbox, ns := range sandboxes { - err := os.MkdirAll(filepath.Join(statePath, ns, sandbox), 0755) - assert.Nil(err) - f := filepath.Join(statePath, ns, sandbox, "monitor_address") - err = ioutil.WriteFile(f, []byte(sandbox), 0644) - assert.Nil(err) - } - - km := &KataMonitor{ - containerdStatePath: path, - sandboxCache: &sandboxCache{ - Mutex: &sync.Mutex{}, - sandboxes: sandboxes, - }, - } - - // nolint: govet - testCases := []struct { - url string - err bool - addr string - }{ - { - url: "http://localhost:6060/debug/vars", - err: true, - addr: "", - }, - { - url: "http://localhost:6060/debug/vars?sandbox=abc", - err: true, - addr: "", - }, - { - url: "http://localhost:6060/debug/vars?sandbox=foo", - err: false, - addr: "foo", - }, - { - url: "http://localhost:6060/debug/vars?sandbox=bar", - err: false, - addr: "bar", - }, - } - - for _, tc := range testCases { - r, err := http.NewRequest("GET", tc.url, nil) - assert.Nil(err) - - addr, err := km.composeSocketAddress(r) - - assert.Equal(tc.err, err != nil) - assert.Equal(tc.addr, addr) - } -} diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache.go b/src/runtime/pkg/kata-monitor/sandbox_cache.go index f749ef464..a55020359 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache.go @@ -6,23 +6,8 @@ package katamonitor import ( - "context" "fmt" - "regexp" "sync" - - "github.com/containerd/containerd" - "github.com/sirupsen/logrus" - - "encoding/json" - - eventstypes "github.com/containerd/containerd/api/events" - "github.com/containerd/containerd/events" - "github.com/containerd/typeurl" - "github.com/kata-containers/kata-containers/src/runtime/pkg/types" - - // Register grpc event types - _ "github.com/containerd/containerd/api/events" ) type sandboxCache struct { @@ -36,17 +21,6 @@ func (sc *sandboxCache) getAllSandboxes() map[string]string { return sc.sandboxes } -func (sc *sandboxCache) getSandboxNamespace(sandbox string) (string, error) { - sc.Lock() - defer sc.Unlock() - - if val, found := sc.sandboxes[sandbox]; found { - return val, nil - } - - return "", fmt.Errorf("sandbox %s not in cache", sandbox) -} - func (sc *sandboxCache) deleteIfExists(id string) (string, bool) { sc.Lock() defer sc.Unlock() @@ -73,116 +47,19 @@ func (sc *sandboxCache) putIfNotExists(id, value string) bool { return false } -func (sc *sandboxCache) init(sandboxes map[string]string) { +func (sc *sandboxCache) set(sandboxes map[string]string) { sc.Lock() defer sc.Unlock() sc.sandboxes = sandboxes } -// startEventsListener will boot a thread to listen container events to manage sandbox cache -func (sc *sandboxCache) startEventsListener(addr string) error { - client, err := containerd.New(addr) - if err != nil { - return err - } - defer client.Close() +func (sc *sandboxCache) getSandboxRuntime(sandbox string) (string, error) { + sc.Lock() + defer sc.Unlock() - ctx := context.Background() - - eventsClient := client.EventService() - containerClient := client.ContainerService() - - // only need create/delete events. - eventFilters := []string{ - `topic=="/containers/create"`, - `topic=="/containers/delete"`, + if val, found := sc.sandboxes[sandbox]; found { + return val, nil } - runtimeNameRegexp, err := regexp.Compile(types.KataRuntimeNameRegexp) - if err != nil { - return err - } - - eventsCh, errCh := eventsClient.Subscribe(ctx, eventFilters...) - for { - var e *events.Envelope - select { - case e = <-eventsCh: - case err = <-errCh: - monitorLog.WithError(err).Warn("get error from error chan") - return err - } - - if e != nil { - var eventBody []byte - if e.Event != nil { - v, err := typeurl.UnmarshalAny(e.Event) - if err != nil { - monitorLog.WithError(err).Warn("cannot unmarshal an event from Any") - continue - } - eventBody, err = json.Marshal(v) - if err != nil { - monitorLog.WithError(err).Warn("cannot marshal Any into JSON") - continue - } - } - - if e.Topic == "/containers/create" { - // Namespace: k8s.io - // Topic: /containers/create - // Event: { - // "id":"6a2e22e6fffaf1dec63ddabf587ed56069b1809ba67a0d7872fc470528364e66", - // "image":"k8s.gcr.io/pause:3.1", - // "runtime":{"name":"io.containerd.kata.v2"} - // } - cc := eventstypes.ContainerCreate{} - err := json.Unmarshal(eventBody, &cc) - if err != nil { - monitorLog.WithError(err).WithField("body", string(eventBody)).Warn("unmarshal ContainerCreate failed") - continue - } - - // skip non-kata contaienrs - if !runtimeNameRegexp.MatchString(cc.Runtime.Name) { - continue - } - - c, err := getContainer(containerClient, e.Namespace, cc.ID) - if err != nil { - monitorLog.WithError(err).WithField("container", cc.ID).Warn("failed to get container") - continue - } - - // if the container is a sandbox container, - // means the VM is started, and can start to collect metrics from the VM. - if isSandboxContainer(&c) { - // we can simply put the contaienrid in sandboxes list if the container is a sandbox container - sc.putIfNotExists(cc.ID, e.Namespace) - monitorLog.WithField("container", cc.ID).Info("add sandbox to cache") - } - } else if e.Topic == "/containers/delete" { - // Namespace: k8s.io - // Topic: /containers/delete - // Event: { - // "id":"73ec10d2e38070f930310687ab46bbaa532c79d5680fd7f18fff99f759d9385e" - // } - cd := &eventstypes.ContainerDelete{} - err := json.Unmarshal(eventBody, &cd) - if err != nil { - monitorLog.WithError(err).WithField("body", string(eventBody)).Warn("unmarshal ContainerDelete failed") - } - - // if container in sandboxes list, it must be the pause container in the sandbox, - // so the contaienr id is the sandbox id - // we can simply delete the contaienr from sandboxes list - // the last container in a sandbox is deleted, means the VM will stop. - _, deleted := sc.deleteIfExists(cd.ID) - monitorLog.WithFields(logrus.Fields{"container": cd.ID, "result": deleted}).Info("delete sandbox from cache") - } else { - monitorLog.WithFields(logrus.Fields{"Namespace": e.Namespace, "Topic": e.Topic, "Event": string(eventBody)}).Error("other events") - } - - } - } + return "", fmt.Errorf("sandbox %s not in cache", sandbox) } diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go index a163a20a7..54f077e7d 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go @@ -21,7 +21,7 @@ func TestSandboxCache(t *testing.T) { scMap := map[string]string{"111": "222"} - sc.init(scMap) + sc.set(scMap) scMap = sc.getAllSandboxes() assert.Equal(1, len(scMap))