diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index b63458c81ff..537d7b3be7e 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -323,7 +323,12 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { go adc.reconciler.Run(stopCh) go adc.desiredStateOfWorldPopulator.Run(stopCh) go wait.Until(adc.pvcWorker, time.Second, stopCh) - metrics.Register(adc.pvcLister, adc.pvLister, adc.podLister, &adc.volumePluginMgr) + metrics.Register(adc.pvcLister, + adc.pvLister, + adc.podLister, + adc.actualStateOfWorld, + adc.desiredStateOfWorld, + &adc.volumePluginMgr) <-stopCh } diff --git a/pkg/controller/volume/attachdetach/metrics/BUILD b/pkg/controller/volume/attachdetach/metrics/BUILD index 4a57a114119..d07ac9b9576 100644 --- a/pkg/controller/volume/attachdetach/metrics/BUILD +++ b/pkg/controller/volume/attachdetach/metrics/BUILD @@ -6,10 +6,10 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics", visibility = ["//visibility:public"], deps = [ + "//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/attachdetach/util:go_default_library", "//pkg/volume:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", @@ -22,10 +22,14 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/controller:go_default_library", + "//pkg/controller/volume/attachdetach/cache:go_default_library", + "//pkg/controller/volume/attachdetach/testing:go_default_library", "//pkg/volume/testing:go_default_library", + "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", ], diff --git a/pkg/controller/volume/attachdetach/metrics/metrics.go b/pkg/controller/volume/attachdetach/metrics/metrics.go index 2ed0a0947a3..8167dba5f10 100644 --- a/pkg/controller/volume/attachdetach/metrics/metrics.go +++ b/pkg/controller/volume/attachdetach/metrics/metrics.go @@ -22,69 +22,94 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/util" "k8s.io/kubernetes/pkg/volume" ) +const pluginNameNotAvailable = "N/A" + var ( inUseVolumeMetricDesc = prometheus.NewDesc( prometheus.BuildFQName("", "storage_count", "attachable_volumes_in_use"), "Measure number of volumes in use", []string{"node", "volume_plugin"}, nil) + + totalVolumesMetricDesc = prometheus.NewDesc( + prometheus.BuildFQName("", "attachdetach_controller", "total_volumes"), + "Number of volumes in A/D Controller", + []string{"plugin_name", "state"}, nil) + + forcedDetachMetricCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "attachdetach_controller_forced_detaches", + Help: "Number of times the A/D Controller performed a forced detach"}) ) var registerMetrics sync.Once -type volumeInUseCollector struct { - pvcLister corelisters.PersistentVolumeClaimLister - podLister corelisters.PodLister - pvLister corelisters.PersistentVolumeLister - volumePluginMgr *volume.VolumePluginMgr -} - -// nodeVolumeCount contains map of {"nodeName": {"pluginName": volume_count }} -// For example : -// node 172.168.1.100.ec2.internal has 10 EBS and 3 glusterfs PVC in use -// {"172.168.1.100.ec2.internal": {"aws-ebs": 10, "glusterfs": 3}} -type nodeVolumeCount map[types.NodeName]map[string]int - -// Register registers pvc's in-use metrics +// Register registers metrics in A/D Controller. func Register(pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, podLister corelisters.PodLister, + asw cache.ActualStateOfWorld, + dsw cache.DesiredStateOfWorld, pluginMgr *volume.VolumePluginMgr) { registerMetrics.Do(func() { - prometheus.MustRegister(newVolumeInUseCollector(pvcLister, podLister, pvLister, pluginMgr)) + prometheus.MustRegister(newAttachDetachStateCollector(pvcLister, + podLister, + pvLister, + asw, + dsw, + pluginMgr)) + prometheus.MustRegister(forcedDetachMetricCounter) }) - } -func (volumeInUse nodeVolumeCount) add(nodeName types.NodeName, pluginName string) { - nodeCount, ok := volumeInUse[nodeName] +type attachDetachStateCollector struct { + pvcLister corelisters.PersistentVolumeClaimLister + podLister corelisters.PodLister + pvLister corelisters.PersistentVolumeLister + asw cache.ActualStateOfWorld + dsw cache.DesiredStateOfWorld + volumePluginMgr *volume.VolumePluginMgr +} + +// volumeCount is a map of maps used as a counter, e.g.: +// node 172.168.1.100.ec2.internal has 10 EBS and 3 glusterfs PVC in use: +// {"172.168.1.100.ec2.internal": {"aws-ebs": 10, "glusterfs": 3}} +// state actual_state_of_world contains a total of 10 EBS volumes: +// {"actual_state_of_world": {"aws-ebs": 10}} +type volumeCount map[string]map[string]int64 + +func (v volumeCount) add(typeKey, counterKey string) { + count, ok := v[typeKey] if !ok { - nodeCount = map[string]int{} + count = map[string]int64{} } - nodeCount[pluginName]++ - volumeInUse[nodeName] = nodeCount + count[counterKey]++ + v[typeKey] = count } -func newVolumeInUseCollector( +func newAttachDetachStateCollector( pvcLister corelisters.PersistentVolumeClaimLister, podLister corelisters.PodLister, pvLister corelisters.PersistentVolumeLister, - pluginMgr *volume.VolumePluginMgr) *volumeInUseCollector { - return &volumeInUseCollector{pvcLister, podLister, pvLister, pluginMgr} + asw cache.ActualStateOfWorld, + dsw cache.DesiredStateOfWorld, + pluginMgr *volume.VolumePluginMgr) *attachDetachStateCollector { + return &attachDetachStateCollector{pvcLister, podLister, pvLister, asw, dsw, pluginMgr} } // Check if our collector implements necessary collector interface -var _ prometheus.Collector = &volumeInUseCollector{} +var _ prometheus.Collector = &attachDetachStateCollector{} -func (collector *volumeInUseCollector) Describe(ch chan<- *prometheus.Desc) { +func (collector *attachDetachStateCollector) Describe(ch chan<- *prometheus.Desc) { ch <- inUseVolumeMetricDesc + ch <- totalVolumesMetricDesc } -func (collector *volumeInUseCollector) Collect(ch chan<- prometheus.Metric) { +func (collector *attachDetachStateCollector) Collect(ch chan<- prometheus.Metric) { nodeVolumeMap := collector.getVolumeInUseCount() for nodeName, pluginCount := range nodeVolumeMap { for pluginName, count := range pluginCount { @@ -99,23 +124,37 @@ func (collector *volumeInUseCollector) Collect(ch chan<- prometheus.Metric) { ch <- metric } } + + stateVolumeMap := collector.getTotalVolumesCount() + for stateName, pluginCount := range stateVolumeMap { + for pluginName, count := range pluginCount { + metric, err := prometheus.NewConstMetric(totalVolumesMetricDesc, + prometheus.GaugeValue, + float64(count), + pluginName, + string(stateName)) + if err != nil { + glog.Warningf("Failed to create metric : %v", err) + } + ch <- metric + } + } } -func (collector *volumeInUseCollector) getVolumeInUseCount() nodeVolumeCount { +func (collector *attachDetachStateCollector) getVolumeInUseCount() volumeCount { pods, err := collector.podLister.List(labels.Everything()) if err != nil { glog.Errorf("Error getting pod list") return nil } - nodeVolumeMap := make(nodeVolumeCount) + nodeVolumeMap := make(volumeCount) for _, pod := range pods { if len(pod.Spec.Volumes) <= 0 { continue } - nodeName := types.NodeName(pod.Spec.NodeName) - if nodeName == "" { + if pod.Spec.NodeName == "" { continue } for _, podVolume := range pod.Spec.Volumes { @@ -127,8 +166,36 @@ func (collector *volumeInUseCollector) getVolumeInUseCount() nodeVolumeCount { if err != nil { continue } - nodeVolumeMap.add(nodeName, volumePlugin.GetPluginName()) + nodeVolumeMap.add(pod.Spec.NodeName, volumePlugin.GetPluginName()) } } return nodeVolumeMap } + +func (collector *attachDetachStateCollector) getTotalVolumesCount() volumeCount { + stateVolumeMap := make(volumeCount) + for _, v := range collector.dsw.GetVolumesToAttach() { + if plugin, err := collector.volumePluginMgr.FindPluginBySpec(v.VolumeSpec); err == nil { + pluginName := pluginNameNotAvailable + if plugin != nil { + pluginName = plugin.GetPluginName() + } + stateVolumeMap.add("desired_state_of_world", pluginName) + } + } + for _, v := range collector.asw.GetAttachedVolumes() { + if plugin, err := collector.volumePluginMgr.FindPluginBySpec(v.VolumeSpec); err == nil { + pluginName := pluginNameNotAvailable + if plugin != nil { + pluginName = plugin.GetPluginName() + } + stateVolumeMap.add("actual_state_of_world", pluginName) + } + } + return stateVolumeMap +} + +// RecordForcedDetachMetric register a forced detach metric. +func RecordForcedDetachMetric() { + forcedDetachMetricCounter.Inc() +} diff --git a/pkg/controller/volume/attachdetach/metrics/metrics_test.go b/pkg/controller/volume/attachdetach/metrics/metrics_test.go index 4916e2e5dbb..00efe32c1aa 100644 --- a/pkg/controller/volume/attachdetach/metrics/metrics_test.go +++ b/pkg/controller/volume/attachdetach/metrics/metrics_test.go @@ -22,13 +22,17 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" + controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" volumetesting "k8s.io/kubernetes/pkg/volume/testing" + "k8s.io/kubernetes/pkg/volume/util/types" ) -func TestMetricCollection(t *testing.T) { +func TestVolumesInUseMetricCollection(t *testing.T) { fakeVolumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) fakeClient := &fake.Clientset{} @@ -103,7 +107,13 @@ func TestMetricCollection(t *testing.T) { pvcLister := pvcInformer.Lister() pvLister := pvInformer.Lister() - metricCollector := newVolumeInUseCollector(pvcLister, fakePodInformer.Lister(), pvLister, fakeVolumePluginMgr) + metricCollector := newAttachDetachStateCollector( + pvcLister, + fakePodInformer.Lister(), + pvLister, + nil, + nil, + fakeVolumePluginMgr) nodeUseMap := metricCollector.getVolumeInUseCount() if len(nodeUseMap) < 1 { t.Errorf("Expected one volume in use got %d", len(nodeUseMap)) @@ -117,5 +127,54 @@ func TestMetricCollection(t *testing.T) { if pluginUseCount < 1 { t.Errorf("Expected at least in-use volume metric got %d", pluginUseCount) } - +} + +func TestTotalVolumesMetricCollection(t *testing.T) { + fakeVolumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(fakeVolumePluginMgr) + asw := cache.NewActualStateOfWorld(fakeVolumePluginMgr) + podName := "pod-uid" + volumeName := v1.UniqueVolumeName("volume-name") + volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) + nodeName := k8stypes.NodeName("node-name") + + dsw.AddNode(nodeName, false) + _, err := dsw.AddPod(types.UniquePodName(podName), controllervolumetesting.NewPod(podName, podName), volumeSpec, nodeName) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + asw.AddVolumeNode(volumeName, volumeSpec, nodeName, "") + + metricCollector := newAttachDetachStateCollector( + nil, + nil, + nil, + asw, + dsw, + fakeVolumePluginMgr) + + totalVolumesMap := metricCollector.getTotalVolumesCount() + if len(totalVolumesMap) != 2 { + t.Errorf("Expected 2 states, got %d", len(totalVolumesMap)) + } + + dswCount, ok := totalVolumesMap["desired_state_of_world"] + if !ok { + t.Errorf("Expected desired_state_of_world, got nothing") + } + + fakePluginCount := dswCount["fake-plugin"] + if fakePluginCount != 1 { + t.Errorf("Expected 1 fake-plugin volume in DesiredStateOfWorld, got %d", fakePluginCount) + } + + aswCount, ok := totalVolumesMap["actual_state_of_world"] + if !ok { + t.Errorf("Expected actual_state_of_world, got nothing") + } + + fakePluginCount = aswCount["fake-plugin"] + if fakePluginCount != 1 { + t.Errorf("Expected 1 fake-plugin volume in ActualStateOfWorld, got %d", fakePluginCount) + } } diff --git a/pkg/controller/volume/attachdetach/reconciler/BUILD b/pkg/controller/volume/attachdetach/reconciler/BUILD index 0ccd942aeaa..dcdbf66d312 100644 --- a/pkg/controller/volume/attachdetach/reconciler/BUILD +++ b/pkg/controller/volume/attachdetach/reconciler/BUILD @@ -12,6 +12,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler", deps = [ "//pkg/controller/volume/attachdetach/cache:go_default_library", + "//pkg/controller/volume/attachdetach/metrics:go_default_library", "//pkg/controller/volume/attachdetach/statusupdater:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index 5c57b2093ee..f81595a60a0 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" + "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" @@ -232,6 +233,7 @@ func (rc *reconciler) reconcile() { if !timeout { glog.Infof(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", "")) } else { + metrics.RecordForcedDetachMetric() glog.Warningf(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration))) } } diff --git a/test/e2e/storage/volume_metrics.go b/test/e2e/storage/volume_metrics.go index 8a6ef0a8740..f6e5d130dd5 100644 --- a/test/e2e/storage/volume_metrics.go +++ b/test/e2e/storage/volume_metrics.go @@ -228,6 +228,66 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() { framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod)) }) + It("should create metrics for total number of volumes in A/D Controller", func() { + var err error + pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(pvc) + Expect(err).NotTo(HaveOccurred()) + Expect(pvc).ToNot(Equal(nil)) + + claims := []*v1.PersistentVolumeClaim{pvc} + pod := framework.MakePod(ns, nil, claims, false, "") + + // Get metrics + controllerMetrics, err := metricsGrabber.GrabFromControllerManager() + if err != nil { + framework.Skipf("Could not get controller-manager metrics - skipping") + } + + // Create pod + pod, err = c.CoreV1().Pods(ns).Create(pod) + Expect(err).NotTo(HaveOccurred()) + err = framework.WaitForPodRunningInNamespace(c, pod) + framework.ExpectNoError(framework.WaitForPodRunningInNamespace(c, pod), "Error starting pod ", pod.Name) + pod, err = c.CoreV1().Pods(ns).Get(pod.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + // Get updated metrics + updatedControllerMetrics, err := metricsGrabber.GrabFromControllerManager() + if err != nil { + framework.Skipf("Could not get controller-manager metrics - skipping") + } + + // Forced detach metric should be present + forceDetachKey := "attachdetach_controller_forced_detaches" + _, ok := updatedControllerMetrics[forceDetachKey] + Expect(ok).To(BeTrue(), "Key %q not found in A/D Controller metrics", forceDetachKey) + + // Wait and validate + totalVolumesKey := "attachdetach_controller_total_volumes" + states := []string{"actual_state_of_world", "desired_state_of_world"} + dimensions := []string{"state", "plugin_name"} + waitForADControllerStatesMetrics(metricsGrabber, totalVolumesKey, dimensions, states) + + // Total number of volumes in both ActualStateofWorld and DesiredStateOfWorld + // states should be higher or equal than it used to be + oldStates := getStatesMetrics(totalVolumesKey, metrics.Metrics(controllerMetrics)) + updatedStates := getStatesMetrics(totalVolumesKey, metrics.Metrics(updatedControllerMetrics)) + for _, stateName := range states { + if _, ok := oldStates[stateName]; !ok { + continue + } + for pluginName, numVolumes := range updatedStates[stateName] { + oldNumVolumes := oldStates[stateName][pluginName] + Expect(numVolumes).To(BeNumerically(">=", oldNumVolumes), + "Wrong number of volumes in state %q, plugin %q: wanted >=%d, got %d", + stateName, pluginName, oldNumVolumes, numVolumes) + } + } + + framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name) + framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod)) + }) + // Test for pv controller metrics, concretely: bound/unbound pv/pvc count. Describe("PVController", func() { const ( @@ -535,3 +595,41 @@ func hasValidMetrics(metrics metrics.Metrics, metricKey string, dimensions ...st } return errCount == 0 } + +func getStatesMetrics(metricKey string, givenMetrics metrics.Metrics) map[string]map[string]int64 { + states := make(map[string]map[string]int64) + for _, sample := range givenMetrics[metricKey] { + framework.Logf("Found sample %q", sample.String()) + state := string(sample.Metric["state"]) + pluginName := string(sample.Metric["plugin_name"]) + states[state] = map[string]int64{pluginName: int64(sample.Value)} + } + return states +} + +func waitForADControllerStatesMetrics(metricsGrabber *metrics.MetricsGrabber, metricName string, dimensions []string, stateNames []string) { + backoff := wait.Backoff{ + Duration: 10 * time.Second, + Factor: 1.2, + Steps: 21, + } + verifyMetricFunc := func() (bool, error) { + updatedMetrics, err := metricsGrabber.GrabFromControllerManager() + if err != nil { + framework.Skipf("Could not get controller-manager metrics - skipping") + return false, err + } + if !hasValidMetrics(metrics.Metrics(updatedMetrics), metricName, dimensions...) { + return false, fmt.Errorf("could not get valid metrics for %q", metricName) + } + states := getStatesMetrics(metricName, metrics.Metrics(updatedMetrics)) + for _, name := range stateNames { + if _, ok := states[name]; !ok { + return false, fmt.Errorf("could not get state %q from A/D Controller metrics", name) + } + } + return true, nil + } + waitErr := wait.ExponentialBackoff(backoff, verifyMetricFunc) + Expect(waitErr).NotTo(HaveOccurred(), "Timeout error fetching A/D controller metrics : %v", waitErr) +}