Operation generator migration metric fixes and test metrics retrieval code

This commit is contained in:
David Zhu 2019-04-25 16:20:03 -07:00
parent dc071b8e81
commit 213cc99ead
3 changed files with 224 additions and 5 deletions

View File

@ -295,16 +295,18 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
func (og *operationGenerator) GenerateAttachVolumeFunc(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
originalSpec := volumeToAttach.VolumeSpec
attachVolumeFunc := func() (error, error) {
var attachableVolumePlugin volume.AttachableVolumePlugin
originalSpec := volumeToAttach.VolumeSpec
nu, err := nodeUsingCSIPlugin(og, volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
if err != nil {
return volumeToAttach.GenerateError("AttachVolume.NodeUsingCSIPlugin failed", err)
}
// useCSIPlugin will check both CSIMigration and the plugin specific feature gate
if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) && nu {
// useCSIPlugin will check both CSIMigration and the plugin specific feature gates
ucp := useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec)
if ucp && nu {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
@ -382,8 +384,29 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
}
}
// Get attacher plugin
attachableVolumePluginName := unknownAttachableVolumePlugin
// TODO(dyzz) Ignoring this error means that if the plugin is migrated and
// any transient error is encountered (API unavailable, driver not installed)
// the operation will have it's metric registered with the in-tree plugin instead
// of the CSI Driver we migrated to. Fixing this requires a larger refactor that
// involves determining the plugin_name for the metric generating "CompleteFunc"
// during the actual "OperationFunc" and not during this generation function
nu, _ := nodeUsingCSIPlugin(og, volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
ucp := useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec)
// Need to translate the spec here if the plugin is migrated so that the metrics
// emitted show the correct (migrated) plugin
if ucp && nu {
csiSpec, err := translateSpec(volumeToAttach.VolumeSpec)
if err == nil {
volumeToAttach.VolumeSpec = csiSpec
}
// If we have an error here we ignore it, the metric emitted will then be for the
// in-tree plugin. This error case(skipped one) will also trigger an error
// while the generated function is executed. And those errors will be handled during the execution of the generated
// function with a back off policy.
}
// Get attacher plugin
attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
// It's ok to ignore the error, returning error is not expected from this function.
@ -528,7 +551,20 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
actualStateOfWorld ActualStateOfWorldMounterUpdater,
isRemount bool) volumetypes.GeneratedOperations {
// Get mounter plugin
originalSpec := volumeToMount.VolumeSpec
volumePluginName := unknownVolumePlugin
// Need to translate the spec here if the plugin is migrated so that the metrics
// emitted show the correct (migrated) plugin
if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) {
csiSpec, err := translateSpec(volumeToMount.VolumeSpec)
if err == nil {
volumeToMount.VolumeSpec = csiSpec
}
// If we have an error here we ignore it, the metric emitted will then be for the
// in-tree plugin. This error case(skipped one) will also trigger an error
// while the generated function is executed. And those errors will be handled during the execution of the generated
// function with a back off policy.
}
volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
if err == nil && volumePlugin != nil {
@ -536,7 +572,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
}
mountVolumeFunc := func() (error, error) {
originalSpec := volumeToMount.VolumeSpec
// Get mounter plugin
if useCSIPlugin(og.volumePluginMgr, volumeToMount.VolumeSpec) {
csiSpec, err := translateSpec(volumeToMount.VolumeSpec)

View File

@ -88,6 +88,7 @@ func InitNFSDriver() testsuites.TestDriver {
return &nfsDriver{
driverInfo: testsuites.DriverInfo{
Name: "nfs",
PluginName: "kubernetes.io/nfs",
MaxFileSize: testpatterns.FileSizeLarge,
SupportedFsType: sets.NewString(
"", // Default fsType
@ -229,6 +230,7 @@ func InitGlusterFSDriver() testsuites.TestDriver {
return &glusterFSDriver{
driverInfo: testsuites.DriverInfo{
Name: "gluster",
PluginName: "kubernetes.io/glusterfs",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
"", // Default fsType
@ -346,6 +348,7 @@ func InitISCSIDriver() testsuites.TestDriver {
return &iSCSIDriver{
driverInfo: testsuites.DriverInfo{
Name: "iscsi",
PluginName: "kubernetes.io/iscsi",
FeatureTag: "[Feature:Volumes]",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
@ -458,6 +461,7 @@ func InitRbdDriver() testsuites.TestDriver {
return &rbdDriver{
driverInfo: testsuites.DriverInfo{
Name: "rbd",
PluginName: "kubernetes.io/rbd",
FeatureTag: "[Feature:Volumes]",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
@ -585,6 +589,7 @@ func InitCephFSDriver() testsuites.TestDriver {
return &cephFSDriver{
driverInfo: testsuites.DriverInfo{
Name: "ceph",
PluginName: "kubernetes.io/cephfs",
FeatureTag: "[Feature:Volumes]",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
@ -684,6 +689,7 @@ func InitHostPathDriver() testsuites.TestDriver {
return &hostPathDriver{
driverInfo: testsuites.DriverInfo{
Name: "hostPath",
PluginName: "kubernetes.io/host-path",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
"", // Default fsType
@ -756,6 +762,7 @@ func InitHostPathSymlinkDriver() testsuites.TestDriver {
return &hostPathSymlinkDriver{
driverInfo: testsuites.DriverInfo{
Name: "hostPathSymlink",
PluginName: "kubernetes.io/host-path",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
"", // Default fsType
@ -896,6 +903,7 @@ func InitEmptydirDriver() testsuites.TestDriver {
return &emptydirDriver{
driverInfo: testsuites.DriverInfo{
Name: "emptydir",
PluginName: "kubernetes.io/empty-dir",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
"", // Default fsType
@ -961,6 +969,7 @@ func InitCinderDriver() testsuites.TestDriver {
return &cinderDriver{
driverInfo: testsuites.DriverInfo{
Name: "cinder",
PluginName: "kubernetes.io/cinder",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
"", // Default fsType
@ -1129,6 +1138,7 @@ func InitGcePdDriver() testsuites.TestDriver {
return &gcePdDriver{
driverInfo: testsuites.DriverInfo{
Name: "gcepd",
PluginName: "kubernetes.io/gce-pd",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: supportedTypes,
SupportedMountOption: sets.NewString("debug", "nouid32"),
@ -1256,6 +1266,7 @@ func InitVSphereDriver() testsuites.TestDriver {
return &vSphereDriver{
driverInfo: testsuites.DriverInfo{
Name: "vSphere",
PluginName: "kubernetes.io/vsphere-volume",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
"", // Default fsType
@ -1377,6 +1388,7 @@ func InitAzureDriver() testsuites.TestDriver {
return &azureDriver{
driverInfo: testsuites.DriverInfo{
Name: "azure",
PluginName: "kubernetes.io/azure-file",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
"", // Default fsType
@ -1495,6 +1507,7 @@ func InitAwsDriver() testsuites.TestDriver {
return &awsDriver{
driverInfo: testsuites.DriverInfo{
Name: "aws",
PluginName: "kubernetes.io/aws-ebs",
MaxFileSize: testpatterns.FileSizeMedium,
SupportedFsType: sets.NewString(
"", // Default fsType
@ -1661,6 +1674,7 @@ func InitLocalDriverWithVolumeType(volumeType utils.LocalVolumeType) func() test
return &localDriver{
driverInfo: testsuites.DriverInfo{
Name: "local",
PluginName: "kubernetes.io/local-volume",
FeatureTag: featureTag,
MaxFileSize: maxFileSize,
SupportedFsType: supportedFsTypes,

View File

@ -18,8 +18,10 @@ package testsuites
import (
"context"
"flag"
"fmt"
"regexp"
"strings"
"time"
. "github.com/onsi/ginkgo"
@ -31,13 +33,26 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
csilib "k8s.io/csi-translation-lib"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/metrics"
"k8s.io/kubernetes/test/e2e/framework/podlogs"
"k8s.io/kubernetes/test/e2e/framework/volume"
"k8s.io/kubernetes/test/e2e/storage/testpatterns"
)
var (
migratedPlugins *string
)
func init() {
migratedPlugins = flag.String("storage.migratedPlugins", "", "comma seperated list of in-tree plugin names of form 'kubernetes.io/{pluginName}' migrated to CSI")
}
type opCounts map[string]int64
// TestSuite represents an interface for a set of tests which works with TestDriver
type TestSuite interface {
// getTestSuiteInfo returns the TestSuiteInfo for this TestSuite
@ -465,3 +480,157 @@ func StartPodLogs(f *framework.Framework) func() {
return cancel
}
func getVolumeOpsFromMetricsForPlugin(ms metrics.ControllerManagerMetrics, pluginName string) opCounts {
totOps := opCounts{}
for method, samples := range ms {
switch method {
case "storage_operation_status_count":
for _, sample := range samples {
plugin := string(sample.Metric["volume_plugin"])
if pluginName != plugin {
continue
}
opName := string(sample.Metric["operation_name"])
if opName == "verify_controller_attached_volume" {
// We ignore verify_controller_attached_volume because it does not call into
// the plugin. It only watches Node API and updates Actual State of World cache
continue
}
totOps[opName] = totOps[opName] + int64(sample.Value)
}
}
}
return totOps
}
func getVolumeOpsFromKubeletMetricsForPlugin(ms metrics.KubeletMetrics, pluginName string) opCounts {
totOps := opCounts{}
for method, samples := range ms {
switch method {
case "storage_operation_status_count":
for _, sample := range samples {
plugin := string(sample.Metric["volume_plugin"])
if pluginName != plugin {
continue
}
opName := string(sample.Metric["operation_name"])
if opName == "verify_controller_attached_volume" {
// We ignore verify_controller_attached_volume because it does not call into
// the plugin. It only watches Node API and updates Actual State of World cache
continue
}
totOps[opName] = totOps[opName] + int64(sample.Value)
}
}
}
return totOps
}
func getVolumeOpCounts(c clientset.Interface, pluginName string) opCounts {
metricsGrabber, err := metrics.NewMetricsGrabber(c, nil, true, false, true, false, false)
if err != nil {
framework.Failf("Error creating metrics grabber : %v", err)
}
if !metricsGrabber.HasRegisteredMaster() {
framework.Skipf("Environment does not support getting controller-manager metrics - skipping")
}
controllerMetrics, err := metricsGrabber.GrabFromControllerManager()
framework.ExpectNoError(err, "Error getting c-m metrics : %v", err)
totOps := getVolumeOpsFromMetricsForPlugin(controllerMetrics, pluginName)
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
framework.ExpectNoError(err, "Error listing nodes: %v", err)
for _, node := range nodes.Items {
nodeMetrics, err := metricsGrabber.GrabFromKubelet(node.GetName())
framework.ExpectNoError(err, "Error getting Kubelet %v metrics: %v", node.GetName(), err)
totOps = addOpCounts(totOps, getVolumeOpsFromKubeletMetricsForPlugin(nodeMetrics, pluginName))
}
return totOps
}
func addOpCounts(o1 opCounts, o2 opCounts) opCounts {
totOps := opCounts{}
seen := sets.NewString()
for op, count := range o1 {
seen.Insert(op)
totOps[op] = totOps[op] + count
totOps[op] = totOps[op] + o2[op]
}
for op, count := range o2 {
if !seen.Has(op) {
totOps[op] = totOps[op] + count
}
}
return totOps
}
func getMigrationVolumeOpCounts(cs clientset.Interface, pluginName string) (opCounts, opCounts) {
if len(pluginName) > 0 {
var migratedOps opCounts
csiName, err := csilib.GetCSINameFromInTreeName(pluginName)
if err != nil {
framework.Logf("Could not find CSI Name for in-tree plugin %v", pluginName)
migratedOps = opCounts{}
} else {
csiName = "kubernetes.io/csi:" + csiName
migratedOps = getVolumeOpCounts(cs, csiName)
}
return getVolumeOpCounts(cs, pluginName), migratedOps
} else {
// Not an in-tree driver
framework.Logf("Test running for native CSI Driver, not checking metrics")
return opCounts{}, opCounts{}
}
}
func getTotOps(ops opCounts) int64 {
var tot int64 = 0
for _, count := range ops {
tot += count
}
return tot
}
func validateMigrationVolumeOpCounts(cs clientset.Interface, pluginName string, oldInTreeOps, oldMigratedOps opCounts) {
if len(pluginName) == 0 {
// This is a native CSI Driver and we don't check ops
return
}
newInTreeOps, newMigratedOps := getMigrationVolumeOpCounts(cs, pluginName)
if sets.NewString(strings.Split(*migratedPlugins, ",")...).Has(pluginName) {
// If this plugin is migrated based on the test flag storage.migratedPlugins
for op, count := range newInTreeOps {
if count != oldInTreeOps[op] {
framework.Failf("In-tree plugin %v migrated to CSI Driver, however found %v %v metrics for in-tree plugin", pluginName, count-oldInTreeOps[op], op)
}
}
totMigrated := getTotOps(newMigratedOps)
oldTotMigrated := getTotOps(oldMigratedOps)
if totMigrated-oldTotMigrated <= 0 {
framework.Failf("In-tree plugin %v migrated to CSI Driver, however found %v metrics for migrated plugin", pluginName, totMigrated-oldTotMigrated)
}
} else {
// In-tree plugin is not migrated
totInTree := getTotOps(newInTreeOps)
oldTotInTree := getTotOps(oldInTreeOps)
if totInTree == oldTotInTree {
framework.Failf("In-tree plugin %v NOT migrated to CSI Driver, however found did not find any operation metrics for in-tree plugin", pluginName)
}
// We don't check counts for the Migrated version of the driver because if tests are running in parallel a test could be using
// the CSI Driver natively and increase the metrics count
// TODO(dyzz): Add a dimension to OperationGenerator metrics for "migrated"->true/false so that we can disambiguate migrated metrics
// and native CSI Driver metrics. This way we can check the counts for migrated version of the driver for stronger negative test case
// guarantees (as well as more informative metrics).
}
}