From 213cc99ead9a3fd03c3b382c6ac249fe1b9dbabd Mon Sep 17 00:00:00 2001 From: David Zhu Date: Thu, 25 Apr 2019 16:20:03 -0700 Subject: [PATCH] Operation generator migration metric fixes and test metrics retrieval code --- .../operationexecutor/operation_generator.go | 46 ++++- test/e2e/storage/drivers/in_tree.go | 14 ++ test/e2e/storage/testsuites/base.go | 169 ++++++++++++++++++ 3 files changed, 224 insertions(+), 5 deletions(-) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 6fa0a684bbc..4599a487b4c 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -295,16 +295,18 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc( func (og *operationGenerator) GenerateAttachVolumeFunc( volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations { + originalSpec := volumeToAttach.VolumeSpec attachVolumeFunc := func() (error, error) { var attachableVolumePlugin volume.AttachableVolumePlugin - originalSpec := volumeToAttach.VolumeSpec + nu, err := nodeUsingCSIPlugin(og, volumeToAttach.VolumeSpec, volumeToAttach.NodeName) if err != nil { return volumeToAttach.GenerateError("AttachVolume.NodeUsingCSIPlugin failed", err) } - // useCSIPlugin will check both CSIMigration and the plugin specific feature gate - if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) && nu { + // useCSIPlugin will check both CSIMigration and the plugin specific feature gates + ucp := useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) + if ucp && nu { // The volume represented by this spec is CSI and thus should be migrated attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) if err != nil || attachableVolumePlugin == nil { @@ -382,8 +384,29 @@ func (og *operationGenerator) GenerateAttachVolumeFunc( } } - // Get attacher plugin attachableVolumePluginName := unknownAttachableVolumePlugin + // TODO(dyzz) Ignoring this error means that if the plugin is migrated and + // any transient error is encountered (API unavailable, driver not installed) + // the operation will have it's metric registered with the in-tree plugin instead + // of the CSI Driver we migrated to. Fixing this requires a larger refactor that + // involves determining the plugin_name for the metric generating "CompleteFunc" + // during the actual "OperationFunc" and not during this generation function + nu, _ := nodeUsingCSIPlugin(og, volumeToAttach.VolumeSpec, volumeToAttach.NodeName) + ucp := useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) + + // Need to translate the spec here if the plugin is migrated so that the metrics + // emitted show the correct (migrated) plugin + if ucp && nu { + csiSpec, err := translateSpec(volumeToAttach.VolumeSpec) + if err == nil { + volumeToAttach.VolumeSpec = csiSpec + } + // If we have an error here we ignore it, the metric emitted will then be for the + // in-tree plugin. This error case(skipped one) will also trigger an error + // while the generated function is executed. And those errors will be handled during the execution of the generated + // function with a back off policy. + } + // Get attacher plugin attachableVolumePlugin, err := og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) // It's ok to ignore the error, returning error is not expected from this function. @@ -528,7 +551,20 @@ func (og *operationGenerator) GenerateMountVolumeFunc( actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations { // Get mounter plugin + originalSpec := volumeToMount.VolumeSpec volumePluginName := unknownVolumePlugin + // Need to translate the spec here if the plugin is migrated so that the metrics + // emitted show the correct (migrated) plugin + if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) { + csiSpec, err := translateSpec(volumeToMount.VolumeSpec) + if err == nil { + volumeToMount.VolumeSpec = csiSpec + } + // If we have an error here we ignore it, the metric emitted will then be for the + // in-tree plugin. This error case(skipped one) will also trigger an error + // while the generated function is executed. And those errors will be handled during the execution of the generated + // function with a back off policy. + } volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) if err == nil && volumePlugin != nil { @@ -536,7 +572,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( } mountVolumeFunc := func() (error, error) { - originalSpec := volumeToMount.VolumeSpec + // Get mounter plugin if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) { csiSpec, err := translateSpec(volumeToMount.VolumeSpec) diff --git a/test/e2e/storage/drivers/in_tree.go b/test/e2e/storage/drivers/in_tree.go index e397921f0e4..68e813c83c3 100644 --- a/test/e2e/storage/drivers/in_tree.go +++ b/test/e2e/storage/drivers/in_tree.go @@ -88,6 +88,7 @@ func InitNFSDriver() testsuites.TestDriver { return &nfsDriver{ driverInfo: testsuites.DriverInfo{ Name: "nfs", + PluginName: "kubernetes.io/nfs", MaxFileSize: testpatterns.FileSizeLarge, SupportedFsType: sets.NewString( "", // Default fsType @@ -229,6 +230,7 @@ func InitGlusterFSDriver() testsuites.TestDriver { return &glusterFSDriver{ driverInfo: testsuites.DriverInfo{ Name: "gluster", + PluginName: "kubernetes.io/glusterfs", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( "", // Default fsType @@ -346,6 +348,7 @@ func InitISCSIDriver() testsuites.TestDriver { return &iSCSIDriver{ driverInfo: testsuites.DriverInfo{ Name: "iscsi", + PluginName: "kubernetes.io/iscsi", FeatureTag: "[Feature:Volumes]", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( @@ -458,6 +461,7 @@ func InitRbdDriver() testsuites.TestDriver { return &rbdDriver{ driverInfo: testsuites.DriverInfo{ Name: "rbd", + PluginName: "kubernetes.io/rbd", FeatureTag: "[Feature:Volumes]", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( @@ -585,6 +589,7 @@ func InitCephFSDriver() testsuites.TestDriver { return &cephFSDriver{ driverInfo: testsuites.DriverInfo{ Name: "ceph", + PluginName: "kubernetes.io/cephfs", FeatureTag: "[Feature:Volumes]", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( @@ -684,6 +689,7 @@ func InitHostPathDriver() testsuites.TestDriver { return &hostPathDriver{ driverInfo: testsuites.DriverInfo{ Name: "hostPath", + PluginName: "kubernetes.io/host-path", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( "", // Default fsType @@ -756,6 +762,7 @@ func InitHostPathSymlinkDriver() testsuites.TestDriver { return &hostPathSymlinkDriver{ driverInfo: testsuites.DriverInfo{ Name: "hostPathSymlink", + PluginName: "kubernetes.io/host-path", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( "", // Default fsType @@ -896,6 +903,7 @@ func InitEmptydirDriver() testsuites.TestDriver { return &emptydirDriver{ driverInfo: testsuites.DriverInfo{ Name: "emptydir", + PluginName: "kubernetes.io/empty-dir", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( "", // Default fsType @@ -961,6 +969,7 @@ func InitCinderDriver() testsuites.TestDriver { return &cinderDriver{ driverInfo: testsuites.DriverInfo{ Name: "cinder", + PluginName: "kubernetes.io/cinder", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( "", // Default fsType @@ -1129,6 +1138,7 @@ func InitGcePdDriver() testsuites.TestDriver { return &gcePdDriver{ driverInfo: testsuites.DriverInfo{ Name: "gcepd", + PluginName: "kubernetes.io/gce-pd", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: supportedTypes, SupportedMountOption: sets.NewString("debug", "nouid32"), @@ -1256,6 +1266,7 @@ func InitVSphereDriver() testsuites.TestDriver { return &vSphereDriver{ driverInfo: testsuites.DriverInfo{ Name: "vSphere", + PluginName: "kubernetes.io/vsphere-volume", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( "", // Default fsType @@ -1377,6 +1388,7 @@ func InitAzureDriver() testsuites.TestDriver { return &azureDriver{ driverInfo: testsuites.DriverInfo{ Name: "azure", + PluginName: "kubernetes.io/azure-file", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( "", // Default fsType @@ -1495,6 +1507,7 @@ func InitAwsDriver() testsuites.TestDriver { return &awsDriver{ driverInfo: testsuites.DriverInfo{ Name: "aws", + PluginName: "kubernetes.io/aws-ebs", MaxFileSize: testpatterns.FileSizeMedium, SupportedFsType: sets.NewString( "", // Default fsType @@ -1661,6 +1674,7 @@ func InitLocalDriverWithVolumeType(volumeType utils.LocalVolumeType) func() test return &localDriver{ driverInfo: testsuites.DriverInfo{ Name: "local", + PluginName: "kubernetes.io/local-volume", FeatureTag: featureTag, MaxFileSize: maxFileSize, SupportedFsType: supportedFsTypes, diff --git a/test/e2e/storage/testsuites/base.go b/test/e2e/storage/testsuites/base.go index 7cc57d28e6a..9be9a8944fb 100644 --- a/test/e2e/storage/testsuites/base.go +++ b/test/e2e/storage/testsuites/base.go @@ -18,8 +18,10 @@ package testsuites import ( "context" + "flag" "fmt" "regexp" + "strings" "time" . "github.com/onsi/ginkgo" @@ -31,13 +33,26 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" + csilib "k8s.io/csi-translation-lib" "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e/framework/metrics" "k8s.io/kubernetes/test/e2e/framework/podlogs" "k8s.io/kubernetes/test/e2e/framework/volume" "k8s.io/kubernetes/test/e2e/storage/testpatterns" ) +var ( + migratedPlugins *string +) + +func init() { + migratedPlugins = flag.String("storage.migratedPlugins", "", "comma seperated list of in-tree plugin names of form 'kubernetes.io/{pluginName}' migrated to CSI") +} + +type opCounts map[string]int64 + // TestSuite represents an interface for a set of tests which works with TestDriver type TestSuite interface { // getTestSuiteInfo returns the TestSuiteInfo for this TestSuite @@ -465,3 +480,157 @@ func StartPodLogs(f *framework.Framework) func() { return cancel } + +func getVolumeOpsFromMetricsForPlugin(ms metrics.ControllerManagerMetrics, pluginName string) opCounts { + totOps := opCounts{} + + for method, samples := range ms { + switch method { + case "storage_operation_status_count": + for _, sample := range samples { + plugin := string(sample.Metric["volume_plugin"]) + if pluginName != plugin { + continue + } + opName := string(sample.Metric["operation_name"]) + if opName == "verify_controller_attached_volume" { + // We ignore verify_controller_attached_volume because it does not call into + // the plugin. It only watches Node API and updates Actual State of World cache + continue + } + totOps[opName] = totOps[opName] + int64(sample.Value) + } + } + } + return totOps +} + +func getVolumeOpsFromKubeletMetricsForPlugin(ms metrics.KubeletMetrics, pluginName string) opCounts { + totOps := opCounts{} + + for method, samples := range ms { + switch method { + case "storage_operation_status_count": + for _, sample := range samples { + plugin := string(sample.Metric["volume_plugin"]) + if pluginName != plugin { + continue + } + opName := string(sample.Metric["operation_name"]) + if opName == "verify_controller_attached_volume" { + // We ignore verify_controller_attached_volume because it does not call into + // the plugin. It only watches Node API and updates Actual State of World cache + continue + } + totOps[opName] = totOps[opName] + int64(sample.Value) + } + } + } + return totOps +} + +func getVolumeOpCounts(c clientset.Interface, pluginName string) opCounts { + metricsGrabber, err := metrics.NewMetricsGrabber(c, nil, true, false, true, false, false) + + if err != nil { + framework.Failf("Error creating metrics grabber : %v", err) + } + + if !metricsGrabber.HasRegisteredMaster() { + framework.Skipf("Environment does not support getting controller-manager metrics - skipping") + } + + controllerMetrics, err := metricsGrabber.GrabFromControllerManager() + framework.ExpectNoError(err, "Error getting c-m metrics : %v", err) + totOps := getVolumeOpsFromMetricsForPlugin(controllerMetrics, pluginName) + + nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{}) + framework.ExpectNoError(err, "Error listing nodes: %v", err) + for _, node := range nodes.Items { + nodeMetrics, err := metricsGrabber.GrabFromKubelet(node.GetName()) + framework.ExpectNoError(err, "Error getting Kubelet %v metrics: %v", node.GetName(), err) + totOps = addOpCounts(totOps, getVolumeOpsFromKubeletMetricsForPlugin(nodeMetrics, pluginName)) + } + + return totOps +} + +func addOpCounts(o1 opCounts, o2 opCounts) opCounts { + totOps := opCounts{} + seen := sets.NewString() + for op, count := range o1 { + seen.Insert(op) + totOps[op] = totOps[op] + count + totOps[op] = totOps[op] + o2[op] + } + for op, count := range o2 { + if !seen.Has(op) { + totOps[op] = totOps[op] + count + } + } + return totOps +} + +func getMigrationVolumeOpCounts(cs clientset.Interface, pluginName string) (opCounts, opCounts) { + if len(pluginName) > 0 { + var migratedOps opCounts + csiName, err := csilib.GetCSINameFromInTreeName(pluginName) + if err != nil { + framework.Logf("Could not find CSI Name for in-tree plugin %v", pluginName) + migratedOps = opCounts{} + } else { + csiName = "kubernetes.io/csi:" + csiName + migratedOps = getVolumeOpCounts(cs, csiName) + } + return getVolumeOpCounts(cs, pluginName), migratedOps + } else { + // Not an in-tree driver + framework.Logf("Test running for native CSI Driver, not checking metrics") + return opCounts{}, opCounts{} + } +} + +func getTotOps(ops opCounts) int64 { + var tot int64 = 0 + for _, count := range ops { + tot += count + } + return tot +} + +func validateMigrationVolumeOpCounts(cs clientset.Interface, pluginName string, oldInTreeOps, oldMigratedOps opCounts) { + if len(pluginName) == 0 { + // This is a native CSI Driver and we don't check ops + return + } + + newInTreeOps, newMigratedOps := getMigrationVolumeOpCounts(cs, pluginName) + + if sets.NewString(strings.Split(*migratedPlugins, ",")...).Has(pluginName) { + // If this plugin is migrated based on the test flag storage.migratedPlugins + for op, count := range newInTreeOps { + if count != oldInTreeOps[op] { + framework.Failf("In-tree plugin %v migrated to CSI Driver, however found %v %v metrics for in-tree plugin", pluginName, count-oldInTreeOps[op], op) + } + } + totMigrated := getTotOps(newMigratedOps) + oldTotMigrated := getTotOps(oldMigratedOps) + if totMigrated-oldTotMigrated <= 0 { + framework.Failf("In-tree plugin %v migrated to CSI Driver, however found %v metrics for migrated plugin", pluginName, totMigrated-oldTotMigrated) + + } + } else { + // In-tree plugin is not migrated + totInTree := getTotOps(newInTreeOps) + oldTotInTree := getTotOps(oldInTreeOps) + if totInTree == oldTotInTree { + framework.Failf("In-tree plugin %v NOT migrated to CSI Driver, however found did not find any operation metrics for in-tree plugin", pluginName) + } + // We don't check counts for the Migrated version of the driver because if tests are running in parallel a test could be using + // the CSI Driver natively and increase the metrics count + + // TODO(dyzz): Add a dimension to OperationGenerator metrics for "migrated"->true/false so that we can disambiguate migrated metrics + // and native CSI Driver metrics. This way we can check the counts for migrated version of the driver for stronger negative test case + // guarantees (as well as more informative metrics). + } +}