kata-monitor: talk to the container engine via the CRI

kata-monitor uses containerd client to retrieve information from the
container engine. This makes kata-monitor work with the containerd
container engine only.
Bin Liu (bin <bin@hyper.sh>) worked on a kata-monitor version able
to talk to any container engine leveraging the standard CRI[1].
Here, the original work of Bin Lui has been adapted on the current
kata-monitor to make it container engine independent.

[1] https://github.com/liubin/kata-containers/tree/fix/1030-use-cri-in-kata-monitor

Fixes: #1030
Signed-off-by: Francesco Giudici <fgiudici@redhat.com>
This commit is contained in:
Francesco Giudici 2021-07-02 08:47:02 +02:00
parent eac05ad6d6
commit c2f03e8993
10 changed files with 283 additions and 452 deletions

View File

@ -9,7 +9,7 @@ import (
"flag" "flag"
"net/http" "net/http"
"os" "os"
"runtime" goruntime "runtime"
"text/template" "text/template"
"time" "time"
@ -18,8 +18,7 @@ import (
) )
var monitorListenAddr = flag.String("listen-address", ":8090", "The address to listen on for HTTP requests.") var monitorListenAddr = flag.String("listen-address", ":8090", "The address to listen on for HTTP requests.")
var containerdAddr = flag.String("containerd-address", "/run/containerd/containerd.sock", "Containerd address to accept client requests.") var runtimeEndpoint = flag.String("runtime-endpoint", "/run/containerd/containerd.sock", `Endpoint of CRI container runtime service. (default: "/run/containerd/containerd.sock")`)
var containerdConfig = flag.String("containerd-conf", "/etc/containerd/config.toml", "Containerd config file.")
var logLevel = flag.String("log-level", "info", "Log level of logrus(trace/debug/info/warn/error/fatal/panic).") var logLevel = flag.String("log-level", "info", "Log level of logrus(trace/debug/info/warn/error/fatal/panic).")
// These values are overridden via ldflags // These values are overridden via ldflags
@ -59,9 +58,9 @@ func main() {
ver := versionInfo{ ver := versionInfo{
AppName: appName, AppName: appName,
Version: version, Version: version,
GoVersion: runtime.Version(), GoVersion: goruntime.Version(),
Os: runtime.GOOS, Os: goruntime.GOOS,
Arch: runtime.GOARCH, Arch: goruntime.GOARCH,
GitCommit: GitCommit, GitCommit: GitCommit,
} }
@ -86,15 +85,14 @@ func main() {
// properties from command-line options // properties from command-line options
"listen-address": *monitorListenAddr, "listen-address": *monitorListenAddr,
"containerd-address": *containerdAddr, "runtime-endpoint": *runtimeEndpoint,
"containerd-conf": *containerdConfig,
"log-level": *logLevel, "log-level": *logLevel,
} }
logrus.WithFields(announceFields).Info("announce") logrus.WithFields(announceFields).Info("announce")
// create new kataMonitor // create new kataMonitor
km, err := kataMonitor.NewKataMonitor(*containerdAddr, *containerdConfig) km, err := kataMonitor.NewKataMonitor(*runtimeEndpoint)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -1,107 +0,0 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"context"
"github.com/sirupsen/logrus"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/oci"
"github.com/opencontainers/runtime-spec/specs-go"
)
func getContainer(containersClient containers.Store, namespace, cid string) (containers.Container, error) {
ctx := context.Background()
ctx = namespaces.WithNamespace(ctx, namespace)
return containersClient.Get(ctx, cid)
}
// isSandboxContainer return true if the container is a sandbox container.
func isSandboxContainer(c *containers.Container) bool {
// unmarshal from any to spec.
if c.Spec == nil {
monitorLog.WithField("container", c.ID).Error("container spec is nil")
return false
}
v, err := typeurl.UnmarshalAny(c.Spec)
if err != nil {
monitorLog.WithError(err).Error("failed to Unmarshal container spec")
return false
}
// convert to oci spec type
ociSpec := v.(*specs.Spec)
// get container type
containerType, err := oci.ContainerType(*ociSpec)
if err != nil {
monitorLog.WithError(err).Error("failed to get contaienr type")
return false
}
// return if is a sandbox container
return containerType == vc.PodSandbox
}
// getSandboxes get kata sandbox from containerd.
// this will be called only after monitor start.
func (ka *KataMonitor) getSandboxes() (map[string]string, error) {
client, err := containerd.New(ka.containerdAddr)
if err != nil {
return nil, err
}
defer client.Close()
ctx := context.Background()
// first all namespaces.
namespaceList, err := client.NamespaceService().List(ctx)
if err != nil {
return nil, err
}
// map of type: <key:sandbox_id => value: namespace>
sandboxMap := make(map[string]string)
for _, namespace := range namespaceList {
initSandboxByNamespaceFunc := func(namespace string) error {
ctx := context.Background()
namespacedCtx := namespaces.WithNamespace(ctx, namespace)
// only list Kata Containers pods/containers
containers, err := client.ContainerService().List(namespacedCtx,
"runtime.name~="+types.KataRuntimeNameRegexp+`,labels."io.cri-containerd.kind"==sandbox`)
if err != nil {
return err
}
for i := range containers {
c := containers[i]
isc := isSandboxContainer(&c)
monitorLog.WithFields(logrus.Fields{"container": c.ID, "result": isc}).Debug("is this a sandbox container?")
if isc {
sandboxMap[c.ID] = namespace
}
}
return nil
}
if err := initSandboxByNamespaceFunc(namespace); err != nil {
return nil, err
}
}
return sandboxMap, nil
}

View File

@ -1,76 +0,0 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"testing"
criContainerdAnnotations "github.com/containerd/cri-containerd/pkg/annotations"
"github.com/containerd/typeurl"
"github.com/containerd/containerd/containers"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/stretchr/testify/assert"
)
func TestIsSandboxContainer(t *testing.T) {
assert := assert.New(t)
c := &containers.Container{}
isc := isSandboxContainer(c)
assert.Equal(false, isc, "should not be a sandbox container")
spec := &specs.Spec{
Annotations: map[string]string{},
}
any, err := typeurl.MarshalAny(spec)
assert.Nil(err, "MarshalAny failed for spec")
c.Spec = any
// default container is a pod(sandbox) container
isc = isSandboxContainer(c)
assert.Equal(true, isc, "should be a sandbox container")
testCases := []struct {
annotationKey string
annotationValue string
result bool
}{
{
annotationKey: criContainerdAnnotations.ContainerType,
annotationValue: "",
result: false,
},
{
annotationKey: criContainerdAnnotations.ContainerType,
annotationValue: criContainerdAnnotations.ContainerTypeContainer,
result: false,
},
{
annotationKey: criContainerdAnnotations.ContainerType,
annotationValue: "pod",
result: false,
},
{
annotationKey: criContainerdAnnotations.ContainerType,
annotationValue: criContainerdAnnotations.ContainerTypeSandbox,
result: true,
},
}
for _, tc := range testCases {
spec.Annotations = map[string]string{
tc.annotationKey: tc.annotationValue,
}
any, err := typeurl.MarshalAny(spec)
assert.Nil(err, "MarshalAny failed for spec")
c.Spec = any
isc = isSandboxContainer(c)
assert.Equal(tc.result, isc, "assert failed for checking if is a sandbox container")
}
}

View File

@ -0,0 +1,221 @@
// Copyright (c) 2020 Ant Group
// Copyright (c) 2021 Red Hat Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"context"
"encoding/json"
"fmt"
"net"
"net/url"
"regexp"
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/xeipuuv/gojsonpointer"
"google.golang.org/grpc"
pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)
const (
// unixProtocol is the network protocol of unix socket.
unixProtocol = "unix"
k8sContainerdNamespace = "k8s.io"
)
// getAddressAndDialer returns the address parsed from the given endpoint and a context dialer.
func getAddressAndDialer(endpoint string) (string, func(ctx context.Context, addr string) (net.Conn, error), error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return "", nil, err
}
if protocol != unixProtocol {
return "", nil, fmt.Errorf("only support unix socket endpoint")
}
return addr, dial, nil
}
func getConnection(endPoint string) (*grpc.ClientConn, error) {
var conn *grpc.ClientConn
monitorLog.Debugf("connect using endpoint '%s' with '%s' timeout", endPoint, defaultTimeout)
addr, dialer, err := getAddressAndDialer(endPoint)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
conn, err = grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithContextDialer(dialer))
if err != nil {
errMsg := errors.Wrapf(err, "connect endpoint '%s', make sure you are running as root and the endpoint has been started", endPoint)
return nil, errMsg
}
monitorLog.Debugf("connected successfully using endpoint: %s", endPoint)
return conn, nil
}
func matchesRegex(pattern, target string) bool {
if pattern == "" {
return true
}
matched, err := regexp.MatchString(pattern, target)
if err != nil {
// Assume it's not a match if an error occurs.
return false
}
return matched
}
func closeConnection(conn *grpc.ClientConn) error {
if conn == nil {
return nil
}
return conn.Close()
}
func getRuntimeClient(runtimeEndpoint string) (pb.RuntimeServiceClient, *grpc.ClientConn, error) {
var (
conn *grpc.ClientConn
err error
)
// Set up a connection to the server.
// If no EndPoint set then use the default endpoint types
conn, err = getConnection(runtimeEndpoint)
if err != nil {
return nil, nil, err
}
runtimeClient := pb.NewRuntimeServiceClient(conn)
return runtimeClient, conn, nil
}
func dial(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, unixProtocol, addr)
}
func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string) (protocol string, addr string, err error) {
if protocol, addr, err = parseEndpoint(endpoint); err != nil && protocol == "" {
fallbackEndpoint := fallbackProtocol + "://" + endpoint
protocol, addr, err = parseEndpoint(fallbackEndpoint)
if err == nil {
monitorLog.Warningf("Using %q as endpoint is deprecated, please consider using full url format %q.", endpoint, fallbackEndpoint)
}
}
return
}
func parseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}
switch u.Scheme {
case "tcp":
return "tcp", u.Host, nil
case "unix":
return "unix", u.Path, nil
case "":
return "", "", fmt.Errorf("using %q as endpoint is deprecated, please consider using full url format", endpoint)
default:
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}
// getSandboxes get kata sandbox from the container engine.
// this will be called only after monitor start.
func (km *KataMonitor) getSandboxes() (map[string]string, error) {
sandboxMap := map[string]string{}
runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint)
if err != nil {
return sandboxMap, err
}
defer closeConnection(runtimeConn)
filter := &pb.PodSandboxFilter{
State: &pb.PodSandboxStateValue{
State: pb.PodSandboxState_SANDBOX_READY,
},
}
request := &pb.ListPodSandboxRequest{
Filter: filter,
}
monitorLog.Debugf("ListPodSandboxRequest: %v", request)
r, err := runtimeClient.ListPodSandbox(context.Background(), request)
if err != nil {
return sandboxMap, err
}
monitorLog.Debugf("ListPodSandboxResponse: %v", r)
for _, pod := range r.Items {
request := &pb.PodSandboxStatusRequest{
PodSandboxId: pod.Id,
Verbose: true,
}
r, err := runtimeClient.PodSandboxStatus(context.Background(), request)
if err != nil {
return sandboxMap, err
}
lowRuntime := ""
highRuntime := ""
var res map[string]interface{}
if err := json.Unmarshal([]byte(r.Info["info"]), &res); err != nil {
monitorLog.WithError(err).WithField("pod", r).Error("failed to Unmarshal pod info")
continue
} else {
monitorLog.WithField("pod info", res).Debug("")
// get high level container runtime
pointer, _ := gojsonpointer.NewJsonPointer("/runtimeSpec/annotations/io.container.manager")
rt, _, _ := pointer.Get(res)
if rt != nil {
if str, ok := rt.(string); ok {
if str == "cri-o" {
highRuntime = RuntimeCRIO
} else {
highRuntime = RuntimeContainerd
}
}
}
// get low level container runtime
// containerd stores the pod runtime in "/runtimeType" while CRI-O stores it the
// io.kubernetes.cri-o.RuntimeHandler annotation: check for both.
keys := []string{"/runtimeType", "/runtimeSpec/annotations/io.kubernetes.cri-o.RuntimeHandler"}
for _, key := range keys {
pointer, _ := gojsonpointer.NewJsonPointer(key)
rt, _, _ := pointer.Get(res)
if rt != nil {
if str, ok := rt.(string); ok {
lowRuntime = str
break
}
}
}
}
// Filter by pod name/namespace regular expressions.
monitorLog.WithFields(logrus.Fields{
"low runtime": lowRuntime,
"high runtime": highRuntime,
}).Debug("")
if matchesRegex(types.KataRuntimeNameRegexp, lowRuntime) || matchesRegex("kata*", lowRuntime) {
sandboxMap[pod.Id] = highRuntime
}
}
return sandboxMap, nil
}

View File

@ -74,8 +74,14 @@ func registerMetrics() {
// getMonitorAddress get metrics address for a sandbox, the abstract unix socket address is saved // getMonitorAddress get metrics address for a sandbox, the abstract unix socket address is saved
// in `metrics_address` with the same place of `address`. // in `metrics_address` with the same place of `address`.
func (km *KataMonitor) getMonitorAddress(sandboxID, namespace string) (string, error) { func (km *KataMonitor) getMonitorAddress(sandboxID, runtime string) (string, error) {
path := filepath.Join(km.containerdStatePath, types.ContainerdRuntimeTaskPath, namespace, sandboxID, "monitor_address") path := filepath.Join("/run/containerd", types.ContainerdRuntimeTaskPath, k8sContainerdNamespace, sandboxID, "monitor_address")
if runtime == RuntimeCRIO {
path = filepath.Join("/run/containers/storage/overlay-containers", sandboxID, "userdata", "monitor_address")
}
monitorLog.WithField("path", path).Debug("get monitor address")
data, err := ioutil.ReadFile(path) data, err := ioutil.ReadFile(path)
if err != nil { if err != nil {
return "", err return "", err
@ -173,9 +179,9 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count") monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count")
// get metrics from sandbox's shim // get metrics from sandbox's shim
for sandboxID, namespace := range sandboxes { for sandboxID, runtime := range sandboxes {
wg.Add(1) wg.Add(1)
go func(sandboxID, namespace string, results chan<- []*dto.MetricFamily) { go func(sandboxID, runtime string, results chan<- []*dto.MetricFamily) {
sandboxMetrics, err := getParsedMetrics(sandboxID) sandboxMetrics, err := getParsedMetrics(sandboxID)
if err != nil { if err != nil {
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox") monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
@ -184,7 +190,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
results <- sandboxMetrics results <- sandboxMetrics
wg.Done() wg.Done()
monitorLog.WithField("sandbox_id", sandboxID).Debug("job finished") monitorLog.WithField("sandbox_id", sandboxID).Debug("job finished")
}(sandboxID, namespace, results) }(sandboxID, runtime, results)
monitorLog.WithField("sandbox_id", sandboxID).Debug("job started") monitorLog.WithField("sandbox_id", sandboxID).Debug("job started")
} }
@ -278,7 +284,7 @@ func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily,
}) })
} }
// Kata shim are using prometheus go client, add an prefix for metric name to avoid confusing // Kata shim are using prometheus go client, add a prefix for metric name to avoid confusing
if mf.Name != nil && (strings.HasPrefix(*mf.Name, "go_") || strings.HasPrefix(*mf.Name, "process_")) { if mf.Name != nil && (strings.HasPrefix(*mf.Name, "go_") || strings.HasPrefix(*mf.Name, "process_")) {
mf.Name = mutils.String2Pointer("kata_shim_" + *mf.Name) mf.Name = mutils.String2Pointer("kata_shim_" + *mf.Name)
} }

View File

@ -6,21 +6,24 @@
package katamonitor package katamonitor
import ( import (
"errors"
"fmt" "fmt"
"net/http" "net/http"
"os" "strings"
"sync" "sync"
"time"
"github.com/containerd/containerd/defaults"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
// register grpc event types
_ "github.com/containerd/containerd/api/events"
) )
var monitorLog = logrus.WithField("source", "kata-monitor") var monitorLog = logrus.WithField("source", "kata-monitor")
const (
RuntimeContainerd = "containerd"
RuntimeCRIO = "cri-o"
podCacheRefreshTimeSeconds = 15
)
// SetLogger sets the logger for katamonitor package. // SetLogger sets the logger for katamonitor package.
func SetLogger(logger *logrus.Entry) { func SetLogger(logger *logrus.Entry) {
fields := monitorLog.Data fields := monitorLog.Data
@ -30,54 +33,47 @@ func SetLogger(logger *logrus.Entry) {
// KataMonitor is monitor agent // KataMonitor is monitor agent
type KataMonitor struct { type KataMonitor struct {
sandboxCache *sandboxCache sandboxCache *sandboxCache
containerdAddr string runtimeEndpoint string
containerdConfigFile string
containerdStatePath string
} }
// NewKataMonitor create and return a new KataMonitor instance // NewKataMonitor create and return a new KataMonitor instance
func NewKataMonitor(containerdAddr, containerdConfigFile string) (*KataMonitor, error) { func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
if containerdAddr == "" { if runtimeEndpoint == "" {
return nil, fmt.Errorf("containerd serve address missing") return nil, errors.New("runtime endpoint missing")
} }
containerdConf := &srvconfig.Config{ if !strings.HasPrefix(runtimeEndpoint, "unix") {
State: defaults.DefaultStateDir, runtimeEndpoint = "unix://" + runtimeEndpoint
}
if err := srvconfig.LoadConfig(containerdConfigFile, containerdConf); err != nil && !os.IsNotExist(err) {
return nil, err
} }
km := &KataMonitor{ km := &KataMonitor{
containerdAddr: containerdAddr, runtimeEndpoint: runtimeEndpoint,
containerdConfigFile: containerdConfigFile,
containerdStatePath: containerdConf.State,
sandboxCache: &sandboxCache{ sandboxCache: &sandboxCache{
Mutex: &sync.Mutex{}, Mutex: &sync.Mutex{},
sandboxes: make(map[string]string), sandboxes: make(map[string]string),
}, },
} }
if err := km.initSandboxCache(); err != nil {
return nil, err
}
// register metrics // register metrics
registerMetrics() registerMetrics()
go km.sandboxCache.startEventsListener(km.containerdAddr) go km.startPodCacheUpdater()
return km, nil return km, nil
} }
func (km *KataMonitor) initSandboxCache() error { // startPodCacheUpdater will boot a thread to manage sandbox cache
func (km *KataMonitor) startPodCacheUpdater() {
for {
time.Sleep(podCacheRefreshTimeSeconds * time.Second)
sandboxes, err := km.getSandboxes() sandboxes, err := km.getSandboxes()
if err != nil { if err != nil {
return err monitorLog.WithError(err).Error("failed to get sandboxes")
continue
}
monitorLog.WithField("count", len(sandboxes)).Debug("update sandboxes list")
km.sandboxCache.set(sandboxes)
} }
km.sandboxCache.init(sandboxes)
return nil
} }
// GetAgentURL returns agent URL // GetAgentURL returns agent URL
@ -117,6 +113,6 @@ func (km *KataMonitor) getSandboxList() []string {
return result return result
} }
func (km *KataMonitor) getSandboxNamespace(sandbox string) (string, error) { func (km *KataMonitor) getSandboxRuntime(sandbox string) (string, error) {
return km.sandboxCache.getSandboxNamespace(sandbox) return km.sandboxCache.getSandboxRuntime(sandbox)
} }

View File

@ -27,12 +27,12 @@ func (km *KataMonitor) composeSocketAddress(r *http.Request) (string, error) {
return "", err return "", err
} }
namespace, err := km.getSandboxNamespace(sandbox) runtime, err := km.getSandboxRuntime(sandbox)
if err != nil { if err != nil {
return "", err return "", err
} }
return km.getMonitorAddress(sandbox, namespace) return km.getMonitorAddress(sandbox, runtime)
} }
func (km *KataMonitor) proxyRequest(w http.ResponseWriter, r *http.Request) { func (km *KataMonitor) proxyRequest(w http.ResponseWriter, r *http.Request) {

View File

@ -1,84 +0,0 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestComposeSocketAddress(t *testing.T) {
assert := assert.New(t)
path := fmt.Sprintf("/tmp/TestComposeSocketAddress-%d", time.Now().Nanosecond())
statePath := filepath.Join(path, "io.containerd.runtime.v2.task")
sandboxes := map[string]string{"foo": "ns-foo", "bar": "ns-bar"}
defer func() {
os.RemoveAll(path)
}()
for sandbox, ns := range sandboxes {
err := os.MkdirAll(filepath.Join(statePath, ns, sandbox), 0755)
assert.Nil(err)
f := filepath.Join(statePath, ns, sandbox, "monitor_address")
err = ioutil.WriteFile(f, []byte(sandbox), 0644)
assert.Nil(err)
}
km := &KataMonitor{
containerdStatePath: path,
sandboxCache: &sandboxCache{
Mutex: &sync.Mutex{},
sandboxes: sandboxes,
},
}
// nolint: govet
testCases := []struct {
url string
err bool
addr string
}{
{
url: "http://localhost:6060/debug/vars",
err: true,
addr: "",
},
{
url: "http://localhost:6060/debug/vars?sandbox=abc",
err: true,
addr: "",
},
{
url: "http://localhost:6060/debug/vars?sandbox=foo",
err: false,
addr: "foo",
},
{
url: "http://localhost:6060/debug/vars?sandbox=bar",
err: false,
addr: "bar",
},
}
for _, tc := range testCases {
r, err := http.NewRequest("GET", tc.url, nil)
assert.Nil(err)
addr, err := km.composeSocketAddress(r)
assert.Equal(tc.err, err != nil)
assert.Equal(tc.addr, addr)
}
}

View File

@ -6,23 +6,8 @@
package katamonitor package katamonitor
import ( import (
"context"
"fmt" "fmt"
"regexp"
"sync" "sync"
"github.com/containerd/containerd"
"github.com/sirupsen/logrus"
"encoding/json"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/typeurl"
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
// Register grpc event types
_ "github.com/containerd/containerd/api/events"
) )
type sandboxCache struct { type sandboxCache struct {
@ -36,17 +21,6 @@ func (sc *sandboxCache) getAllSandboxes() map[string]string {
return sc.sandboxes return sc.sandboxes
} }
func (sc *sandboxCache) getSandboxNamespace(sandbox string) (string, error) {
sc.Lock()
defer sc.Unlock()
if val, found := sc.sandboxes[sandbox]; found {
return val, nil
}
return "", fmt.Errorf("sandbox %s not in cache", sandbox)
}
func (sc *sandboxCache) deleteIfExists(id string) (string, bool) { func (sc *sandboxCache) deleteIfExists(id string) (string, bool) {
sc.Lock() sc.Lock()
defer sc.Unlock() defer sc.Unlock()
@ -73,116 +47,19 @@ func (sc *sandboxCache) putIfNotExists(id, value string) bool {
return false return false
} }
func (sc *sandboxCache) init(sandboxes map[string]string) { func (sc *sandboxCache) set(sandboxes map[string]string) {
sc.Lock() sc.Lock()
defer sc.Unlock() defer sc.Unlock()
sc.sandboxes = sandboxes sc.sandboxes = sandboxes
} }
// startEventsListener will boot a thread to listen container events to manage sandbox cache func (sc *sandboxCache) getSandboxRuntime(sandbox string) (string, error) {
func (sc *sandboxCache) startEventsListener(addr string) error { sc.Lock()
client, err := containerd.New(addr) defer sc.Unlock()
if err != nil {
return err
}
defer client.Close()
ctx := context.Background() if val, found := sc.sandboxes[sandbox]; found {
return val, nil
eventsClient := client.EventService()
containerClient := client.ContainerService()
// only need create/delete events.
eventFilters := []string{
`topic=="/containers/create"`,
`topic=="/containers/delete"`,
} }
runtimeNameRegexp, err := regexp.Compile(types.KataRuntimeNameRegexp) return "", fmt.Errorf("sandbox %s not in cache", sandbox)
if err != nil {
return err
}
eventsCh, errCh := eventsClient.Subscribe(ctx, eventFilters...)
for {
var e *events.Envelope
select {
case e = <-eventsCh:
case err = <-errCh:
monitorLog.WithError(err).Warn("get error from error chan")
return err
}
if e != nil {
var eventBody []byte
if e.Event != nil {
v, err := typeurl.UnmarshalAny(e.Event)
if err != nil {
monitorLog.WithError(err).Warn("cannot unmarshal an event from Any")
continue
}
eventBody, err = json.Marshal(v)
if err != nil {
monitorLog.WithError(err).Warn("cannot marshal Any into JSON")
continue
}
}
if e.Topic == "/containers/create" {
// Namespace: k8s.io
// Topic: /containers/create
// Event: {
// "id":"6a2e22e6fffaf1dec63ddabf587ed56069b1809ba67a0d7872fc470528364e66",
// "image":"k8s.gcr.io/pause:3.1",
// "runtime":{"name":"io.containerd.kata.v2"}
// }
cc := eventstypes.ContainerCreate{}
err := json.Unmarshal(eventBody, &cc)
if err != nil {
monitorLog.WithError(err).WithField("body", string(eventBody)).Warn("unmarshal ContainerCreate failed")
continue
}
// skip non-kata contaienrs
if !runtimeNameRegexp.MatchString(cc.Runtime.Name) {
continue
}
c, err := getContainer(containerClient, e.Namespace, cc.ID)
if err != nil {
monitorLog.WithError(err).WithField("container", cc.ID).Warn("failed to get container")
continue
}
// if the container is a sandbox container,
// means the VM is started, and can start to collect metrics from the VM.
if isSandboxContainer(&c) {
// we can simply put the contaienrid in sandboxes list if the container is a sandbox container
sc.putIfNotExists(cc.ID, e.Namespace)
monitorLog.WithField("container", cc.ID).Info("add sandbox to cache")
}
} else if e.Topic == "/containers/delete" {
// Namespace: k8s.io
// Topic: /containers/delete
// Event: {
// "id":"73ec10d2e38070f930310687ab46bbaa532c79d5680fd7f18fff99f759d9385e"
// }
cd := &eventstypes.ContainerDelete{}
err := json.Unmarshal(eventBody, &cd)
if err != nil {
monitorLog.WithError(err).WithField("body", string(eventBody)).Warn("unmarshal ContainerDelete failed")
}
// if container in sandboxes list, it must be the pause container in the sandbox,
// so the contaienr id is the sandbox id
// we can simply delete the contaienr from sandboxes list
// the last container in a sandbox is deleted, means the VM will stop.
_, deleted := sc.deleteIfExists(cd.ID)
monitorLog.WithFields(logrus.Fields{"container": cd.ID, "result": deleted}).Info("delete sandbox from cache")
} else {
monitorLog.WithFields(logrus.Fields{"Namespace": e.Namespace, "Topic": e.Topic, "Event": string(eventBody)}).Error("other events")
}
}
}
} }

View File

@ -21,7 +21,7 @@ func TestSandboxCache(t *testing.T) {
scMap := map[string]string{"111": "222"} scMap := map[string]string{"111": "222"}
sc.init(scMap) sc.set(scMap)
scMap = sc.getAllSandboxes() scMap = sc.getAllSandboxes()
assert.Equal(1, len(scMap)) assert.Equal(1, len(scMap))