diff --git a/pkg/controller/.import-restrictions b/pkg/controller/.import-restrictions index a47e51a2b16..62d91266fd7 100644 --- a/pkg/controller/.import-restrictions +++ b/pkg/controller/.import-restrictions @@ -104,6 +104,7 @@ "github.com/robfig/cron", "github.com/spf13/pflag", "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/mock", "github.com/stretchr/testify/require", "github.com/google/gofuzz", "github.com/golang/protobuf/ptypes/wrappers", @@ -162,6 +163,7 @@ "k8s.io/client-go/util/flowcontrol", "k8s.io/client-go/util/retry", "k8s.io/client-go/util/workqueue", + "k8s.io/client-go/util/testing", "k8s.io/client-go/transport" ] }, diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index a6c59dac5e6..80beee758e6 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -99,6 +99,7 @@ filegroup( "//pkg/volume/csi/csiv0:all-srcs", "//pkg/volume/csi/fake:all-srcs", "//pkg/volume/csi/nodeinfomanager:all-srcs", + "//pkg/volume/csi/testing:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index bfbf0779057..c280fec73b3 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -1447,7 +1447,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu fakeClient.Fake.PrependWatchReactor("volumeattachments", core.DefaultWatchReactor(fakeWatcher, nil)) // Start informer for CSIDrivers. - factory := informers.NewSharedInformerFactory(fakeClient, csiResyncPeriod) + factory := informers.NewSharedInformerFactory(fakeClient, CsiResyncPeriod) csiDriverInformer := factory.Storage().V1beta1().CSIDrivers() csiDriverLister := csiDriverInformer.Lister() factory.Start(wait.NeverStop) @@ -1474,7 +1474,7 @@ func newTestWatchPlugin(t *testing.T, fakeClient *fakeclient.Clientset) (*csiPlu if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { // Wait until the informer in CSI volume plugin has all CSIDrivers. - wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) { + wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) { return csiDriverInformer.Informer().HasSynced(), nil }) } diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 6be657873f4..3ae2df60f2c 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -59,8 +59,9 @@ const ( volDataFileName = "vol_data.json" fsTypeBlockName = "block" + // CsiResyncPeriod is default resync period duration // TODO: increase to something useful - csiResyncPeriod = time.Minute + CsiResyncPeriod = time.Minute ) var deprecatedSocketDirVersions = []string{"0.1.0", "0.2.0", "0.3.0", "0.4.0"} diff --git a/pkg/volume/csi/csi_plugin_test.go b/pkg/volume/csi/csi_plugin_test.go index 97fecb5ef93..e4ec6ae674a 100644 --- a/pkg/volume/csi/csi_plugin_test.go +++ b/pkg/volume/csi/csi_plugin_test.go @@ -50,7 +50,7 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri } // Start informer for CSIDrivers. - factory := informers.NewSharedInformerFactory(client, csiResyncPeriod) + factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod) csiDriverInformer := factory.Storage().V1beta1().CSIDrivers() csiDriverLister := csiDriverInformer.Lister() go factory.Start(wait.NeverStop) @@ -77,7 +77,7 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset) (*csiPlugin, stri if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { // Wait until the informer in CSI volume plugin has all CSIDrivers. - wait.PollImmediate(testInformerSyncPeriod, testInformerSyncTimeout, func() (bool, error) { + wait.PollImmediate(TestInformerSyncPeriod, TestInformerSyncTimeout, func() (bool, error) { return csiDriverInformer.Informer().HasSynced(), nil }) } @@ -935,7 +935,7 @@ func TestPluginFindAttachablePlugin(t *testing.T) { defer os.RemoveAll(tmpDir) client := fakeclient.NewSimpleClientset(getTestCSIDriver(test.driverName, nil, &test.canAttach)) - factory := informers.NewSharedInformerFactory(client, csiResyncPeriod) + factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod) host := volumetest.NewFakeVolumeHostWithCSINodeName( tmpDir, client, diff --git a/pkg/volume/csi/csi_test.go b/pkg/volume/csi/csi_test.go index 8dd147fa846..b2865453e93 100644 --- a/pkg/volume/csi/csi_test.go +++ b/pkg/volume/csi/csi_test.go @@ -121,7 +121,7 @@ func TestCSI_VolumeAll(t *testing.T) { client := fakeclient.NewSimpleClientset() fakeWatcher := watch.NewRaceFreeFake() - factory := informers.NewSharedInformerFactory(client, csiResyncPeriod) + factory := informers.NewSharedInformerFactory(client, CsiResyncPeriod) factory.Start(wait.NeverStop) host := volumetest.NewFakeVolumeHostWithCSINodeName( diff --git a/pkg/volume/csi/csi_util.go b/pkg/volume/csi/csi_util.go index 0b8233be809..eb643de4b72 100644 --- a/pkg/volume/csi/csi_util.go +++ b/pkg/volume/csi/csi_util.go @@ -34,8 +34,10 @@ import ( ) const ( - testInformerSyncPeriod = 100 * time.Millisecond - testInformerSyncTimeout = 30 * time.Second + // TestInformerSyncPeriod is informer sync period duration for testing + TestInformerSyncPeriod = 100 * time.Millisecond + // TestInformerSyncTimeout is informer timeout duration for testing + TestInformerSyncTimeout = 30 * time.Second ) func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretReference) (map[string]string, error) { diff --git a/pkg/volume/csi/testing/BUILD b/pkg/volume/csi/testing/BUILD new file mode 100644 index 00000000000..78cddd23181 --- /dev/null +++ b/pkg/volume/csi/testing/BUILD @@ -0,0 +1,33 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["testing.go"], + importpath = "k8s.io/kubernetes/pkg/volume/csi/testing", + visibility = ["//visibility:public"], + deps = [ + "//pkg/features:go_default_library", + "//pkg/volume:go_default_library", + "//pkg/volume/csi:go_default_library", + "//pkg/volume/testing:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/util/testing:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/volume/csi/testing/testing.go b/pkg/volume/csi/testing/testing.go new file mode 100644 index 00000000000..f4d4d9a4cba --- /dev/null +++ b/pkg/volume/csi/testing/testing.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + fakeclient "k8s.io/client-go/kubernetes/fake" + utiltesting "k8s.io/client-go/util/testing" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/csi" + volumetest "k8s.io/kubernetes/pkg/volume/testing" + "testing" +) + +// NewTestPlugin creates a plugin mgr to load plugins and setup a fake client +func NewTestPlugin(t *testing.T, client *fakeclient.Clientset) (*volume.VolumePluginMgr, *volume.VolumePlugin, string) { + tmpDir, err := utiltesting.MkTmpdir("csi-test") + if err != nil { + t.Fatalf("can't create temp dir: %v", err) + } + + if client == nil { + client = fakeclient.NewSimpleClientset() + } + + // Start informer for CSIDrivers. + factory := informers.NewSharedInformerFactory(client, csi.CsiResyncPeriod) + csiDriverInformer := factory.Storage().V1beta1().CSIDrivers() + csiDriverLister := csiDriverInformer.Lister() + go factory.Start(wait.NeverStop) + + host := volumetest.NewFakeVolumeHostWithCSINodeName( + tmpDir, + client, + nil, + "fakeNode", + csiDriverLister, + ) + plugMgr := &volume.VolumePluginMgr{} + plugMgr.InitPlugins(csi.ProbeVolumePlugins(), nil /* prober */, host) + + plug, err := plugMgr.FindPluginByName(csi.CSIPluginName) + if err != nil { + t.Fatalf("can't find plugin %v", csi.CSIPluginName) + } + + if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) { + // Wait until the informer in CSI volume plugin has all CSIDrivers. + wait.PollImmediate(csi.TestInformerSyncPeriod, csi.TestInformerSyncTimeout, func() (bool, error) { + return csiDriverInformer.Informer().HasSynced(), nil + }) + } + + return plugMgr, &plug, tmpDir +} diff --git a/pkg/volume/util/operationexecutor/BUILD b/pkg/volume/util/operationexecutor/BUILD index 1a4aacaa68b..b1c2efe37e5 100644 --- a/pkg/volume/util/operationexecutor/BUILD +++ b/pkg/volume/util/operationexecutor/BUILD @@ -41,16 +41,34 @@ go_library( go_test( name = "go_default_test", - srcs = ["operation_executor_test.go"], + srcs = [ + "operation_executor_test.go", + "operation_generator_test.go", + ], embed = [":go_default_library"], deps = [ + "//pkg/features:go_default_library", "//pkg/util/mount:go_default_library", "//pkg/volume:go_default_library", + "//pkg/volume/awsebs:go_default_library", + "//pkg/volume/csi:go_default_library", + "//pkg/volume/csi/testing:go_default_library", + "//pkg/volume/gcepd:go_default_library", + "//pkg/volume/testing:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/component-base/featuregate:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", + "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", + "//vendor/github.com/prometheus/client_model/go:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", ], ) diff --git a/pkg/volume/util/operationexecutor/operation_generator_test.go b/pkg/volume/util/operationexecutor/operation_generator_test.go new file mode 100644 index 00000000000..3f038e30b5b --- /dev/null +++ b/pkg/volume/util/operationexecutor/operation_generator_test.go @@ -0,0 +1,256 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operationexecutor + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" + utilfeature "k8s.io/apiserver/pkg/util/feature" + fakeclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" + "k8s.io/component-base/featuregate" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/csi-translation-lib/plugins" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/awsebs" + "k8s.io/kubernetes/pkg/volume/csi" + csitesting "k8s.io/kubernetes/pkg/volume/csi/testing" + "k8s.io/kubernetes/pkg/volume/gcepd" + volumetesting "k8s.io/kubernetes/pkg/volume/testing" + "os" + "testing" +) + +// this method just tests the volume plugin name that's used in CompleteFunc, the same plugin is also used inside the +// generated func so there is no need to test the plugin name that's used inside generated function +func TestOperationGenerator_GenerateUnmapVolumeFunc_PluginName(t *testing.T) { + type testcase struct { + name string + isCsiMigrationEnabled bool + pluginName string + csiDriverName string + csiMigrationFeature featuregate.Feature + pvSpec v1.PersistentVolumeSpec + probVolumePlugins []volume.VolumePlugin + } + + testcases := []testcase{ + { + name: "gce pd plugin: csi migration disabled", + isCsiMigrationEnabled: false, + pluginName: plugins.GCEPDInTreePluginName, + csiMigrationFeature: features.CSIMigrationGCE, + pvSpec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{}, + }}, + probVolumePlugins: gcepd.ProbeVolumePlugins(), + }, + { + name: "gce pd plugin: csi migration enabled", + isCsiMigrationEnabled: true, + pluginName: plugins.GCEPDInTreePluginName, + csiDriverName: plugins.GCEPDDriverName, + csiMigrationFeature: features.CSIMigrationGCE, + pvSpec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{}, + }}, + probVolumePlugins: gcepd.ProbeVolumePlugins(), + }, + { + name: "aws ebs plugin: csi migration disabled", + isCsiMigrationEnabled: false, + pluginName: plugins.AWSEBSInTreePluginName, + csiMigrationFeature: features.CSIMigrationAWS, + pvSpec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{}, + }}, + probVolumePlugins: awsebs.ProbeVolumePlugins(), + }, + { + name: "aws ebs plugin: csi migration enabled", + isCsiMigrationEnabled: true, + pluginName: plugins.AWSEBSInTreePluginName, + csiDriverName: plugins.AWSEBSDriverName, + csiMigrationFeature: features.CSIMigrationAWS, + pvSpec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{}, + }}, + probVolumePlugins: awsebs.ProbeVolumePlugins(), + }, + } + + for _, tc := range testcases { + expectedPluginName := tc.pluginName + if tc.isCsiMigrationEnabled { + expectedPluginName = fmt.Sprintf("%s:%s", csi.CSIPluginName, tc.csiDriverName) + } + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, tc.isCsiMigrationEnabled)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, tc.csiMigrationFeature, tc.isCsiMigrationEnabled)() + + volumePluginMgr, plugin, tmpDir := initTestPlugins(t, tc.probVolumePlugins, tc.pluginName) + defer os.RemoveAll(tmpDir) + + operationGenerator := getTestOperationGenerator(volumePluginMgr) + + pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID(string(uuid.NewUUID()))}} + volumeToUnmount := getTestVolumeToUnmount(pod, tc.pvSpec, tc.pluginName) + + if tc.isCsiMigrationEnabled { + // GenerateUnmapVolumeFunc call blockVolumePlugin.NewBlockVolumeUnmapper and when the plugin is csi, + // csi plugin looks a file that contains some information about the volume, + // and GenerateUnmapVolumeFuncfails if csi plugin can't find that file. + // So the reason for calling plugin.NewBlockVolumeMapper for csi enabled case is creating that file. + csiSpec, err := translateSpec(volumeToUnmount.VolumeSpec) + if err != nil { + t.Fatalf("Can't translate volume to CSI") + } + _, mapperError := (*plugin).(volume.BlockVolumePlugin).NewBlockVolumeMapper(csiSpec, pod, volume.VolumeOptions{}) + if mapperError != nil { + t.Fatalf("mapper error: %v\n", mapperError) + } + } + + unmapVolumeFunc, e := operationGenerator.GenerateUnmapVolumeFunc(volumeToUnmount, nil) + if e != nil { + t.Fatalf("Error occurred while generating unmapVolumeFunc: %v", e) + } + + metricFamilyName := "storage_operation_status_count" + labelFilter := map[string]string{ + "status": "success", + "operation_name": "unmap_volume", + "volume_plugin": expectedPluginName, + } + // compare the relative change of the metric because of the global state of the prometheus.DefaultGatherer.Gather() + storageOperationStatusCountMetricBefore := findMetricWithNameAndLabels(metricFamilyName, labelFilter) + + var ee error + unmapVolumeFunc.CompleteFunc(&ee) + + storageOperationStatusCountMetricAfter := findMetricWithNameAndLabels(metricFamilyName, labelFilter) + if storageOperationStatusCountMetricAfter == nil { + t.Fatalf("Couldn't find the metric with name(%s) and labels(%v)", metricFamilyName, labelFilter) + } + + if storageOperationStatusCountMetricBefore == nil { + assert.Equal(t, float64(1), *storageOperationStatusCountMetricAfter.Counter.Value, tc.name) + } else { + metricValueDiff := *storageOperationStatusCountMetricAfter.Counter.Value - *storageOperationStatusCountMetricBefore.Counter.Value + assert.Equal(t, float64(1), metricValueDiff, tc.name) + } + } +} + +func findMetricWithNameAndLabels(metricFamilyName string, labelFilter map[string]string) *io_prometheus_client.Metric { + metricFamily := getMetricFamily(metricFamilyName) + if metricFamily == nil { + return nil + } + + for _, metric := range metricFamily.GetMetric() { + if isLabelsMatchWithMetric(labelFilter, metric) { + return metric + } + } + + return nil +} + +func isLabelsMatchWithMetric(labelFilter map[string]string, metric *io_prometheus_client.Metric) bool { + if len(labelFilter) != len(metric.Label) { + return false + } + for labelName, labelValue := range labelFilter { + labelFound := false + for _, labelPair := range metric.Label { + if labelName == *labelPair.Name && labelValue == *labelPair.Value { + labelFound = true + break + } + } + if !labelFound { + return false + } + } + return true +} + +func getTestOperationGenerator(volumePluginMgr *volume.VolumePluginMgr) OperationGenerator { + fakeKubeClient := fakeclient.NewSimpleClientset() + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + operationGenerator := NewOperationGenerator( + fakeKubeClient, + volumePluginMgr, + fakeRecorder, + false, + fakeHandler) + return operationGenerator +} + +func getTestVolumeToUnmount(pod *v1.Pod, pvSpec v1.PersistentVolumeSpec, pluginName string) MountedVolume { + volumeSpec := &volume.Spec{ + PersistentVolume: &v1.PersistentVolume{ + Spec: pvSpec, + }, + } + volumeToUnmount := MountedVolume{ + VolumeName: v1.UniqueVolumeName("pd-volume"), + PodUID: pod.UID, + PluginName: pluginName, + VolumeSpec: volumeSpec, + } + return volumeToUnmount +} + +func getMetricFamily(metricFamilyName string) *io_prometheus_client.MetricFamily { + metricFamilies, _ := prometheus.DefaultGatherer.Gather() + for _, mf := range metricFamilies { + if *mf.Name == metricFamilyName { + return mf + } + } + return nil +} + +func initTestPlugins(t *testing.T, plugs []volume.VolumePlugin, pluginName string) (*volume.VolumePluginMgr, *volume.VolumePlugin, string) { + client := fakeclient.NewSimpleClientset() + pluginMgr, csiPlugin, tmpDir := csitesting.NewTestPlugin(t, client) + + err := pluginMgr.InitPlugins(plugs, nil, pluginMgr.Host) + if err != nil { + t.Fatalf("Can't init volume plugins: %v", err) + } + + _, e := pluginMgr.FindPluginByName(pluginName) + if e != nil { + t.Fatalf("Can't find the plugin by name: %s", pluginName) + } + + return pluginMgr, csiPlugin, tmpDir +}