From b1448adb59f063e9e5b3be8904b3dbd1269865f1 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Wed, 15 Nov 2017 20:41:04 +0100 Subject: [PATCH] kubectl: Use metrics-server for kubectl top commands --- pkg/kubectl/cmd/BUILD | 9 + pkg/kubectl/cmd/testing/BUILD | 1 + pkg/kubectl/cmd/testing/fake.go | 10 + pkg/kubectl/cmd/top.go | 21 ++ pkg/kubectl/cmd/top_node.go | 71 +++- pkg/kubectl/cmd/top_node_test.go | 198 +++++++++- pkg/kubectl/cmd/top_pod.go | 68 +++- pkg/kubectl/cmd/top_pod_test.go | 338 +++++++++++++++++- pkg/kubectl/cmd/top_test.go | 66 +++- pkg/kubectl/cmd/util/BUILD | 1 + pkg/kubectl/cmd/util/clientcache.go | 29 ++ pkg/kubectl/cmd/util/factory.go | 4 + pkg/kubectl/cmd/util/factory_client_access.go | 5 + pkg/kubectl/metricsutil/BUILD | 1 + pkg/kubectl/metricsutil/metrics_client.go | 55 +-- pkg/kubectl/metricsutil/metrics_printer.go | 2 +- 16 files changed, 819 insertions(+), 60 deletions(-) diff --git a/pkg/kubectl/cmd/BUILD b/pkg/kubectl/cmd/BUILD index d6cba425fec..e8601c2e38e 100644 --- a/pkg/kubectl/cmd/BUILD +++ b/pkg/kubectl/cmd/BUILD @@ -140,6 +140,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/version:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library", + "//vendor/k8s.io/client-go/discovery:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/rbac/v1:go_default_library", @@ -149,6 +150,9 @@ go_library( "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", "//vendor/k8s.io/client-go/transport/spdy:go_default_library", "//vendor/k8s.io/kube-openapi/pkg/util/proto:go_default_library", + "//vendor/k8s.io/metrics/pkg/apis/metrics:go_default_library", + "//vendor/k8s.io/metrics/pkg/apis/metrics/v1beta1:go_default_library", + "//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", ], ) @@ -224,6 +228,7 @@ go_test( "//pkg/printers:go_default_library", "//pkg/printers/internalversion:go_default_library", "//pkg/util/strings:go_default_library", + "//vendor/github.com/googleapis/gnostic/OpenAPIv2:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/gopkg.in/yaml.v2:go_default_library", @@ -245,11 +250,15 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch/testing:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/version:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/rest/fake:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/tools/remotecommand:go_default_library", "//vendor/k8s.io/metrics/pkg/apis/metrics/v1alpha1:go_default_library", + "//vendor/k8s.io/metrics/pkg/apis/metrics/v1beta1:go_default_library", + "//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset/fake:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", ], ) diff --git a/pkg/kubectl/cmd/testing/BUILD b/pkg/kubectl/cmd/testing/BUILD index 90fe0a0872b..ef13e0525f9 100644 --- a/pkg/kubectl/cmd/testing/BUILD +++ b/pkg/kubectl/cmd/testing/BUILD @@ -40,6 +40,7 @@ go_library( "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/rest/fake:go_default_library", + "//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset:go_default_library", ], ) diff --git a/pkg/kubectl/cmd/testing/fake.go b/pkg/kubectl/cmd/testing/fake.go index d57706d4c1b..09737c5653e 100644 --- a/pkg/kubectl/cmd/testing/fake.go +++ b/pkg/kubectl/cmd/testing/fake.go @@ -50,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/kubectl/validation" "k8s.io/kubernetes/pkg/printers" + metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset" ) // +k8s:deepcopy-gen=true @@ -245,6 +246,7 @@ type TestFactory struct { TmpDir string CategoryExpander categories.CategoryExpander SkipDiscovery bool + MetricsClientSet metricsclientset.Interface ClientForMappingFunc func(mapping *meta.RESTMapping) (resource.RESTClient, error) UnstructuredClientForMappingFunc func(mapping *meta.RESTMapping) (resource.RESTClient, error) @@ -315,6 +317,10 @@ func (f *FakeFactory) KubernetesClientSet() (*kubernetes.Clientset, error) { return nil, nil } +func (f *FakeFactory) MetricsClientSet() (metricsclientset.Interface, error) { + return f.tf.MetricsClientSet, f.tf.Err +} + func (f *FakeFactory) ClientSet() (internalclientset.Interface, error) { return nil, nil } @@ -675,6 +681,10 @@ func (f *fakeAPIFactory) KubernetesClientSet() (*kubernetes.Clientset, error) { return clientset, f.tf.Err } +func (f *fakeAPIFactory) MetricsClientSet() (metricsclientset.Interface, error) { + return f.tf.MetricsClientSet, f.tf.Err +} + func (f *fakeAPIFactory) ClientSet() (internalclientset.Interface, error) { // Swap the HTTP client out of the REST client with the fake // version. diff --git a/pkg/kubectl/cmd/top.go b/pkg/kubectl/cmd/top.go index 47c1968517d..bc58e2b86ed 100644 --- a/pkg/kubectl/cmd/top.go +++ b/pkg/kubectl/cmd/top.go @@ -19,8 +19,10 @@ package cmd import ( "io" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/util/i18n" + metricsapi "k8s.io/metrics/pkg/apis/metrics" "github.com/spf13/cobra" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" @@ -30,6 +32,9 @@ import ( type TopOptions struct{} var ( + supportedMetricsAPIVersions = []string{ + "v1beta1", + } topLong = templates.LongDesc(i18n.T(` Display Resource (CPU/Memory/Storage) usage. @@ -51,3 +56,19 @@ func NewCmdTop(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command { cmd.AddCommand(NewCmdTopPod(f, nil, out)) return cmd } + +func SupportedMetricsAPIVersionAvailable(discoveredAPIGroups *metav1.APIGroupList) bool { + for _, discoveredAPIGroup := range discoveredAPIGroups.Groups { + if discoveredAPIGroup.Name != metricsapi.GroupName { + continue + } + for _, version := range discoveredAPIGroup.Versions { + for _, supportedVersion := range supportedMetricsAPIVersions { + if version.Version == supportedVersion { + return true + } + } + } + } + return false +} diff --git a/pkg/kubectl/cmd/top_node.go b/pkg/kubectl/cmd/top_node.go index 610a542b7f3..f4eac86442c 100644 --- a/pkg/kubectl/cmd/top_node.go +++ b/pkg/kubectl/cmd/top_node.go @@ -25,11 +25,15 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/discovery" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/metricsutil" "k8s.io/kubernetes/pkg/kubectl/util/i18n" + metricsapi "k8s.io/metrics/pkg/apis/metrics" + metricsV1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1" + metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset" ) // TopNodeOptions contains all the options for running the top-node cli command. @@ -40,6 +44,8 @@ type TopNodeOptions struct { HeapsterOptions HeapsterTopOptions Client *metricsutil.HeapsterMetricsClient Printer *metricsutil.TopCmdPrinter + DiscoveryClient discovery.DiscoveryInterface + MetricsClient metricsclientset.Interface } type HeapsterTopOptions struct { @@ -123,8 +129,16 @@ func (o *TopNodeOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args [] if err != nil { return err } + + o.DiscoveryClient = clientset.DiscoveryClient + o.MetricsClient, err = f.MetricsClientSet() + if err != nil { + return err + } + o.NodeClient = clientset.CoreV1() o.Client = metricsutil.NewHeapsterMetricsClient(clientset.CoreV1(), o.HeapsterOptions.Namespace, o.HeapsterOptions.Scheme, o.HeapsterOptions.Service, o.HeapsterOptions.Port) + o.Printer = metricsutil.NewTopCmdPrinter(out) return nil } @@ -138,15 +152,35 @@ func (o *TopNodeOptions) Validate() error { func (o TopNodeOptions) RunTopNode() error { var err error - selector := labels.Everything().String() + selector := labels.Everything() if len(o.Selector) > 0 { - selector = o.Selector + selector, err = labels.Parse(o.Selector) + if err != nil { + return err + } } - metrics, err := o.Client.GetNodeMetrics(o.ResourceName, selector) + + apiGroups, err := o.DiscoveryClient.ServerGroups() if err != nil { return err } - if len(metrics) == 0 { + + metricsAPIAvailable := SupportedMetricsAPIVersionAvailable(apiGroups) + + metrics := &metricsapi.NodeMetricsList{} + if metricsAPIAvailable { + metrics, err = getNodeMetricsFromMetricsAPI(o.MetricsClient, o.ResourceName, selector) + if err != nil { + return err + } + } else { + metrics, err = o.Client.GetNodeMetrics(o.ResourceName, selector.String()) + if err != nil { + return err + } + } + + if len(metrics.Items) == 0 { return errors.New("metrics not available yet") } @@ -159,7 +193,7 @@ func (o TopNodeOptions) RunTopNode() error { nodes = append(nodes, *node) } else { nodeList, err := o.NodeClient.Nodes().List(metav1.ListOptions{ - LabelSelector: selector, + LabelSelector: selector.String(), }) if err != nil { return err @@ -173,5 +207,30 @@ func (o TopNodeOptions) RunTopNode() error { allocatable[n.Name] = n.Status.Allocatable } - return o.Printer.PrintNodeMetrics(metrics, allocatable) + return o.Printer.PrintNodeMetrics(metrics.Items, allocatable) +} + +func getNodeMetricsFromMetricsAPI(metricsClient metricsclientset.Interface, resourceName string, selector labels.Selector) (*metricsapi.NodeMetricsList, error) { + var err error + versionedMetrics := &metricsV1beta1api.NodeMetricsList{} + mc := metricsClient.Metrics() + nm := mc.NodeMetricses() + if resourceName != "" { + m, err := nm.Get(resourceName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + versionedMetrics.Items = []metricsV1beta1api.NodeMetrics{*m} + } else { + versionedMetrics, err = nm.List(metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return nil, err + } + } + metrics := &metricsapi.NodeMetricsList{} + err = metricsV1beta1api.Convert_v1beta1_NodeMetricsList_To_metrics_NodeMetricsList(versionedMetrics, metrics, nil) + if err != nil { + return nil, err + } + return metrics, nil } diff --git a/pkg/kubectl/cmd/top_node_test.go b/pkg/kubectl/cmd/top_node_test.go index 7afc783da11..4ad3ad593b6 100644 --- a/pkg/kubectl/cmd/top_node_test.go +++ b/pkg/kubectl/cmd/top_node_test.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "fmt" + "io/ioutil" "net/http" "strings" "testing" @@ -26,9 +27,13 @@ import ( "net/url" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest/fake" + core "k8s.io/client-go/testing" cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" - "k8s.io/metrics/pkg/apis/metrics/v1alpha1" + metricsv1alpha1api "k8s.io/metrics/pkg/apis/metrics/v1alpha1" + metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1" + metricsfake "k8s.io/metrics/pkg/client/clientset_generated/clientset/fake" ) const ( @@ -38,7 +43,7 @@ const ( func TestTopNodeAllMetrics(t *testing.T) { initTestErrorHandler(t) - metrics, nodes := testNodeMetricsData() + metrics, nodes := testNodeV1alpha1MetricsData() expectedMetricsPath := fmt.Sprintf("%s/%s/nodes", baseMetricsAddress, metricsApiVersion) expectedNodePath := fmt.Sprintf("/%s/%s/nodes", apiPrefix, apiVersion) @@ -48,6 +53,10 @@ func TestTopNodeAllMetrics(t *testing.T) { NegotiatedSerializer: ns, Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { switch p, m := req.URL.Path, req.Method; { + case p == "/api": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil + case p == "/apis": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil case p == expectedMetricsPath && m == "GET": body, err := marshallBody(metrics) if err != nil { @@ -83,7 +92,7 @@ func TestTopNodeAllMetricsCustomDefaults(t *testing.T) { customBaseMetricsAddress := customBaseHeapsterServiceAddress + "/apis/metrics" initTestErrorHandler(t) - metrics, nodes := testNodeMetricsData() + metrics, nodes := testNodeV1alpha1MetricsData() expectedMetricsPath := fmt.Sprintf("%s/%s/nodes", customBaseMetricsAddress, metricsApiVersion) expectedNodePath := fmt.Sprintf("/%s/%s/nodes", apiPrefix, apiVersion) @@ -93,6 +102,10 @@ func TestTopNodeAllMetricsCustomDefaults(t *testing.T) { NegotiatedSerializer: ns, Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { switch p, m := req.URL.Path, req.Method; { + case p == "/api": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil + case p == "/apis": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil case p == expectedMetricsPath && m == "GET": body, err := marshallBody(metrics) if err != nil { @@ -132,10 +145,10 @@ func TestTopNodeAllMetricsCustomDefaults(t *testing.T) { func TestTopNodeWithNameMetrics(t *testing.T) { initTestErrorHandler(t) - metrics, nodes := testNodeMetricsData() + metrics, nodes := testNodeV1alpha1MetricsData() expectedMetrics := metrics.Items[0] expectedNode := nodes.Items[0] - nonExpectedMetrics := v1alpha1.NodeMetricsList{ + nonExpectedMetrics := metricsv1alpha1api.NodeMetricsList{ ListMeta: metrics.ListMeta, Items: metrics.Items[1:], } @@ -148,6 +161,10 @@ func TestTopNodeWithNameMetrics(t *testing.T) { NegotiatedSerializer: ns, Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { switch p, m := req.URL.Path, req.Method; { + case p == "/api": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil + case p == "/apis": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil case p == expectedPath && m == "GET": body, err := marshallBody(expectedMetrics) if err != nil { @@ -183,8 +200,8 @@ func TestTopNodeWithNameMetrics(t *testing.T) { func TestTopNodeWithLabelSelectorMetrics(t *testing.T) { initTestErrorHandler(t) - metrics, nodes := testNodeMetricsData() - expectedMetrics := v1alpha1.NodeMetricsList{ + metrics, nodes := testNodeV1alpha1MetricsData() + expectedMetrics := metricsv1alpha1api.NodeMetricsList{ ListMeta: metrics.ListMeta, Items: metrics.Items[0:1], } @@ -192,7 +209,7 @@ func TestTopNodeWithLabelSelectorMetrics(t *testing.T) { ListMeta: nodes.ListMeta, Items: nodes.Items[0:1], } - nonExpectedMetrics := v1alpha1.NodeMetricsList{ + nonExpectedMetrics := metricsv1alpha1api.NodeMetricsList{ ListMeta: metrics.ListMeta, Items: metrics.Items[1:], } @@ -207,6 +224,10 @@ func TestTopNodeWithLabelSelectorMetrics(t *testing.T) { NegotiatedSerializer: ns, Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { switch p, m, q := req.URL.Path, req.Method, req.URL.RawQuery; { + case p == "/api": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil + case p == "/apis": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil case p == expectedPath && m == "GET" && q == expectedQuery: body, err := marshallBody(expectedMetrics) if err != nil { @@ -242,3 +263,164 @@ func TestTopNodeWithLabelSelectorMetrics(t *testing.T) { } } } + +func TestTopNodeAllMetricsFromMetricsServer(t *testing.T) { + initTestErrorHandler(t) + expectedMetrics, nodes := testNodeV1beta1MetricsData() + expectedNodePath := fmt.Sprintf("/%s/%s/nodes", apiPrefix, apiVersion) + + f, tf, codec, ns := cmdtesting.NewAPIFactory() + tf.Printer = &testPrinter{} + tf.Client = &fake.RESTClient{ + NegotiatedSerializer: ns, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + switch p, m := req.URL.Path, req.Method; { + case p == "/api": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil + case p == "/apis": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbodyWithMetrics)))}, nil + case p == expectedNodePath && m == "GET": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, nodes)}, nil + default: + t.Fatalf("unexpected request: %#v\nGot URL: %#v\n", req, req.URL) + return nil, nil + } + }), + } + fakemetricsClientset := &metricsfake.Clientset{} + fakemetricsClientset.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, expectedMetrics, nil + }) + tf.MetricsClientSet = fakemetricsClientset + tf.Namespace = "test" + tf.ClientConfig = defaultClientConfig() + buf := bytes.NewBuffer([]byte{}) + + cmd := NewCmdTopNode(f, nil, buf) + cmd.Run(cmd, []string{}) + + // Check the presence of node names in the output. + result := buf.String() + for _, m := range expectedMetrics.Items { + if !strings.Contains(result, m.Name) { + t.Errorf("missing metrics for %s: \n%s", m.Name, result) + } + } +} + +func TestTopNodeWithNameMetricsFromMetricsServer(t *testing.T) { + initTestErrorHandler(t) + metrics, nodes := testNodeV1beta1MetricsData() + expectedMetrics := metrics.Items[0] + expectedNode := nodes.Items[0] + nonExpectedMetrics := metricsv1beta1api.NodeMetricsList{ + ListMeta: metrics.ListMeta, + Items: metrics.Items[1:], + } + expectedNodePath := fmt.Sprintf("/%s/%s/nodes/%s", apiPrefix, apiVersion, expectedMetrics.Name) + + f, tf, codec, ns := cmdtesting.NewAPIFactory() + tf.Printer = &testPrinter{} + tf.Client = &fake.RESTClient{ + NegotiatedSerializer: ns, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + switch p, m := req.URL.Path, req.Method; { + case p == "/api": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil + case p == "/apis": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbodyWithMetrics)))}, nil + case p == expectedNodePath && m == "GET": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &expectedNode)}, nil + default: + t.Fatalf("unexpected request: %#v\nGot URL: %#v\n", req, req.URL) + return nil, nil + } + }), + } + fakemetricsClientset := &metricsfake.Clientset{} + fakemetricsClientset.AddReactor("get", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, &expectedMetrics, nil + }) + tf.MetricsClientSet = fakemetricsClientset + tf.Namespace = "test" + tf.ClientConfig = defaultClientConfig() + buf := bytes.NewBuffer([]byte{}) + + cmd := NewCmdTopNode(f, nil, buf) + cmd.Run(cmd, []string{expectedMetrics.Name}) + + // Check the presence of node names in the output. + result := buf.String() + if !strings.Contains(result, expectedMetrics.Name) { + t.Errorf("missing metrics for %s: \n%s", expectedMetrics.Name, result) + } + for _, m := range nonExpectedMetrics.Items { + if strings.Contains(result, m.Name) { + t.Errorf("unexpected metrics for %s: \n%s", m.Name, result) + } + } +} + +func TestTopNodeWithLabelSelectorMetricsFromMetricsServer(t *testing.T) { + initTestErrorHandler(t) + metrics, nodes := testNodeV1beta1MetricsData() + expectedMetrics := &metricsv1beta1api.NodeMetricsList{ + ListMeta: metrics.ListMeta, + Items: metrics.Items[0:1], + } + expectedNodes := v1.NodeList{ + ListMeta: nodes.ListMeta, + Items: nodes.Items[0:1], + } + nonExpectedMetrics := &metricsv1beta1api.NodeMetricsList{ + ListMeta: metrics.ListMeta, + Items: metrics.Items[1:], + } + label := "key=value" + expectedNodePath := fmt.Sprintf("/%s/%s/nodes", apiPrefix, apiVersion) + + f, tf, codec, ns := cmdtesting.NewAPIFactory() + tf.Printer = &testPrinter{} + tf.Client = &fake.RESTClient{ + NegotiatedSerializer: ns, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + switch p, m, _ := req.URL.Path, req.Method, req.URL.RawQuery; { + case p == "/api": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil + case p == "/apis": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbodyWithMetrics)))}, nil + case p == expectedNodePath && m == "GET": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &expectedNodes)}, nil + default: + t.Fatalf("unexpected request: %#v\nGot URL: %#v\n", req, req.URL) + return nil, nil + } + }), + } + + fakemetricsClientset := &metricsfake.Clientset{} + fakemetricsClientset.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, expectedMetrics, nil + }) + tf.MetricsClientSet = fakemetricsClientset + tf.Namespace = "test" + tf.ClientConfig = defaultClientConfig() + buf := bytes.NewBuffer([]byte{}) + + cmd := NewCmdTopNode(f, nil, buf) + cmd.Flags().Set("selector", label) + cmd.Run(cmd, []string{}) + + // Check the presence of node names in the output. + result := buf.String() + for _, m := range expectedMetrics.Items { + if !strings.Contains(result, m.Name) { + t.Errorf("missing metrics for %s: \n%s", m.Name, result) + } + } + for _, m := range nonExpectedMetrics.Items { + if strings.Contains(result, m.Name) { + t.Errorf("unexpected metrics for %s: \n%s", m.Name, result) + } + } +} diff --git a/pkg/kubectl/cmd/top_pod.go b/pkg/kubectl/cmd/top_pod.go index 669f95cd3cf..f358dcdb648 100644 --- a/pkg/kubectl/cmd/top_pod.go +++ b/pkg/kubectl/cmd/top_pod.go @@ -25,11 +25,15 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/discovery" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/metricsutil" "k8s.io/kubernetes/pkg/kubectl/util/i18n" + metricsapi "k8s.io/metrics/pkg/apis/metrics" + metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1" + metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset" "github.com/golang/glog" "github.com/spf13/cobra" @@ -45,6 +49,8 @@ type TopPodOptions struct { HeapsterOptions HeapsterTopOptions Client *metricsutil.HeapsterMetricsClient Printer *metricsutil.TopCmdPrinter + DiscoveryClient discovery.DiscoveryInterface + MetricsClient metricsclientset.Interface } const metricsCreationDelay = 2 * time.Minute @@ -119,8 +125,16 @@ func (o *TopPodOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []s if err != nil { return err } + + o.DiscoveryClient = clientset.DiscoveryClient + o.MetricsClient, err = f.MetricsClientSet() + if err != nil { + return err + } + o.PodClient = clientset.CoreV1() o.Client = metricsutil.NewHeapsterMetricsClient(clientset.CoreV1(), o.HeapsterOptions.Namespace, o.HeapsterOptions.Scheme, o.HeapsterOptions.Service, o.HeapsterOptions.Port) + o.Printer = metricsutil.NewTopCmdPrinter(out) return nil } @@ -141,10 +155,30 @@ func (o TopPodOptions) RunTopPod() error { return err } } - metrics, err := o.Client.GetPodMetrics(o.Namespace, o.ResourceName, o.AllNamespaces, selector) + + apiGroups, err := o.DiscoveryClient.ServerGroups() + if err != nil { + return err + } + + metricsAPIAvailable := SupportedMetricsAPIVersionAvailable(apiGroups) + + metrics := &metricsapi.PodMetricsList{} + if metricsAPIAvailable { + metrics, err = getMetricsFromMetricsAPI(o.MetricsClient, o.Namespace, o.ResourceName, o.AllNamespaces, selector) + if err != nil { + return err + } + } else { + metrics, err = o.Client.GetPodMetrics(o.Namespace, o.ResourceName, o.AllNamespaces, selector) + if err != nil { + return err + } + } + // TODO: Refactor this once Heapster becomes the API server. // First we check why no metrics have been received. - if len(metrics) == 0 { + if len(metrics.Items) == 0 { // If the API server query is successful but all the pods are newly created, // the metrics are probably not ready yet, so we return the error here in the first place. e := verifyEmptyMetrics(o, selector) @@ -155,7 +189,35 @@ func (o TopPodOptions) RunTopPod() error { if err != nil { return err } - return o.Printer.PrintPodMetrics(metrics, o.PrintContainers, o.AllNamespaces) + + return o.Printer.PrintPodMetrics(metrics.Items, o.PrintContainers, o.AllNamespaces) +} + +func getMetricsFromMetricsAPI(metricsClient metricsclientset.Interface, namespace, resourceName string, allNamespaces bool, selector labels.Selector) (*metricsapi.PodMetricsList, error) { + var err error + ns := metav1.NamespaceAll + if !allNamespaces { + ns = namespace + } + versionedMetrics := &metricsv1beta1api.PodMetricsList{} + if resourceName != "" { + m, err := metricsClient.Metrics().PodMetricses(ns).Get(resourceName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + versionedMetrics.Items = []metricsv1beta1api.PodMetrics{*m} + } else { + versionedMetrics, err = metricsClient.Metrics().PodMetricses(ns).List(metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return nil, err + } + } + metrics := &metricsapi.PodMetricsList{} + err = metricsv1beta1api.Convert_v1beta1_PodMetricsList_To_metrics_PodMetricsList(versionedMetrics, metrics, nil) + if err != nil { + return nil, err + } + return metrics, nil } func verifyEmptyMetrics(o TopPodOptions, selector labels.Selector) error { diff --git a/pkg/kubectl/cmd/top_pod_test.go b/pkg/kubectl/cmd/top_pod_test.go index a839454e2f1..c762766d8df 100644 --- a/pkg/kubectl/cmd/top_pod_test.go +++ b/pkg/kubectl/cmd/top_pod_test.go @@ -18,23 +18,71 @@ package cmd import ( "bytes" + "io/ioutil" "net/http" + "net/url" "strings" "testing" "time" - "net/url" + "github.com/googleapis/gnostic/OpenAPIv2" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + apiversion "k8s.io/apimachinery/pkg/version" + restclient "k8s.io/client-go/rest" "k8s.io/client-go/rest/fake" + core "k8s.io/client-go/testing" cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" - metricsapi "k8s.io/metrics/pkg/apis/metrics/v1alpha1" + metricsv1alpha1api "k8s.io/metrics/pkg/apis/metrics/v1alpha1" + metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1" + metricsfake "k8s.io/metrics/pkg/client/clientset_generated/clientset/fake" ) const ( - topPathPrefix = baseMetricsAddress + "/" + metricsApiVersion + topPathPrefix = baseMetricsAddress + "/" + metricsApiVersion + topMetricsAPIPathPrefix = "/apis/metrics.k8s.io/v1beta1" + apibody = `{ + "kind": "APIVersions", + "versions": [ + "v1" + ], + "serverAddressByClientCIDRs": [ + { + "clientCIDR": "0.0.0.0/0", + "serverAddress": "10.0.2.15:8443" + } + ] +}` + // This is not the full output one would usually get, just a trimmed down version. + apisbody = `{ + "kind": "APIGroupList", + "apiVersion": "v1", + "groups": [{}] +}` + + apisbodyWithMetrics = `{ + "kind": "APIGroupList", + "apiVersion": "v1", + "groups": [ + { + "name":"metrics.k8s.io", + "versions":[ + { + "groupVersion":"metrics.k8s.io/v1beta1", + "version":"v1beta1" + } + ], + "preferredVersion":{ + "groupVersion":"metrics.k8s.io/v1beta1", + "version":"v1beta1" + }, + "serverAddressByClientCIDRs":null + } + ] +}` ) func TestTopPod(t *testing.T) { @@ -87,7 +135,7 @@ func TestTopPod(t *testing.T) { for _, testCase := range testCases { t.Logf("Running test case: %s", testCase.name) metricsList := testPodMetricsData() - var expectedMetrics []metricsapi.PodMetrics + var expectedMetrics []metricsv1alpha1api.PodMetrics var expectedContainerNames, nonExpectedMetricsNames []string for n, m := range metricsList { if n < len(testCase.namespaces) { @@ -105,7 +153,7 @@ func TestTopPod(t *testing.T) { if len(expectedMetrics) == 1 { response = expectedMetrics[0] } else { - response = metricsapi.PodMetricsList{ + response = metricsv1alpha1api.PodMetricsList{ ListMeta: metav1.ListMeta{ ResourceVersion: "2", }, @@ -119,6 +167,10 @@ func TestTopPod(t *testing.T) { NegotiatedSerializer: ns, Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { switch p, m, q := req.URL.Path, req.Method, req.URL.RawQuery; { + case p == "/api": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil + case p == "/apis": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil case p == testCase.expectedPath && m == "GET" && (testCase.expectedQuery == "" || q == testCase.expectedQuery): body, err := marshallBody(response) if err != nil { @@ -167,6 +219,188 @@ func TestTopPod(t *testing.T) { } } +func TestTopPodWithMetricsServer(t *testing.T) { + testNS := "testns" + testCases := []struct { + name string + namespace string + flags map[string]string + args []string + expectedPath string + expectedQuery string + namespaces []string + containers bool + listsNamespaces bool + }{ + { + name: "all namespaces", + flags: map[string]string{"all-namespaces": "true"}, + expectedPath: topMetricsAPIPathPrefix + "/pods", + namespaces: []string{testNS, "secondtestns", "thirdtestns"}, + listsNamespaces: true, + }, + { + name: "all in namespace", + expectedPath: topMetricsAPIPathPrefix + "/namespaces/" + testNS + "/pods", + namespaces: []string{testNS, testNS}, + }, + { + name: "pod with name", + args: []string{"pod1"}, + expectedPath: topMetricsAPIPathPrefix + "/namespaces/" + testNS + "/pods/pod1", + namespaces: []string{testNS}, + }, + { + name: "pod with label selector", + flags: map[string]string{"selector": "key=value"}, + expectedPath: topMetricsAPIPathPrefix + "/namespaces/" + testNS + "/pods", + expectedQuery: "labelSelector=" + url.QueryEscape("key=value"), + namespaces: []string{testNS, testNS}, + }, + { + name: "pod with container metrics", + flags: map[string]string{"containers": "true"}, + args: []string{"pod1"}, + expectedPath: topMetricsAPIPathPrefix + "/namespaces/" + testNS + "/pods/pod1", + namespaces: []string{testNS}, + containers: true, + }, + } + initTestErrorHandler(t) + for _, testCase := range testCases { + t.Logf("Running test case: %s", testCase.name) + metricsList := testV1beta1PodMetricsData() + var expectedMetrics []metricsv1beta1api.PodMetrics + var expectedContainerNames, nonExpectedMetricsNames []string + for n, m := range metricsList { + if n < len(testCase.namespaces) { + m.Namespace = testCase.namespaces[n] + expectedMetrics = append(expectedMetrics, m) + for _, c := range m.Containers { + expectedContainerNames = append(expectedContainerNames, c.Name) + } + } else { + nonExpectedMetricsNames = append(nonExpectedMetricsNames, m.Name) + } + } + + fakemetricsClientset := &metricsfake.Clientset{} + + if len(expectedMetrics) == 1 { + fakemetricsClientset.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, &expectedMetrics[0], nil + }) + } else { + fakemetricsClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + res := &metricsv1beta1api.PodMetricsList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "2", + }, + Items: expectedMetrics, + } + return true, res, nil + }) + } + + f, tf, _, ns := cmdtesting.NewAPIFactory() + tf.Printer = &testPrinter{} + tf.Client = &fake.RESTClient{ + NegotiatedSerializer: ns, + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + switch p := req.URL.Path; { + case p == "/api": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil + case p == "/apis": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbodyWithMetrics)))}, nil + default: + t.Fatalf("%s: unexpected request: %#v\nGot URL: %#v", + testCase.name, req, req.URL) + return nil, nil + } + }), + } + tf.MetricsClientSet = fakemetricsClientset + tf.Namespace = testNS + tf.ClientConfig = defaultClientConfig() + buf := bytes.NewBuffer([]byte{}) + + cmd := NewCmdTopPod(f, nil, buf) + for name, value := range testCase.flags { + cmd.Flags().Set(name, value) + } + cmd.Run(cmd, testCase.args) + + // Check the presence of pod names&namespaces/container names in the output. + result := buf.String() + if testCase.containers { + for _, containerName := range expectedContainerNames { + if !strings.Contains(result, containerName) { + t.Errorf("%s: missing metrics for container %s: \n%s", testCase.name, containerName, result) + } + } + } + for _, m := range expectedMetrics { + if !strings.Contains(result, m.Name) { + t.Errorf("%s: missing metrics for %s: \n%s", testCase.name, m.Name, result) + } + if testCase.listsNamespaces && !strings.Contains(result, m.Namespace) { + t.Errorf("%s: missing metrics for %s/%s: \n%s", testCase.name, m.Namespace, m.Name, result) + } + } + for _, name := range nonExpectedMetricsNames { + if strings.Contains(result, name) { + t.Errorf("%s: unexpected metrics for %s: \n%s", testCase.name, name, result) + } + } + } +} + +type fakeDiscovery struct{} + +// ServerGroups returns the supported groups, with information like supported versions and the +// preferred version. +func (d *fakeDiscovery) ServerGroups() (*metav1.APIGroupList, error) { + return nil, nil +} + +// ServerResourcesForGroupVersion returns the supported resources for a group and version. +func (d *fakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { + return nil, nil +} + +// ServerResources returns the supported resources for all groups and versions. +func (d *fakeDiscovery) ServerResources() ([]*metav1.APIResourceList, error) { + return nil, nil +} + +// ServerPreferredResources returns the supported resources with the version preferred by the +// server. +func (d *fakeDiscovery) ServerPreferredResources() ([]*metav1.APIResourceList, error) { + return nil, nil +} + +// ServerPreferredNamespacedResources returns the supported namespaced resources with the +// version preferred by the server. +func (d *fakeDiscovery) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) { + return nil, nil +} + +// ServerVersion retrieves and parses the server's version (git version). +func (d *fakeDiscovery) ServerVersion() (*apiversion.Info, error) { + return nil, nil +} + +// OpenAPISchema retrieves and parses the swagger API schema the server supports. +func (d *fakeDiscovery) OpenAPISchema() (*openapi_v2.Document, error) { + return nil, nil +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (d *fakeDiscovery) RESTClient() restclient.Interface { + return nil +} + func TestTopPodCustomDefaults(t *testing.T) { customBaseHeapsterServiceAddress := "/api/v1/namespaces/custom-namespace/services/https:custom-heapster-service:/proxy" customBaseMetricsAddress := customBaseHeapsterServiceAddress + "/apis/metrics" @@ -221,7 +455,7 @@ func TestTopPodCustomDefaults(t *testing.T) { for _, testCase := range testCases { t.Logf("Running test case: %s", testCase.name) metricsList := testPodMetricsData() - var expectedMetrics []metricsapi.PodMetrics + var expectedMetrics []metricsv1alpha1api.PodMetrics var expectedContainerNames, nonExpectedMetricsNames []string for n, m := range metricsList { if n < len(testCase.namespaces) { @@ -239,7 +473,7 @@ func TestTopPodCustomDefaults(t *testing.T) { if len(expectedMetrics) == 1 { response = expectedMetrics[0] } else { - response = metricsapi.PodMetricsList{ + response = metricsv1alpha1api.PodMetricsList{ ListMeta: metav1.ListMeta{ ResourceVersion: "2", }, @@ -253,6 +487,10 @@ func TestTopPodCustomDefaults(t *testing.T) { NegotiatedSerializer: ns, Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { switch p, m, q := req.URL.Path, req.Method, req.URL.RawQuery; { + case p == "/api": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil + case p == "/apis": + return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil case p == testCase.expectedPath && m == "GET" && (testCase.expectedQuery == "" || q == testCase.expectedQuery): body, err := marshallBody(response) if err != nil { @@ -276,6 +514,7 @@ func TestTopPodCustomDefaults(t *testing.T) { Scheme: "https", Service: "custom-heapster-service", }, + DiscoveryClient: &fakeDiscovery{}, } cmd := NewCmdTopPod(f, opts, buf) for name, value := range testCase.flags { @@ -308,12 +547,12 @@ func TestTopPodCustomDefaults(t *testing.T) { } } -func testPodMetricsData() []metricsapi.PodMetrics { - return []metricsapi.PodMetrics{ +func testV1beta1PodMetricsData() []metricsv1beta1api.PodMetrics { + return []metricsv1beta1api.PodMetrics{ { - ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "test", ResourceVersion: "10"}, + ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "test", ResourceVersion: "10", Labels: map[string]string{"key": "value"}}, Window: metav1.Duration{Duration: time.Minute}, - Containers: []metricsapi.ContainerMetrics{ + Containers: []metricsv1beta1api.ContainerMetrics{ { Name: "container1-1", Usage: v1.ResourceList{ @@ -333,9 +572,9 @@ func testPodMetricsData() []metricsapi.PodMetrics { }, }, { - ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "test", ResourceVersion: "11"}, + ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "test", ResourceVersion: "11", Labels: map[string]string{"key": "value"}}, Window: metav1.Duration{Duration: time.Minute}, - Containers: []metricsapi.ContainerMetrics{ + Containers: []metricsv1beta1api.ContainerMetrics{ { Name: "container2-1", Usage: v1.ResourceList{ @@ -365,7 +604,78 @@ func testPodMetricsData() []metricsapi.PodMetrics { { ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "test", ResourceVersion: "12"}, Window: metav1.Duration{Duration: time.Minute}, - Containers: []metricsapi.ContainerMetrics{ + Containers: []metricsv1beta1api.ContainerMetrics{ + { + Name: "container3-1", + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(7, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(8*(1024*1024), resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(9*(1024*1024), resource.DecimalSI), + }, + }, + }, + }, + } +} + +func testPodMetricsData() []metricsv1alpha1api.PodMetrics { + return []metricsv1alpha1api.PodMetrics{ + { + ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "test", ResourceVersion: "10"}, + Window: metav1.Duration{Duration: time.Minute}, + Containers: []metricsv1alpha1api.ContainerMetrics{ + { + Name: "container1-1", + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(2*(1024*1024), resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(3*(1024*1024), resource.DecimalSI), + }, + }, + { + Name: "container1-2", + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(4, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(5*(1024*1024), resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(6*(1024*1024), resource.DecimalSI), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "test", ResourceVersion: "11"}, + Window: metav1.Duration{Duration: time.Minute}, + Containers: []metricsv1alpha1api.ContainerMetrics{ + { + Name: "container2-1", + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(7, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(8*(1024*1024), resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(9*(1024*1024), resource.DecimalSI), + }, + }, + { + Name: "container2-2", + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(11*(1024*1024), resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(12*(1024*1024), resource.DecimalSI), + }, + }, + { + Name: "container2-3", + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(13, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(14*(1024*1024), resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(15*(1024*1024), resource.DecimalSI), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "test", ResourceVersion: "12"}, + Window: metav1.Duration{Duration: time.Minute}, + Containers: []metricsv1alpha1api.ContainerMetrics{ { Name: "container3-1", Usage: v1.ResourceList{ diff --git a/pkg/kubectl/cmd/top_test.go b/pkg/kubectl/cmd/top_test.go index ce168b649d0..da57d99e46b 100644 --- a/pkg/kubectl/cmd/top_test.go +++ b/pkg/kubectl/cmd/top_test.go @@ -29,12 +29,14 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" - metricsapi "k8s.io/metrics/pkg/apis/metrics/v1alpha1" + metricsv1alpha1api "k8s.io/metrics/pkg/apis/metrics/v1alpha1" + metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1" ) const ( baseHeapsterServiceAddress = "/api/v1/namespaces/kube-system/services/http:heapster:/proxy" baseMetricsAddress = baseHeapsterServiceAddress + "/apis/metrics" + baseMetricsServerAddress = "/apis/metrics.k8s.io/v1beta1" metricsApiVersion = "v1alpha1" ) @@ -58,12 +60,12 @@ func marshallBody(metrics interface{}) (io.ReadCloser, error) { return ioutil.NopCloser(bytes.NewReader(result)), nil } -func testNodeMetricsData() (*metricsapi.NodeMetricsList, *v1.NodeList) { - metrics := &metricsapi.NodeMetricsList{ +func testNodeV1alpha1MetricsData() (*metricsv1alpha1api.NodeMetricsList, *v1.NodeList) { + metrics := &metricsv1alpha1api.NodeMetricsList{ ListMeta: metav1.ListMeta{ ResourceVersion: "1", }, - Items: []metricsapi.NodeMetrics{ + Items: []metricsv1alpha1api.NodeMetrics{ { ObjectMeta: metav1.ObjectMeta{Name: "node1", ResourceVersion: "10"}, Window: metav1.Duration{Duration: time.Minute}, @@ -113,3 +115,59 @@ func testNodeMetricsData() (*metricsapi.NodeMetricsList, *v1.NodeList) { } return metrics, nodes } + +func testNodeV1beta1MetricsData() (*metricsv1beta1api.NodeMetricsList, *v1.NodeList) { + metrics := &metricsv1beta1api.NodeMetricsList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "1", + }, + Items: []metricsv1beta1api.NodeMetrics{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1", ResourceVersion: "10", Labels: map[string]string{"key": "value"}}, + Window: metav1.Duration{Duration: time.Minute}, + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(2*(1024*1024), resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(3*(1024*1024), resource.DecimalSI), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node2", ResourceVersion: "11"}, + Window: metav1.Duration{Duration: time.Minute}, + Usage: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(5, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(6*(1024*1024), resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(7*(1024*1024), resource.DecimalSI), + }, + }, + }, + } + nodes := &v1.NodeList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "15", + }, + Items: []v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{Name: "node1", ResourceVersion: "10"}, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20*(1024*1024), resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(30*(1024*1024), resource.DecimalSI), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "node2", ResourceVersion: "11"}, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(50, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(60*(1024*1024), resource.DecimalSI), + v1.ResourceStorage: *resource.NewQuantity(70*(1024*1024), resource.DecimalSI), + }, + }, + }, + }, + } + return metrics, nodes +} diff --git a/pkg/kubectl/cmd/util/BUILD b/pkg/kubectl/cmd/util/BUILD index ed3d59475c3..cc4a42a17a4 100644 --- a/pkg/kubectl/cmd/util/BUILD +++ b/pkg/kubectl/cmd/util/BUILD @@ -81,6 +81,7 @@ go_library( "//vendor/k8s.io/client-go/scale:go_default_library", "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library", "//vendor/k8s.io/client-go/util/homedir:go_default_library", + "//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", ], ) diff --git a/pkg/kubectl/cmd/util/clientcache.go b/pkg/kubectl/cmd/util/clientcache.go index 124216d2e8f..3aca7400772 100644 --- a/pkg/kubectl/cmd/util/clientcache.go +++ b/pkg/kubectl/cmd/util/clientcache.go @@ -27,6 +27,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" oldclient "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/version" + metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset" ) func NewClientCache(loader clientcmd.ClientConfig, discoveryClientFactory DiscoveryClientFactory) *ClientCache { @@ -58,6 +59,7 @@ type ClientCache struct { discoveryClient discovery.DiscoveryInterface kubernetesClientCache kubernetesClientCache + metricsClientCache metricsClientCache } // kubernetesClientCache creates a new kubernetes.Clientset one time @@ -71,6 +73,17 @@ type kubernetesClientCache struct { err error } +// metricsClientCache creates a new metricsclientset.Clientset one time +// and then returns the result for all future requests +type metricsClientCache struct { + // once makes sure the client is only initialized once + once sync.Once + // client is the cached client value + client *metricsclientset.Clientset + // err is the cached error value + err error +} + // KubernetesClientSetForVersion returns a new kubernetes.Clientset. It will cache the value // the first time it is called and return the cached value on subsequent calls. // If an error is encountered the first time KubernetesClientSetForVersion is called, @@ -87,6 +100,22 @@ func (c *ClientCache) KubernetesClientSetForVersion(requiredVersion *schema.Grou return c.kubernetesClientCache.client, c.kubernetesClientCache.err } +// MetricsClientSetForVersion returns a new kubernetes.Clientset. It will cache the value +// the first time it is called and return the cached value on subsequent calls. +// If an error is encountered the first time MetircsClientSetForVersion is called, +// the error will be cached. +func (c *ClientCache) MetricsClientSetForVersion(requiredVersion *schema.GroupVersion) (*metricsclientset.Clientset, error) { + c.metricsClientCache.once.Do(func() { + config, err := c.ClientConfigForVersion(requiredVersion) + if err != nil { + c.kubernetesClientCache.err = err + return + } + c.metricsClientCache.client, c.metricsClientCache.err = metricsclientset.NewForConfig(config) + }) + return c.metricsClientCache.client, c.metricsClientCache.err +} + // also looks up the discovery client. We can't do this during init because the flags won't have been set // because this is constructed pre-command execution before the command tree is // even set up. Requires the lock to already be acquired diff --git a/pkg/kubectl/cmd/util/factory.go b/pkg/kubectl/cmd/util/factory.go index 82a28ae6426..9a2fc6f6cc0 100644 --- a/pkg/kubectl/cmd/util/factory.go +++ b/pkg/kubectl/cmd/util/factory.go @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/kubectl/validation" "k8s.io/kubernetes/pkg/printers" + metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset" ) const ( @@ -95,6 +96,9 @@ type ClientAccessFactory interface { // KubernetesClientSet gives you back an external clientset KubernetesClientSet() (*kubernetes.Clientset, error) + // MetricsClientSet gives you back an external clientset for the metrics API + MetricsClientSet() (metricsclientset.Interface, error) + // Returns a RESTClient for accessing Kubernetes resources or an error. RESTClient() (*restclient.RESTClient, error) // Returns a client.Config for accessing the Kubernetes server. diff --git a/pkg/kubectl/cmd/util/factory_client_access.go b/pkg/kubectl/cmd/util/factory_client_access.go index 391aa33a384..47cd38e2e22 100644 --- a/pkg/kubectl/cmd/util/factory_client_access.go +++ b/pkg/kubectl/cmd/util/factory_client_access.go @@ -31,6 +31,7 @@ import ( "time" "k8s.io/api/core/v1" + metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -201,6 +202,10 @@ func (f *ring0Factory) KubernetesClientSet() (*kubernetes.Clientset, error) { return f.clientCache.KubernetesClientSetForVersion(nil) } +func (f *ring0Factory) MetricsClientSet() (metricsclientset.Interface, error) { + return f.clientCache.MetricsClientSetForVersion(nil) +} + func (f *ring0Factory) ClientSet() (internalclientset.Interface, error) { return f.clientCache.ClientSetForVersion(nil) } diff --git a/pkg/kubectl/metricsutil/BUILD b/pkg/kubectl/metricsutil/BUILD index 84fc55a01d7..a3cfa89ff26 100644 --- a/pkg/kubectl/metricsutil/BUILD +++ b/pkg/kubectl/metricsutil/BUILD @@ -23,6 +23,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/metrics/pkg/apis/metrics:go_default_library", "//vendor/k8s.io/metrics/pkg/apis/metrics/v1alpha1:go_default_library", ], ) diff --git a/pkg/kubectl/metricsutil/metrics_client.go b/pkg/kubectl/metricsutil/metrics_client.go index 909f91f67e8..e8d48182d02 100644 --- a/pkg/kubectl/metricsutil/metrics_client.go +++ b/pkg/kubectl/metricsutil/metrics_client.go @@ -26,7 +26,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/pkg/apis/core/validation" - metricsapi "k8s.io/metrics/pkg/apis/metrics/v1alpha1" + metricsapi "k8s.io/metrics/pkg/apis/metrics" + metricsv1alpha1api "k8s.io/metrics/pkg/apis/metrics/v1alpha1" ) const ( @@ -97,67 +98,73 @@ func nodeMetricsUrl(name string) (string, error) { return fmt.Sprintf("%s/nodes/%s", metricsRoot, name), nil } -func (cli *HeapsterMetricsClient) GetNodeMetrics(nodeName string, selector string) ([]metricsapi.NodeMetrics, error) { +func (cli *HeapsterMetricsClient) GetNodeMetrics(nodeName string, selector string) (*metricsapi.NodeMetricsList, error) { params := map[string]string{"labelSelector": selector} path, err := nodeMetricsUrl(nodeName) if err != nil { - return []metricsapi.NodeMetrics{}, err + return nil, err } resultRaw, err := GetHeapsterMetrics(cli, path, params) if err != nil { - return []metricsapi.NodeMetrics{}, err + return nil, err } - metrics := make([]metricsapi.NodeMetrics, 0) + versionedMetrics := metricsv1alpha1api.NodeMetricsList{} if len(nodeName) == 0 { - metricsList := metricsapi.NodeMetricsList{} - err = json.Unmarshal(resultRaw, &metricsList) + err = json.Unmarshal(resultRaw, &versionedMetrics) if err != nil { - return []metricsapi.NodeMetrics{}, fmt.Errorf("failed to unmarshall heapster response: %v", err) + return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err) } - metrics = append(metrics, metricsList.Items...) } else { - var singleMetric metricsapi.NodeMetrics + var singleMetric metricsv1alpha1api.NodeMetrics err = json.Unmarshal(resultRaw, &singleMetric) if err != nil { - return []metricsapi.NodeMetrics{}, fmt.Errorf("failed to unmarshall heapster response: %v", err) + return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err) } - metrics = append(metrics, singleMetric) + versionedMetrics.Items = []metricsv1alpha1api.NodeMetrics{singleMetric} + } + metrics := &metricsapi.NodeMetricsList{} + err = metricsv1alpha1api.Convert_v1alpha1_NodeMetricsList_To_metrics_NodeMetricsList(&versionedMetrics, metrics, nil) + if err != nil { + return nil, err } return metrics, nil } -func (cli *HeapsterMetricsClient) GetPodMetrics(namespace string, podName string, allNamespaces bool, selector labels.Selector) ([]metricsapi.PodMetrics, error) { +func (cli *HeapsterMetricsClient) GetPodMetrics(namespace string, podName string, allNamespaces bool, selector labels.Selector) (*metricsapi.PodMetricsList, error) { if allNamespaces { namespace = metav1.NamespaceAll } path, err := podMetricsUrl(namespace, podName) if err != nil { - return []metricsapi.PodMetrics{}, err + return nil, err } params := map[string]string{"labelSelector": selector.String()} - allMetrics := make([]metricsapi.PodMetrics, 0) + versionedMetrics := metricsv1alpha1api.PodMetricsList{} resultRaw, err := GetHeapsterMetrics(cli, path, params) if err != nil { - return []metricsapi.PodMetrics{}, err + return nil, err } if len(podName) == 0 { - metrics := metricsapi.PodMetricsList{} - err = json.Unmarshal(resultRaw, &metrics) + err = json.Unmarshal(resultRaw, &versionedMetrics) if err != nil { - return []metricsapi.PodMetrics{}, fmt.Errorf("failed to unmarshall heapster response: %v", err) + return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err) } - allMetrics = append(allMetrics, metrics.Items...) } else { - var singleMetric metricsapi.PodMetrics + var singleMetric metricsv1alpha1api.PodMetrics err = json.Unmarshal(resultRaw, &singleMetric) if err != nil { - return []metricsapi.PodMetrics{}, fmt.Errorf("failed to unmarshall heapster response: %v", err) + return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err) } - allMetrics = append(allMetrics, singleMetric) + versionedMetrics.Items = []metricsv1alpha1api.PodMetrics{singleMetric} } - return allMetrics, nil + metrics := &metricsapi.PodMetricsList{} + err = metricsv1alpha1api.Convert_v1alpha1_PodMetricsList_To_metrics_PodMetricsList(&versionedMetrics, metrics, nil) + if err != nil { + return nil, err + } + return metrics, nil } func GetHeapsterMetrics(cli *HeapsterMetricsClient, path string, params map[string]string) ([]byte, error) { diff --git a/pkg/kubectl/metricsutil/metrics_printer.go b/pkg/kubectl/metricsutil/metrics_printer.go index b1bd5451bfe..ff92b055f01 100644 --- a/pkg/kubectl/metricsutil/metrics_printer.go +++ b/pkg/kubectl/metricsutil/metrics_printer.go @@ -25,7 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/printers" - metricsapi "k8s.io/metrics/pkg/apis/metrics/v1alpha1" + metricsapi "k8s.io/metrics/pkg/apis/metrics" ) var (