mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-25 15:02:45 +00:00
kata-monitor: talk to the container engine via the CRI
kata-monitor uses containerd client to retrieve information from the container engine. This makes kata-monitor work with the containerd container engine only. Bin Liu (bin <bin@hyper.sh>) worked on a kata-monitor version able to talk to any container engine leveraging the standard CRI[1]. Here, the original work of Bin Lui has been adapted on the current kata-monitor to make it container engine independent. [1] https://github.com/liubin/kata-containers/tree/fix/1030-use-cri-in-kata-monitor Fixes: #1030 Signed-off-by: Francesco Giudici <fgiudici@redhat.com>
This commit is contained in:
parent
eac05ad6d6
commit
c2f03e8993
@ -9,7 +9,7 @@ import (
|
|||||||
"flag"
|
"flag"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
goruntime "runtime"
|
||||||
"text/template"
|
"text/template"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -18,8 +18,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var monitorListenAddr = flag.String("listen-address", ":8090", "The address to listen on for HTTP requests.")
|
var monitorListenAddr = flag.String("listen-address", ":8090", "The address to listen on for HTTP requests.")
|
||||||
var containerdAddr = flag.String("containerd-address", "/run/containerd/containerd.sock", "Containerd address to accept client requests.")
|
var runtimeEndpoint = flag.String("runtime-endpoint", "/run/containerd/containerd.sock", `Endpoint of CRI container runtime service. (default: "/run/containerd/containerd.sock")`)
|
||||||
var containerdConfig = flag.String("containerd-conf", "/etc/containerd/config.toml", "Containerd config file.")
|
|
||||||
var logLevel = flag.String("log-level", "info", "Log level of logrus(trace/debug/info/warn/error/fatal/panic).")
|
var logLevel = flag.String("log-level", "info", "Log level of logrus(trace/debug/info/warn/error/fatal/panic).")
|
||||||
|
|
||||||
// These values are overridden via ldflags
|
// These values are overridden via ldflags
|
||||||
@ -59,9 +58,9 @@ func main() {
|
|||||||
ver := versionInfo{
|
ver := versionInfo{
|
||||||
AppName: appName,
|
AppName: appName,
|
||||||
Version: version,
|
Version: version,
|
||||||
GoVersion: runtime.Version(),
|
GoVersion: goruntime.Version(),
|
||||||
Os: runtime.GOOS,
|
Os: goruntime.GOOS,
|
||||||
Arch: runtime.GOARCH,
|
Arch: goruntime.GOARCH,
|
||||||
GitCommit: GitCommit,
|
GitCommit: GitCommit,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,15 +85,14 @@ func main() {
|
|||||||
|
|
||||||
// properties from command-line options
|
// properties from command-line options
|
||||||
"listen-address": *monitorListenAddr,
|
"listen-address": *monitorListenAddr,
|
||||||
"containerd-address": *containerdAddr,
|
"runtime-endpoint": *runtimeEndpoint,
|
||||||
"containerd-conf": *containerdConfig,
|
|
||||||
"log-level": *logLevel,
|
"log-level": *logLevel,
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.WithFields(announceFields).Info("announce")
|
logrus.WithFields(announceFields).Info("announce")
|
||||||
|
|
||||||
// create new kataMonitor
|
// create new kataMonitor
|
||||||
km, err := kataMonitor.NewKataMonitor(*containerdAddr, *containerdConfig)
|
km, err := kataMonitor.NewKataMonitor(*runtimeEndpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -1,107 +0,0 @@
|
|||||||
// Copyright (c) 2020 Ant Financial
|
|
||||||
//
|
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
|
||||||
//
|
|
||||||
|
|
||||||
package katamonitor
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/containerd/containerd"
|
|
||||||
"github.com/containerd/containerd/containers"
|
|
||||||
"github.com/containerd/containerd/namespaces"
|
|
||||||
"github.com/containerd/typeurl"
|
|
||||||
|
|
||||||
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
|
|
||||||
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
|
|
||||||
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/oci"
|
|
||||||
"github.com/opencontainers/runtime-spec/specs-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func getContainer(containersClient containers.Store, namespace, cid string) (containers.Container, error) {
|
|
||||||
ctx := context.Background()
|
|
||||||
ctx = namespaces.WithNamespace(ctx, namespace)
|
|
||||||
return containersClient.Get(ctx, cid)
|
|
||||||
}
|
|
||||||
|
|
||||||
// isSandboxContainer return true if the container is a sandbox container.
|
|
||||||
func isSandboxContainer(c *containers.Container) bool {
|
|
||||||
// unmarshal from any to spec.
|
|
||||||
if c.Spec == nil {
|
|
||||||
monitorLog.WithField("container", c.ID).Error("container spec is nil")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
v, err := typeurl.UnmarshalAny(c.Spec)
|
|
||||||
if err != nil {
|
|
||||||
monitorLog.WithError(err).Error("failed to Unmarshal container spec")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// convert to oci spec type
|
|
||||||
ociSpec := v.(*specs.Spec)
|
|
||||||
|
|
||||||
// get container type
|
|
||||||
containerType, err := oci.ContainerType(*ociSpec)
|
|
||||||
if err != nil {
|
|
||||||
monitorLog.WithError(err).Error("failed to get contaienr type")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// return if is a sandbox container
|
|
||||||
return containerType == vc.PodSandbox
|
|
||||||
}
|
|
||||||
|
|
||||||
// getSandboxes get kata sandbox from containerd.
|
|
||||||
// this will be called only after monitor start.
|
|
||||||
func (ka *KataMonitor) getSandboxes() (map[string]string, error) {
|
|
||||||
client, err := containerd.New(ka.containerdAddr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
// first all namespaces.
|
|
||||||
namespaceList, err := client.NamespaceService().List(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// map of type: <key:sandbox_id => value: namespace>
|
|
||||||
sandboxMap := make(map[string]string)
|
|
||||||
|
|
||||||
for _, namespace := range namespaceList {
|
|
||||||
|
|
||||||
initSandboxByNamespaceFunc := func(namespace string) error {
|
|
||||||
ctx := context.Background()
|
|
||||||
namespacedCtx := namespaces.WithNamespace(ctx, namespace)
|
|
||||||
// only list Kata Containers pods/containers
|
|
||||||
containers, err := client.ContainerService().List(namespacedCtx,
|
|
||||||
"runtime.name~="+types.KataRuntimeNameRegexp+`,labels."io.cri-containerd.kind"==sandbox`)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range containers {
|
|
||||||
c := containers[i]
|
|
||||||
isc := isSandboxContainer(&c)
|
|
||||||
monitorLog.WithFields(logrus.Fields{"container": c.ID, "result": isc}).Debug("is this a sandbox container?")
|
|
||||||
if isc {
|
|
||||||
sandboxMap[c.ID] = namespace
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := initSandboxByNamespaceFunc(namespace); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return sandboxMap, nil
|
|
||||||
}
|
|
@ -1,76 +0,0 @@
|
|||||||
// Copyright (c) 2020 Ant Financial
|
|
||||||
//
|
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
|
||||||
//
|
|
||||||
|
|
||||||
package katamonitor
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
criContainerdAnnotations "github.com/containerd/cri-containerd/pkg/annotations"
|
|
||||||
"github.com/containerd/typeurl"
|
|
||||||
|
|
||||||
"github.com/containerd/containerd/containers"
|
|
||||||
"github.com/opencontainers/runtime-spec/specs-go"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestIsSandboxContainer(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
c := &containers.Container{}
|
|
||||||
isc := isSandboxContainer(c)
|
|
||||||
assert.Equal(false, isc, "should not be a sandbox container")
|
|
||||||
|
|
||||||
spec := &specs.Spec{
|
|
||||||
Annotations: map[string]string{},
|
|
||||||
}
|
|
||||||
|
|
||||||
any, err := typeurl.MarshalAny(spec)
|
|
||||||
assert.Nil(err, "MarshalAny failed for spec")
|
|
||||||
|
|
||||||
c.Spec = any
|
|
||||||
// default container is a pod(sandbox) container
|
|
||||||
isc = isSandboxContainer(c)
|
|
||||||
assert.Equal(true, isc, "should be a sandbox container")
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
annotationKey string
|
|
||||||
annotationValue string
|
|
||||||
result bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
annotationKey: criContainerdAnnotations.ContainerType,
|
|
||||||
annotationValue: "",
|
|
||||||
result: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
annotationKey: criContainerdAnnotations.ContainerType,
|
|
||||||
annotationValue: criContainerdAnnotations.ContainerTypeContainer,
|
|
||||||
result: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
annotationKey: criContainerdAnnotations.ContainerType,
|
|
||||||
annotationValue: "pod",
|
|
||||||
result: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
annotationKey: criContainerdAnnotations.ContainerType,
|
|
||||||
annotationValue: criContainerdAnnotations.ContainerTypeSandbox,
|
|
||||||
result: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
spec.Annotations = map[string]string{
|
|
||||||
tc.annotationKey: tc.annotationValue,
|
|
||||||
}
|
|
||||||
any, err := typeurl.MarshalAny(spec)
|
|
||||||
assert.Nil(err, "MarshalAny failed for spec")
|
|
||||||
c.Spec = any
|
|
||||||
isc = isSandboxContainer(c)
|
|
||||||
assert.Equal(tc.result, isc, "assert failed for checking if is a sandbox container")
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
221
src/runtime/pkg/kata-monitor/cri.go
Normal file
221
src/runtime/pkg/kata-monitor/cri.go
Normal file
@ -0,0 +1,221 @@
|
|||||||
|
// Copyright (c) 2020 Ant Group
|
||||||
|
// Copyright (c) 2021 Red Hat Inc.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
//
|
||||||
|
|
||||||
|
package katamonitor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"regexp"
|
||||||
|
|
||||||
|
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/xeipuuv/gojsonpointer"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
pb "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// unixProtocol is the network protocol of unix socket.
|
||||||
|
unixProtocol = "unix"
|
||||||
|
|
||||||
|
k8sContainerdNamespace = "k8s.io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// getAddressAndDialer returns the address parsed from the given endpoint and a context dialer.
|
||||||
|
func getAddressAndDialer(endpoint string) (string, func(ctx context.Context, addr string) (net.Conn, error), error) {
|
||||||
|
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, err
|
||||||
|
}
|
||||||
|
if protocol != unixProtocol {
|
||||||
|
return "", nil, fmt.Errorf("only support unix socket endpoint")
|
||||||
|
}
|
||||||
|
|
||||||
|
return addr, dial, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getConnection(endPoint string) (*grpc.ClientConn, error) {
|
||||||
|
var conn *grpc.ClientConn
|
||||||
|
monitorLog.Debugf("connect using endpoint '%s' with '%s' timeout", endPoint, defaultTimeout)
|
||||||
|
addr, dialer, err := getAddressAndDialer(endPoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
|
||||||
|
defer cancel()
|
||||||
|
conn, err = grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithContextDialer(dialer))
|
||||||
|
if err != nil {
|
||||||
|
errMsg := errors.Wrapf(err, "connect endpoint '%s', make sure you are running as root and the endpoint has been started", endPoint)
|
||||||
|
return nil, errMsg
|
||||||
|
}
|
||||||
|
monitorLog.Debugf("connected successfully using endpoint: %s", endPoint)
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func matchesRegex(pattern, target string) bool {
|
||||||
|
if pattern == "" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
matched, err := regexp.MatchString(pattern, target)
|
||||||
|
if err != nil {
|
||||||
|
// Assume it's not a match if an error occurs.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return matched
|
||||||
|
}
|
||||||
|
|
||||||
|
func closeConnection(conn *grpc.ClientConn) error {
|
||||||
|
if conn == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func getRuntimeClient(runtimeEndpoint string) (pb.RuntimeServiceClient, *grpc.ClientConn, error) {
|
||||||
|
var (
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
// Set up a connection to the server.
|
||||||
|
// If no EndPoint set then use the default endpoint types
|
||||||
|
conn, err = getConnection(runtimeEndpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
runtimeClient := pb.NewRuntimeServiceClient(conn)
|
||||||
|
return runtimeClient, conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func dial(ctx context.Context, addr string) (net.Conn, error) {
|
||||||
|
return (&net.Dialer{}).DialContext(ctx, unixProtocol, addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string) (protocol string, addr string, err error) {
|
||||||
|
if protocol, addr, err = parseEndpoint(endpoint); err != nil && protocol == "" {
|
||||||
|
fallbackEndpoint := fallbackProtocol + "://" + endpoint
|
||||||
|
protocol, addr, err = parseEndpoint(fallbackEndpoint)
|
||||||
|
if err == nil {
|
||||||
|
monitorLog.Warningf("Using %q as endpoint is deprecated, please consider using full url format %q.", endpoint, fallbackEndpoint)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseEndpoint(endpoint string) (string, string, error) {
|
||||||
|
u, err := url.Parse(endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return "", "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch u.Scheme {
|
||||||
|
case "tcp":
|
||||||
|
return "tcp", u.Host, nil
|
||||||
|
|
||||||
|
case "unix":
|
||||||
|
return "unix", u.Path, nil
|
||||||
|
|
||||||
|
case "":
|
||||||
|
return "", "", fmt.Errorf("using %q as endpoint is deprecated, please consider using full url format", endpoint)
|
||||||
|
|
||||||
|
default:
|
||||||
|
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getSandboxes get kata sandbox from the container engine.
|
||||||
|
// this will be called only after monitor start.
|
||||||
|
func (km *KataMonitor) getSandboxes() (map[string]string, error) {
|
||||||
|
|
||||||
|
sandboxMap := map[string]string{}
|
||||||
|
runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint)
|
||||||
|
if err != nil {
|
||||||
|
return sandboxMap, err
|
||||||
|
}
|
||||||
|
defer closeConnection(runtimeConn)
|
||||||
|
|
||||||
|
filter := &pb.PodSandboxFilter{
|
||||||
|
State: &pb.PodSandboxStateValue{
|
||||||
|
State: pb.PodSandboxState_SANDBOX_READY,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
request := &pb.ListPodSandboxRequest{
|
||||||
|
Filter: filter,
|
||||||
|
}
|
||||||
|
monitorLog.Debugf("ListPodSandboxRequest: %v", request)
|
||||||
|
r, err := runtimeClient.ListPodSandbox(context.Background(), request)
|
||||||
|
if err != nil {
|
||||||
|
return sandboxMap, err
|
||||||
|
}
|
||||||
|
monitorLog.Debugf("ListPodSandboxResponse: %v", r)
|
||||||
|
|
||||||
|
for _, pod := range r.Items {
|
||||||
|
request := &pb.PodSandboxStatusRequest{
|
||||||
|
PodSandboxId: pod.Id,
|
||||||
|
Verbose: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
r, err := runtimeClient.PodSandboxStatus(context.Background(), request)
|
||||||
|
if err != nil {
|
||||||
|
return sandboxMap, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lowRuntime := ""
|
||||||
|
highRuntime := ""
|
||||||
|
var res map[string]interface{}
|
||||||
|
if err := json.Unmarshal([]byte(r.Info["info"]), &res); err != nil {
|
||||||
|
monitorLog.WithError(err).WithField("pod", r).Error("failed to Unmarshal pod info")
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
monitorLog.WithField("pod info", res).Debug("")
|
||||||
|
// get high level container runtime
|
||||||
|
pointer, _ := gojsonpointer.NewJsonPointer("/runtimeSpec/annotations/io.container.manager")
|
||||||
|
rt, _, _ := pointer.Get(res)
|
||||||
|
if rt != nil {
|
||||||
|
if str, ok := rt.(string); ok {
|
||||||
|
if str == "cri-o" {
|
||||||
|
highRuntime = RuntimeCRIO
|
||||||
|
} else {
|
||||||
|
highRuntime = RuntimeContainerd
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// get low level container runtime
|
||||||
|
// containerd stores the pod runtime in "/runtimeType" while CRI-O stores it the
|
||||||
|
// io.kubernetes.cri-o.RuntimeHandler annotation: check for both.
|
||||||
|
keys := []string{"/runtimeType", "/runtimeSpec/annotations/io.kubernetes.cri-o.RuntimeHandler"}
|
||||||
|
for _, key := range keys {
|
||||||
|
pointer, _ := gojsonpointer.NewJsonPointer(key)
|
||||||
|
rt, _, _ := pointer.Get(res)
|
||||||
|
if rt != nil {
|
||||||
|
if str, ok := rt.(string); ok {
|
||||||
|
lowRuntime = str
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter by pod name/namespace regular expressions.
|
||||||
|
monitorLog.WithFields(logrus.Fields{
|
||||||
|
"low runtime": lowRuntime,
|
||||||
|
"high runtime": highRuntime,
|
||||||
|
}).Debug("")
|
||||||
|
if matchesRegex(types.KataRuntimeNameRegexp, lowRuntime) || matchesRegex("kata*", lowRuntime) {
|
||||||
|
sandboxMap[pod.Id] = highRuntime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return sandboxMap, nil
|
||||||
|
}
|
@ -74,8 +74,14 @@ func registerMetrics() {
|
|||||||
|
|
||||||
// getMonitorAddress get metrics address for a sandbox, the abstract unix socket address is saved
|
// getMonitorAddress get metrics address for a sandbox, the abstract unix socket address is saved
|
||||||
// in `metrics_address` with the same place of `address`.
|
// in `metrics_address` with the same place of `address`.
|
||||||
func (km *KataMonitor) getMonitorAddress(sandboxID, namespace string) (string, error) {
|
func (km *KataMonitor) getMonitorAddress(sandboxID, runtime string) (string, error) {
|
||||||
path := filepath.Join(km.containerdStatePath, types.ContainerdRuntimeTaskPath, namespace, sandboxID, "monitor_address")
|
path := filepath.Join("/run/containerd", types.ContainerdRuntimeTaskPath, k8sContainerdNamespace, sandboxID, "monitor_address")
|
||||||
|
if runtime == RuntimeCRIO {
|
||||||
|
path = filepath.Join("/run/containers/storage/overlay-containers", sandboxID, "userdata", "monitor_address")
|
||||||
|
}
|
||||||
|
|
||||||
|
monitorLog.WithField("path", path).Debug("get monitor address")
|
||||||
|
|
||||||
data, err := ioutil.ReadFile(path)
|
data, err := ioutil.ReadFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
@ -173,9 +179,9 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
|
|||||||
monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count")
|
monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count")
|
||||||
|
|
||||||
// get metrics from sandbox's shim
|
// get metrics from sandbox's shim
|
||||||
for sandboxID, namespace := range sandboxes {
|
for sandboxID, runtime := range sandboxes {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(sandboxID, namespace string, results chan<- []*dto.MetricFamily) {
|
go func(sandboxID, runtime string, results chan<- []*dto.MetricFamily) {
|
||||||
sandboxMetrics, err := getParsedMetrics(sandboxID)
|
sandboxMetrics, err := getParsedMetrics(sandboxID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
|
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
|
||||||
@ -184,7 +190,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
|
|||||||
results <- sandboxMetrics
|
results <- sandboxMetrics
|
||||||
wg.Done()
|
wg.Done()
|
||||||
monitorLog.WithField("sandbox_id", sandboxID).Debug("job finished")
|
monitorLog.WithField("sandbox_id", sandboxID).Debug("job finished")
|
||||||
}(sandboxID, namespace, results)
|
}(sandboxID, runtime, results)
|
||||||
|
|
||||||
monitorLog.WithField("sandbox_id", sandboxID).Debug("job started")
|
monitorLog.WithField("sandbox_id", sandboxID).Debug("job started")
|
||||||
}
|
}
|
||||||
@ -278,7 +284,7 @@ func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily,
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Kata shim are using prometheus go client, add an prefix for metric name to avoid confusing
|
// Kata shim are using prometheus go client, add a prefix for metric name to avoid confusing
|
||||||
if mf.Name != nil && (strings.HasPrefix(*mf.Name, "go_") || strings.HasPrefix(*mf.Name, "process_")) {
|
if mf.Name != nil && (strings.HasPrefix(*mf.Name, "go_") || strings.HasPrefix(*mf.Name, "process_")) {
|
||||||
mf.Name = mutils.String2Pointer("kata_shim_" + *mf.Name)
|
mf.Name = mutils.String2Pointer("kata_shim_" + *mf.Name)
|
||||||
}
|
}
|
||||||
|
@ -6,21 +6,24 @@
|
|||||||
package katamonitor
|
package katamonitor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/defaults"
|
|
||||||
srvconfig "github.com/containerd/containerd/services/server/config"
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
// register grpc event types
|
|
||||||
_ "github.com/containerd/containerd/api/events"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var monitorLog = logrus.WithField("source", "kata-monitor")
|
var monitorLog = logrus.WithField("source", "kata-monitor")
|
||||||
|
|
||||||
|
const (
|
||||||
|
RuntimeContainerd = "containerd"
|
||||||
|
RuntimeCRIO = "cri-o"
|
||||||
|
podCacheRefreshTimeSeconds = 15
|
||||||
|
)
|
||||||
|
|
||||||
// SetLogger sets the logger for katamonitor package.
|
// SetLogger sets the logger for katamonitor package.
|
||||||
func SetLogger(logger *logrus.Entry) {
|
func SetLogger(logger *logrus.Entry) {
|
||||||
fields := monitorLog.Data
|
fields := monitorLog.Data
|
||||||
@ -30,54 +33,47 @@ func SetLogger(logger *logrus.Entry) {
|
|||||||
// KataMonitor is monitor agent
|
// KataMonitor is monitor agent
|
||||||
type KataMonitor struct {
|
type KataMonitor struct {
|
||||||
sandboxCache *sandboxCache
|
sandboxCache *sandboxCache
|
||||||
containerdAddr string
|
runtimeEndpoint string
|
||||||
containerdConfigFile string
|
|
||||||
containerdStatePath string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKataMonitor create and return a new KataMonitor instance
|
// NewKataMonitor create and return a new KataMonitor instance
|
||||||
func NewKataMonitor(containerdAddr, containerdConfigFile string) (*KataMonitor, error) {
|
func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) {
|
||||||
if containerdAddr == "" {
|
if runtimeEndpoint == "" {
|
||||||
return nil, fmt.Errorf("containerd serve address missing")
|
return nil, errors.New("runtime endpoint missing")
|
||||||
}
|
}
|
||||||
|
|
||||||
containerdConf := &srvconfig.Config{
|
if !strings.HasPrefix(runtimeEndpoint, "unix") {
|
||||||
State: defaults.DefaultStateDir,
|
runtimeEndpoint = "unix://" + runtimeEndpoint
|
||||||
}
|
|
||||||
|
|
||||||
if err := srvconfig.LoadConfig(containerdConfigFile, containerdConf); err != nil && !os.IsNotExist(err) {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
km := &KataMonitor{
|
km := &KataMonitor{
|
||||||
containerdAddr: containerdAddr,
|
runtimeEndpoint: runtimeEndpoint,
|
||||||
containerdConfigFile: containerdConfigFile,
|
|
||||||
containerdStatePath: containerdConf.State,
|
|
||||||
sandboxCache: &sandboxCache{
|
sandboxCache: &sandboxCache{
|
||||||
Mutex: &sync.Mutex{},
|
Mutex: &sync.Mutex{},
|
||||||
sandboxes: make(map[string]string),
|
sandboxes: make(map[string]string),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := km.initSandboxCache(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// register metrics
|
// register metrics
|
||||||
registerMetrics()
|
registerMetrics()
|
||||||
|
|
||||||
go km.sandboxCache.startEventsListener(km.containerdAddr)
|
go km.startPodCacheUpdater()
|
||||||
|
|
||||||
return km, nil
|
return km, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (km *KataMonitor) initSandboxCache() error {
|
// startPodCacheUpdater will boot a thread to manage sandbox cache
|
||||||
|
func (km *KataMonitor) startPodCacheUpdater() {
|
||||||
|
for {
|
||||||
|
time.Sleep(podCacheRefreshTimeSeconds * time.Second)
|
||||||
sandboxes, err := km.getSandboxes()
|
sandboxes, err := km.getSandboxes()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
monitorLog.WithError(err).Error("failed to get sandboxes")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
monitorLog.WithField("count", len(sandboxes)).Debug("update sandboxes list")
|
||||||
|
km.sandboxCache.set(sandboxes)
|
||||||
}
|
}
|
||||||
km.sandboxCache.init(sandboxes)
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAgentURL returns agent URL
|
// GetAgentURL returns agent URL
|
||||||
@ -117,6 +113,6 @@ func (km *KataMonitor) getSandboxList() []string {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (km *KataMonitor) getSandboxNamespace(sandbox string) (string, error) {
|
func (km *KataMonitor) getSandboxRuntime(sandbox string) (string, error) {
|
||||||
return km.sandboxCache.getSandboxNamespace(sandbox)
|
return km.sandboxCache.getSandboxRuntime(sandbox)
|
||||||
}
|
}
|
||||||
|
@ -27,12 +27,12 @@ func (km *KataMonitor) composeSocketAddress(r *http.Request) (string, error) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace, err := km.getSandboxNamespace(sandbox)
|
runtime, err := km.getSandboxRuntime(sandbox)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return km.getMonitorAddress(sandbox, namespace)
|
return km.getMonitorAddress(sandbox, runtime)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (km *KataMonitor) proxyRequest(w http.ResponseWriter, r *http.Request) {
|
func (km *KataMonitor) proxyRequest(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -1,84 +0,0 @@
|
|||||||
// Copyright (c) 2020 Ant Financial
|
|
||||||
//
|
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
|
||||||
//
|
|
||||||
|
|
||||||
package katamonitor
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestComposeSocketAddress(t *testing.T) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
path := fmt.Sprintf("/tmp/TestComposeSocketAddress-%d", time.Now().Nanosecond())
|
|
||||||
statePath := filepath.Join(path, "io.containerd.runtime.v2.task")
|
|
||||||
|
|
||||||
sandboxes := map[string]string{"foo": "ns-foo", "bar": "ns-bar"}
|
|
||||||
defer func() {
|
|
||||||
os.RemoveAll(path)
|
|
||||||
}()
|
|
||||||
|
|
||||||
for sandbox, ns := range sandboxes {
|
|
||||||
err := os.MkdirAll(filepath.Join(statePath, ns, sandbox), 0755)
|
|
||||||
assert.Nil(err)
|
|
||||||
f := filepath.Join(statePath, ns, sandbox, "monitor_address")
|
|
||||||
err = ioutil.WriteFile(f, []byte(sandbox), 0644)
|
|
||||||
assert.Nil(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
km := &KataMonitor{
|
|
||||||
containerdStatePath: path,
|
|
||||||
sandboxCache: &sandboxCache{
|
|
||||||
Mutex: &sync.Mutex{},
|
|
||||||
sandboxes: sandboxes,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// nolint: govet
|
|
||||||
testCases := []struct {
|
|
||||||
url string
|
|
||||||
err bool
|
|
||||||
addr string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
url: "http://localhost:6060/debug/vars",
|
|
||||||
err: true,
|
|
||||||
addr: "",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
url: "http://localhost:6060/debug/vars?sandbox=abc",
|
|
||||||
err: true,
|
|
||||||
addr: "",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
url: "http://localhost:6060/debug/vars?sandbox=foo",
|
|
||||||
err: false,
|
|
||||||
addr: "foo",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
url: "http://localhost:6060/debug/vars?sandbox=bar",
|
|
||||||
err: false,
|
|
||||||
addr: "bar",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
r, err := http.NewRequest("GET", tc.url, nil)
|
|
||||||
assert.Nil(err)
|
|
||||||
|
|
||||||
addr, err := km.composeSocketAddress(r)
|
|
||||||
|
|
||||||
assert.Equal(tc.err, err != nil)
|
|
||||||
assert.Equal(tc.addr, addr)
|
|
||||||
}
|
|
||||||
}
|
|
@ -6,23 +6,8 @@
|
|||||||
package katamonitor
|
package katamonitor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/containerd/containerd"
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
eventstypes "github.com/containerd/containerd/api/events"
|
|
||||||
"github.com/containerd/containerd/events"
|
|
||||||
"github.com/containerd/typeurl"
|
|
||||||
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
|
|
||||||
|
|
||||||
// Register grpc event types
|
|
||||||
_ "github.com/containerd/containerd/api/events"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type sandboxCache struct {
|
type sandboxCache struct {
|
||||||
@ -36,17 +21,6 @@ func (sc *sandboxCache) getAllSandboxes() map[string]string {
|
|||||||
return sc.sandboxes
|
return sc.sandboxes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *sandboxCache) getSandboxNamespace(sandbox string) (string, error) {
|
|
||||||
sc.Lock()
|
|
||||||
defer sc.Unlock()
|
|
||||||
|
|
||||||
if val, found := sc.sandboxes[sandbox]; found {
|
|
||||||
return val, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return "", fmt.Errorf("sandbox %s not in cache", sandbox)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sc *sandboxCache) deleteIfExists(id string) (string, bool) {
|
func (sc *sandboxCache) deleteIfExists(id string) (string, bool) {
|
||||||
sc.Lock()
|
sc.Lock()
|
||||||
defer sc.Unlock()
|
defer sc.Unlock()
|
||||||
@ -73,116 +47,19 @@ func (sc *sandboxCache) putIfNotExists(id, value string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *sandboxCache) init(sandboxes map[string]string) {
|
func (sc *sandboxCache) set(sandboxes map[string]string) {
|
||||||
sc.Lock()
|
sc.Lock()
|
||||||
defer sc.Unlock()
|
defer sc.Unlock()
|
||||||
sc.sandboxes = sandboxes
|
sc.sandboxes = sandboxes
|
||||||
}
|
}
|
||||||
|
|
||||||
// startEventsListener will boot a thread to listen container events to manage sandbox cache
|
func (sc *sandboxCache) getSandboxRuntime(sandbox string) (string, error) {
|
||||||
func (sc *sandboxCache) startEventsListener(addr string) error {
|
sc.Lock()
|
||||||
client, err := containerd.New(addr)
|
defer sc.Unlock()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
ctx := context.Background()
|
if val, found := sc.sandboxes[sandbox]; found {
|
||||||
|
return val, nil
|
||||||
eventsClient := client.EventService()
|
|
||||||
containerClient := client.ContainerService()
|
|
||||||
|
|
||||||
// only need create/delete events.
|
|
||||||
eventFilters := []string{
|
|
||||||
`topic=="/containers/create"`,
|
|
||||||
`topic=="/containers/delete"`,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
runtimeNameRegexp, err := regexp.Compile(types.KataRuntimeNameRegexp)
|
return "", fmt.Errorf("sandbox %s not in cache", sandbox)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
eventsCh, errCh := eventsClient.Subscribe(ctx, eventFilters...)
|
|
||||||
for {
|
|
||||||
var e *events.Envelope
|
|
||||||
select {
|
|
||||||
case e = <-eventsCh:
|
|
||||||
case err = <-errCh:
|
|
||||||
monitorLog.WithError(err).Warn("get error from error chan")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if e != nil {
|
|
||||||
var eventBody []byte
|
|
||||||
if e.Event != nil {
|
|
||||||
v, err := typeurl.UnmarshalAny(e.Event)
|
|
||||||
if err != nil {
|
|
||||||
monitorLog.WithError(err).Warn("cannot unmarshal an event from Any")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
eventBody, err = json.Marshal(v)
|
|
||||||
if err != nil {
|
|
||||||
monitorLog.WithError(err).Warn("cannot marshal Any into JSON")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if e.Topic == "/containers/create" {
|
|
||||||
// Namespace: k8s.io
|
|
||||||
// Topic: /containers/create
|
|
||||||
// Event: {
|
|
||||||
// "id":"6a2e22e6fffaf1dec63ddabf587ed56069b1809ba67a0d7872fc470528364e66",
|
|
||||||
// "image":"k8s.gcr.io/pause:3.1",
|
|
||||||
// "runtime":{"name":"io.containerd.kata.v2"}
|
|
||||||
// }
|
|
||||||
cc := eventstypes.ContainerCreate{}
|
|
||||||
err := json.Unmarshal(eventBody, &cc)
|
|
||||||
if err != nil {
|
|
||||||
monitorLog.WithError(err).WithField("body", string(eventBody)).Warn("unmarshal ContainerCreate failed")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// skip non-kata contaienrs
|
|
||||||
if !runtimeNameRegexp.MatchString(cc.Runtime.Name) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
c, err := getContainer(containerClient, e.Namespace, cc.ID)
|
|
||||||
if err != nil {
|
|
||||||
monitorLog.WithError(err).WithField("container", cc.ID).Warn("failed to get container")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the container is a sandbox container,
|
|
||||||
// means the VM is started, and can start to collect metrics from the VM.
|
|
||||||
if isSandboxContainer(&c) {
|
|
||||||
// we can simply put the contaienrid in sandboxes list if the container is a sandbox container
|
|
||||||
sc.putIfNotExists(cc.ID, e.Namespace)
|
|
||||||
monitorLog.WithField("container", cc.ID).Info("add sandbox to cache")
|
|
||||||
}
|
|
||||||
} else if e.Topic == "/containers/delete" {
|
|
||||||
// Namespace: k8s.io
|
|
||||||
// Topic: /containers/delete
|
|
||||||
// Event: {
|
|
||||||
// "id":"73ec10d2e38070f930310687ab46bbaa532c79d5680fd7f18fff99f759d9385e"
|
|
||||||
// }
|
|
||||||
cd := &eventstypes.ContainerDelete{}
|
|
||||||
err := json.Unmarshal(eventBody, &cd)
|
|
||||||
if err != nil {
|
|
||||||
monitorLog.WithError(err).WithField("body", string(eventBody)).Warn("unmarshal ContainerDelete failed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// if container in sandboxes list, it must be the pause container in the sandbox,
|
|
||||||
// so the contaienr id is the sandbox id
|
|
||||||
// we can simply delete the contaienr from sandboxes list
|
|
||||||
// the last container in a sandbox is deleted, means the VM will stop.
|
|
||||||
_, deleted := sc.deleteIfExists(cd.ID)
|
|
||||||
monitorLog.WithFields(logrus.Fields{"container": cd.ID, "result": deleted}).Info("delete sandbox from cache")
|
|
||||||
} else {
|
|
||||||
monitorLog.WithFields(logrus.Fields{"Namespace": e.Namespace, "Topic": e.Topic, "Event": string(eventBody)}).Error("other events")
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ func TestSandboxCache(t *testing.T) {
|
|||||||
|
|
||||||
scMap := map[string]string{"111": "222"}
|
scMap := map[string]string{"111": "222"}
|
||||||
|
|
||||||
sc.init(scMap)
|
sc.set(scMap)
|
||||||
|
|
||||||
scMap = sc.getAllSandboxes()
|
scMap = sc.getAllSandboxes()
|
||||||
assert.Equal(1, len(scMap))
|
assert.Equal(1, len(scMap))
|
||||||
|
Loading…
Reference in New Issue
Block a user