Merge pull request #1811 from egernst/monitor-cleanup

Monitor cleanup
This commit is contained in:
Eric Ernst 2021-05-07 21:03:34 -07:00 committed by GitHub
commit 12a04cb0ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 23 additions and 48 deletions

View File

@ -14,7 +14,6 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"path/filepath"
"strings" "strings"
"sync" "sync"
@ -37,10 +36,8 @@ const (
subCommandName = "exec" subCommandName = "exec"
// command-line parameters name // command-line parameters name
paramRuntimeNamespace = "runtime-namespace"
paramDebugConsolePort = "kata-debug-port" paramDebugConsolePort = "kata-debug-port"
defaultKernelParamDebugConsoleVPortValue = 1026 defaultKernelParamDebugConsoleVPortValue = 1026
defaultRuntimeNamespace = "k8s.io"
) )
var ( var (
@ -56,21 +53,12 @@ var kataExecCLICommand = cli.Command{
Name: subCommandName, Name: subCommandName,
Usage: "Enter into guest by debug console", Usage: "Enter into guest by debug console",
Flags: []cli.Flag{ Flags: []cli.Flag{
cli.StringFlag{
Name: paramRuntimeNamespace,
Usage: "Namespace that containerd or CRI-O are using for containers. (Default: k8s.io, only works for containerd)",
},
cli.Uint64Flag{ cli.Uint64Flag{
Name: paramDebugConsolePort, Name: paramDebugConsolePort,
Usage: "Port that debug console is listening on. (Default: 1026)", Usage: "Port that debug console is listening on. (Default: 1026)",
}, },
}, },
Action: func(context *cli.Context) error { Action: func(context *cli.Context) error {
namespace := context.String(paramRuntimeNamespace)
if namespace == "" {
namespace = defaultRuntimeNamespace
}
port := context.Uint64(paramDebugConsolePort) port := context.Uint64(paramDebugConsolePort)
if port == 0 { if port == 0 {
port = defaultKernelParamDebugConsoleVPortValue port = defaultKernelParamDebugConsoleVPortValue
@ -82,7 +70,8 @@ var kataExecCLICommand = cli.Command{
return err return err
} }
conn, err := getConn(namespace, sandboxID, port) conn, err := getConn(sandboxID, port)
if err != nil { if err != nil {
return err return err
} }
@ -165,9 +154,8 @@ func (s *iostream) Read(data []byte) (n int, err error) {
return s.conn.Read(data) return s.conn.Read(data)
} }
func getConn(namespace, sandboxID string, port uint64) (net.Conn, error) { func getConn(sandboxID string, port uint64) (net.Conn, error) {
socketAddr := filepath.Join(string(filepath.Separator), "containerd-shim", namespace, sandboxID, "shim-monitor.sock") client, err := kataMonitor.BuildShimClient(sandboxID, defaultTimeout)
client, err := kataMonitor.BuildUnixSocketClient(socketAddr, defaultTimeout)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -178,7 +166,7 @@ func getConn(namespace, sandboxID string, port uint64) (net.Conn, error) {
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Failed to get %s: %d", socketAddr, resp.StatusCode) return nil, fmt.Errorf("Failure from %s shim-monitor: %d", sandboxID, resp.StatusCode)
} }
defer resp.Body.Close() defer resp.Body.Close()

View File

@ -16,7 +16,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/containerd/containerd/namespaces"
cdshim "github.com/containerd/containerd/runtime/v2/shim" cdshim "github.com/containerd/containerd/runtime/v2/shim"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers" vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations" vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations"
@ -129,11 +128,7 @@ func decodeAgentMetrics(body string) []*dto.MetricFamily {
func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec) { func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec) {
// metrics socket will under sandbox's bundle path // metrics socket will under sandbox's bundle path
metricsAddress, err := socketAddress(ctx, s.id) metricsAddress := SocketAddress(s.id)
if err != nil {
shimMgtLog.WithError(err).Error("failed to create socket address")
return
}
listener, err := cdshim.NewSocket(metricsAddress) listener, err := cdshim.NewSocket(metricsAddress)
if err != nil { if err != nil {
@ -188,10 +183,8 @@ func (s *service) mountPprofHandle(m *http.ServeMux, ociSpec *specs.Spec) {
m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
} }
func socketAddress(ctx context.Context, id string) (string, error) { // SocketAddress returns the address of the abstract domain socket for communicating with the
ns, err := namespaces.NamespaceRequired(ctx) // shim management endpoint
if err != nil { func SocketAddress(id string) string {
return "", err return filepath.Join(string(filepath.Separator), "run", "vc", id, "shim-monitor")
}
return filepath.Join(string(filepath.Separator), "containerd-shim", ns, id, "shim-monitor.sock"), nil
} }

View File

@ -176,7 +176,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
for sandboxID, namespace := range sandboxes { for sandboxID, namespace := range sandboxes {
wg.Add(1) wg.Add(1)
go func(sandboxID, namespace string, results chan<- []*dto.MetricFamily) { go func(sandboxID, namespace string, results chan<- []*dto.MetricFamily) {
sandboxMetrics, err := km.getSandboxMetrics(sandboxID, namespace) sandboxMetrics, err := getSandboxMetrics(sandboxID)
if err != nil { if err != nil {
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox") monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
} }
@ -234,8 +234,8 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
} }
// getSandboxMetrics will get sandbox's metrics from shim // getSandboxMetrics will get sandbox's metrics from shim
func (km *KataMonitor) getSandboxMetrics(sandboxID, namespace string) ([]*dto.MetricFamily, error) { func getSandboxMetrics(sandboxID string) ([]*dto.MetricFamily, error) {
body, err := km.doGet(sandboxID, namespace, defaultTimeout, "metrics") body, err := doGet(sandboxID, defaultTimeout, "metrics")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -87,13 +87,8 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) {
commonServeError(w, http.StatusBadRequest, err) commonServeError(w, http.StatusBadRequest, err)
return return
} }
namespace, err := km.getSandboxNamespace(sandboxID)
if err != nil {
commonServeError(w, http.StatusBadRequest, err)
return
}
data, err := km.doGet(sandboxID, namespace, defaultTimeout, "agent-url") data, err := doGet(sandboxID, defaultTimeout, "agent-url")
if err != nil { if err != nil {
commonServeError(w, http.StatusBadRequest, err) commonServeError(w, http.StatusBadRequest, err)
return return

View File

@ -11,6 +11,8 @@ import (
"net" "net"
"net/http" "net/http"
"time" "time"
shim "github.com/kata-containers/kata-containers/src/runtime/containerd-shim-v2"
) )
const ( const (
@ -33,16 +35,13 @@ func getSandboxIDFromReq(r *http.Request) (string, error) {
return "", fmt.Errorf("sandbox not found in %+v", r.URL.Query()) return "", fmt.Errorf("sandbox not found in %+v", r.URL.Query())
} }
func (km *KataMonitor) buildShimClient(sandboxID, namespace string, timeout time.Duration) (*http.Client, error) { // BuildShimClient builds and returns an http client for communicating with the provided sandbox
socketAddr, err := km.getMonitorAddress(sandboxID, namespace) func BuildShimClient(sandboxID string, timeout time.Duration) (*http.Client, error) {
if err != nil { return buildUnixSocketClient(shim.SocketAddress(sandboxID), timeout)
return nil, err
}
return BuildUnixSocketClient(socketAddr, timeout)
} }
// BuildUnixSocketClient build http client for Unix socket // buildUnixSocketClient build http client for Unix socket
func BuildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Client, error) { func buildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Client, error) {
transport := &http.Transport{ transport := &http.Transport{
DisableKeepAlives: true, DisableKeepAlives: true,
Dial: func(proto, addr string) (conn net.Conn, err error) { Dial: func(proto, addr string) (conn net.Conn, err error) {
@ -61,8 +60,8 @@ func BuildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Clie
return client, nil return client, nil
} }
func (km *KataMonitor) doGet(sandboxID, namespace string, timeoutInSeconds time.Duration, urlPath string) ([]byte, error) { func doGet(sandboxID string, timeoutInSeconds time.Duration, urlPath string) ([]byte, error) {
client, err := km.buildShimClient(sandboxID, namespace, timeoutInSeconds) client, err := BuildShimClient(sandboxID, timeoutInSeconds)
if err != nil { if err != nil {
return nil, err return nil, err
} }