From 96885115950c6024fddb93d72ef115fec8d99814 Mon Sep 17 00:00:00 2001 From: xiangqian Date: Fri, 17 May 2019 17:15:38 -0700 Subject: [PATCH] record provision and deletion latency metric instead of using provisioner from storage class directly, uses plugin name firstly --- .../volume/persistentvolume/framework_test.go | 20 ++ .../volume/persistentvolume/metrics/BUILD | 1 + .../persistentvolume/metrics/metrics.go | 87 ++++++-- .../volume/persistentvolume/provision_test.go | 75 ++++++- .../volume/persistentvolume/pv_controller.go | 207 +++++++++++++----- .../persistentvolume/pv_controller_base.go | 19 +- .../persistentvolume/pv_controller_test.go | 138 ++++++++++++ pkg/volume/util/metrics.go | 16 ++ 8 files changed, 495 insertions(+), 68 deletions(-) diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index 7fc7f385130..cc550647e17 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -419,6 +419,26 @@ func claimWithAnnotation(name, value string, claims []*v1.PersistentVolumeClaim) return claims } +// volumeWithAnnotation saves given annotation into given volume. +// Meant to be used to compose volume specified inline in a test. +func volumeWithAnnotation(name, value string, volume *v1.PersistentVolume) *v1.PersistentVolume { + if volume.Annotations == nil { + volume.Annotations = map[string]string{name: value} + } else { + volume.Annotations[name] = value + } + return volume +} + +// volumesWithAnnotation saves given annotation into given volumes. +// Meant to be used to compose volumes specified inline in a test. +func volumesWithAnnotation(name, value string, volumes []*v1.PersistentVolume) []*v1.PersistentVolume { + for _, volume := range volumes { + volumeWithAnnotation(name, value, volume) + } + return volumes +} + // claimWithAccessMode saves given access into given claims. // Meant to be used to compose claims specified inline in a test. func claimWithAccessMode(modes []v1.PersistentVolumeAccessMode, claims []*v1.PersistentVolumeClaim) []*v1.PersistentVolumeClaim { diff --git a/pkg/controller/volume/persistentvolume/metrics/BUILD b/pkg/controller/volume/persistentvolume/metrics/BUILD index d686e4df42f..54c222ec798 100644 --- a/pkg/controller/volume/persistentvolume/metrics/BUILD +++ b/pkg/controller/volume/persistentvolume/metrics/BUILD @@ -6,6 +6,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics", visibility = ["//visibility:public"], deps = [ + "//pkg/volume/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/controller/volume/persistentvolume/metrics/metrics.go b/pkg/controller/volume/persistentvolume/metrics/metrics.go index 1184378777c..bb66fe22422 100644 --- a/pkg/controller/volume/persistentvolume/metrics/metrics.go +++ b/pkg/controller/volume/persistentvolume/metrics/metrics.go @@ -18,11 +18,12 @@ package metrics import ( "sync" - - "k8s.io/api/core/v1" + "time" "github.com/prometheus/client_golang/prometheus" + v1 "k8s.io/api/core/v1" "k8s.io/klog" + metricutil "k8s.io/kubernetes/pkg/volume/util" ) const ( @@ -56,7 +57,6 @@ type PVCLister interface { func Register(pvLister PVLister, pvcLister PVCLister) { registerMetrics.Do(func() { prometheus.MustRegister(newPVAndPVCCountCollector(pvLister, pvcLister)) - prometheus.MustRegister(volumeOperationMetric) prometheus.MustRegister(volumeOperationErrorsMetric) }) } @@ -92,12 +92,6 @@ var ( "Gauge measuring number of persistent volume claim currently unbound", []string{namespaceLabel}, nil) - volumeOperationMetric = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "volume_operation_total_seconds", - Help: "Total volume operation time", - }, - []string{"plugin_name", "operation_name"}) volumeOperationErrorsMetric = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "volume_operation_total_errors", @@ -198,14 +192,79 @@ func (collector *pvAndPVCCountCollector) pvcCollect(ch chan<- prometheus.Metric) } } -// RecordVolumeOperationMetric records the latency and errors of volume operations. -func RecordVolumeOperationMetric(pluginName, opName string, timeTaken float64, err error) { +// RecordVolumeOperationErrorMetric records error count into metric +// volume_operation_total_errors for provisioning/deletion operations +func RecordVolumeOperationErrorMetric(pluginName, opName string) { if pluginName == "" { pluginName = "N/A" } - if err != nil { - volumeOperationErrorsMetric.WithLabelValues(pluginName, opName).Inc() + volumeOperationErrorsMetric.WithLabelValues(pluginName, opName).Inc() +} + +// operationTimestamp stores the start time of an operation by a plugin +type operationTimestamp struct { + pluginName string + operation string + startTs time.Time +} + +func newOperationTimestamp(pluginName, operationName string) *operationTimestamp { + return &operationTimestamp{ + pluginName: pluginName, + operation: operationName, + startTs: time.Now(), + } +} + +// OperationStartTimeCache concurrent safe cache for operation start timestamps +type OperationStartTimeCache struct { + cache sync.Map // [string]operationTimestamp +} + +// NewOperationStartTimeCache creates a operation timestamp cache +func NewOperationStartTimeCache() OperationStartTimeCache { + return OperationStartTimeCache{ + cache: sync.Map{}, //[string]operationTimestamp {} + } +} + +// AddIfNotExist returns directly if there exists an entry with the key. Otherwise, it +// creates a new operation timestamp using operationName, pluginName, and current timestamp +// and stores the operation timestamp with the key +func (c *OperationStartTimeCache) AddIfNotExist(key, pluginName, operationName string) { + ts := newOperationTimestamp(pluginName, operationName) + c.cache.LoadOrStore(key, ts) +} + +// Delete deletes a value for a key. +func (c *OperationStartTimeCache) Delete(key string) { + c.cache.Delete(key) +} + +// Has returns a bool value indicates the existence of a key in the cache +func (c *OperationStartTimeCache) Has(key string) bool { + _, exists := c.cache.Load(key) + return exists +} + +// RecordMetric records either an error count metric or a latency metric if there +// exists a start timestamp entry in the cache. For a successful operation, i.e., +// err == nil, the corresponding timestamp entry will be removed from cache +func RecordMetric(key string, c *OperationStartTimeCache, err error) { + obj, exists := c.cache.Load(key) + if !exists { return } - volumeOperationMetric.WithLabelValues(pluginName, opName).Observe(timeTaken) + ts, ok := obj.(*operationTimestamp) + if !ok { + return + } + if err != nil { + RecordVolumeOperationErrorMetric(ts.pluginName, ts.operation) + } else { + timeTaken := time.Since(ts.startTs).Seconds() + metricutil.RecordOperationLatencyMetric(ts.pluginName, ts.operation, timeTaken) + // end of this operation, remove the timestamp entry from cache + c.Delete(key) + } } diff --git a/pkg/controller/volume/persistentvolume/provision_test.go b/pkg/controller/volume/persistentvolume/provision_test.go index 3d646137e9d..865a87fd7fb 100644 --- a/pkg/controller/volume/persistentvolume/provision_test.go +++ b/pkg/controller/volume/persistentvolume/provision_test.go @@ -20,7 +20,7 @@ import ( "errors" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -432,6 +432,17 @@ func TestProvisionSync(t *testing.T) { []string{"Normal ExternalProvisioning"}, noerrors, wrapTestWithCSIMigrationProvisionCalls(testSyncClaim), }, + { + // volume provisioned and available + // in this case, NO normal event with external provisioner should be issued + "11-22 - external provisioner with volume available", + newVolumeArray("volume11-22", "1Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRetain, classExternal), + newVolumeArray("volume11-22", "1Gi", "uid11-22", "claim11-22", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classExternal, pvutil.AnnBoundByController), + newClaimArray("claim11-22", "uid11-22", "1Gi", "", v1.ClaimPending, &classExternal), + newClaimArray("claim11-22", "uid11-22", "1Gi", "volume11-22", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted), + noevents, + noerrors, wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), + }, } runSyncTests(t, tests, storageClasses, []*v1.Pod{}) } @@ -461,6 +472,68 @@ func TestProvisionMultiSync(t *testing.T) { newClaimArray("claim12-1", "uid12-1", "1Gi", "pvc-uid12-1", v1.ClaimBound, &classGold, pvutil.AnnBoundByController, pvutil.AnnBindCompleted, pvutil.AnnStorageProvisioner), noevents, noerrors, wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim), }, + { + // provision a volume (external provisioner) and binding + normal event with external provisioner + "12-2 - external provisioner with volume provisioned success", + novolumes, + newVolumeArray("pvc-uid12-2", "1Gi", "uid12-2", "claim12-2", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classExternal, pvutil.AnnBoundByController), + newClaimArray("claim12-2", "uid12-2", "1Gi", "", v1.ClaimPending, &classExternal), + claimWithAnnotation(pvutil.AnnStorageProvisioner, "vendor.com/my-volume", + newClaimArray("claim12-2", "uid12-2", "1Gi", "pvc-uid12-2", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)), + []string{"Normal ExternalProvisioning"}, + noerrors, + wrapTestWithInjectedOperation(wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) { + // Create a volume before syncClaim tries to bind a PV to PVC + // This simulates external provisioner creating a volume while the controller + // is waiting for a volume to bind to the existed claim + // the external provisioner workflow implemented in "provisionClaimOperationCSI" + // should issue an ExternalProvisioning event to signal that some external provisioner + // is working on provisioning the PV, also add the operation start timestamp into local cache + // operationTimestamps. Rely on the existences of the start time stamp to create a PV for binding + if ctrl.operationTimestamps.Has("default/claim12-2") { + volume := newVolume("pvc-uid12-2", "1Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRetain, classExternal) + ctrl.volumes.store.Add(volume) // add the volume to controller + reactor.AddVolume(volume) + } + }), + }, + { + // provision a volume (external provisioner) but binding will not happen + normal event with external provisioner + "12-3 - external provisioner with volume to be provisioned", + novolumes, + novolumes, + newClaimArray("claim12-3", "uid12-3", "1Gi", "", v1.ClaimPending, &classExternal), + claimWithAnnotation(pvutil.AnnStorageProvisioner, "vendor.com/my-volume", + newClaimArray("claim12-3", "uid12-3", "1Gi", "", v1.ClaimPending, &classExternal)), + []string{"Normal ExternalProvisioning"}, + noerrors, + wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim), + }, + { + // provision a volume (external provisioner) and binding + normal event with external provisioner + "12-4 - external provisioner with volume provisioned/bound success", + novolumes, + newVolumeArray("pvc-uid12-4", "1Gi", "uid12-4", "claim12-4", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classExternal, pvutil.AnnBoundByController), + newClaimArray("claim12-4", "uid12-4", "1Gi", "", v1.ClaimPending, &classExternal), + claimWithAnnotation(pvutil.AnnStorageProvisioner, "vendor.com/my-volume", + newClaimArray("claim12-4", "uid12-4", "1Gi", "pvc-uid12-4", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)), + []string{"Normal ExternalProvisioning"}, + noerrors, + wrapTestWithInjectedOperation(wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) { + // Create a volume before syncClaim tries to bind a PV to PVC + // This simulates external provisioner creating a volume while the controller + // is waiting for a volume to bind to the existed claim + // the external provisioner workflow implemented in "provisionClaimOperationCSI" + // should issue an ExternalProvisioning event to signal that some external provisioner + // is working on provisioning the PV, also add the operation start timestamp into local cache + // operationTimestamps. Rely on the existences of the start time stamp to create a PV for binding + if ctrl.operationTimestamps.Has("default/claim12-4") { + volume := newVolume("pvc-uid12-4", "1Gi", "uid12-4", "claim12-4", v1.VolumeBound, v1.PersistentVolumeReclaimRetain, classExternal, pvutil.AnnBoundByController) + ctrl.volumes.store.Add(volume) // add the volume to controller + reactor.AddVolume(volume) + } + }), + }, } runMultisyncTests(t, tests, storageClasses, storageClasses[0].Name) diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index 6ba7e7115fa..a630b052c59 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -22,7 +22,7 @@ import ( "strings" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -203,6 +203,28 @@ type PersistentVolumeController struct { // For testing only: hook to intercept CSI driver name <=> Intree plugin name mapping // Not used when set to nil csiNameFromIntreeNameHook func(pluginName string) (string, error) + + // operationTimestamps caches start timestamp of operations + // (currently provision + binding/deletion) for metric recording. + // Detailed lifecyle/key for each operation + // 1. provision + binding + // key: claimKey + // start time: user has NOT provide any volume ref in the claim AND + // there is no existing volume found for the claim, + // "provisionClaim" is called with a valid plugin/external provisioner + // to provision a volume + // end time: after a volume has been provisioned and bound to the claim successfully + // the corresponding timestamp entry will be deleted from cache + // abort: claim has not been bound to a volume yet but a claim deleted event + // has been received from API server + // 2. deletion + // key: volumeName + // start time: when "reclaimVolume" process a volume with reclaim policy + // set to be "PersistentVolumeReclaimDelete" + // end time: after a volume deleted event has been received from API server + // the corresponding timestamp entry will be deleted from cache + // abort: N.A. + operationTimestamps metrics.OperationStartTimeCache } // syncClaim is the main controller method to decide what to do with a claim. @@ -323,13 +345,21 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol } else /* pv != nil */ { // Found a PV for this claim // OBSERVATION: pvc is "Pending", pv is "Available" - klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), volume.Name, getVolumeStatusForLogging(volume)) + claimKey := claimToClaimKey(claim) + klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume)) if err = ctrl.bind(volume, claim); err != nil { // On any error saving the volume or the claim, subsequent // syncClaim will finish the binding. + // record count error for provision if exists + // timestamp entry will remain in cache until a success binding has happened + metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err) return err } // OBSERVATION: claim is "Bound", pv is "Bound" + // if exists a timestamp entry in cache, record end to end provision latency and clean up cache + // End of the provision + binding operation lifecycle, cache will be cleaned by "RecordMetric" + // [Unit test 12-1, 12-2, 12-4] + metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil) return nil } } else /* pvc.Spec.VolumeName != nil */ { @@ -1011,11 +1041,17 @@ func (ctrl *PersistentVolumeController) reclaimVolume(volume *v1.PersistentVolum case v1.PersistentVolumeReclaimDelete: klog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name) opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID)) - startTime := time.Now() + // create a start timestamp entry in cache for deletion operation if no one exists with + // key = volume.Name, pluginName = provisionerName, operation = "delete" + ctrl.operationTimestamps.AddIfNotExist(volume.Name, ctrl.getProvisionerNameFromVolume(volume), "delete") ctrl.scheduleOperation(opName, func() error { - pluginName, err := ctrl.deleteVolumeOperation(volume) - timeTaken := time.Since(startTime).Seconds() - metrics.RecordVolumeOperationMetric(pluginName, "delete", timeTaken, err) + _, err := ctrl.deleteVolumeOperation(volume) + if err != nil { + // only report error count to "volume_operation_total_errors" + // latency reporting will happen when the volume get finally + // deleted and a volume deleted event is captured + metrics.RecordMetric(volume.Name, &ctrl.operationTimestamps, err) + } return err }) @@ -1309,11 +1345,31 @@ func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolum } klog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim)) opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) - startTime := time.Now() + plugin, storageClass, err := ctrl.findProvisionablePlugin(claim) + if err != nil { + ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error()) + klog.Errorf("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err) + // failed to find the requested provisioning plugin, directly return err for now. + // controller will retry the provisioning in every syncUnboundClaim() call + // retain the original behavior of returning nil from provisionClaim call + return nil + } ctrl.scheduleOperation(opName, func() error { - pluginName, err := ctrl.provisionClaimOperation(claim) - timeTaken := time.Since(startTime).Seconds() - metrics.RecordVolumeOperationMetric(pluginName, "provision", timeTaken, err) + // create a start timestamp entry in cache for provision operation if no one exists with + // key = claimKey, pluginName = provisionerName, operation = "provision" + claimKey := claimToClaimKey(claim) + ctrl.operationTimestamps.AddIfNotExist(claimKey, ctrl.getProvisionerName(plugin, storageClass), "provision") + var err error + if plugin == nil || plugin.IsMigratedToCSI() { + _, err = ctrl.provisionClaimOperationExternal(claim, plugin, storageClass) + } else { + _, err = ctrl.provisionClaimOperation(claim, plugin, storageClass) + } + // if error happened, record an error count metric + // timestamp entry will remain in cache until a success binding has happened + if err != nil { + metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err) + } return err }) return nil @@ -1328,39 +1384,20 @@ func (ctrl *PersistentVolumeController) getCSINameFromIntreeName(pluginName stri // provisionClaimOperation provisions a volume. This method is running in // standalone goroutine and already has all necessary locks. -func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) (string, error) { +func (ctrl *PersistentVolumeController) provisionClaimOperation( + claim *v1.PersistentVolumeClaim, + plugin vol.ProvisionableVolumePlugin, + storageClass *storage.StorageClass) (string, error) { claimClass := v1helper.GetPersistentVolumeClaimClass(claim) klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass) - plugin, storageClass, err := ctrl.findProvisionablePlugin(claim) - if err != nil { - ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error()) - klog.V(2).Infof("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err) - // The controller will retry provisioning the volume in every - // syncVolume() call. - return "", err - } - - var pluginName string + // called from provisionClaim(), in this case, plugin MUST NOT be nil and + // plugin.IsMigratedToCSI() MUST return FALSE + // NOTE: checks on plugin/storageClass has been saved + pluginName := plugin.GetPluginName() provisionerName := storageClass.Provisioner - if plugin != nil { - if plugin.IsMigratedToCSI() { - // pluginName is not set here to align with existing behavior - // of not setting pluginName for external provisioners (including CSI) - // Set provisionerName to CSI plugin name for setClaimProvisioner - provisionerName, err = ctrl.getCSINameFromIntreeName(storageClass.Provisioner) - if err != nil { - strerr := fmt.Sprintf("error getting CSI name for In tree plugin %s: %v", storageClass.Provisioner, err) - klog.V(2).Infof("%s", strerr) - ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr) - return "", err - } - } else { - pluginName = plugin.GetPluginName() - } - } - // Add provisioner annotation so external provisioners know when to start + // Add provisioner annotation to be consistent with external provisioner workflow newClaim, err := ctrl.setClaimProvisioner(claim, provisionerName) if err != nil { // Save failed, the controller will retry in the next sync @@ -1369,19 +1406,9 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis } claim = newClaim - if plugin == nil || plugin.IsMigratedToCSI() { - // findProvisionablePlugin returned no error nor plugin. - // This means that an unknown provisioner is requested. Report an event - // and wait for the external provisioner - msg := fmt.Sprintf("waiting for a volume to be created, either by external provisioner %q or manually created by system administrator", storageClass.Provisioner) - ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ExternalProvisioning, msg) - klog.V(3).Infof("provisioning claim %q: %s", claimToClaimKey(claim), msg) - return pluginName, nil - } - // internal provisioning - // A previous doProvisionClaim may just have finished while we were waiting for + // A previous provisionClaimOperation may just have finished while we were waiting for // the locks. Check that PV (with deterministic name) hasn't been provisioned // yet. @@ -1547,6 +1574,44 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.Persis return pluginName, nil } +// provisionClaimOperationExternal provisions a volume using external provisioner async-ly +// This method will be running in a standalone go-routine scheduled in "provisionClaim" +func (ctrl *PersistentVolumeController) provisionClaimOperationExternal( + claim *v1.PersistentVolumeClaim, + plugin vol.ProvisionableVolumePlugin, + storageClass *storage.StorageClass) (string, error) { + claimClass := v1helper.GetPersistentVolumeClaimClass(claim) + klog.V(4).Infof("provisionClaimOperationExternal [%s] started, class: %q", claimToClaimKey(claim), claimClass) + // Set provisionerName to external provisioner name by setClaimProvisioner + var err error + provisionerName := storageClass.Provisioner + if plugin != nil { + // update the provisioner name to use the CSI in-tree name + provisionerName, err = ctrl.getCSINameFromIntreeName(storageClass.Provisioner) + if err != nil { + strerr := fmt.Sprintf("error getting CSI name for In tree plugin %s: %v", storageClass.Provisioner, err) + klog.V(2).Infof("%s", strerr) + ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr) + return provisionerName, err + } + } + // Add provisioner annotation so external provisioners know when to start + newClaim, err := ctrl.setClaimProvisioner(claim, provisionerName) + if err != nil { + // Save failed, the controller will retry in the next sync + klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err) + return provisionerName, err + } + claim = newClaim + msg := fmt.Sprintf("waiting for a volume to be created, either by external provisioner %q or manually created by system administrator", provisionerName) + // External provisioner has been requested for provisioning the volume + // Report an event and wait for external provisioner to finish + ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ExternalProvisioning, msg) + klog.V(3).Infof("provisionClaimOperationExternal provisioning claim %q: %s", claimToClaimKey(claim), msg) + // return provisioner name here for metric reporting + return provisionerName, nil +} + // rescheduleProvisioning signal back to the scheduler to retry dynamic provisioning // by removing the AnnSelectedNode annotation func (ctrl *PersistentVolumeController) rescheduleProvisioning(claim *v1.PersistentVolumeClaim) { @@ -1661,3 +1726,47 @@ func (ctrl *PersistentVolumeController) findDeletablePlugin(volume *v1.Persisten } return plugin, nil } + +// obtain provisioner/deleter name for a volume +func (ctrl *PersistentVolumeController) getProvisionerNameFromVolume(volume *v1.PersistentVolume) string { + plugin, err := ctrl.findDeletablePlugin(volume) + if err != nil { + return "N/A" + } + // if external provisioner was used for provisioning, + // the volume MUST have annotation of AnnDynamicallyProvisioned, use the value + // as the provisioner name + if plugin == nil { + return volume.Annotations[pvutil.AnnDynamicallyProvisioned] + } else if plugin.IsMigratedToCSI() { + // in case where a plugin has been migrated to CSI, + // use the CSI name instead of in-tree name + storageClass := v1helper.GetPersistentVolumeClass(volume) + class, err := ctrl.classLister.Get(storageClass) + if err != nil { + return "N/A" + } + provisionerName, err := ctrl.getCSINameFromIntreeName(class.Provisioner) + if err != nil { + return "N/A" + } + return provisionerName + } + return plugin.GetPluginName() +} + +// obtain plugin/external provisioner name from plugin and storage class +func (ctrl *PersistentVolumeController) getProvisionerName(plugin vol.ProvisionableVolumePlugin, storageClass *storage.StorageClass) string { + // intree plugin, returns the plugin's name + if plugin != nil && !plugin.IsMigratedToCSI() { + return plugin.GetPluginName() + } else if plugin != nil { + // get the CSI in-tree name from storage class provisioner name + provisionerName, err := ctrl.getCSINameFromIntreeName(storageClass.Provisioner) + if err != nil { + return "N/A" + } + return provisionerName + } + return storageClass.Provisioner +} diff --git a/pkg/controller/volume/persistentvolume/pv_controller_base.go b/pkg/controller/volume/persistentvolume/pv_controller_base.go index e43863c3193..adf53d8be83 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_base.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_base.go @@ -21,7 +21,7 @@ import ( "strconv" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -92,6 +92,7 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error) claimQueue: workqueue.NewNamed("claims"), volumeQueue: workqueue.NewNamed("volumes"), resyncPeriod: p.SyncPeriod, + operationTimestamps: metrics.NewOperationStartTimeCache(), } // Prober is nil because PV is not aware of Flexvolume. @@ -209,6 +210,10 @@ func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume func (ctrl *PersistentVolumeController) deleteVolume(volume *v1.PersistentVolume) { _ = ctrl.volumes.store.Delete(volume) klog.V(4).Infof("volume %q deleted", volume.Name) + // record deletion metric if a deletion start timestamp is in the cache + // the following calls will be a no-op if there is nothing for this volume in the cache + // end of timestamp cache entry lifecycle, "RecordMetric" will do the clean + metrics.RecordMetric(volume.Name, &ctrl.operationTimestamps, nil) if volume.Spec.ClaimRef == nil { return @@ -245,20 +250,26 @@ func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeCl } } +// Unit test [5-5] [5-6] [5-7] // deleteClaim runs in worker thread and handles "claim deleted" event. func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeClaim) { _ = ctrl.claims.Delete(claim) - klog.V(4).Infof("claim %q deleted", claimToClaimKey(claim)) + claimKey := claimToClaimKey(claim) + klog.V(4).Infof("claim %q deleted", claimKey) + // clean any possible unfinished provision start timestamp from cache + // Unit test [5-8] [5-9] + ctrl.operationTimestamps.Delete(claimKey) volumeName := claim.Spec.VolumeName if volumeName == "" { - klog.V(5).Infof("deleteClaim[%q]: volume not bound", claimToClaimKey(claim)) + klog.V(5).Infof("deleteClaim[%q]: volume not bound", claimKey) return } + // sync the volume when its claim is deleted. Explicitly sync'ing the // volume here in response to claim deletion prevents the volume from // waiting until the next sync period for its Release. - klog.V(5).Infof("deleteClaim[%q]: scheduling sync of volume %s", claimToClaimKey(claim), volumeName) + klog.V(5).Infof("deleteClaim[%q]: scheduling sync of volume %s", claimKey, volumeName) ctrl.volumeQueue.Add(volumeName) } diff --git a/pkg/controller/volume/persistentvolume/pv_controller_test.go b/pkg/controller/volume/persistentvolume/pv_controller_test.go index 4f1281adc2b..7f61f409860 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller_test.go +++ b/pkg/controller/volume/persistentvolume/pv_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package persistentvolume import ( + "errors" "testing" "time" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + storagelisters "k8s.io/client-go/listers/storage/v1" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/klog" @@ -100,6 +102,130 @@ func TestControllerSync(t *testing.T) { return nil }, }, + { + // deleteClaim with a bound claim makes bound volume released with external deleter. + // delete the corresponding volume from apiserver, and report latency metric + "5-5 - delete claim and delete volume report metric", + volumesWithAnnotation(pvutil.AnnDynamicallyProvisioned, "gcr.io/vendor-csi", + newVolumeArray("volume5-6", "10Gi", "uid5-6", "claim5-6", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classExternal, pvutil.AnnBoundByController)), + novolumes, + claimWithAnnotation(pvutil.AnnStorageProvisioner, "gcr.io/vendor-csi", + newClaimArray("claim5-5", "uid5-5", "1Gi", "volume5-5", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)), + noclaims, + noevents, noerrors, + // Custom test function that generates a delete claim event which should have been caught by + // "deleteClaim" to remove the claim from controller's cache, after that, a volume deleted + // event will be generated to trigger "deleteVolume" call for metric reporting + func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { + test.initialVolumes[0].Annotations[pvutil.AnnDynamicallyProvisioned] = "gcr.io/vendor-csi" + obj := ctrl.claims.List()[0] + claim := obj.(*v1.PersistentVolumeClaim) + reactor.DeleteClaimEvent(claim) + for len(ctrl.claims.ListKeys()) > 0 { + time.Sleep(10 * time.Millisecond) + } + // claim has been removed from controller's cache, generate a volume deleted event + volume := ctrl.volumes.store.List()[0].(*v1.PersistentVolume) + reactor.DeleteVolumeEvent(volume) + return nil + }, + }, + { + // deleteClaim with a bound claim makes bound volume released with external deleter pending + // there should be an entry in operation timestamps cache in controller + "5-6 - delete claim and waiting for external volume deletion", + volumesWithAnnotation(pvutil.AnnDynamicallyProvisioned, "gcr.io/vendor-csi", + newVolumeArray("volume5-6", "10Gi", "uid5-6", "claim5-6", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classExternal, pvutil.AnnBoundByController)), + volumesWithAnnotation(pvutil.AnnDynamicallyProvisioned, "gcr.io/vendor-csi", + newVolumeArray("volume5-6", "10Gi", "uid5-6", "claim5-6", v1.VolumeReleased, v1.PersistentVolumeReclaimDelete, classExternal, pvutil.AnnBoundByController)), + claimWithAnnotation(pvutil.AnnStorageProvisioner, "gcr.io/vendor-csi", + newClaimArray("claim5-6", "uid5-6", "1Gi", "volume5-6", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)), + noclaims, + noevents, noerrors, + // Custom test function that generates a delete claim event which should have been caught by + // "deleteClaim" to remove the claim from controller's cache and mark bound volume to be released + func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { + // should have been provisioned by external provisioner + obj := ctrl.claims.List()[0] + claim := obj.(*v1.PersistentVolumeClaim) + reactor.DeleteClaimEvent(claim) + // wait until claim is cleared from cache, i.e., deleteClaim is called + for len(ctrl.claims.ListKeys()) > 0 { + time.Sleep(10 * time.Millisecond) + } + // make sure the operation timestamp cache is NOT empty + if !ctrl.operationTimestamps.Has("volume5-6") { + return errors.New("failed checking timestamp cache: should not be empty") + } + return nil + }, + }, + { + // deleteVolume event issued before deleteClaim, no metric should have been reported + // and no delete operation start timestamp should be inserted into controller.operationTimestamps cache + "5-7 - delete volume event makes claim lost, delete claim event will not report metric", + newVolumeArray("volume5-7", "10Gi", "uid5-7", "claim5-7", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classExternal, pvutil.AnnBoundByController, pvutil.AnnDynamicallyProvisioned), + novolumes, + claimWithAnnotation(pvutil.AnnStorageProvisioner, "gcr.io/vendor-csi", + newClaimArray("claim5-7", "uid5-7", "1Gi", "volume5-7", v1.ClaimBound, &classExternal, pvutil.AnnBoundByController, pvutil.AnnBindCompleted)), + noclaims, + []string{"Warning ClaimLost"}, + noerrors, + // Custom test function that generates a delete claim event which should have been caught by + // "deleteClaim" to remove the claim from controller's cache and mark bound volume to be released + func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { + volume := ctrl.volumes.store.List()[0].(*v1.PersistentVolume) + reactor.DeleteVolumeEvent(volume) + for len(ctrl.volumes.store.ListKeys()) > 0 { + time.Sleep(10 * time.Millisecond) + } + // trying to remove the claim as well + obj := ctrl.claims.List()[0] + claim := obj.(*v1.PersistentVolumeClaim) + reactor.DeleteClaimEvent(claim) + // wait until claim is cleared from cache, i.e., deleteClaim is called + for len(ctrl.claims.ListKeys()) > 0 { + time.Sleep(10 * time.Millisecond) + } + // make sure operation timestamp cache is empty + if ctrl.operationTimestamps.Has("volume5-7") { + return errors.New("failed checking timestamp cache") + } + return nil + }, + }, + { + // delete a claim waiting for being bound cleans up provision(volume ref == "") entry from timestamp cache + "5-8 - delete claim cleans up operation timestamp cache for provision", + novolumes, + novolumes, + claimWithAnnotation(pvutil.AnnStorageProvisioner, "gcr.io/vendor-csi", + newClaimArray("claim5-8", "uid5-8", "1Gi", "", v1.ClaimPending, &classExternal)), + noclaims, + []string{"Normal ExternalProvisioning"}, + noerrors, + // Custom test function that generates a delete claim event which should have been caught by + // "deleteClaim" to remove the claim from controller's cache and mark bound volume to be released + func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error { + // wait until the provision timestamp has been inserted + for !ctrl.operationTimestamps.Has("default/claim5-8") { + time.Sleep(10 * time.Millisecond) + } + // delete the claim + obj := ctrl.claims.List()[0] + claim := obj.(*v1.PersistentVolumeClaim) + reactor.DeleteClaimEvent(claim) + // wait until claim is cleared from cache, i.e., deleteClaim is called + for len(ctrl.claims.ListKeys()) > 0 { + time.Sleep(10 * time.Millisecond) + } + // make sure operation timestamp cache is empty + if ctrl.operationTimestamps.Has("default/claim5-8") { + return errors.New("failed checking timestamp cache") + } + return nil + }, + }, } for _, test := range tests { @@ -120,6 +246,18 @@ func TestControllerSync(t *testing.T) { t.Fatalf("Test %q construct persistent volume failed: %v", test.name, err) } + // Inject storage classes into controller via a custom lister for test [5-5] + storageClasses := []*storagev1.StorageClass{ + makeStorageClass(classExternal, &modeImmediate), + } + + storageClasses[0].Provisioner = "gcr.io/vendor-csi" + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + for _, class := range storageClasses { + indexer.Add(class) + } + ctrl.classLister = storagelisters.NewStorageClassLister(indexer) + reactor := newVolumeReactor(client, ctrl, fakeVolumeWatch, fakeClaimWatch, test.errors) for _, claim := range test.initialClaims { reactor.AddClaim(claim) diff --git a/pkg/volume/util/metrics.go b/pkg/volume/util/metrics.go index ee20c0e10be..5dc94049ff9 100644 --- a/pkg/volume/util/metrics.go +++ b/pkg/volume/util/metrics.go @@ -54,6 +54,15 @@ var storageOperationStatusMetric = prometheus.NewCounterVec( []string{"volume_plugin", "operation_name", "status"}, ) +var storageOperationEndToEndLatencyMetric = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "volume_operation_total_seconds", + Help: "Storage operation end to end duration in seconds", + Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600}, + }, + []string{"volume_plugin", "operation_name"}, +) + func init() { registerMetrics() } @@ -62,6 +71,7 @@ func registerMetrics() { prometheus.MustRegister(storageOperationMetric) prometheus.MustRegister(storageOperationErrorMetric) prometheus.MustRegister(storageOperationStatusMetric) + prometheus.MustRegister(storageOperationEndToEndLatencyMetric) } // OperationCompleteHook returns a hook to call when an operation is completed @@ -95,3 +105,9 @@ func GetFullQualifiedPluginNameForVolume(pluginName string, spec *volume.Spec) s } return pluginName } + +// RecordOperationLatencyMetric records the end to end latency for certain operation +// into metric volume_operation_total_seconds +func RecordOperationLatencyMetric(plugin, operationName string, secondsTaken float64) { + storageOperationEndToEndLatencyMetric.WithLabelValues(plugin, operationName).Observe(secondsTaken) +}