Merge pull request #56206 from brancz/top-metrics-s

Automatic merge from submit-queue (batch tested with PRs 56206, 58525). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

kubectl: Use metrics-server for kubectl top commands

**What this PR does / why we need it**:

This PR implements support for the kubectl top commands to use the metrics-server as an aggregated API, instead of requesting the metrics from heapster directly. If the `metrics.k8s.io` API is not served by the apiserver, then this still falls back to the previous behavior.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:

Fixes #55489

**Special notes for your reviewer**:

As utilizing heapster as well as the v1alpha1 version of the metrics API is discouraged, I intentionally implemented the support very separated, so that once it is decided, that support is entirely removed, this will make it easy.

**Release note**:

```release-note
Support metrics API in `kubectl top` commands.
```

/cc @kubernetes/sig-instrumentation-pr-reviews @DirectXMan12 @fgrzadkowski @piosz
This commit is contained in:
Kubernetes Submit Queue 2018-01-23 13:17:31 -08:00 committed by GitHub
commit 3cbb62b6bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 819 additions and 60 deletions

View File

@ -140,6 +140,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/version:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
"//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/rbac/v1:go_default_library",
@ -149,6 +150,9 @@ go_library(
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/client-go/transport/spdy:go_default_library",
"//vendor/k8s.io/kube-openapi/pkg/util/proto:go_default_library",
"//vendor/k8s.io/metrics/pkg/apis/metrics:go_default_library",
"//vendor/k8s.io/metrics/pkg/apis/metrics/v1beta1:go_default_library",
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
],
)
@ -224,6 +228,7 @@ go_test(
"//pkg/printers:go_default_library",
"//pkg/printers/internalversion:go_default_library",
"//pkg/util/strings:go_default_library",
"//vendor/github.com/googleapis/gnostic/OpenAPIv2:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/gopkg.in/yaml.v2:go_default_library",
@ -245,11 +250,15 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch/testing:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/version:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/rest/fake:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/metrics/pkg/apis/metrics/v1alpha1:go_default_library",
"//vendor/k8s.io/metrics/pkg/apis/metrics/v1beta1:go_default_library",
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
],
)

View File

@ -40,6 +40,7 @@ go_library(
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/rest/fake:go_default_library",
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset:go_default_library",
],
)

View File

@ -50,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/kubectl/validation"
"k8s.io/kubernetes/pkg/printers"
metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset"
)
// +k8s:deepcopy-gen=true
@ -245,6 +246,7 @@ type TestFactory struct {
TmpDir string
CategoryExpander categories.CategoryExpander
SkipDiscovery bool
MetricsClientSet metricsclientset.Interface
ClientForMappingFunc func(mapping *meta.RESTMapping) (resource.RESTClient, error)
UnstructuredClientForMappingFunc func(mapping *meta.RESTMapping) (resource.RESTClient, error)
@ -315,6 +317,10 @@ func (f *FakeFactory) KubernetesClientSet() (*kubernetes.Clientset, error) {
return nil, nil
}
func (f *FakeFactory) MetricsClientSet() (metricsclientset.Interface, error) {
return f.tf.MetricsClientSet, f.tf.Err
}
func (f *FakeFactory) ClientSet() (internalclientset.Interface, error) {
return nil, nil
}
@ -675,6 +681,10 @@ func (f *fakeAPIFactory) KubernetesClientSet() (*kubernetes.Clientset, error) {
return clientset, f.tf.Err
}
func (f *fakeAPIFactory) MetricsClientSet() (metricsclientset.Interface, error) {
return f.tf.MetricsClientSet, f.tf.Err
}
func (f *fakeAPIFactory) ClientSet() (internalclientset.Interface, error) {
// Swap the HTTP client out of the REST client with the fake
// version.

View File

@ -19,8 +19,10 @@ package cmd
import (
"io"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/util/i18n"
metricsapi "k8s.io/metrics/pkg/apis/metrics"
"github.com/spf13/cobra"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
@ -30,6 +32,9 @@ import (
type TopOptions struct{}
var (
supportedMetricsAPIVersions = []string{
"v1beta1",
}
topLong = templates.LongDesc(i18n.T(`
Display Resource (CPU/Memory/Storage) usage.
@ -51,3 +56,19 @@ func NewCmdTop(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
cmd.AddCommand(NewCmdTopPod(f, nil, out))
return cmd
}
func SupportedMetricsAPIVersionAvailable(discoveredAPIGroups *metav1.APIGroupList) bool {
for _, discoveredAPIGroup := range discoveredAPIGroups.Groups {
if discoveredAPIGroup.Name != metricsapi.GroupName {
continue
}
for _, version := range discoveredAPIGroup.Versions {
for _, supportedVersion := range supportedMetricsAPIVersions {
if version.Version == supportedVersion {
return true
}
}
}
}
return false
}

View File

@ -25,11 +25,15 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/discovery"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/metricsutil"
"k8s.io/kubernetes/pkg/kubectl/util/i18n"
metricsapi "k8s.io/metrics/pkg/apis/metrics"
metricsV1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1"
metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset"
)
// TopNodeOptions contains all the options for running the top-node cli command.
@ -40,6 +44,8 @@ type TopNodeOptions struct {
HeapsterOptions HeapsterTopOptions
Client *metricsutil.HeapsterMetricsClient
Printer *metricsutil.TopCmdPrinter
DiscoveryClient discovery.DiscoveryInterface
MetricsClient metricsclientset.Interface
}
type HeapsterTopOptions struct {
@ -123,8 +129,16 @@ func (o *TopNodeOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []
if err != nil {
return err
}
o.DiscoveryClient = clientset.DiscoveryClient
o.MetricsClient, err = f.MetricsClientSet()
if err != nil {
return err
}
o.NodeClient = clientset.CoreV1()
o.Client = metricsutil.NewHeapsterMetricsClient(clientset.CoreV1(), o.HeapsterOptions.Namespace, o.HeapsterOptions.Scheme, o.HeapsterOptions.Service, o.HeapsterOptions.Port)
o.Printer = metricsutil.NewTopCmdPrinter(out)
return nil
}
@ -138,15 +152,35 @@ func (o *TopNodeOptions) Validate() error {
func (o TopNodeOptions) RunTopNode() error {
var err error
selector := labels.Everything().String()
selector := labels.Everything()
if len(o.Selector) > 0 {
selector = o.Selector
selector, err = labels.Parse(o.Selector)
if err != nil {
return err
}
}
metrics, err := o.Client.GetNodeMetrics(o.ResourceName, selector)
apiGroups, err := o.DiscoveryClient.ServerGroups()
if err != nil {
return err
}
if len(metrics) == 0 {
metricsAPIAvailable := SupportedMetricsAPIVersionAvailable(apiGroups)
metrics := &metricsapi.NodeMetricsList{}
if metricsAPIAvailable {
metrics, err = getNodeMetricsFromMetricsAPI(o.MetricsClient, o.ResourceName, selector)
if err != nil {
return err
}
} else {
metrics, err = o.Client.GetNodeMetrics(o.ResourceName, selector.String())
if err != nil {
return err
}
}
if len(metrics.Items) == 0 {
return errors.New("metrics not available yet")
}
@ -159,7 +193,7 @@ func (o TopNodeOptions) RunTopNode() error {
nodes = append(nodes, *node)
} else {
nodeList, err := o.NodeClient.Nodes().List(metav1.ListOptions{
LabelSelector: selector,
LabelSelector: selector.String(),
})
if err != nil {
return err
@ -173,5 +207,30 @@ func (o TopNodeOptions) RunTopNode() error {
allocatable[n.Name] = n.Status.Allocatable
}
return o.Printer.PrintNodeMetrics(metrics, allocatable)
return o.Printer.PrintNodeMetrics(metrics.Items, allocatable)
}
func getNodeMetricsFromMetricsAPI(metricsClient metricsclientset.Interface, resourceName string, selector labels.Selector) (*metricsapi.NodeMetricsList, error) {
var err error
versionedMetrics := &metricsV1beta1api.NodeMetricsList{}
mc := metricsClient.Metrics()
nm := mc.NodeMetricses()
if resourceName != "" {
m, err := nm.Get(resourceName, metav1.GetOptions{})
if err != nil {
return nil, err
}
versionedMetrics.Items = []metricsV1beta1api.NodeMetrics{*m}
} else {
versionedMetrics, err = nm.List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return nil, err
}
}
metrics := &metricsapi.NodeMetricsList{}
err = metricsV1beta1api.Convert_v1beta1_NodeMetricsList_To_metrics_NodeMetricsList(versionedMetrics, metrics, nil)
if err != nil {
return nil, err
}
return metrics, nil
}

View File

@ -19,6 +19,7 @@ package cmd
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"
@ -26,9 +27,13 @@ import (
"net/url"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest/fake"
core "k8s.io/client-go/testing"
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
"k8s.io/metrics/pkg/apis/metrics/v1alpha1"
metricsv1alpha1api "k8s.io/metrics/pkg/apis/metrics/v1alpha1"
metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1"
metricsfake "k8s.io/metrics/pkg/client/clientset_generated/clientset/fake"
)
const (
@ -38,7 +43,7 @@ const (
func TestTopNodeAllMetrics(t *testing.T) {
initTestErrorHandler(t)
metrics, nodes := testNodeMetricsData()
metrics, nodes := testNodeV1alpha1MetricsData()
expectedMetricsPath := fmt.Sprintf("%s/%s/nodes", baseMetricsAddress, metricsApiVersion)
expectedNodePath := fmt.Sprintf("/%s/%s/nodes", apiPrefix, apiVersion)
@ -48,6 +53,10 @@ func TestTopNodeAllMetrics(t *testing.T) {
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == "/api":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil
case p == "/apis":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil
case p == expectedMetricsPath && m == "GET":
body, err := marshallBody(metrics)
if err != nil {
@ -83,7 +92,7 @@ func TestTopNodeAllMetricsCustomDefaults(t *testing.T) {
customBaseMetricsAddress := customBaseHeapsterServiceAddress + "/apis/metrics"
initTestErrorHandler(t)
metrics, nodes := testNodeMetricsData()
metrics, nodes := testNodeV1alpha1MetricsData()
expectedMetricsPath := fmt.Sprintf("%s/%s/nodes", customBaseMetricsAddress, metricsApiVersion)
expectedNodePath := fmt.Sprintf("/%s/%s/nodes", apiPrefix, apiVersion)
@ -93,6 +102,10 @@ func TestTopNodeAllMetricsCustomDefaults(t *testing.T) {
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == "/api":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil
case p == "/apis":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil
case p == expectedMetricsPath && m == "GET":
body, err := marshallBody(metrics)
if err != nil {
@ -132,10 +145,10 @@ func TestTopNodeAllMetricsCustomDefaults(t *testing.T) {
func TestTopNodeWithNameMetrics(t *testing.T) {
initTestErrorHandler(t)
metrics, nodes := testNodeMetricsData()
metrics, nodes := testNodeV1alpha1MetricsData()
expectedMetrics := metrics.Items[0]
expectedNode := nodes.Items[0]
nonExpectedMetrics := v1alpha1.NodeMetricsList{
nonExpectedMetrics := metricsv1alpha1api.NodeMetricsList{
ListMeta: metrics.ListMeta,
Items: metrics.Items[1:],
}
@ -148,6 +161,10 @@ func TestTopNodeWithNameMetrics(t *testing.T) {
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == "/api":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil
case p == "/apis":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil
case p == expectedPath && m == "GET":
body, err := marshallBody(expectedMetrics)
if err != nil {
@ -183,8 +200,8 @@ func TestTopNodeWithNameMetrics(t *testing.T) {
func TestTopNodeWithLabelSelectorMetrics(t *testing.T) {
initTestErrorHandler(t)
metrics, nodes := testNodeMetricsData()
expectedMetrics := v1alpha1.NodeMetricsList{
metrics, nodes := testNodeV1alpha1MetricsData()
expectedMetrics := metricsv1alpha1api.NodeMetricsList{
ListMeta: metrics.ListMeta,
Items: metrics.Items[0:1],
}
@ -192,7 +209,7 @@ func TestTopNodeWithLabelSelectorMetrics(t *testing.T) {
ListMeta: nodes.ListMeta,
Items: nodes.Items[0:1],
}
nonExpectedMetrics := v1alpha1.NodeMetricsList{
nonExpectedMetrics := metricsv1alpha1api.NodeMetricsList{
ListMeta: metrics.ListMeta,
Items: metrics.Items[1:],
}
@ -207,6 +224,10 @@ func TestTopNodeWithLabelSelectorMetrics(t *testing.T) {
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m, q := req.URL.Path, req.Method, req.URL.RawQuery; {
case p == "/api":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil
case p == "/apis":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil
case p == expectedPath && m == "GET" && q == expectedQuery:
body, err := marshallBody(expectedMetrics)
if err != nil {
@ -242,3 +263,164 @@ func TestTopNodeWithLabelSelectorMetrics(t *testing.T) {
}
}
}
func TestTopNodeAllMetricsFromMetricsServer(t *testing.T) {
initTestErrorHandler(t)
expectedMetrics, nodes := testNodeV1beta1MetricsData()
expectedNodePath := fmt.Sprintf("/%s/%s/nodes", apiPrefix, apiVersion)
f, tf, codec, ns := cmdtesting.NewAPIFactory()
tf.Printer = &testPrinter{}
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == "/api":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil
case p == "/apis":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbodyWithMetrics)))}, nil
case p == expectedNodePath && m == "GET":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, nodes)}, nil
default:
t.Fatalf("unexpected request: %#v\nGot URL: %#v\n", req, req.URL)
return nil, nil
}
}),
}
fakemetricsClientset := &metricsfake.Clientset{}
fakemetricsClientset.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, expectedMetrics, nil
})
tf.MetricsClientSet = fakemetricsClientset
tf.Namespace = "test"
tf.ClientConfig = defaultClientConfig()
buf := bytes.NewBuffer([]byte{})
cmd := NewCmdTopNode(f, nil, buf)
cmd.Run(cmd, []string{})
// Check the presence of node names in the output.
result := buf.String()
for _, m := range expectedMetrics.Items {
if !strings.Contains(result, m.Name) {
t.Errorf("missing metrics for %s: \n%s", m.Name, result)
}
}
}
func TestTopNodeWithNameMetricsFromMetricsServer(t *testing.T) {
initTestErrorHandler(t)
metrics, nodes := testNodeV1beta1MetricsData()
expectedMetrics := metrics.Items[0]
expectedNode := nodes.Items[0]
nonExpectedMetrics := metricsv1beta1api.NodeMetricsList{
ListMeta: metrics.ListMeta,
Items: metrics.Items[1:],
}
expectedNodePath := fmt.Sprintf("/%s/%s/nodes/%s", apiPrefix, apiVersion, expectedMetrics.Name)
f, tf, codec, ns := cmdtesting.NewAPIFactory()
tf.Printer = &testPrinter{}
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == "/api":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil
case p == "/apis":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbodyWithMetrics)))}, nil
case p == expectedNodePath && m == "GET":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &expectedNode)}, nil
default:
t.Fatalf("unexpected request: %#v\nGot URL: %#v\n", req, req.URL)
return nil, nil
}
}),
}
fakemetricsClientset := &metricsfake.Clientset{}
fakemetricsClientset.AddReactor("get", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &expectedMetrics, nil
})
tf.MetricsClientSet = fakemetricsClientset
tf.Namespace = "test"
tf.ClientConfig = defaultClientConfig()
buf := bytes.NewBuffer([]byte{})
cmd := NewCmdTopNode(f, nil, buf)
cmd.Run(cmd, []string{expectedMetrics.Name})
// Check the presence of node names in the output.
result := buf.String()
if !strings.Contains(result, expectedMetrics.Name) {
t.Errorf("missing metrics for %s: \n%s", expectedMetrics.Name, result)
}
for _, m := range nonExpectedMetrics.Items {
if strings.Contains(result, m.Name) {
t.Errorf("unexpected metrics for %s: \n%s", m.Name, result)
}
}
}
func TestTopNodeWithLabelSelectorMetricsFromMetricsServer(t *testing.T) {
initTestErrorHandler(t)
metrics, nodes := testNodeV1beta1MetricsData()
expectedMetrics := &metricsv1beta1api.NodeMetricsList{
ListMeta: metrics.ListMeta,
Items: metrics.Items[0:1],
}
expectedNodes := v1.NodeList{
ListMeta: nodes.ListMeta,
Items: nodes.Items[0:1],
}
nonExpectedMetrics := &metricsv1beta1api.NodeMetricsList{
ListMeta: metrics.ListMeta,
Items: metrics.Items[1:],
}
label := "key=value"
expectedNodePath := fmt.Sprintf("/%s/%s/nodes", apiPrefix, apiVersion)
f, tf, codec, ns := cmdtesting.NewAPIFactory()
tf.Printer = &testPrinter{}
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m, _ := req.URL.Path, req.Method, req.URL.RawQuery; {
case p == "/api":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil
case p == "/apis":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbodyWithMetrics)))}, nil
case p == expectedNodePath && m == "GET":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &expectedNodes)}, nil
default:
t.Fatalf("unexpected request: %#v\nGot URL: %#v\n", req, req.URL)
return nil, nil
}
}),
}
fakemetricsClientset := &metricsfake.Clientset{}
fakemetricsClientset.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, expectedMetrics, nil
})
tf.MetricsClientSet = fakemetricsClientset
tf.Namespace = "test"
tf.ClientConfig = defaultClientConfig()
buf := bytes.NewBuffer([]byte{})
cmd := NewCmdTopNode(f, nil, buf)
cmd.Flags().Set("selector", label)
cmd.Run(cmd, []string{})
// Check the presence of node names in the output.
result := buf.String()
for _, m := range expectedMetrics.Items {
if !strings.Contains(result, m.Name) {
t.Errorf("missing metrics for %s: \n%s", m.Name, result)
}
}
for _, m := range nonExpectedMetrics.Items {
if strings.Contains(result, m.Name) {
t.Errorf("unexpected metrics for %s: \n%s", m.Name, result)
}
}
}

View File

@ -25,11 +25,15 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/discovery"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/metricsutil"
"k8s.io/kubernetes/pkg/kubectl/util/i18n"
metricsapi "k8s.io/metrics/pkg/apis/metrics"
metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1"
metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset"
"github.com/golang/glog"
"github.com/spf13/cobra"
@ -45,6 +49,8 @@ type TopPodOptions struct {
HeapsterOptions HeapsterTopOptions
Client *metricsutil.HeapsterMetricsClient
Printer *metricsutil.TopCmdPrinter
DiscoveryClient discovery.DiscoveryInterface
MetricsClient metricsclientset.Interface
}
const metricsCreationDelay = 2 * time.Minute
@ -119,8 +125,16 @@ func (o *TopPodOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []s
if err != nil {
return err
}
o.DiscoveryClient = clientset.DiscoveryClient
o.MetricsClient, err = f.MetricsClientSet()
if err != nil {
return err
}
o.PodClient = clientset.CoreV1()
o.Client = metricsutil.NewHeapsterMetricsClient(clientset.CoreV1(), o.HeapsterOptions.Namespace, o.HeapsterOptions.Scheme, o.HeapsterOptions.Service, o.HeapsterOptions.Port)
o.Printer = metricsutil.NewTopCmdPrinter(out)
return nil
}
@ -141,10 +155,30 @@ func (o TopPodOptions) RunTopPod() error {
return err
}
}
metrics, err := o.Client.GetPodMetrics(o.Namespace, o.ResourceName, o.AllNamespaces, selector)
apiGroups, err := o.DiscoveryClient.ServerGroups()
if err != nil {
return err
}
metricsAPIAvailable := SupportedMetricsAPIVersionAvailable(apiGroups)
metrics := &metricsapi.PodMetricsList{}
if metricsAPIAvailable {
metrics, err = getMetricsFromMetricsAPI(o.MetricsClient, o.Namespace, o.ResourceName, o.AllNamespaces, selector)
if err != nil {
return err
}
} else {
metrics, err = o.Client.GetPodMetrics(o.Namespace, o.ResourceName, o.AllNamespaces, selector)
if err != nil {
return err
}
}
// TODO: Refactor this once Heapster becomes the API server.
// First we check why no metrics have been received.
if len(metrics) == 0 {
if len(metrics.Items) == 0 {
// If the API server query is successful but all the pods are newly created,
// the metrics are probably not ready yet, so we return the error here in the first place.
e := verifyEmptyMetrics(o, selector)
@ -155,7 +189,35 @@ func (o TopPodOptions) RunTopPod() error {
if err != nil {
return err
}
return o.Printer.PrintPodMetrics(metrics, o.PrintContainers, o.AllNamespaces)
return o.Printer.PrintPodMetrics(metrics.Items, o.PrintContainers, o.AllNamespaces)
}
func getMetricsFromMetricsAPI(metricsClient metricsclientset.Interface, namespace, resourceName string, allNamespaces bool, selector labels.Selector) (*metricsapi.PodMetricsList, error) {
var err error
ns := metav1.NamespaceAll
if !allNamespaces {
ns = namespace
}
versionedMetrics := &metricsv1beta1api.PodMetricsList{}
if resourceName != "" {
m, err := metricsClient.Metrics().PodMetricses(ns).Get(resourceName, metav1.GetOptions{})
if err != nil {
return nil, err
}
versionedMetrics.Items = []metricsv1beta1api.PodMetrics{*m}
} else {
versionedMetrics, err = metricsClient.Metrics().PodMetricses(ns).List(metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return nil, err
}
}
metrics := &metricsapi.PodMetricsList{}
err = metricsv1beta1api.Convert_v1beta1_PodMetricsList_To_metrics_PodMetricsList(versionedMetrics, metrics, nil)
if err != nil {
return nil, err
}
return metrics, nil
}
func verifyEmptyMetrics(o TopPodOptions, selector labels.Selector) error {

View File

@ -18,23 +18,71 @@ package cmd
import (
"bytes"
"io/ioutil"
"net/http"
"net/url"
"strings"
"testing"
"time"
"net/url"
"github.com/googleapis/gnostic/OpenAPIv2"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
apiversion "k8s.io/apimachinery/pkg/version"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
core "k8s.io/client-go/testing"
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1alpha1"
metricsv1alpha1api "k8s.io/metrics/pkg/apis/metrics/v1alpha1"
metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1"
metricsfake "k8s.io/metrics/pkg/client/clientset_generated/clientset/fake"
)
const (
topPathPrefix = baseMetricsAddress + "/" + metricsApiVersion
topPathPrefix = baseMetricsAddress + "/" + metricsApiVersion
topMetricsAPIPathPrefix = "/apis/metrics.k8s.io/v1beta1"
apibody = `{
"kind": "APIVersions",
"versions": [
"v1"
],
"serverAddressByClientCIDRs": [
{
"clientCIDR": "0.0.0.0/0",
"serverAddress": "10.0.2.15:8443"
}
]
}`
// This is not the full output one would usually get, just a trimmed down version.
apisbody = `{
"kind": "APIGroupList",
"apiVersion": "v1",
"groups": [{}]
}`
apisbodyWithMetrics = `{
"kind": "APIGroupList",
"apiVersion": "v1",
"groups": [
{
"name":"metrics.k8s.io",
"versions":[
{
"groupVersion":"metrics.k8s.io/v1beta1",
"version":"v1beta1"
}
],
"preferredVersion":{
"groupVersion":"metrics.k8s.io/v1beta1",
"version":"v1beta1"
},
"serverAddressByClientCIDRs":null
}
]
}`
)
func TestTopPod(t *testing.T) {
@ -87,7 +135,7 @@ func TestTopPod(t *testing.T) {
for _, testCase := range testCases {
t.Logf("Running test case: %s", testCase.name)
metricsList := testPodMetricsData()
var expectedMetrics []metricsapi.PodMetrics
var expectedMetrics []metricsv1alpha1api.PodMetrics
var expectedContainerNames, nonExpectedMetricsNames []string
for n, m := range metricsList {
if n < len(testCase.namespaces) {
@ -105,7 +153,7 @@ func TestTopPod(t *testing.T) {
if len(expectedMetrics) == 1 {
response = expectedMetrics[0]
} else {
response = metricsapi.PodMetricsList{
response = metricsv1alpha1api.PodMetricsList{
ListMeta: metav1.ListMeta{
ResourceVersion: "2",
},
@ -119,6 +167,10 @@ func TestTopPod(t *testing.T) {
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m, q := req.URL.Path, req.Method, req.URL.RawQuery; {
case p == "/api":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil
case p == "/apis":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil
case p == testCase.expectedPath && m == "GET" && (testCase.expectedQuery == "" || q == testCase.expectedQuery):
body, err := marshallBody(response)
if err != nil {
@ -167,6 +219,188 @@ func TestTopPod(t *testing.T) {
}
}
func TestTopPodWithMetricsServer(t *testing.T) {
testNS := "testns"
testCases := []struct {
name string
namespace string
flags map[string]string
args []string
expectedPath string
expectedQuery string
namespaces []string
containers bool
listsNamespaces bool
}{
{
name: "all namespaces",
flags: map[string]string{"all-namespaces": "true"},
expectedPath: topMetricsAPIPathPrefix + "/pods",
namespaces: []string{testNS, "secondtestns", "thirdtestns"},
listsNamespaces: true,
},
{
name: "all in namespace",
expectedPath: topMetricsAPIPathPrefix + "/namespaces/" + testNS + "/pods",
namespaces: []string{testNS, testNS},
},
{
name: "pod with name",
args: []string{"pod1"},
expectedPath: topMetricsAPIPathPrefix + "/namespaces/" + testNS + "/pods/pod1",
namespaces: []string{testNS},
},
{
name: "pod with label selector",
flags: map[string]string{"selector": "key=value"},
expectedPath: topMetricsAPIPathPrefix + "/namespaces/" + testNS + "/pods",
expectedQuery: "labelSelector=" + url.QueryEscape("key=value"),
namespaces: []string{testNS, testNS},
},
{
name: "pod with container metrics",
flags: map[string]string{"containers": "true"},
args: []string{"pod1"},
expectedPath: topMetricsAPIPathPrefix + "/namespaces/" + testNS + "/pods/pod1",
namespaces: []string{testNS},
containers: true,
},
}
initTestErrorHandler(t)
for _, testCase := range testCases {
t.Logf("Running test case: %s", testCase.name)
metricsList := testV1beta1PodMetricsData()
var expectedMetrics []metricsv1beta1api.PodMetrics
var expectedContainerNames, nonExpectedMetricsNames []string
for n, m := range metricsList {
if n < len(testCase.namespaces) {
m.Namespace = testCase.namespaces[n]
expectedMetrics = append(expectedMetrics, m)
for _, c := range m.Containers {
expectedContainerNames = append(expectedContainerNames, c.Name)
}
} else {
nonExpectedMetricsNames = append(nonExpectedMetricsNames, m.Name)
}
}
fakemetricsClientset := &metricsfake.Clientset{}
if len(expectedMetrics) == 1 {
fakemetricsClientset.AddReactor("get", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &expectedMetrics[0], nil
})
} else {
fakemetricsClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
res := &metricsv1beta1api.PodMetricsList{
ListMeta: metav1.ListMeta{
ResourceVersion: "2",
},
Items: expectedMetrics,
}
return true, res, nil
})
}
f, tf, _, ns := cmdtesting.NewAPIFactory()
tf.Printer = &testPrinter{}
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p := req.URL.Path; {
case p == "/api":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil
case p == "/apis":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbodyWithMetrics)))}, nil
default:
t.Fatalf("%s: unexpected request: %#v\nGot URL: %#v",
testCase.name, req, req.URL)
return nil, nil
}
}),
}
tf.MetricsClientSet = fakemetricsClientset
tf.Namespace = testNS
tf.ClientConfig = defaultClientConfig()
buf := bytes.NewBuffer([]byte{})
cmd := NewCmdTopPod(f, nil, buf)
for name, value := range testCase.flags {
cmd.Flags().Set(name, value)
}
cmd.Run(cmd, testCase.args)
// Check the presence of pod names&namespaces/container names in the output.
result := buf.String()
if testCase.containers {
for _, containerName := range expectedContainerNames {
if !strings.Contains(result, containerName) {
t.Errorf("%s: missing metrics for container %s: \n%s", testCase.name, containerName, result)
}
}
}
for _, m := range expectedMetrics {
if !strings.Contains(result, m.Name) {
t.Errorf("%s: missing metrics for %s: \n%s", testCase.name, m.Name, result)
}
if testCase.listsNamespaces && !strings.Contains(result, m.Namespace) {
t.Errorf("%s: missing metrics for %s/%s: \n%s", testCase.name, m.Namespace, m.Name, result)
}
}
for _, name := range nonExpectedMetricsNames {
if strings.Contains(result, name) {
t.Errorf("%s: unexpected metrics for %s: \n%s", testCase.name, name, result)
}
}
}
}
type fakeDiscovery struct{}
// ServerGroups returns the supported groups, with information like supported versions and the
// preferred version.
func (d *fakeDiscovery) ServerGroups() (*metav1.APIGroupList, error) {
return nil, nil
}
// ServerResourcesForGroupVersion returns the supported resources for a group and version.
func (d *fakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
return nil, nil
}
// ServerResources returns the supported resources for all groups and versions.
func (d *fakeDiscovery) ServerResources() ([]*metav1.APIResourceList, error) {
return nil, nil
}
// ServerPreferredResources returns the supported resources with the version preferred by the
// server.
func (d *fakeDiscovery) ServerPreferredResources() ([]*metav1.APIResourceList, error) {
return nil, nil
}
// ServerPreferredNamespacedResources returns the supported namespaced resources with the
// version preferred by the server.
func (d *fakeDiscovery) ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error) {
return nil, nil
}
// ServerVersion retrieves and parses the server's version (git version).
func (d *fakeDiscovery) ServerVersion() (*apiversion.Info, error) {
return nil, nil
}
// OpenAPISchema retrieves and parses the swagger API schema the server supports.
func (d *fakeDiscovery) OpenAPISchema() (*openapi_v2.Document, error) {
return nil, nil
}
// RESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (d *fakeDiscovery) RESTClient() restclient.Interface {
return nil
}
func TestTopPodCustomDefaults(t *testing.T) {
customBaseHeapsterServiceAddress := "/api/v1/namespaces/custom-namespace/services/https:custom-heapster-service:/proxy"
customBaseMetricsAddress := customBaseHeapsterServiceAddress + "/apis/metrics"
@ -221,7 +455,7 @@ func TestTopPodCustomDefaults(t *testing.T) {
for _, testCase := range testCases {
t.Logf("Running test case: %s", testCase.name)
metricsList := testPodMetricsData()
var expectedMetrics []metricsapi.PodMetrics
var expectedMetrics []metricsv1alpha1api.PodMetrics
var expectedContainerNames, nonExpectedMetricsNames []string
for n, m := range metricsList {
if n < len(testCase.namespaces) {
@ -239,7 +473,7 @@ func TestTopPodCustomDefaults(t *testing.T) {
if len(expectedMetrics) == 1 {
response = expectedMetrics[0]
} else {
response = metricsapi.PodMetricsList{
response = metricsv1alpha1api.PodMetricsList{
ListMeta: metav1.ListMeta{
ResourceVersion: "2",
},
@ -253,6 +487,10 @@ func TestTopPodCustomDefaults(t *testing.T) {
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m, q := req.URL.Path, req.Method, req.URL.RawQuery; {
case p == "/api":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apibody)))}, nil
case p == "/apis":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: ioutil.NopCloser(bytes.NewReader([]byte(apisbody)))}, nil
case p == testCase.expectedPath && m == "GET" && (testCase.expectedQuery == "" || q == testCase.expectedQuery):
body, err := marshallBody(response)
if err != nil {
@ -276,6 +514,7 @@ func TestTopPodCustomDefaults(t *testing.T) {
Scheme: "https",
Service: "custom-heapster-service",
},
DiscoveryClient: &fakeDiscovery{},
}
cmd := NewCmdTopPod(f, opts, buf)
for name, value := range testCase.flags {
@ -308,12 +547,12 @@ func TestTopPodCustomDefaults(t *testing.T) {
}
}
func testPodMetricsData() []metricsapi.PodMetrics {
return []metricsapi.PodMetrics{
func testV1beta1PodMetricsData() []metricsv1beta1api.PodMetrics {
return []metricsv1beta1api.PodMetrics{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "test", ResourceVersion: "10"},
ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "test", ResourceVersion: "10", Labels: map[string]string{"key": "value"}},
Window: metav1.Duration{Duration: time.Minute},
Containers: []metricsapi.ContainerMetrics{
Containers: []metricsv1beta1api.ContainerMetrics{
{
Name: "container1-1",
Usage: v1.ResourceList{
@ -333,9 +572,9 @@ func testPodMetricsData() []metricsapi.PodMetrics {
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "test", ResourceVersion: "11"},
ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "test", ResourceVersion: "11", Labels: map[string]string{"key": "value"}},
Window: metav1.Duration{Duration: time.Minute},
Containers: []metricsapi.ContainerMetrics{
Containers: []metricsv1beta1api.ContainerMetrics{
{
Name: "container2-1",
Usage: v1.ResourceList{
@ -365,7 +604,78 @@ func testPodMetricsData() []metricsapi.PodMetrics {
{
ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "test", ResourceVersion: "12"},
Window: metav1.Duration{Duration: time.Minute},
Containers: []metricsapi.ContainerMetrics{
Containers: []metricsv1beta1api.ContainerMetrics{
{
Name: "container3-1",
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(7, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(8*(1024*1024), resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(9*(1024*1024), resource.DecimalSI),
},
},
},
},
}
}
func testPodMetricsData() []metricsv1alpha1api.PodMetrics {
return []metricsv1alpha1api.PodMetrics{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "test", ResourceVersion: "10"},
Window: metav1.Duration{Duration: time.Minute},
Containers: []metricsv1alpha1api.ContainerMetrics{
{
Name: "container1-1",
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(2*(1024*1024), resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(3*(1024*1024), resource.DecimalSI),
},
},
{
Name: "container1-2",
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(4, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(5*(1024*1024), resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(6*(1024*1024), resource.DecimalSI),
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "test", ResourceVersion: "11"},
Window: metav1.Duration{Duration: time.Minute},
Containers: []metricsv1alpha1api.ContainerMetrics{
{
Name: "container2-1",
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(7, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(8*(1024*1024), resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(9*(1024*1024), resource.DecimalSI),
},
},
{
Name: "container2-2",
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(11*(1024*1024), resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(12*(1024*1024), resource.DecimalSI),
},
},
{
Name: "container2-3",
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(13, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(14*(1024*1024), resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(15*(1024*1024), resource.DecimalSI),
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod3", Namespace: "test", ResourceVersion: "12"},
Window: metav1.Duration{Duration: time.Minute},
Containers: []metricsv1alpha1api.ContainerMetrics{
{
Name: "container3-1",
Usage: v1.ResourceList{

View File

@ -29,12 +29,14 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1alpha1"
metricsv1alpha1api "k8s.io/metrics/pkg/apis/metrics/v1alpha1"
metricsv1beta1api "k8s.io/metrics/pkg/apis/metrics/v1beta1"
)
const (
baseHeapsterServiceAddress = "/api/v1/namespaces/kube-system/services/http:heapster:/proxy"
baseMetricsAddress = baseHeapsterServiceAddress + "/apis/metrics"
baseMetricsServerAddress = "/apis/metrics.k8s.io/v1beta1"
metricsApiVersion = "v1alpha1"
)
@ -58,12 +60,12 @@ func marshallBody(metrics interface{}) (io.ReadCloser, error) {
return ioutil.NopCloser(bytes.NewReader(result)), nil
}
func testNodeMetricsData() (*metricsapi.NodeMetricsList, *v1.NodeList) {
metrics := &metricsapi.NodeMetricsList{
func testNodeV1alpha1MetricsData() (*metricsv1alpha1api.NodeMetricsList, *v1.NodeList) {
metrics := &metricsv1alpha1api.NodeMetricsList{
ListMeta: metav1.ListMeta{
ResourceVersion: "1",
},
Items: []metricsapi.NodeMetrics{
Items: []metricsv1alpha1api.NodeMetrics{
{
ObjectMeta: metav1.ObjectMeta{Name: "node1", ResourceVersion: "10"},
Window: metav1.Duration{Duration: time.Minute},
@ -113,3 +115,59 @@ func testNodeMetricsData() (*metricsapi.NodeMetricsList, *v1.NodeList) {
}
return metrics, nodes
}
func testNodeV1beta1MetricsData() (*metricsv1beta1api.NodeMetricsList, *v1.NodeList) {
metrics := &metricsv1beta1api.NodeMetricsList{
ListMeta: metav1.ListMeta{
ResourceVersion: "1",
},
Items: []metricsv1beta1api.NodeMetrics{
{
ObjectMeta: metav1.ObjectMeta{Name: "node1", ResourceVersion: "10", Labels: map[string]string{"key": "value"}},
Window: metav1.Duration{Duration: time.Minute},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(2*(1024*1024), resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(3*(1024*1024), resource.DecimalSI),
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "node2", ResourceVersion: "11"},
Window: metav1.Duration{Duration: time.Minute},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(5, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(6*(1024*1024), resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(7*(1024*1024), resource.DecimalSI),
},
},
},
}
nodes := &v1.NodeList{
ListMeta: metav1.ListMeta{
ResourceVersion: "15",
},
Items: []v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: "node1", ResourceVersion: "10"},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20*(1024*1024), resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(30*(1024*1024), resource.DecimalSI),
},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "node2", ResourceVersion: "11"},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(50, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(60*(1024*1024), resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(70*(1024*1024), resource.DecimalSI),
},
},
},
},
}
return metrics, nodes
}

View File

@ -81,6 +81,7 @@ go_library(
"//vendor/k8s.io/client-go/scale:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/util/homedir:go_default_library",
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
],
)

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
oldclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/version"
metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset"
)
func NewClientCache(loader clientcmd.ClientConfig, discoveryClientFactory DiscoveryClientFactory) *ClientCache {
@ -58,6 +59,7 @@ type ClientCache struct {
discoveryClient discovery.DiscoveryInterface
kubernetesClientCache kubernetesClientCache
metricsClientCache metricsClientCache
}
// kubernetesClientCache creates a new kubernetes.Clientset one time
@ -71,6 +73,17 @@ type kubernetesClientCache struct {
err error
}
// metricsClientCache creates a new metricsclientset.Clientset one time
// and then returns the result for all future requests
type metricsClientCache struct {
// once makes sure the client is only initialized once
once sync.Once
// client is the cached client value
client *metricsclientset.Clientset
// err is the cached error value
err error
}
// KubernetesClientSetForVersion returns a new kubernetes.Clientset. It will cache the value
// the first time it is called and return the cached value on subsequent calls.
// If an error is encountered the first time KubernetesClientSetForVersion is called,
@ -87,6 +100,22 @@ func (c *ClientCache) KubernetesClientSetForVersion(requiredVersion *schema.Grou
return c.kubernetesClientCache.client, c.kubernetesClientCache.err
}
// MetricsClientSetForVersion returns a new kubernetes.Clientset. It will cache the value
// the first time it is called and return the cached value on subsequent calls.
// If an error is encountered the first time MetircsClientSetForVersion is called,
// the error will be cached.
func (c *ClientCache) MetricsClientSetForVersion(requiredVersion *schema.GroupVersion) (*metricsclientset.Clientset, error) {
c.metricsClientCache.once.Do(func() {
config, err := c.ClientConfigForVersion(requiredVersion)
if err != nil {
c.kubernetesClientCache.err = err
return
}
c.metricsClientCache.client, c.metricsClientCache.err = metricsclientset.NewForConfig(config)
})
return c.metricsClientCache.client, c.metricsClientCache.err
}
// also looks up the discovery client. We can't do this during init because the flags won't have been set
// because this is constructed pre-command execution before the command tree is
// even set up. Requires the lock to already be acquired

View File

@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/kubectl/validation"
"k8s.io/kubernetes/pkg/printers"
metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset"
)
const (
@ -95,6 +96,9 @@ type ClientAccessFactory interface {
// KubernetesClientSet gives you back an external clientset
KubernetesClientSet() (*kubernetes.Clientset, error)
// MetricsClientSet gives you back an external clientset for the metrics API
MetricsClientSet() (metricsclientset.Interface, error)
// Returns a RESTClient for accessing Kubernetes resources or an error.
RESTClient() (*restclient.RESTClient, error)
// Returns a client.Config for accessing the Kubernetes server.

View File

@ -31,6 +31,7 @@ import (
"time"
"k8s.io/api/core/v1"
metricsclientset "k8s.io/metrics/pkg/client/clientset_generated/clientset"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
@ -201,6 +202,10 @@ func (f *ring0Factory) KubernetesClientSet() (*kubernetes.Clientset, error) {
return f.clientCache.KubernetesClientSetForVersion(nil)
}
func (f *ring0Factory) MetricsClientSet() (metricsclientset.Interface, error) {
return f.clientCache.MetricsClientSetForVersion(nil)
}
func (f *ring0Factory) ClientSet() (internalclientset.Interface, error) {
return f.clientCache.ClientSetForVersion(nil)
}

View File

@ -23,6 +23,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/metrics/pkg/apis/metrics:go_default_library",
"//vendor/k8s.io/metrics/pkg/apis/metrics/v1alpha1:go_default_library",
],
)

View File

@ -26,7 +26,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/apis/core/validation"
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1alpha1"
metricsapi "k8s.io/metrics/pkg/apis/metrics"
metricsv1alpha1api "k8s.io/metrics/pkg/apis/metrics/v1alpha1"
)
const (
@ -97,67 +98,73 @@ func nodeMetricsUrl(name string) (string, error) {
return fmt.Sprintf("%s/nodes/%s", metricsRoot, name), nil
}
func (cli *HeapsterMetricsClient) GetNodeMetrics(nodeName string, selector string) ([]metricsapi.NodeMetrics, error) {
func (cli *HeapsterMetricsClient) GetNodeMetrics(nodeName string, selector string) (*metricsapi.NodeMetricsList, error) {
params := map[string]string{"labelSelector": selector}
path, err := nodeMetricsUrl(nodeName)
if err != nil {
return []metricsapi.NodeMetrics{}, err
return nil, err
}
resultRaw, err := GetHeapsterMetrics(cli, path, params)
if err != nil {
return []metricsapi.NodeMetrics{}, err
return nil, err
}
metrics := make([]metricsapi.NodeMetrics, 0)
versionedMetrics := metricsv1alpha1api.NodeMetricsList{}
if len(nodeName) == 0 {
metricsList := metricsapi.NodeMetricsList{}
err = json.Unmarshal(resultRaw, &metricsList)
err = json.Unmarshal(resultRaw, &versionedMetrics)
if err != nil {
return []metricsapi.NodeMetrics{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err)
}
metrics = append(metrics, metricsList.Items...)
} else {
var singleMetric metricsapi.NodeMetrics
var singleMetric metricsv1alpha1api.NodeMetrics
err = json.Unmarshal(resultRaw, &singleMetric)
if err != nil {
return []metricsapi.NodeMetrics{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err)
}
metrics = append(metrics, singleMetric)
versionedMetrics.Items = []metricsv1alpha1api.NodeMetrics{singleMetric}
}
metrics := &metricsapi.NodeMetricsList{}
err = metricsv1alpha1api.Convert_v1alpha1_NodeMetricsList_To_metrics_NodeMetricsList(&versionedMetrics, metrics, nil)
if err != nil {
return nil, err
}
return metrics, nil
}
func (cli *HeapsterMetricsClient) GetPodMetrics(namespace string, podName string, allNamespaces bool, selector labels.Selector) ([]metricsapi.PodMetrics, error) {
func (cli *HeapsterMetricsClient) GetPodMetrics(namespace string, podName string, allNamespaces bool, selector labels.Selector) (*metricsapi.PodMetricsList, error) {
if allNamespaces {
namespace = metav1.NamespaceAll
}
path, err := podMetricsUrl(namespace, podName)
if err != nil {
return []metricsapi.PodMetrics{}, err
return nil, err
}
params := map[string]string{"labelSelector": selector.String()}
allMetrics := make([]metricsapi.PodMetrics, 0)
versionedMetrics := metricsv1alpha1api.PodMetricsList{}
resultRaw, err := GetHeapsterMetrics(cli, path, params)
if err != nil {
return []metricsapi.PodMetrics{}, err
return nil, err
}
if len(podName) == 0 {
metrics := metricsapi.PodMetricsList{}
err = json.Unmarshal(resultRaw, &metrics)
err = json.Unmarshal(resultRaw, &versionedMetrics)
if err != nil {
return []metricsapi.PodMetrics{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err)
}
allMetrics = append(allMetrics, metrics.Items...)
} else {
var singleMetric metricsapi.PodMetrics
var singleMetric metricsv1alpha1api.PodMetrics
err = json.Unmarshal(resultRaw, &singleMetric)
if err != nil {
return []metricsapi.PodMetrics{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err)
}
allMetrics = append(allMetrics, singleMetric)
versionedMetrics.Items = []metricsv1alpha1api.PodMetrics{singleMetric}
}
return allMetrics, nil
metrics := &metricsapi.PodMetricsList{}
err = metricsv1alpha1api.Convert_v1alpha1_PodMetricsList_To_metrics_PodMetricsList(&versionedMetrics, metrics, nil)
if err != nil {
return nil, err
}
return metrics, nil
}
func GetHeapsterMetrics(cli *HeapsterMetricsClient, path string, params map[string]string) ([]byte, error) {

View File

@ -25,7 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/printers"
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1alpha1"
metricsapi "k8s.io/metrics/pkg/apis/metrics"
)
var (