mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-25 06:52:13 +00:00
kata-monitor: talk to the container engine via the CRI
kata-monitor uses containerd client to retrieve information from the container engine. This makes kata-monitor work with the containerd container engine only. Bin Liu (bin <bin@hyper.sh>) worked on a kata-monitor version able to talk to any container engine leveraging the standard CRI[1]. Here, the original work of Bin Lui has been adapted on the current kata-monitor to make it container engine independent. [1] https://github.com/liubin/kata-containers/tree/fix/1030-use-cri-in-kata-monitor Fixes: #1030 Signed-off-by: Francesco Giudici <fgiudici@redhat.com>
This commit is contained in:
parent
eac05ad6d6
commit
c2f03e8993
@ -9,7 +9,7 @@ import (
|
||||
"flag"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
goruntime "runtime"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
@ -18,8 +18,7 @@ import (
|
||||
)
|
||||
|
||||
var monitorListenAddr = flag.String("listen-address", ":8090", "The address to listen on for HTTP requests.")
|
||||
var containerdAddr = flag.String("containerd-address", "/run/containerd/containerd.sock", "Containerd address to accept client requests.")
|
||||
var containerdConfig = flag.String("containerd-conf", "/etc/containerd/config.toml", "Containerd config file.")
|
||||
var runtimeEndpoint = flag.String("runtime-endpoint", "/run/containerd/containerd.sock", `Endpoint of CRI container runtime service. (default: "/run/containerd/containerd.sock")`)
|
||||
var logLevel = flag.String("log-level", "info", "Log level of logrus(trace/debug/info/warn/error/fatal/panic).")
|
||||
|
||||
// These values are overridden via ldflags
|
||||
@ -59,9 +58,9 @@ func main() {
|
||||
ver := versionInfo{
|
||||
AppName: appName,
|
||||
Version: version,
|
||||
GoVersion: runtime.Version(),
|
||||
Os: runtime.GOOS,
|
||||
Arch: runtime.GOARCH,
|
||||
GoVersion: goruntime.Version(),
|
||||
Os: goruntime.GOOS,
|
||||
Arch: goruntime.GOARCH,
|
||||
GitCommit: GitCommit,
|
||||
}
|
||||
|
||||
@ -85,16 +84,15 @@ func main() {
|
||||
"git-commit": ver.GitCommit,
|
||||
|
||||
// properties from command-line options
|
||||
"listen-address": *monitorListenAddr,
|
||||
"containerd-address": *containerdAddr,
|
||||
"containerd-conf": *containerdConfig,
|
||||
"log-level": *logLevel,
|
||||
"listen-address": *monitorListenAddr,
|
||||
"runtime-endpoint": *runtimeEndpoint,
|
||||
"log-level": *logLevel,
|
||||
}
|
||||
|
||||
logrus.WithFields(announceFields).Info("announce")
|
||||
|
||||
// create new kataMonitor
|
||||
km, err := kataMonitor.NewKataMonitor(*containerdAddr, *containerdConfig)
|
||||
km, err := kataMonitor.NewKataMonitor(*runtimeEndpoint)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -1,107 +0,0 @@
|
||||
// Copyright (c) 2020 Ant Financial
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package katamonitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/typeurl"
|
||||
|
||||
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
|
||||
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
|
||||
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/oci"
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
)
|
||||
|
||||
func getContainer(containersClient containers.Store, namespace, cid string) (containers.Container, error) {
|
||||
ctx := context.Background()
|
||||
ctx = namespaces.WithNamespace(ctx, namespace)
|
||||
return containersClient.Get(ctx, cid)
|
||||
}
|
||||
|
||||
// isSandboxContainer return true if the container is a sandbox container.
|
||||
func isSandboxContainer(c *containers.Container) bool {
|
||||
// unmarshal from any to spec.
|
||||
if c.Spec == nil {
|
||||
monitorLog.WithField("container", c.ID).Error("container spec is nil")
|
||||
return false
|
||||
}
|
||||
|
||||
v, err := typeurl.UnmarshalAny(c.Spec)
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).Error("failed to Unmarshal container spec")
|
||||
return false
|
||||
}
|
||||
|
||||
// convert to oci spec type
|
||||
ociSpec := v.(*specs.Spec)
|
||||
|
||||
// get container type
|
||||
containerType, err := oci.ContainerType(*ociSpec)
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).Error("failed to get contaienr type")
|
||||
return false
|
||||
}
|
||||
|
||||
// return if is a sandbox container
|
||||
return containerType == vc.PodSandbox
|
||||
}
|
||||
|
||||
// getSandboxes get kata sandbox from containerd.
|
||||
// this will be called only after monitor start.
|
||||
func (ka *KataMonitor) getSandboxes() (map[string]string, error) {
|
||||
client, err := containerd.New(ka.containerdAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// first all namespaces.
|
||||
namespaceList, err := client.NamespaceService().List(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// map of type: <key:sandbox_id => value: namespace>
|
||||
sandboxMap := make(map[string]string)
|
||||
|
||||
for _, namespace := range namespaceList {
|
||||
|
||||
initSandboxByNamespaceFunc := func(namespace string) error {
|
||||
ctx := context.Background()
|
||||
namespacedCtx := namespaces.WithNamespace(ctx, namespace)
|
||||
// only list Kata Containers pods/containers
|
||||
containers, err := client.ContainerService().List(namespacedCtx,
|
||||
"runtime.name~="+types.KataRuntimeNameRegexp+`,labels."io.cri-containerd.kind"==sandbox`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range containers {
|
||||
c := containers[i]
|
||||
isc := isSandboxContainer(&c)
|
||||
monitorLog.WithFields(logrus.Fields{"container": c.ID, "result": isc}).Debug("is this a sandbox container?")
|
||||
if isc {
|
||||
sandboxMap[c.ID] = namespace
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := initSandboxByNamespaceFunc(namespace); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return sandboxMap, nil
|
||||
}
|
@ -1,76 +0,0 @@
|
||||
// Copyright (c) 2020 Ant Financial
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package katamonitor
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
criContainerdAnnotations "github.com/containerd/cri-containerd/pkg/annotations"
|
||||
"github.com/containerd/typeurl"
|
||||
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestIsSandboxContainer(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
c := &containers.Container{}
|
||||
isc := isSandboxContainer(c)
|
||||
assert.Equal(false, isc, "should not be a sandbox container")
|
||||
|
||||
spec := &specs.Spec{
|
||||
Annotations: map[string]string{},
|
||||
}
|
||||
|
||||
any, err := typeurl.MarshalAny(spec)
|
||||
assert.Nil(err, "MarshalAny failed for spec")
|
||||
|
||||
c.Spec = any
|
||||
// default container is a pod(sandbox) container
|
||||
isc = isSandboxContainer(c)
|
||||
assert.Equal(true, isc, "should be a sandbox container")
|
||||
|
||||
testCases := []struct {
|
||||
annotationKey string
|
||||
annotationValue string
|
||||
result bool
|
||||
}{
|
||||
{
|
||||
annotationKey: criContainerdAnnotations.ContainerType,
|
||||
annotationValue: "",
|
||||
result: false,
|
||||
},
|
||||
{
|
||||
annotationKey: criContainerdAnnotations.ContainerType,
|
||||
annotationValue: criContainerdAnnotations.ContainerTypeContainer,
|
||||
result: false,
|
||||
},
|
||||
{
|
||||
annotationKey: criContainerdAnnotations.ContainerType,
|
||||
annotationValue: "pod",
|
||||
result: false,
|
||||
},
|
||||
{
|
||||
annotationKey: criContainerdAnnotations.ContainerType,
|
||||
annotationValue: criContainerdAnnotations.ContainerTypeSandbox,
|
||||
result: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
spec.Annotations = map[string]string{
|
||||
tc.annotationKey: tc.annotationValue,
|
||||
}
|
||||
any, err := typeurl.MarshalAny(spec)
|
||||
assert.Nil(err, "MarshalAny failed for spec")
|
||||
c.Spec = any
|
||||
isc = isSandboxContainer(c)
|
||||
assert.Equal(tc.result, isc, "assert failed for checking if is a sandbox container")
|
||||
}
|
||||
|
||||
}
|
221
src/runtime/pkg/kata-monitor/cri.go
Normal file
221
src/runtime/pkg/kata-monitor/cri.go
Normal file
@ -0,0 +1,221 @@
|
||||
// Copyright (c) 2020 Ant Group
|
||||
// Copyright (c) 2021 Red Hat Inc.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package katamonitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"regexp"
|
||||
|
||||
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/xeipuuv/gojsonpointer"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||
)
|
||||
|
||||
const (
|
||||
// unixProtocol is the network protocol of unix socket.
|
||||
unixProtocol = "unix"
|
||||
|
||||
k8sContainerdNamespace = "k8s.io"
|
||||
)
|
||||
|
||||
// getAddressAndDialer returns the address parsed from the given endpoint and a context dialer.
|
||||
func getAddressAndDialer(endpoint string) (string, func(ctx context.Context, addr string) (net.Conn, error), error) {
|
||||
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
if protocol != unixProtocol {
|
||||
return "", nil, fmt.Errorf("only support unix socket endpoint")
|
||||
}
|
||||
|
||||
return addr, dial, nil
|
||||
}
|
||||
|
||||
func getConnection(endPoint string) (*grpc.ClientConn, error) {
|
||||
var conn *grpc.ClientConn
|
||||
monitorLog.Debugf("connect using endpoint '%s' with '%s' timeout", endPoint, defaultTimeout)
|
||||
addr, dialer, err := getAddressAndDialer(endPoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||
defer cancel()
|
||||
conn, err = grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithContextDialer(dialer))
|
||||
if err != nil {
|
||||
errMsg := errors.Wrapf(err, "connect endpoint '%s', make sure you are running as root and the endpoint has been started", endPoint)
|
||||
return nil, errMsg
|
||||
}
|
||||
monitorLog.Debugf("connected successfully using endpoint: %s", endPoint)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func matchesRegex(pattern, target string) bool {
|
||||
if pattern == "" {
|
||||
return true
|
||||
}
|
||||
matched, err := regexp.MatchString(pattern, target)
|
||||
if err != nil {
|
||||
// Assume it's not a match if an error occurs.
|
||||
return false
|
||||
}
|
||||
return matched
|
||||
}
|
||||
|
||||
func closeConnection(conn *grpc.ClientConn) error {
|
||||
if conn == nil {
|
||||
return nil
|
||||
}
|
||||
return conn.Close()
|
||||
}
|
||||
|
||||
func getRuntimeClient(runtimeEndpoint string) (pb.RuntimeServiceClient, *grpc.ClientConn, error) {
|
||||
var (
|
||||
conn *grpc.ClientConn
|
||||
err error
|
||||
)
|
||||
// Set up a connection to the server.
|
||||
// If no EndPoint set then use the default endpoint types
|
||||
conn, err = getConnection(runtimeEndpoint)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
runtimeClient := pb.NewRuntimeServiceClient(conn)
|
||||
return runtimeClient, conn, nil
|
||||
}
|
||||
|
||||
func dial(ctx context.Context, addr string) (net.Conn, error) {
|
||||
return (&net.Dialer{}).DialContext(ctx, unixProtocol, addr)
|
||||
}
|
||||
|
||||
func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string) (protocol string, addr string, err error) {
|
||||
if protocol, addr, err = parseEndpoint(endpoint); err != nil && protocol == "" {
|
||||
fallbackEndpoint := fallbackProtocol + "://" + endpoint
|
||||
protocol, addr, err = parseEndpoint(fallbackEndpoint)
|
||||
if err == nil {
|
||||
monitorLog.Warningf("Using %q as endpoint is deprecated, please consider using full url format %q.", endpoint, fallbackEndpoint)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func parseEndpoint(endpoint string) (string, string, error) {
|
||||
u, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
switch u.Scheme {
|
||||
case "tcp":
|
||||
return "tcp", u.Host, nil
|
||||
|
||||
case "unix":
|
||||
return "unix", u.Path, nil
|
||||
|
||||
case "":
|
||||
return "", "", fmt.Errorf("using %q as endpoint is deprecated, please consider using full url format", endpoint)
|
||||
|
||||
default:
|
||||
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
|
||||
}
|
||||
}
|
||||
|
||||
// getSandboxes get kata sandbox from the container engine.
|
||||
// this will be called only after monitor start.
|
||||
func (km *KataMonitor) getSandboxes() (map[string]string, error) {
|
||||
|
||||
sandboxMap := map[string]string{}
|
||||
runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint)
|
||||
if err != nil {
|
||||
return sandboxMap, err
|
||||
}
|
||||
defer closeConnection(runtimeConn)
|
||||
|
||||
filter := &pb.PodSandboxFilter{
|
||||
State: &pb.PodSandboxStateValue{
|
||||
State: pb.PodSandboxState_SANDBOX_READY,
|
||||
},
|
||||
}
|
||||
|
||||
request := &pb.ListPodSandboxRequest{
|
||||
Filter: filter,
|
||||
}
|
||||
monitorLog.Debugf("ListPodSandboxRequest: %v", request)
|
||||
r, err := runtimeClient.ListPodSandbox(context.Background(), request)
|
||||
if err != nil {
|
||||
return sandboxMap, err
|
||||
}
|
||||
monitorLog.Debugf("ListPodSandboxResponse: %v", r)
|
||||
|
||||
for _, pod := range r.Items {
|
||||
request := &pb.PodSandboxStatusRequest{
|
||||
PodSandboxId: pod.Id,
|
||||
Verbose: true,
|
||||
}
|
||||
|
||||
r, err := runtimeClient.PodSandboxStatus(context.Background(), request)
|
||||
if err != nil {
|
||||
return sandboxMap, err
|
||||
}
|
||||
|
||||
lowRuntime := ""
|
||||
highRuntime := ""
|
||||
var res map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(r.Info["info"]), &res); err != nil {
|
||||
monitorLog.WithError(err).WithField("pod", r).Error("failed to Unmarshal pod info")
|
||||
continue
|
||||
} else {
|
||||
monitorLog.WithField("pod info", res).Debug("")
|
||||
// get high level container runtime
|
||||
pointer, _ := gojsonpointer.NewJsonPointer("/runtimeSpec/annotations/io.container.manager")
|
||||
rt, _, _ := pointer.Get(res)
|
||||
if rt != nil {
|
||||
if str, ok := rt.(string); ok {
|
||||
if str == "cri-o" {
|
||||
highRuntime = RuntimeCRIO
|
||||
} else {
|
||||
highRuntime = RuntimeContainerd
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// get low level container runtime
|
||||
// containerd stores the pod runtime in "/runtimeType" while CRI-O stores it the
|
||||
// io.kubernetes.cri-o.RuntimeHandler annotation: check for both.
|
||||
keys := []string{"/runtimeType", "/runtimeSpec/annotations/io.kubernetes.cri-o.RuntimeHandler"}
|
||||
for _, key := range keys {
|
||||
pointer, _ := gojsonpointer.NewJsonPointer(key)
|
||||
rt, _, _ := pointer.Get(res)
|
||||
if rt != nil {
|
||||
if str, ok := rt.(string); ok {
|
||||
lowRuntime = str
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Filter by pod name/namespace regular expressions.
|
||||
monitorLog.WithFields(logrus.Fields{
|
||||
"low runtime": lowRuntime,
|
||||
"high runtime": highRuntime,
|
||||
}).Debug("")
|
||||
if matchesRegex(types.KataRuntimeNameRegexp, lowRuntime) || matchesRegex("kata*", lowRuntime) {
|
||||
sandboxMap[pod.Id] = highRuntime
|
||||
}
|
||||
}
|
||||
|
||||
return sandboxMap, nil
|
||||
}
|
@ -74,8 +74,14 @@ func registerMetrics() {
|
||||
|
||||
// getMonitorAddress get metrics address for a sandbox, the abstract unix socket address is saved
|
||||
// in `metrics_address` with the same place of `address`.
|
||||
func (km *KataMonitor) getMonitorAddress(sandboxID, namespace string) (string, error) {
|
||||
path := filepath.Join(km.containerdStatePath, types.ContainerdRuntimeTaskPath, namespace, sandboxID, "monitor_address")
|
||||
func (km *KataMonitor) getMonitorAddress(sandboxID, runtime string) (string, error) {
|
||||
path := filepath.Join("/run/containerd", types.ContainerdRuntimeTaskPath, k8sContainerdNamespace, sandboxID, "monitor_address")
|
||||
if runtime == RuntimeCRIO {
|
||||
path = filepath.Join("/run/containers/storage/overlay-containers", sandboxID, "userdata", "monitor_address")
|
||||
}
|
||||
|
||||
monitorLog.WithField("path", path).Debug("get monitor address")
|
||||
|
||||
data, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -173,9 +179,9 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
|
||||
monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count")
|
||||
|
||||
// get metrics from sandbox's shim
|
||||
for sandboxID, namespace := range sandboxes {
|
||||
for sandboxID, runtime := range sandboxes {
|
||||
wg.Add(1)
|
||||
go func(sandboxID, namespace string, results chan<- []*dto.MetricFamily) {
|
||||
go func(sandboxID, runtime string, results chan<- []*dto.MetricFamily) {
|
||||
sandboxMetrics, err := getParsedMetrics(sandboxID)
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
|
||||
@ -184,7 +190,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
|
||||
results <- sandboxMetrics
|
||||
wg.Done()
|
||||
monitorLog.WithField("sandbox_id", sandboxID).Debug("job finished")
|
||||
}(sandboxID, namespace, results)
|
||||
}(sandboxID, runtime, results)
|
||||
|
||||
monitorLog.WithField("sandbox_id", sandboxID).Debug("job started")
|
||||
}
|
||||
@ -278,7 +284,7 @@ func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily,
|
||||
})
|
||||
}
|
||||
|
||||
// Kata shim are using prometheus go client, add an prefix for metric name to avoid confusing
|
||||
// Kata shim are using prometheus go client, add a prefix for metric name to avoid confusing
|
||||
if mf.Name != nil && (strings.HasPrefix(*mf.Name, "go_") || strings.HasPrefix(*mf.Name, "process_")) {
|
||||
mf.Name = mutils.String2Pointer("kata_shim_" + *mf.Name)
|
||||
}
|
||||
|
@ -6,21 +6,24 @@
|
||||
package katamonitor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/defaults"
|
||||
srvconfig "github.com/containerd/containerd/services/server/config"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
// register grpc event types
|
||||
_ "github.com/containerd/containerd/api/events"
|
||||
)
|
||||
|
||||
var monitorLog = logrus.WithField("source", "kata-monitor")
|
||||
|
||||
const (
|
||||
RuntimeContainerd = "containerd"
|
||||
RuntimeCRIO = "cri-o"
|
||||
podCacheRefreshTimeSeconds = 15
|
||||
)
|
||||
|
||||
// SetLogger sets the logger for katamonitor package.
|
||||
func SetLogger(logger *logrus.Entry) {
|
||||
fields := monitorLog.Data
|
||||
@ -29,55 +32,48 @@ func SetLogger(logger *logrus.Entry) {
|
||||
|
||||
// KataMonitor is monitor agent
|
||||
type KataMonitor struct {
|
||||
sandboxCache *sandboxCache
|
||||
containerdAddr string
|
||||
containerdConfigFile string
|
||||
containerdStatePath string
|
||||
sandboxCache *sandboxCache
|
||||
runtimeEndpoint string
|
||||
}
|
||||
|
||||
// NewKataMonitor create and return a new KataMonitor instance
|
||||
func NewKataMonitor(containerdAddr, containerdConfigFile string) (*KataMonitor, error) {
|
||||
if containerdAddr == "" {
|
||||
return nil, fmt.Errorf("containerd serve address missing")
|
||||
func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
|
||||
if runtimeEndpoint == "" {
|
||||
return nil, errors.New("runtime endpoint missing")
|
||||
}
|
||||
|
||||
containerdConf := &srvconfig.Config{
|
||||
State: defaults.DefaultStateDir,
|
||||
}
|
||||
|
||||
if err := srvconfig.LoadConfig(containerdConfigFile, containerdConf); err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
if !strings.HasPrefix(runtimeEndpoint, "unix") {
|
||||
runtimeEndpoint = "unix://" + runtimeEndpoint
|
||||
}
|
||||
|
||||
km := &KataMonitor{
|
||||
containerdAddr: containerdAddr,
|
||||
containerdConfigFile: containerdConfigFile,
|
||||
containerdStatePath: containerdConf.State,
|
||||
runtimeEndpoint: runtimeEndpoint,
|
||||
sandboxCache: &sandboxCache{
|
||||
Mutex: &sync.Mutex{},
|
||||
sandboxes: make(map[string]string),
|
||||
},
|
||||
}
|
||||
|
||||
if err := km.initSandboxCache(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// register metrics
|
||||
registerMetrics()
|
||||
|
||||
go km.sandboxCache.startEventsListener(km.containerdAddr)
|
||||
go km.startPodCacheUpdater()
|
||||
|
||||
return km, nil
|
||||
}
|
||||
|
||||
func (km *KataMonitor) initSandboxCache() error {
|
||||
sandboxes, err := km.getSandboxes()
|
||||
if err != nil {
|
||||
return err
|
||||
// startPodCacheUpdater will boot a thread to manage sandbox cache
|
||||
func (km *KataMonitor) startPodCacheUpdater() {
|
||||
for {
|
||||
time.Sleep(podCacheRefreshTimeSeconds * time.Second)
|
||||
sandboxes, err := km.getSandboxes()
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).Error("failed to get sandboxes")
|
||||
continue
|
||||
}
|
||||
monitorLog.WithField("count", len(sandboxes)).Debug("update sandboxes list")
|
||||
km.sandboxCache.set(sandboxes)
|
||||
}
|
||||
km.sandboxCache.init(sandboxes)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAgentURL returns agent URL
|
||||
@ -117,6 +113,6 @@ func (km *KataMonitor) getSandboxList() []string {
|
||||
return result
|
||||
}
|
||||
|
||||
func (km *KataMonitor) getSandboxNamespace(sandbox string) (string, error) {
|
||||
return km.sandboxCache.getSandboxNamespace(sandbox)
|
||||
func (km *KataMonitor) getSandboxRuntime(sandbox string) (string, error) {
|
||||
return km.sandboxCache.getSandboxRuntime(sandbox)
|
||||
}
|
||||
|
@ -27,12 +27,12 @@ func (km *KataMonitor) composeSocketAddress(r *http.Request) (string, error) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
namespace, err := km.getSandboxNamespace(sandbox)
|
||||
runtime, err := km.getSandboxRuntime(sandbox)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return km.getMonitorAddress(sandbox, namespace)
|
||||
return km.getMonitorAddress(sandbox, runtime)
|
||||
}
|
||||
|
||||
func (km *KataMonitor) proxyRequest(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -1,84 +0,0 @@
|
||||
// Copyright (c) 2020 Ant Financial
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package katamonitor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestComposeSocketAddress(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
path := fmt.Sprintf("/tmp/TestComposeSocketAddress-%d", time.Now().Nanosecond())
|
||||
statePath := filepath.Join(path, "io.containerd.runtime.v2.task")
|
||||
|
||||
sandboxes := map[string]string{"foo": "ns-foo", "bar": "ns-bar"}
|
||||
defer func() {
|
||||
os.RemoveAll(path)
|
||||
}()
|
||||
|
||||
for sandbox, ns := range sandboxes {
|
||||
err := os.MkdirAll(filepath.Join(statePath, ns, sandbox), 0755)
|
||||
assert.Nil(err)
|
||||
f := filepath.Join(statePath, ns, sandbox, "monitor_address")
|
||||
err = ioutil.WriteFile(f, []byte(sandbox), 0644)
|
||||
assert.Nil(err)
|
||||
}
|
||||
|
||||
km := &KataMonitor{
|
||||
containerdStatePath: path,
|
||||
sandboxCache: &sandboxCache{
|
||||
Mutex: &sync.Mutex{},
|
||||
sandboxes: sandboxes,
|
||||
},
|
||||
}
|
||||
|
||||
// nolint: govet
|
||||
testCases := []struct {
|
||||
url string
|
||||
err bool
|
||||
addr string
|
||||
}{
|
||||
{
|
||||
url: "http://localhost:6060/debug/vars",
|
||||
err: true,
|
||||
addr: "",
|
||||
},
|
||||
{
|
||||
url: "http://localhost:6060/debug/vars?sandbox=abc",
|
||||
err: true,
|
||||
addr: "",
|
||||
},
|
||||
{
|
||||
url: "http://localhost:6060/debug/vars?sandbox=foo",
|
||||
err: false,
|
||||
addr: "foo",
|
||||
},
|
||||
{
|
||||
url: "http://localhost:6060/debug/vars?sandbox=bar",
|
||||
err: false,
|
||||
addr: "bar",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
r, err := http.NewRequest("GET", tc.url, nil)
|
||||
assert.Nil(err)
|
||||
|
||||
addr, err := km.composeSocketAddress(r)
|
||||
|
||||
assert.Equal(tc.err, err != nil)
|
||||
assert.Equal(tc.addr, addr)
|
||||
}
|
||||
}
|
@ -6,23 +6,8 @@
|
||||
package katamonitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"encoding/json"
|
||||
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/typeurl"
|
||||
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
|
||||
|
||||
// Register grpc event types
|
||||
_ "github.com/containerd/containerd/api/events"
|
||||
)
|
||||
|
||||
type sandboxCache struct {
|
||||
@ -36,17 +21,6 @@ func (sc *sandboxCache) getAllSandboxes() map[string]string {
|
||||
return sc.sandboxes
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) getSandboxNamespace(sandbox string) (string, error) {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
|
||||
if val, found := sc.sandboxes[sandbox]; found {
|
||||
return val, nil
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("sandbox %s not in cache", sandbox)
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) deleteIfExists(id string) (string, bool) {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
@ -73,116 +47,19 @@ func (sc *sandboxCache) putIfNotExists(id, value string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (sc *sandboxCache) init(sandboxes map[string]string) {
|
||||
func (sc *sandboxCache) set(sandboxes map[string]string) {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
sc.sandboxes = sandboxes
|
||||
}
|
||||
|
||||
// startEventsListener will boot a thread to listen container events to manage sandbox cache
|
||||
func (sc *sandboxCache) startEventsListener(addr string) error {
|
||||
client, err := containerd.New(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer client.Close()
|
||||
func (sc *sandboxCache) getSandboxRuntime(sandbox string) (string, error) {
|
||||
sc.Lock()
|
||||
defer sc.Unlock()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
eventsClient := client.EventService()
|
||||
containerClient := client.ContainerService()
|
||||
|
||||
// only need create/delete events.
|
||||
eventFilters := []string{
|
||||
`topic=="/containers/create"`,
|
||||
`topic=="/containers/delete"`,
|
||||
if val, found := sc.sandboxes[sandbox]; found {
|
||||
return val, nil
|
||||
}
|
||||
|
||||
runtimeNameRegexp, err := regexp.Compile(types.KataRuntimeNameRegexp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eventsCh, errCh := eventsClient.Subscribe(ctx, eventFilters...)
|
||||
for {
|
||||
var e *events.Envelope
|
||||
select {
|
||||
case e = <-eventsCh:
|
||||
case err = <-errCh:
|
||||
monitorLog.WithError(err).Warn("get error from error chan")
|
||||
return err
|
||||
}
|
||||
|
||||
if e != nil {
|
||||
var eventBody []byte
|
||||
if e.Event != nil {
|
||||
v, err := typeurl.UnmarshalAny(e.Event)
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).Warn("cannot unmarshal an event from Any")
|
||||
continue
|
||||
}
|
||||
eventBody, err = json.Marshal(v)
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).Warn("cannot marshal Any into JSON")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if e.Topic == "/containers/create" {
|
||||
// Namespace: k8s.io
|
||||
// Topic: /containers/create
|
||||
// Event: {
|
||||
// "id":"6a2e22e6fffaf1dec63ddabf587ed56069b1809ba67a0d7872fc470528364e66",
|
||||
// "image":"k8s.gcr.io/pause:3.1",
|
||||
// "runtime":{"name":"io.containerd.kata.v2"}
|
||||
// }
|
||||
cc := eventstypes.ContainerCreate{}
|
||||
err := json.Unmarshal(eventBody, &cc)
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).WithField("body", string(eventBody)).Warn("unmarshal ContainerCreate failed")
|
||||
continue
|
||||
}
|
||||
|
||||
// skip non-kata contaienrs
|
||||
if !runtimeNameRegexp.MatchString(cc.Runtime.Name) {
|
||||
continue
|
||||
}
|
||||
|
||||
c, err := getContainer(containerClient, e.Namespace, cc.ID)
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).WithField("container", cc.ID).Warn("failed to get container")
|
||||
continue
|
||||
}
|
||||
|
||||
// if the container is a sandbox container,
|
||||
// means the VM is started, and can start to collect metrics from the VM.
|
||||
if isSandboxContainer(&c) {
|
||||
// we can simply put the contaienrid in sandboxes list if the container is a sandbox container
|
||||
sc.putIfNotExists(cc.ID, e.Namespace)
|
||||
monitorLog.WithField("container", cc.ID).Info("add sandbox to cache")
|
||||
}
|
||||
} else if e.Topic == "/containers/delete" {
|
||||
// Namespace: k8s.io
|
||||
// Topic: /containers/delete
|
||||
// Event: {
|
||||
// "id":"73ec10d2e38070f930310687ab46bbaa532c79d5680fd7f18fff99f759d9385e"
|
||||
// }
|
||||
cd := &eventstypes.ContainerDelete{}
|
||||
err := json.Unmarshal(eventBody, &cd)
|
||||
if err != nil {
|
||||
monitorLog.WithError(err).WithField("body", string(eventBody)).Warn("unmarshal ContainerDelete failed")
|
||||
}
|
||||
|
||||
// if container in sandboxes list, it must be the pause container in the sandbox,
|
||||
// so the contaienr id is the sandbox id
|
||||
// we can simply delete the contaienr from sandboxes list
|
||||
// the last container in a sandbox is deleted, means the VM will stop.
|
||||
_, deleted := sc.deleteIfExists(cd.ID)
|
||||
monitorLog.WithFields(logrus.Fields{"container": cd.ID, "result": deleted}).Info("delete sandbox from cache")
|
||||
} else {
|
||||
monitorLog.WithFields(logrus.Fields{"Namespace": e.Namespace, "Topic": e.Topic, "Event": string(eventBody)}).Error("other events")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("sandbox %s not in cache", sandbox)
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ func TestSandboxCache(t *testing.T) {
|
||||
|
||||
scMap := map[string]string{"111": "222"}
|
||||
|
||||
sc.init(scMap)
|
||||
sc.set(scMap)
|
||||
|
||||
scMap = sc.getAllSandboxes()
|
||||
assert.Equal(1, len(scMap))
|
||||
|
Loading…
Reference in New Issue
Block a user