diff --git a/pkg/kubelet/volumemanager/BUILD b/pkg/kubelet/volumemanager/BUILD index c20a4b707b2..855d1b65700 100644 --- a/pkg/kubelet/volumemanager/BUILD +++ b/pkg/kubelet/volumemanager/BUILD @@ -17,6 +17,7 @@ go_library( "//pkg/kubelet/status:go_default_library", "//pkg/kubelet/util/format:go_default_library", "//pkg/kubelet/volumemanager/cache:go_default_library", + "//pkg/kubelet/volumemanager/metrics:go_default_library", "//pkg/kubelet/volumemanager/populator:go_default_library", "//pkg/kubelet/volumemanager/reconciler:go_default_library", "//pkg/util/mount:go_default_library", @@ -76,6 +77,7 @@ filegroup( srcs = [ ":package-srcs", "//pkg/kubelet/volumemanager/cache:all-srcs", + "//pkg/kubelet/volumemanager/metrics:all-srcs", "//pkg/kubelet/volumemanager/populator:all-srcs", "//pkg/kubelet/volumemanager/reconciler:all-srcs", ], diff --git a/pkg/kubelet/volumemanager/metrics/BUILD b/pkg/kubelet/volumemanager/metrics/BUILD new file mode 100644 index 00000000000..6d44fabf88b --- /dev/null +++ b/pkg/kubelet/volumemanager/metrics/BUILD @@ -0,0 +1,43 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["metrics.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kubelet/volumemanager/cache:go_default_library", + "//pkg/volume:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["metrics_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/kubelet/volumemanager/cache:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/testing:go_default_library", + "//pkg/volume/util:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + ], +) diff --git a/pkg/kubelet/volumemanager/metrics/metrics.go b/pkg/kubelet/volumemanager/metrics/metrics.go new file mode 100644 index 00000000000..c428401e08f --- /dev/null +++ b/pkg/kubelet/volumemanager/metrics/metrics.go @@ -0,0 +1,113 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + "github.com/golang/glog" + "github.com/prometheus/client_golang/prometheus" + "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" + "k8s.io/kubernetes/pkg/volume" +) + +const ( + pluginNameNotAvailable = "N/A" + + // Metric keys for Volume Manager. + volumeManagerTotalVolumes = "volume_manager_total_volumes" +) + +var ( + registerMetrics sync.Once + + totalVolumesDesc = prometheus.NewDesc( + volumeManagerTotalVolumes, + "Number of volumes in Volume Manager", + []string{"plugin_name", "state"}, + nil, + ) +) + +// volumeCount is a map of maps used as a counter. +type volumeCount map[string]map[string]int64 + +func (v volumeCount) add(state, plugin string) { + count, ok := v[state] + if !ok { + count = map[string]int64{} + } + count[plugin]++ + v[state] = count +} + +// Register registers Volume Manager metrics. +func Register(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, pluginMgr *volume.VolumePluginMgr) { + registerMetrics.Do(func() { + prometheus.MustRegister(&totalVolumesCollector{asw, dsw, pluginMgr}) + }) +} + +type totalVolumesCollector struct { + asw cache.ActualStateOfWorld + dsw cache.DesiredStateOfWorld + pluginMgr *volume.VolumePluginMgr +} + +var _ prometheus.Collector = &totalVolumesCollector{} + +// Describe implements the prometheus.Collector interface. +func (c *totalVolumesCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- totalVolumesDesc +} + +// Collect implements the prometheus.Collector interface. +func (c *totalVolumesCollector) Collect(ch chan<- prometheus.Metric) { + for stateName, pluginCount := range c.getVolumeCount() { + for pluginName, count := range pluginCount { + metric, err := prometheus.NewConstMetric(totalVolumesDesc, + prometheus.GaugeValue, + float64(count), + pluginName, + stateName) + if err != nil { + glog.Warningf("Failed to create metric : %v", err) + } + ch <- metric + } + } +} + +func (c *totalVolumesCollector) getVolumeCount() volumeCount { + counter := make(volumeCount) + for _, mountedVolume := range c.asw.GetMountedVolumes() { + pluginName := mountedVolume.PluginName + if pluginName == "" { + pluginName = pluginNameNotAvailable + } + counter.add("actual_state_of_world", pluginName) + } + + for _, volumeToMount := range c.dsw.GetVolumesToMount() { + pluginName := pluginNameNotAvailable + if plugin, err := c.pluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec); err == nil { + pluginName = plugin.GetPluginName() + } + counter.add("desired_state_of_world", pluginName) + } + return counter +} diff --git a/pkg/kubelet/volumemanager/metrics/metrics_test.go b/pkg/kubelet/volumemanager/metrics/metrics_test.go new file mode 100644 index 00000000000..45e57dedf7d --- /dev/null +++ b/pkg/kubelet/volumemanager/metrics/metrics_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "testing" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" + "k8s.io/kubernetes/pkg/volume" + + volumetesting "k8s.io/kubernetes/pkg/volume/testing" + "k8s.io/kubernetes/pkg/volume/util" +) + +func TestMetricCollection(t *testing.T) { + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(k8stypes.NodeName("node-name"), volumePluginMgr) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "fake-device1", + }, + }, + }, + }, + }, + } + volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]} + podName := util.GetUniquePodName(pod) + + // Add one volume to DesiredStateOfWorld + generatedVolumeName, err := dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "") + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + + mounter, err := fakePlugin.NewMounter(volumeSpec, pod, volume.VolumeOptions{}) + if err != nil { + t.Fatalf("NewMounter failed. Expected: Actual: <%v>", err) + } + + mapper, err := fakePlugin.NewBlockVolumeMapper(volumeSpec, pod, volume.VolumeOptions{}) + if err != nil { + t.Fatalf("NewBlockVolumeMapper failed. Expected: Actual: <%v>", err) + } + + // Add one volume to ActualStateOfWorld + devicePath := "fake/device/path" + err = asw.MarkVolumeAsAttached("", volumeSpec, "", devicePath) + if err != nil { + t.Fatalf("MarkVolumeAsAttached failed. Expected: Actual: <%v>", err) + } + + err = asw.AddPodToVolume( + podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "", volumeSpec) + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + + metricCollector := &totalVolumesCollector{asw, dsw, volumePluginMgr} + + // Check if getVolumeCount returns correct data + count := metricCollector.getVolumeCount() + if len(count) != 2 { + t.Errorf("getVolumeCount failed. Expected <2> states, got <%d>", len(count)) + } + + dswCount, ok := count["desired_state_of_world"] + if !ok { + t.Errorf("getVolumeCount failed. Expected , got nothing") + } + + fakePluginCount := dswCount["fake-plugin"] + if fakePluginCount != 1 { + t.Errorf("getVolumeCount failed. Expected <1> fake-plugin volume in DesiredStateOfWorld, got <%d>", + fakePluginCount) + } + + aswCount, ok := count["actual_state_of_world"] + if !ok { + t.Errorf("getVolumeCount failed. Expected , got nothing") + } + + fakePluginCount = aswCount["fake-plugin"] + if fakePluginCount != 1 { + t.Errorf("getVolumeCount failed. Expected <1> fake-plugin volume in ActualStateOfWorld, got <%d>", + fakePluginCount) + } +} diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index d3f9800ebde..9e04b89b9b3 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache" + "k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics" "k8s.io/kubernetes/pkg/kubelet/volumemanager/populator" "k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler" "k8s.io/kubernetes/pkg/util/mount" @@ -247,6 +248,8 @@ func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan str glog.Infof("Starting Kubelet Volume Manager") go vm.reconciler.Run(stopCh) + metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr) + <-stopCh glog.Infof("Shutting down Kubelet Volume Manager") } diff --git a/test/e2e/storage/volume_metrics.go b/test/e2e/storage/volume_metrics.go index 2bf9db33d8b..8a6ef0a8740 100644 --- a/test/e2e/storage/volume_metrics.go +++ b/test/e2e/storage/volume_metrics.go @@ -198,6 +198,36 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() { framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod)) }) + It("should create volume metrics in Volume Manager", func() { + var err error + pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(pvc) + Expect(err).NotTo(HaveOccurred()) + Expect(pvc).ToNot(Equal(nil)) + + claims := []*v1.PersistentVolumeClaim{pvc} + pod := framework.MakePod(ns, nil, claims, false, "") + pod, err = c.CoreV1().Pods(ns).Create(pod) + Expect(err).NotTo(HaveOccurred()) + + err = framework.WaitForPodRunningInNamespace(c, pod) + framework.ExpectNoError(framework.WaitForPodRunningInNamespace(c, pod), "Error starting pod ", pod.Name) + + pod, err = c.CoreV1().Pods(ns).Get(pod.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + kubeMetrics, err := metricsGrabber.GrabFromKubelet(pod.Spec.NodeName) + Expect(err).NotTo(HaveOccurred()) + + // Metrics should have dimensions plugin_name and state available + totalVolumesKey := "volume_manager_total_volumes" + dimensions := []string{"state", "plugin_name"} + valid := hasValidMetrics(metrics.Metrics(kubeMetrics), totalVolumesKey, dimensions...) + Expect(valid).To(BeTrue(), "Invalid metric in Volume Manager metrics: %q", totalVolumesKey) + + framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name) + framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod)) + }) + // Test for pv controller metrics, concretely: bound/unbound pv/pvc count. Describe("PVController", func() { const (