runtime: shim: dedup client, socket addr code

(1) Add an accessor function, SocketAddress, to the shim-v2 code for
determining the shim's abstract domain socket address, given the sandbox
ID.

(2) In kata monitor, create a function, BuildShimClient, for obtaining the appropriate
http.Client for communicating with the shim's monitoring endpoint.

(3) Update the kata CLI and kata-monitor code to make use of these.

(4) Migrate some kata monitor methods to be functions, in order to ease
future reuse.

(5) drop unused namespace from functions where it is no longer needed.

Signed-off-by: Eric Ernst <eric_ernst@apple.com>
This commit is contained in:
Eric Ernst 2021-05-06 17:09:47 -07:00
parent 4bc006c8a4
commit 3caed6f88d
5 changed files with 20 additions and 43 deletions

View File

@ -14,7 +14,6 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"path/filepath"
"strings" "strings"
"sync" "sync"
@ -26,7 +25,6 @@ import (
clientUtils "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/client" clientUtils "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/client"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/urfave/cli" "github.com/urfave/cli"
"go.opentelemetry.io/otel/label"
) )
const ( const (
@ -38,10 +36,8 @@ const (
subCommandName = "exec" subCommandName = "exec"
// command-line parameters name // command-line parameters name
paramRuntimeNamespace = "runtime-namespace"
paramDebugConsolePort = "kata-debug-port" paramDebugConsolePort = "kata-debug-port"
defaultKernelParamDebugConsoleVPortValue = 1026 defaultKernelParamDebugConsoleVPortValue = 1026
defaultRuntimeNamespace = "k8s.io"
) )
var ( var (
@ -57,21 +53,12 @@ var kataExecCLICommand = cli.Command{
Name: subCommandName, Name: subCommandName,
Usage: "Enter into guest by debug console", Usage: "Enter into guest by debug console",
Flags: []cli.Flag{ Flags: []cli.Flag{
cli.StringFlag{
Name: paramRuntimeNamespace,
Usage: "Namespace that containerd or CRI-O are using for containers. (Default: k8s.io, only works for containerd)",
},
cli.Uint64Flag{ cli.Uint64Flag{
Name: paramDebugConsolePort, Name: paramDebugConsolePort,
Usage: "Port that debug console is listening on. (Default: 1026)", Usage: "Port that debug console is listening on. (Default: 1026)",
}, },
}, },
Action: func(context *cli.Context) error { Action: func(context *cli.Context) error {
namespace := context.String(paramRuntimeNamespace)
if namespace == "" {
namespace = defaultRuntimeNamespace
}
port := context.Uint64(paramDebugConsolePort) port := context.Uint64(paramDebugConsolePort)
if port == 0 { if port == 0 {
port = defaultKernelParamDebugConsoleVPortValue port = defaultKernelParamDebugConsoleVPortValue
@ -83,7 +70,6 @@ var kataExecCLICommand = cli.Command{
return err return err
} }
span.SetAttributes(label.Key("sandbox").String(sandboxID))
conn, err := getConn(sandboxID, port) conn, err := getConn(sandboxID, port)
if err != nil { if err != nil {
@ -169,8 +155,7 @@ func (s *iostream) Read(data []byte) (n int, err error) {
} }
func getConn(sandboxID string, port uint64) (net.Conn, error) { func getConn(sandboxID string, port uint64) (net.Conn, error) {
socketAddr := filepath.Join(string(filepath.Separator), "run", "vc", sandboxID, "shim-monitor") client, err := kataMonitor.BuildShimClient(sandboxID, defaultTimeout)
client, err := kataMonitor.BuildUnixSocketClient(socketAddr, defaultTimeout)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -181,7 +166,7 @@ func getConn(sandboxID string, port uint64) (net.Conn, error) {
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Failed to get %s: %d", socketAddr, resp.StatusCode) return nil, fmt.Errorf("Failure from %s shim-monitor: %d", sandboxID, resp.StatusCode)
} }
defer resp.Body.Close() defer resp.Body.Close()

View File

@ -128,11 +128,7 @@ func decodeAgentMetrics(body string) []*dto.MetricFamily {
func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec) { func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec) {
// metrics socket will under sandbox's bundle path // metrics socket will under sandbox's bundle path
metricsAddress, err := socketAddress(ctx, s.id) metricsAddress := SocketAddress(s.id)
if err != nil {
shimMgtLog.WithError(err).Error("failed to create socket address")
return
}
listener, err := cdshim.NewSocket(metricsAddress) listener, err := cdshim.NewSocket(metricsAddress)
if err != nil { if err != nil {
@ -187,6 +183,8 @@ func (s *service) mountPprofHandle(m *http.ServeMux, ociSpec *specs.Spec) {
m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
} }
func socketAddress(ctx context.Context, id string) (string, error) { // SocketAddress returns the address of the abstract domain socket for communicating with the
return filepath.Join(string(filepath.Separator), "run", "vc", id, "shim-monitor"), nil // shim management endpoint
func SocketAddress(id string) string {
return filepath.Join(string(filepath.Separator), "run", "vc", id, "shim-monitor")
} }

View File

@ -176,7 +176,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
for sandboxID, namespace := range sandboxes { for sandboxID, namespace := range sandboxes {
wg.Add(1) wg.Add(1)
go func(sandboxID, namespace string, results chan<- []*dto.MetricFamily) { go func(sandboxID, namespace string, results chan<- []*dto.MetricFamily) {
sandboxMetrics, err := km.getSandboxMetrics(sandboxID, namespace) sandboxMetrics, err := getSandboxMetrics(sandboxID)
if err != nil { if err != nil {
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox") monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
} }
@ -234,8 +234,8 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
} }
// getSandboxMetrics will get sandbox's metrics from shim // getSandboxMetrics will get sandbox's metrics from shim
func (km *KataMonitor) getSandboxMetrics(sandboxID, namespace string) ([]*dto.MetricFamily, error) { func getSandboxMetrics(sandboxID string) ([]*dto.MetricFamily, error) {
body, err := km.doGet(sandboxID, namespace, defaultTimeout, "metrics") body, err := doGet(sandboxID, defaultTimeout, "metrics")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -87,13 +87,8 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) {
commonServeError(w, http.StatusBadRequest, err) commonServeError(w, http.StatusBadRequest, err)
return return
} }
namespace, err := km.getSandboxNamespace(sandboxID)
if err != nil {
commonServeError(w, http.StatusBadRequest, err)
return
}
data, err := km.doGet(sandboxID, namespace, defaultTimeout, "agent-url") data, err := doGet(sandboxID, defaultTimeout, "agent-url")
if err != nil { if err != nil {
commonServeError(w, http.StatusBadRequest, err) commonServeError(w, http.StatusBadRequest, err)
return return

View File

@ -11,6 +11,8 @@ import (
"net" "net"
"net/http" "net/http"
"time" "time"
shim "github.com/kata-containers/kata-containers/src/runtime/containerd-shim-v2"
) )
const ( const (
@ -33,16 +35,13 @@ func getSandboxIDFromReq(r *http.Request) (string, error) {
return "", fmt.Errorf("sandbox not found in %+v", r.URL.Query()) return "", fmt.Errorf("sandbox not found in %+v", r.URL.Query())
} }
func (km *KataMonitor) buildShimClient(sandboxID, namespace string, timeout time.Duration) (*http.Client, error) { // BuildShimClient builds and returns an http client for communicating with the provided sandbox
socketAddr, err := km.getMonitorAddress(sandboxID, namespace) func BuildShimClient(sandboxID string, timeout time.Duration) (*http.Client, error) {
if err != nil { return buildUnixSocketClient(shim.SocketAddress(sandboxID), timeout)
return nil, err
}
return BuildUnixSocketClient(socketAddr, timeout)
} }
// BuildUnixSocketClient build http client for Unix socket // buildUnixSocketClient build http client for Unix socket
func BuildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Client, error) { func buildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Client, error) {
transport := &http.Transport{ transport := &http.Transport{
DisableKeepAlives: true, DisableKeepAlives: true,
Dial: func(proto, addr string) (conn net.Conn, err error) { Dial: func(proto, addr string) (conn net.Conn, err error) {
@ -61,8 +60,8 @@ func BuildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Clie
return client, nil return client, nil
} }
func (km *KataMonitor) doGet(sandboxID, namespace string, timeoutInSeconds time.Duration, urlPath string) ([]byte, error) { func doGet(sandboxID string, timeoutInSeconds time.Duration, urlPath string) ([]byte, error) {
client, err := km.buildShimClient(sandboxID, namespace, timeoutInSeconds) client, err := BuildShimClient(sandboxID, timeoutInSeconds)
if err != nil { if err != nil {
return nil, err return nil, err
} }