diff --git a/hack/.golint_failures b/hack/.golint_failures index 93cc24fffe5..6ee5e0194ea 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -108,6 +108,8 @@ pkg/controller/volume/expand pkg/controller/volume/persistentvolume pkg/controller/volume/persistentvolume/config/v1alpha1 pkg/controller/volume/persistentvolume/options +pkg/controller/volume/persistentvolume/testing +pkg/controller/volume/scheduling pkg/credentialprovider pkg/credentialprovider/gcp pkg/features diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index f16eeee6238..09ecdb16032 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -146,6 +146,7 @@ filegroup( "//pkg/controller/volume/protectionutil:all-srcs", "//pkg/controller/volume/pvcprotection:all-srcs", "//pkg/controller/volume/pvprotection:all-srcs", + "//pkg/controller/volume/scheduling:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/controller/volume/persistentvolume/BUILD b/pkg/controller/volume/persistentvolume/BUILD index 09e6481cf33..828147da3ad 100644 --- a/pkg/controller/volume/persistentvolume/BUILD +++ b/pkg/controller/volume/persistentvolume/BUILD @@ -12,12 +12,6 @@ go_library( "index.go", "pv_controller.go", "pv_controller_base.go", - "scheduler_assume_cache.go", - "scheduler_bind_cache_metrics.go", - "scheduler_binder.go", - "scheduler_binder_cache.go", - "scheduler_binder_fake.go", - "util.go", "volume_host.go", ], importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume", @@ -26,6 +20,7 @@ go_library( "//pkg/controller:go_default_library", "//pkg/controller/volume/events:go_default_library", "//pkg/controller/volume/persistentvolume/metrics:go_default_library", + "//pkg/controller/volume/persistentvolume/util:go_default_library", "//pkg/features:go_default_library", "//pkg/util/goroutinemap:go_default_library", "//pkg/util/goroutinemap/exponentialbackoff:go_default_library", @@ -39,14 +34,12 @@ go_library( "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", @@ -62,7 +55,6 @@ go_library( "//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library", "//staging/src/k8s.io/csi-translation-lib:go_default_library", - "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -77,15 +69,14 @@ go_test( "provision_test.go", "pv_controller_test.go", "recycle_test.go", - "scheduler_assume_cache_test.go", - "scheduler_binder_cache_test.go", - "scheduler_binder_test.go", ], embed = [":go_default_library"], deps = [ "//pkg/api/testapi:go_default_library", "//pkg/apis/core:go_default_library", "//pkg/controller:go_default_library", + "//pkg/controller/volume/persistentvolume/testing:go_default_library", + "//pkg/controller/volume/persistentvolume/util:go_default_library", "//pkg/features:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/util:go_default_library", @@ -95,16 +86,12 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", - "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", @@ -132,6 +119,8 @@ filegroup( "//pkg/controller/volume/persistentvolume/config:all-srcs", "//pkg/controller/volume/persistentvolume/metrics:all-srcs", "//pkg/controller/volume/persistentvolume/options:all-srcs", + "//pkg/controller/volume/persistentvolume/testing:all-srcs", + "//pkg/controller/volume/persistentvolume/util:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/controller/volume/persistentvolume/index.go b/pkg/controller/volume/persistentvolume/index.go index 175b0e174e3..dff4cb24e7d 100644 --- a/pkg/controller/volume/persistentvolume/index.go +++ b/pkg/controller/volume/persistentvolume/index.go @@ -21,13 +21,9 @@ import ( "sort" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" - "k8s.io/kubernetes/pkg/features" + pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -96,7 +92,7 @@ func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *v1.PersistentVol return nil, err } - bestVol, err := findMatchingVolume(claim, volumes, nil /* node for topology binding*/, nil /* exclusion map */, delayBinding) + bestVol, err := pvutil.FindMatchingVolume(claim, volumes, nil /* node for topology binding*/, nil /* exclusion map */, delayBinding) if err != nil { return nil, err } @@ -108,176 +104,6 @@ func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *v1.PersistentVol return nil, nil } -// findMatchingVolume goes through the list of volumes to find the best matching volume -// for the claim. -// -// This function is used by both the PV controller and scheduler. -// -// delayBinding is true only in the PV controller path. When set, prebound PVs are still returned -// as a match for the claim, but unbound PVs are skipped. -// -// node is set only in the scheduler path. When set, the PV node affinity is checked against -// the node's labels. -// -// excludedVolumes is only used in the scheduler path, and is needed for evaluating multiple -// unbound PVCs for a single Pod at one time. As each PVC finds a matching PV, the chosen -// PV needs to be excluded from future matching. -func findMatchingVolume( - claim *v1.PersistentVolumeClaim, - volumes []*v1.PersistentVolume, - node *v1.Node, - excludedVolumes map[string]*v1.PersistentVolume, - delayBinding bool) (*v1.PersistentVolume, error) { - - var smallestVolume *v1.PersistentVolume - var smallestVolumeQty resource.Quantity - requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] - requestedClass := v1helper.GetPersistentVolumeClaimClass(claim) - - var selector labels.Selector - if claim.Spec.Selector != nil { - internalSelector, err := metav1.LabelSelectorAsSelector(claim.Spec.Selector) - if err != nil { - // should be unreachable code due to validation - return nil, fmt.Errorf("error creating internal label selector for claim: %v: %v", claimToClaimKey(claim), err) - } - selector = internalSelector - } - - // Go through all available volumes with two goals: - // - find a volume that is either pre-bound by user or dynamically - // provisioned for this claim. Because of this we need to loop through - // all volumes. - // - find the smallest matching one if there is no volume pre-bound to - // the claim. - for _, volume := range volumes { - if _, ok := excludedVolumes[volume.Name]; ok { - // Skip volumes in the excluded list - continue - } - - volumeQty := volume.Spec.Capacity[v1.ResourceStorage] - - // check if volumeModes do not match (feature gate protected) - isMismatch, err := checkVolumeModeMismatches(&claim.Spec, &volume.Spec) - if err != nil { - return nil, fmt.Errorf("error checking if volumeMode was a mismatch: %v", err) - } - // filter out mismatching volumeModes - if isMismatch { - continue - } - - // check if PV's DeletionTimeStamp is set, if so, skip this volume. - if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) { - if volume.ObjectMeta.DeletionTimestamp != nil { - continue - } - } - - nodeAffinityValid := true - if node != nil { - // Scheduler path, check that the PV NodeAffinity - // is satisfied by the node - err := volumeutil.CheckNodeAffinity(volume, node.Labels) - if err != nil { - nodeAffinityValid = false - } - } - - if IsVolumeBoundToClaim(volume, claim) { - // this claim and volume are pre-bound; return - // the volume if the size request is satisfied, - // otherwise continue searching for a match - if volumeQty.Cmp(requestedQty) < 0 { - continue - } - - // If PV node affinity is invalid, return no match. - // This means the prebound PV (and therefore PVC) - // is not suitable for this node. - if !nodeAffinityValid { - return nil, nil - } - - return volume, nil - } - - if node == nil && delayBinding { - // PV controller does not bind this claim. - // Scheduler will handle binding unbound volumes - // Scheduler path will have node != nil - continue - } - - // filter out: - // - volumes in non-available phase - // - volumes bound to another claim - // - volumes whose labels don't match the claim's selector, if specified - // - volumes in Class that is not requested - // - volumes whose NodeAffinity does not match the node - if volume.Status.Phase != v1.VolumeAvailable { - // We ignore volumes in non-available phase, because volumes that - // satisfies matching criteria will be updated to available, binding - // them now has high chance of encountering unnecessary failures - // due to API conflicts. - continue - } else if volume.Spec.ClaimRef != nil { - continue - } else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) { - continue - } - if v1helper.GetPersistentVolumeClass(volume) != requestedClass { - continue - } - if !nodeAffinityValid { - continue - } - - if node != nil { - // Scheduler path - // Check that the access modes match - if !checkAccessModes(claim, volume) { - continue - } - } - - if volumeQty.Cmp(requestedQty) >= 0 { - if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 { - smallestVolume = volume - smallestVolumeQty = volumeQty - } - } - } - - if smallestVolume != nil { - // Found a matching volume - return smallestVolume, nil - } - - return nil, nil -} - -// checkVolumeModeMismatches is a convenience method that checks volumeMode for PersistentVolume -// and PersistentVolumeClaims -func checkVolumeModeMismatches(pvcSpec *v1.PersistentVolumeClaimSpec, pvSpec *v1.PersistentVolumeSpec) (bool, error) { - if !utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) { - return false, nil - } - - // In HA upgrades, we cannot guarantee that the apiserver is on a version >= controller-manager. - // So we default a nil volumeMode to filesystem - requestedVolumeMode := v1.PersistentVolumeFilesystem - if pvcSpec.VolumeMode != nil { - requestedVolumeMode = *pvcSpec.VolumeMode - } - pvVolumeMode := v1.PersistentVolumeFilesystem - if pvSpec.VolumeMode != nil { - pvVolumeMode = *pvSpec.VolumeMode - } - return requestedVolumeMode != pvVolumeMode, nil -} - // findBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage func (pvIndex *persistentVolumeOrderedIndex) findBestMatchForClaim(claim *v1.PersistentVolumeClaim, delayBinding bool) (*v1.PersistentVolume, error) { return pvIndex.findByClaim(claim, delayBinding) @@ -362,19 +188,3 @@ func claimToClaimKey(claim *v1.PersistentVolumeClaim) string { func claimrefToClaimKey(claimref *v1.ObjectReference) string { return fmt.Sprintf("%s/%s", claimref.Namespace, claimref.Name) } - -// Returns true if PV satisfies all the PVC's requested AccessModes -func checkAccessModes(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) bool { - pvModesMap := map[v1.PersistentVolumeAccessMode]bool{} - for _, mode := range volume.Spec.AccessModes { - pvModesMap[mode] = true - } - - for _, mode := range claim.Spec.AccessModes { - _, ok := pvModesMap[mode] - if !ok { - return false - } - } - return true -} diff --git a/pkg/controller/volume/persistentvolume/index_test.go b/pkg/controller/volume/persistentvolume/index_test.go index 8955e850768..b322f7cd922 100644 --- a/pkg/controller/volume/persistentvolume/index_test.go +++ b/pkg/controller/volume/persistentvolume/index_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" ref "k8s.io/client-go/tools/reference" "k8s.io/kubernetes/pkg/api/testapi" + pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume/util" ) @@ -773,7 +774,7 @@ func createTestVolumes() []*v1.PersistentVolume { v1.ReadOnlyMany, }, StorageClassName: classWait, - NodeAffinity: getVolumeNodeAffinity("key1", "value1"), + NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value1"), VolumeMode: &fs, }, Status: v1.PersistentVolumeStatus{ @@ -797,7 +798,7 @@ func createTestVolumes() []*v1.PersistentVolume { v1.ReadOnlyMany, }, StorageClassName: classWait, - NodeAffinity: getVolumeNodeAffinity("key1", "value1"), + NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value1"), VolumeMode: &fs, }, Status: v1.PersistentVolumeStatus{ @@ -822,7 +823,7 @@ func createTestVolumes() []*v1.PersistentVolume { }, StorageClassName: classWait, ClaimRef: &v1.ObjectReference{Name: "claim02", Namespace: "myns"}, - NodeAffinity: getVolumeNodeAffinity("key1", "value1"), + NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value1"), VolumeMode: &fs, }, Status: v1.PersistentVolumeStatus{ @@ -846,7 +847,7 @@ func createTestVolumes() []*v1.PersistentVolume { v1.ReadOnlyMany, }, StorageClassName: classWait, - NodeAffinity: getVolumeNodeAffinity("key1", "value3"), + NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value3"), VolumeMode: &fs, }, Status: v1.PersistentVolumeStatus{ @@ -870,7 +871,7 @@ func createTestVolumes() []*v1.PersistentVolume { v1.ReadOnlyMany, }, StorageClassName: classWait, - NodeAffinity: getVolumeNodeAffinity("key1", "value4"), + NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"), VolumeMode: &fs, }, Status: v1.PersistentVolumeStatus{ @@ -894,7 +895,7 @@ func createTestVolumes() []*v1.PersistentVolume { v1.ReadOnlyMany, }, StorageClassName: classWait, - NodeAffinity: getVolumeNodeAffinity("key1", "value4"), + NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"), VolumeMode: &fs, }, Status: v1.PersistentVolumeStatus{ @@ -918,7 +919,7 @@ func createTestVolumes() []*v1.PersistentVolume { v1.ReadOnlyMany, }, StorageClassName: classWait, - NodeAffinity: getVolumeNodeAffinity("key1", "value4"), + NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"), VolumeMode: &fs, }, Status: v1.PersistentVolumeStatus{ @@ -942,7 +943,7 @@ func createTestVolumes() []*v1.PersistentVolume { v1.ReadOnlyMany, }, StorageClassName: classWait, - NodeAffinity: getVolumeNodeAffinity("key1", "value4"), + NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"), VolumeMode: &fs, }, }, @@ -968,24 +969,6 @@ func testVolume(name, size string) *v1.PersistentVolume { } } -func getVolumeNodeAffinity(key string, value string) *v1.VolumeNodeAffinity { - return &v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: key, - Operator: v1.NodeSelectorOpIn, - Values: []string{value}, - }, - }, - }, - }, - }, - } -} - func createVolumeModeBlockTestVolume() *v1.PersistentVolume { blockMode := v1.PersistentVolumeBlock @@ -1163,7 +1146,7 @@ func TestVolumeModeCheck(t *testing.T) { for name, scenario := range scenarios { t.Run(name, func(t *testing.T) { defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, scenario.enableBlock)() - expectedMismatch, err := checkVolumeModeMismatches(&scenario.pvc.Spec, &scenario.vol.Spec) + expectedMismatch, err := pvutil.CheckVolumeModeMismatches(&scenario.pvc.Spec, &scenario.vol.Spec) if err != nil { t.Errorf("Unexpected failure for checkVolumeModeMismatches: %v", err) } @@ -1608,7 +1591,7 @@ func TestFindMatchVolumeWithNode(t *testing.T) { } for name, scenario := range scenarios { - volume, err := findMatchingVolume(scenario.claim, volumes, scenario.node, scenario.excludedVolumes, true) + volume, err := pvutil.FindMatchingVolume(scenario.claim, volumes, scenario.node, scenario.excludedVolumes, true) if err != nil { t.Errorf("Unexpected error matching volume by claim: %v", err) } @@ -1662,7 +1645,7 @@ func TestCheckAccessModes(t *testing.T) { } for name, scenario := range scenarios { - result := checkAccessModes(scenario.claim, volume) + result := pvutil.CheckAccessModes(scenario.claim, volume) if result != scenario.shouldSucceed { t.Errorf("Test %q failed: Expected %v, got %v", name, scenario.shouldSucceed, result) } diff --git a/pkg/controller/volume/persistentvolume/provision_test.go b/pkg/controller/volume/persistentvolume/provision_test.go index 97cf29cfbff..8fbb8a2e681 100644 --- a/pkg/controller/volume/persistentvolume/provision_test.go +++ b/pkg/controller/volume/persistentvolume/provision_test.go @@ -213,7 +213,7 @@ func TestProvisionSync(t *testing.T) { // Inject error to the first // kubeclient.PersistentVolumes.Create() call. All other calls // will succeed. - {"create", "persistentvolumes", errors.New("Mock creation error")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error")}, }, wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim), }, @@ -229,11 +229,11 @@ func TestProvisionSync(t *testing.T) { []pvtesting.ReactorError{ // Inject error to five kubeclient.PersistentVolumes.Create() // calls - {"create", "persistentvolumes", errors.New("Mock creation error1")}, - {"create", "persistentvolumes", errors.New("Mock creation error2")}, - {"create", "persistentvolumes", errors.New("Mock creation error3")}, - {"create", "persistentvolumes", errors.New("Mock creation error4")}, - {"create", "persistentvolumes", errors.New("Mock creation error5")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")}, }, wrapTestWithPluginCalls( nil, // recycle calls @@ -254,11 +254,11 @@ func TestProvisionSync(t *testing.T) { []pvtesting.ReactorError{ // Inject error to five kubeclient.PersistentVolumes.Create() // calls - {"create", "persistentvolumes", errors.New("Mock creation error1")}, - {"create", "persistentvolumes", errors.New("Mock creation error2")}, - {"create", "persistentvolumes", errors.New("Mock creation error3")}, - {"create", "persistentvolumes", errors.New("Mock creation error4")}, - {"create", "persistentvolumes", errors.New("Mock creation error5")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")}, }, // No deleteCalls are configured, which results into no deleter plugin available for the volume wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim), @@ -275,11 +275,11 @@ func TestProvisionSync(t *testing.T) { []pvtesting.ReactorError{ // Inject error to five kubeclient.PersistentVolumes.Create() // calls - {"create", "persistentvolumes", errors.New("Mock creation error1")}, - {"create", "persistentvolumes", errors.New("Mock creation error2")}, - {"create", "persistentvolumes", errors.New("Mock creation error3")}, - {"create", "persistentvolumes", errors.New("Mock creation error4")}, - {"create", "persistentvolumes", errors.New("Mock creation error5")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")}, }, wrapTestWithPluginCalls( nil, // recycle calls @@ -305,11 +305,11 @@ func TestProvisionSync(t *testing.T) { []pvtesting.ReactorError{ // Inject error to five kubeclient.PersistentVolumes.Create() // calls - {"create", "persistentvolumes", errors.New("Mock creation error1")}, - {"create", "persistentvolumes", errors.New("Mock creation error2")}, - {"create", "persistentvolumes", errors.New("Mock creation error3")}, - {"create", "persistentvolumes", errors.New("Mock creation error4")}, - {"create", "persistentvolumes", errors.New("Mock creation error5")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")}, }, wrapTestWithPluginCalls( nil, // recycle calls @@ -399,8 +399,8 @@ func TestProvisionSync(t *testing.T) { []pvtesting.ReactorError{ // Inject errors to simulate crashed API server during // kubeclient.PersistentVolumes.Create() - {"create", "persistentvolumes", errors.New("Mock creation error1")}, - {"create", "persistentvolumes", apierrs.NewAlreadyExists(api.Resource("persistentvolumes"), "")}, + {Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")}, + {Verb: "create", Resource: "persistentvolumes", Error: apierrs.NewAlreadyExists(api.Resource("persistentvolumes"), "")}, }, wrapTestWithPluginCalls( nil, // recycle calls diff --git a/pkg/controller/volume/persistentvolume/pv_controller.go b/pkg/controller/volume/persistentvolume/pv_controller.go index e8f0e75f191..64265e67cad 100644 --- a/pkg/controller/volume/persistentvolume/pv_controller.go +++ b/pkg/controller/volume/persistentvolume/pv_controller.go @@ -43,6 +43,7 @@ import ( v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/controller/volume/events" "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics" + pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/util/goroutinemap" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" @@ -276,7 +277,7 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo return fmt.Errorf("storageClassName does not match") } - isMismatch, err := checkVolumeModeMismatches(&claim.Spec, &volume.Spec) + isMismatch, err := pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) if err != nil { return fmt.Errorf("error checking volumeMode: %v", err) } @@ -284,7 +285,7 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo return fmt.Errorf("incompatible volumeMode") } - if !checkAccessModes(claim, volume) { + if !pvutil.CheckAccessModes(claim, volume) { return fmt.Errorf("incompatible accessMode") } @@ -309,7 +310,7 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV } // If claim is in delay binding mode. - return IsDelayBindingMode(claim, ctrl.classLister) + return pvutil.IsDelayBindingMode(claim, ctrl.classLister) } // syncUnboundClaim is the main controller method to decide what to do with an @@ -407,7 +408,7 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol } // OBSERVATION: pvc is "Bound", pv is "Bound" return nil - } else if IsVolumeBoundToClaim(volume, claim) { + } else if pvutil.IsVolumeBoundToClaim(volume, claim) { // User asked for a PV that is claimed by this PVC // OBSERVATION: pvc is "Pending", pv is "Bound" klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim)) @@ -610,7 +611,7 @@ func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) } return nil } else if claim.Spec.VolumeName == "" { - if isMismatch, err := checkVolumeModeMismatches(&claim.Spec, &volume.Spec); err != nil || isMismatch { + if isMismatch, err := pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec); err != nil || isMismatch { // Binding for the volume won't be called in syncUnboundClaim, // because findBestMatchForClaim won't return the volume due to volumeMode mismatch. volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name) @@ -851,7 +852,7 @@ func (ctrl *PersistentVolumeController) updateVolumePhaseWithEvent(volume *v1.Pe func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) { klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q", volume.Name, claimToClaimKey(claim)) - volumeClone, dirty, err := GetBindVolumeToClaim(volume, claim) + volumeClone, dirty, err := pvutil.GetBindVolumeToClaim(volume, claim) if err != nil { return nil, err } diff --git a/pkg/controller/volume/persistentvolume/testing/BUILD b/pkg/controller/volume/persistentvolume/testing/BUILD new file mode 100644 index 00000000000..683e56eaf47 --- /dev/null +++ b/pkg/controller/volume/persistentvolume/testing/BUILD @@ -0,0 +1,36 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["testing.go"], + importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing", + visibility = ["//visibility:public"], + deps = [ + "//pkg/features:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/volume/persistentvolume/util.go b/pkg/controller/volume/persistentvolume/util.go deleted file mode 100644 index 47e27ab6478..00000000000 --- a/pkg/controller/volume/persistentvolume/util.go +++ /dev/null @@ -1,103 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package persistentvolume - -import ( - "fmt" - - "k8s.io/api/core/v1" - storage "k8s.io/api/storage/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/scheme" - storagelisters "k8s.io/client-go/listers/storage/v1" - "k8s.io/client-go/tools/reference" - v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" -) - -// IsDelayBindingMode checks if claim is in delay binding mode. -func IsDelayBindingMode(claim *v1.PersistentVolumeClaim, classLister storagelisters.StorageClassLister) (bool, error) { - className := v1helper.GetPersistentVolumeClaimClass(claim) - if className == "" { - return false, nil - } - - class, err := classLister.Get(className) - if err != nil { - return false, nil - } - - if class.VolumeBindingMode == nil { - return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className) - } - - return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil -} - -// GetBindVolumeToClaim returns a new volume which is bound to given claim. In -// addition, it returns a bool which indicates whether we made modification on -// original volume. -func GetBindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, bool, error) { - dirty := false - - // Check if the volume was already bound (either by user or by controller) - shouldSetBoundByController := false - if !IsVolumeBoundToClaim(volume, claim) { - shouldSetBoundByController = true - } - - // The volume from method args can be pointing to watcher cache. We must not - // modify these, therefore create a copy. - volumeClone := volume.DeepCopy() - - // Bind the volume to the claim if it is not bound yet - if volume.Spec.ClaimRef == nil || - volume.Spec.ClaimRef.Name != claim.Name || - volume.Spec.ClaimRef.Namespace != claim.Namespace || - volume.Spec.ClaimRef.UID != claim.UID { - - claimRef, err := reference.GetReference(scheme.Scheme, claim) - if err != nil { - return nil, false, fmt.Errorf("Unexpected error getting claim reference: %v", err) - } - volumeClone.Spec.ClaimRef = claimRef - dirty = true - } - - // Set annBoundByController if it is not set yet - if shouldSetBoundByController && !metav1.HasAnnotation(volumeClone.ObjectMeta, annBoundByController) { - metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, annBoundByController, "yes") - dirty = true - } - - return volumeClone, dirty, nil -} - -// IsVolumeBoundToClaim returns true, if given volume is pre-bound or bound -// to specific claim. Both claim.Name and claim.Namespace must be equal. -// If claim.UID is present in volume.Spec.ClaimRef, it must be equal too. -func IsVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) bool { - if volume.Spec.ClaimRef == nil { - return false - } - if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace { - return false - } - if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID { - return false - } - return true -} diff --git a/pkg/controller/volume/persistentvolume/util/BUILD b/pkg/controller/volume/persistentvolume/util/BUILD new file mode 100644 index 00000000000..cc104a345ba --- /dev/null +++ b/pkg/controller/volume/persistentvolume/util/BUILD @@ -0,0 +1,36 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["util.go"], + importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util", + visibility = ["//visibility:public"], + deps = [ + "//pkg/apis/core/v1/helper:go_default_library", + "//pkg/features:go_default_library", + "//pkg/volume/util:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", + "//staging/src/k8s.io/client-go/tools/reference:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/volume/persistentvolume/util/util.go b/pkg/controller/volume/persistentvolume/util/util.go new file mode 100644 index 00000000000..048e8c9f85d --- /dev/null +++ b/pkg/controller/volume/persistentvolume/util/util.go @@ -0,0 +1,342 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package persistentvolume + +import ( + "fmt" + + "k8s.io/api/core/v1" + storage "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/kubernetes/scheme" + storagelisters "k8s.io/client-go/listers/storage/v1" + "k8s.io/client-go/tools/reference" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/features" + volumeutil "k8s.io/kubernetes/pkg/volume/util" +) + +const ( + // AnnBindCompleted Annotation applies to PVCs. It indicates that the lifecycle + // of the PVC has passed through the initial setup. This information changes how + // we interpret some observations of the state of the objects. Value of this + // Annotation does not matter. + AnnBindCompleted = "pv.kubernetes.io/bind-completed" + + // AnnBoundByController annotation applies to PVs and PVCs. It indicates that + // the binding (PV->PVC or PVC->PV) was installed by the controller. The + // absence of this annotation means the binding was done by the user (i.e. + // pre-bound). Value of this annotation does not matter. + // External PV binders must bind PV the same way as PV controller, otherwise PV + // controller may not handle it correctly. + AnnBoundByController = "pv.kubernetes.io/bound-by-controller" + + // AnnSelectedNode annotation is added to a PVC that has been triggered by scheduler to + // be dynamically provisioned. Its value is the name of the selected node. + AnnSelectedNode = "volume.kubernetes.io/selected-node" + + // NotSupportedProvisioner is a special provisioner name which can be set + // in storage class to indicate dynamic provisioning is not supported by + // the storage. + NotSupportedProvisioner = "kubernetes.io/no-provisioner" +) + +// IsDelayBindingMode checks if claim is in delay binding mode. +func IsDelayBindingMode(claim *v1.PersistentVolumeClaim, classLister storagelisters.StorageClassLister) (bool, error) { + className := v1helper.GetPersistentVolumeClaimClass(claim) + if className == "" { + return false, nil + } + + class, err := classLister.Get(className) + if err != nil { + return false, nil + } + + if class.VolumeBindingMode == nil { + return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className) + } + + return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil +} + +// GetBindVolumeToClaim returns a new volume which is bound to given claim. In +// addition, it returns a bool which indicates whether we made modification on +// original volume. +func GetBindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, bool, error) { + dirty := false + + // Check if the volume was already bound (either by user or by controller) + shouldSetBoundByController := false + if !IsVolumeBoundToClaim(volume, claim) { + shouldSetBoundByController = true + } + + // The volume from method args can be pointing to watcher cache. We must not + // modify these, therefore create a copy. + volumeClone := volume.DeepCopy() + + // Bind the volume to the claim if it is not bound yet + if volume.Spec.ClaimRef == nil || + volume.Spec.ClaimRef.Name != claim.Name || + volume.Spec.ClaimRef.Namespace != claim.Namespace || + volume.Spec.ClaimRef.UID != claim.UID { + + claimRef, err := reference.GetReference(scheme.Scheme, claim) + if err != nil { + return nil, false, fmt.Errorf("Unexpected error getting claim reference: %v", err) + } + volumeClone.Spec.ClaimRef = claimRef + dirty = true + } + + // Set AnnBoundByController if it is not set yet + if shouldSetBoundByController && !metav1.HasAnnotation(volumeClone.ObjectMeta, AnnBoundByController) { + metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, AnnBoundByController, "yes") + dirty = true + } + + return volumeClone, dirty, nil +} + +// IsVolumeBoundToClaim returns true, if given volume is pre-bound or bound +// to specific claim. Both claim.Name and claim.Namespace must be equal. +// If claim.UID is present in volume.Spec.ClaimRef, it must be equal too. +func IsVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) bool { + if volume.Spec.ClaimRef == nil { + return false + } + if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace { + return false + } + if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID { + return false + } + return true +} + +// FindMatchingVolume goes through the list of volumes to find the best matching volume +// for the claim. +// +// This function is used by both the PV controller and scheduler. +// +// delayBinding is true only in the PV controller path. When set, prebound PVs are still returned +// as a match for the claim, but unbound PVs are skipped. +// +// node is set only in the scheduler path. When set, the PV node affinity is checked against +// the node's labels. +// +// excludedVolumes is only used in the scheduler path, and is needed for evaluating multiple +// unbound PVCs for a single Pod at one time. As each PVC finds a matching PV, the chosen +// PV needs to be excluded from future matching. +func FindMatchingVolume( + claim *v1.PersistentVolumeClaim, + volumes []*v1.PersistentVolume, + node *v1.Node, + excludedVolumes map[string]*v1.PersistentVolume, + delayBinding bool) (*v1.PersistentVolume, error) { + + var smallestVolume *v1.PersistentVolume + var smallestVolumeQty resource.Quantity + requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] + requestedClass := v1helper.GetPersistentVolumeClaimClass(claim) + + var selector labels.Selector + if claim.Spec.Selector != nil { + internalSelector, err := metav1.LabelSelectorAsSelector(claim.Spec.Selector) + if err != nil { + // should be unreachable code due to validation + return nil, fmt.Errorf("error creating internal label selector for claim: %v: %v", claimToClaimKey(claim), err) + } + selector = internalSelector + } + + // Go through all available volumes with two goals: + // - find a volume that is either pre-bound by user or dynamically + // provisioned for this claim. Because of this we need to loop through + // all volumes. + // - find the smallest matching one if there is no volume pre-bound to + // the claim. + for _, volume := range volumes { + if _, ok := excludedVolumes[volume.Name]; ok { + // Skip volumes in the excluded list + continue + } + + volumeQty := volume.Spec.Capacity[v1.ResourceStorage] + + // check if volumeModes do not match (feature gate protected) + isMismatch, err := CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) + if err != nil { + return nil, fmt.Errorf("error checking if volumeMode was a mismatch: %v", err) + } + // filter out mismatching volumeModes + if isMismatch { + continue + } + + // check if PV's DeletionTimeStamp is set, if so, skip this volume. + if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) { + if volume.ObjectMeta.DeletionTimestamp != nil { + continue + } + } + + nodeAffinityValid := true + if node != nil { + // Scheduler path, check that the PV NodeAffinity + // is satisfied by the node + err := volumeutil.CheckNodeAffinity(volume, node.Labels) + if err != nil { + nodeAffinityValid = false + } + } + + if IsVolumeBoundToClaim(volume, claim) { + // this claim and volume are pre-bound; return + // the volume if the size request is satisfied, + // otherwise continue searching for a match + if volumeQty.Cmp(requestedQty) < 0 { + continue + } + + // If PV node affinity is invalid, return no match. + // This means the prebound PV (and therefore PVC) + // is not suitable for this node. + if !nodeAffinityValid { + return nil, nil + } + + return volume, nil + } + + if node == nil && delayBinding { + // PV controller does not bind this claim. + // Scheduler will handle binding unbound volumes + // Scheduler path will have node != nil + continue + } + + // filter out: + // - volumes in non-available phase + // - volumes bound to another claim + // - volumes whose labels don't match the claim's selector, if specified + // - volumes in Class that is not requested + // - volumes whose NodeAffinity does not match the node + if volume.Status.Phase != v1.VolumeAvailable { + // We ignore volumes in non-available phase, because volumes that + // satisfies matching criteria will be updated to available, binding + // them now has high chance of encountering unnecessary failures + // due to API conflicts. + continue + } else if volume.Spec.ClaimRef != nil { + continue + } else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) { + continue + } + if v1helper.GetPersistentVolumeClass(volume) != requestedClass { + continue + } + if !nodeAffinityValid { + continue + } + + if node != nil { + // Scheduler path + // Check that the access modes match + if !CheckAccessModes(claim, volume) { + continue + } + } + + if volumeQty.Cmp(requestedQty) >= 0 { + if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 { + smallestVolume = volume + smallestVolumeQty = volumeQty + } + } + } + + if smallestVolume != nil { + // Found a matching volume + return smallestVolume, nil + } + + return nil, nil +} + +// CheckVolumeModeMismatches is a convenience method that checks volumeMode for PersistentVolume +// and PersistentVolumeClaims +func CheckVolumeModeMismatches(pvcSpec *v1.PersistentVolumeClaimSpec, pvSpec *v1.PersistentVolumeSpec) (bool, error) { + if !utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) { + return false, nil + } + + // In HA upgrades, we cannot guarantee that the apiserver is on a version >= controller-manager. + // So we default a nil volumeMode to filesystem + requestedVolumeMode := v1.PersistentVolumeFilesystem + if pvcSpec.VolumeMode != nil { + requestedVolumeMode = *pvcSpec.VolumeMode + } + pvVolumeMode := v1.PersistentVolumeFilesystem + if pvSpec.VolumeMode != nil { + pvVolumeMode = *pvSpec.VolumeMode + } + return requestedVolumeMode != pvVolumeMode, nil +} + +// CheckAccessModes returns true if PV satisfies all the PVC's requested AccessModes +func CheckAccessModes(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) bool { + pvModesMap := map[v1.PersistentVolumeAccessMode]bool{} + for _, mode := range volume.Spec.AccessModes { + pvModesMap[mode] = true + } + + for _, mode := range claim.Spec.AccessModes { + _, ok := pvModesMap[mode] + if !ok { + return false + } + } + return true +} + +func claimToClaimKey(claim *v1.PersistentVolumeClaim) string { + return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name) +} + +// GetVolumeNodeAffinity returns a VolumeNodeAffinity for given key and value. +func GetVolumeNodeAffinity(key string, value string) *v1.VolumeNodeAffinity { + return &v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: key, + Operator: v1.NodeSelectorOpIn, + Values: []string{value}, + }, + }, + }, + }, + }, + } +} diff --git a/pkg/controller/volume/scheduling/BUILD b/pkg/controller/volume/scheduling/BUILD new file mode 100644 index 00000000000..357f0fa2501 --- /dev/null +++ b/pkg/controller/volume/scheduling/BUILD @@ -0,0 +1,76 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "scheduler_assume_cache.go", + "scheduler_bind_cache_metrics.go", + "scheduler_binder.go", + "scheduler_binder_cache.go", + "scheduler_binder_fake.go", + ], + importpath = "k8s.io/kubernetes/pkg/controller/volume/scheduling", + visibility = ["//visibility:public"], + deps = [ + "//pkg/apis/core/v1/helper:go_default_library", + "//pkg/controller/volume/persistentvolume/util:go_default_library", + "//pkg/volume/util:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", + "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "scheduler_assume_cache_test.go", + "scheduler_binder_cache_test.go", + "scheduler_binder_test.go", + ], + embed = [":go_default_library"], + deps = [ + "//pkg/api/testapi:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/volume/persistentvolume/testing:go_default_library", + "//pkg/controller/volume/persistentvolume/util:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/storage/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go b/pkg/controller/volume/scheduling/scheduler_assume_cache.go similarity index 99% rename from pkg/controller/volume/persistentvolume/scheduler_assume_cache.go rename to pkg/controller/volume/scheduling/scheduler_assume_cache.go index cd4bc88d7ca..442432eb6f8 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go +++ b/pkg/controller/volume/scheduling/scheduler_assume_cache.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package persistentvolume +package scheduling import ( "fmt" diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go b/pkg/controller/volume/scheduling/scheduler_assume_cache_test.go similarity index 98% rename from pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go rename to pkg/controller/volume/scheduling/scheduler_assume_cache_test.go index 40f0ff88738..e3b7f7810de 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go +++ b/pkg/controller/volume/scheduling/scheduler_assume_cache_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package persistentvolume +package scheduling import ( "fmt" @@ -22,6 +22,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" ) func makePV(name, version, storageClass string) *v1.PersistentVolume { @@ -456,7 +457,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) { // Assume PVC newPVC := pvc.DeepCopy() - newPVC.Annotations[annSelectedNode] = "test-node" + newPVC.Annotations[pvutil.AnnSelectedNode] = "test-node" if err := cache.Assume(newPVC); err != nil { t.Fatalf("failed to assume PVC: %v", err) } diff --git a/pkg/controller/volume/persistentvolume/scheduler_bind_cache_metrics.go b/pkg/controller/volume/scheduling/scheduler_bind_cache_metrics.go similarity index 98% rename from pkg/controller/volume/persistentvolume/scheduler_bind_cache_metrics.go rename to pkg/controller/volume/scheduling/scheduler_bind_cache_metrics.go index 9a56c2422b7..79b2611e26f 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_bind_cache_metrics.go +++ b/pkg/controller/volume/scheduling/scheduler_bind_cache_metrics.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package persistentvolume +package scheduling import ( "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/scheduling/scheduler_binder.go similarity index 97% rename from pkg/controller/volume/persistentvolume/scheduler_binder.go rename to pkg/controller/volume/scheduling/scheduler_binder.go index 4ee2b93320a..2efd668c672 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/scheduling/scheduler_binder.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package persistentvolume +package scheduling import ( "fmt" @@ -32,6 +32,7 @@ import ( storagelisters "k8s.io/client-go/listers/storage/v1" "k8s.io/klog" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" volumeutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -211,7 +212,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume // Filter out claims to provision for _, claim := range claimsToBind { - if selectedNode, ok := claim.Annotations[annSelectedNode]; ok { + if selectedNode, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok { if selectedNode != node.Name { // Fast path, skip unmatched node return false, boundVolumesSatisfied, nil @@ -274,7 +275,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al // Assume PV newBindings := []*bindingInfo{} for _, binding := range claimsToBind { - newPV, dirty, err := GetBindVolumeToClaim(binding.pv, binding.pvc) + newPV, dirty, err := pvutil.GetBindVolumeToClaim(binding.pv, binding.pvc) klog.V(5).Infof("AssumePodVolumes: GetBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v", podName, binding.pv.Name, @@ -303,7 +304,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al // The claims from method args can be pointing to watcher cache. We must not // modify these, therefore create a copy. claimClone := claim.DeepCopy() - metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annSelectedNode, nodeName) + metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnSelectedNode, nodeName) err = b.pvcCache.Assume(claimClone) if err != nil { b.revertAssumedPVs(newBindings) @@ -511,7 +512,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim if pvc.Annotations == nil { return false, fmt.Errorf("selectedNode annotation reset for PVC %q", pvc.Name) } - selectedNode := pvc.Annotations[annSelectedNode] + selectedNode := pvc.Annotations[pvutil.AnnSelectedNode] if selectedNode != pod.Spec.NodeName { return false, fmt.Errorf("selectedNode annotation value %q not set to scheduled node %q", selectedNode, pod.Spec.NodeName) } @@ -582,7 +583,7 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste } func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool { - return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted) + return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, pvutil.AnnBindCompleted) } // arePodVolumesBound returns true if all volumes are fully bound @@ -614,7 +615,7 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV if volumeBound { boundClaims = append(boundClaims, pvc) } else { - delayBindingMode, err := IsDelayBindingMode(pvc, b.classLister) + delayBindingMode, err := pvutil.IsDelayBindingMode(pvc, b.classLister) if err != nil { return nil, nil, nil, err } @@ -674,7 +675,7 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi pvcName := getPVCName(pvc) // Find a matching PV - pv, err := findMatchingVolume(pvc, allPVs, node, chosenPVs, true) + pv, err := pvutil.FindMatchingVolume(pvc, allPVs, node, chosenPVs, true) if err != nil { return false, nil, nil, err } @@ -717,7 +718,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v return false, nil, fmt.Errorf("failed to find storage class %q", className) } provisioner := class.Provisioner - if provisioner == "" || provisioner == notSupportedProvisioner { + if provisioner == "" || provisioner == pvutil.NotSupportedProvisioner { klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName) return false, nil, nil } @@ -775,3 +776,7 @@ func (a byPVCSize) Less(i, j int) bool { // return true if iSize is less than jSize return iSize.Cmp(jSize) == -1 } + +func claimToClaimKey(claim *v1.PersistentVolumeClaim) string { + return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name) +} diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go b/pkg/controller/volume/scheduling/scheduler_binder_cache.go similarity index 99% rename from pkg/controller/volume/persistentvolume/scheduler_binder_cache.go rename to pkg/controller/volume/scheduling/scheduler_binder_cache.go index 96b573afa24..91cfeda2dde 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_cache.go +++ b/pkg/controller/volume/scheduling/scheduler_binder_cache.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package persistentvolume +package scheduling import ( "sync" diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_cache_test.go b/pkg/controller/volume/scheduling/scheduler_binder_cache_test.go similarity index 99% rename from pkg/controller/volume/persistentvolume/scheduler_binder_cache_test.go rename to pkg/controller/volume/scheduling/scheduler_binder_cache_test.go index f8499e752da..54eb05678d4 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_cache_test.go +++ b/pkg/controller/volume/scheduling/scheduler_binder_cache_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package persistentvolume +package scheduling import ( "reflect" diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_fake.go b/pkg/controller/volume/scheduling/scheduler_binder_fake.go similarity index 98% rename from pkg/controller/volume/persistentvolume/scheduler_binder_fake.go rename to pkg/controller/volume/scheduling/scheduler_binder_fake.go index 46e8f200e28..5376cec2ca5 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_fake.go +++ b/pkg/controller/volume/scheduling/scheduler_binder_fake.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package persistentvolume +package scheduling import "k8s.io/api/core/v1" diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go b/pkg/controller/volume/scheduling/scheduler_binder_test.go similarity index 97% rename from pkg/controller/volume/persistentvolume/scheduler_binder_test.go rename to pkg/controller/volume/scheduling/scheduler_binder_test.go index b481b7ee13a..54433369da4 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder_test.go +++ b/pkg/controller/volume/scheduling/scheduler_binder_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package persistentvolume +package scheduling import ( "context" @@ -39,6 +39,8 @@ import ( "k8s.io/klog" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/controller" + pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" + pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util" ) var ( @@ -97,7 +99,7 @@ var ( type testEnv struct { client clientset.Interface - reactor *volumeReactor + reactor *pvtesting.VolumeReactor binder SchedulerVolumeBinder internalBinder *volumeBinder internalNodeInformer coreinformers.NodeInformer @@ -107,7 +109,7 @@ type testEnv struct { func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv { client := &fake.Clientset{} - reactor := newVolumeReactor(client, nil, nil, nil, nil) + reactor := pvtesting.NewVolumeReactor(client, nil, nil, nil) // TODO refactor all tests to use real watch mechanism, see #72327 client.AddWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) { gvr := action.GetResource() @@ -238,11 +240,11 @@ func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs [ for _, pvc := range cachedPVCs { internalPVCCache.add(pvc) if apiPVCs == nil { - env.reactor.claims[pvc.Name] = pvc + env.reactor.AddClaim(pvc) } } for _, pvc := range apiPVCs { - env.reactor.claims[pvc.Name] = pvc + env.reactor.AddClaim(pvc) } } @@ -251,11 +253,11 @@ func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.P for _, pv := range cachedPVs { internalPVCache.add(pv) if apiPVs == nil { - env.reactor.volumes[pv.Name] = pv + env.reactor.AddVolume(pv) } } for _, pv := range apiPVs { - env.reactor.volumes[pv.Name] = pv + env.reactor.AddVolume(pv) } } @@ -421,8 +423,8 @@ func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindi t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err) continue } - if pvc.Annotations[annSelectedNode] != nodeLabelValue { - t.Errorf("Test %q failed: expected annSelectedNode of pvc %q to be %q, but got %q", name, pvcKey, nodeLabelValue, pvc.Annotations[annSelectedNode]) + if pvc.Annotations[pvutil.AnnSelectedNode] != nodeLabelValue { + t.Errorf("Test %q failed: expected pvutil.AnnSelectedNode of pvc %q to be %q, but got %q", name, pvcKey, nodeLabelValue, pvc.Annotations[pvutil.AnnSelectedNode]) } } } @@ -447,8 +449,8 @@ func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod, t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err) continue } - if pvc.Annotations[annSelectedNode] != "" { - t.Errorf("Test %q failed: expected annSelectedNode of pvc %q empty, but got %q", name, pvcKey, pvc.Annotations[annSelectedNode]) + if pvc.Annotations[pvutil.AnnSelectedNode] != "" { + t.Errorf("Test %q failed: expected pvutil.AnnSelectedNode of pvc %q empty, but got %q", name, pvcKey, pvc.Annotations[pvutil.AnnSelectedNode]) } } } @@ -476,7 +478,7 @@ func (env *testEnv) validateBind( } // Check reactor for API updates - if err := env.reactor.checkVolumes(expectedAPIPVs); err != nil { + if err := env.reactor.CheckVolumes(expectedAPIPVs); err != nil { t.Errorf("Test %q failed: API reactor validation failed: %v", name, err) } } @@ -504,7 +506,7 @@ func (env *testEnv) validateProvision( } // Check reactor for API updates - if err := env.reactor.checkClaims(expectedAPIPVCs); err != nil { + if err := env.reactor.CheckClaims(expectedAPIPVCs); err != nil { t.Errorf("Test %q failed: API reactor validation failed: %v", name, err) } } @@ -543,10 +545,10 @@ func makeTestPVC(name, size, node string, pvcBoundState int, pvName, resourceVer switch pvcBoundState { case pvcSelectedNode: - metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annSelectedNode, node) + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnSelectedNode, node) // don't fallthrough case pvcBound: - metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annBindCompleted, "yes") + metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes") fallthrough case pvcPrebound: pvc.Spec.VolumeName = pvName @@ -595,7 +597,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV }, } if node != "" { - pv.Spec.NodeAffinity = getVolumeNodeAffinity(nodeLabelKey, node) + pv.Spec.NodeAffinity = pvutil.GetVolumeNodeAffinity(nodeLabelKey, node) } if boundToPVC != nil { @@ -607,7 +609,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV Namespace: boundToPVC.Namespace, UID: boundToPVC.UID, } - metav1.SetMetaDataAnnotation(&pv.ObjectMeta, annBoundByController, "yes") + metav1.SetMetaDataAnnotation(&pv.ObjectMeta, pvutil.AnnBoundByController, "yes") } return pv @@ -615,7 +617,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV func pvcSetSelectedNode(pvc *v1.PersistentVolumeClaim, node string) *v1.PersistentVolumeClaim { newPVC := pvc.DeepCopy() - metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annSelectedNode, node) + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnSelectedNode, node) return newPVC } @@ -691,7 +693,7 @@ func makeBinding(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) *bindin func addProvisionAnn(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim { res := pvc.DeepCopy() // Add provision related annotations - metav1.SetMetaDataAnnotation(&res.ObjectMeta, annSelectedNode, nodeLabelValue) + metav1.SetMetaDataAnnotation(&res.ObjectMeta, pvutil.AnnSelectedNode, nodeLabelValue) return res } @@ -1488,7 +1490,7 @@ func TestBindPodVolumes(t *testing.T) { // Update PVC to be fully bound to PV newPVC := pvc.DeepCopy() newPVC.Spec.VolumeName = pv.Name - metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnBindCompleted, "yes") if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil { t.Errorf("failed to update PVC %q: %v", newPVC.Name, err) } @@ -1512,7 +1514,7 @@ func TestBindPodVolumes(t *testing.T) { return } newPVC.Spec.VolumeName = dynamicPV.Name - metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnBindCompleted, "yes") if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil { t.Errorf("failed to update PVC %q: %v", newPVC.Name, err) } @@ -1588,7 +1590,7 @@ func TestBindPodVolumes(t *testing.T) { // Update PVC to be fully bound to a PV with a different node newPVC := pvcs[0].DeepCopy() newPVC.Spec.VolumeName = pvNode2.Name - metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes") + metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnBindCompleted, "yes") if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil { t.Errorf("failed to update PVC %q: %v", newPVC.Name, err) }