Refactor PV scheduling library into separate package

This commit is contained in:
Yecheng Fu 2019-03-18 13:54:04 +08:00
parent 0b6c028c8a
commit 842fed658c
20 changed files with 588 additions and 407 deletions

View File

@ -108,6 +108,8 @@ pkg/controller/volume/expand
pkg/controller/volume/persistentvolume
pkg/controller/volume/persistentvolume/config/v1alpha1
pkg/controller/volume/persistentvolume/options
pkg/controller/volume/persistentvolume/testing
pkg/controller/volume/scheduling
pkg/credentialprovider
pkg/credentialprovider/gcp
pkg/features

View File

@ -146,6 +146,7 @@ filegroup(
"//pkg/controller/volume/protectionutil:all-srcs",
"//pkg/controller/volume/pvcprotection:all-srcs",
"//pkg/controller/volume/pvprotection:all-srcs",
"//pkg/controller/volume/scheduling:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -12,12 +12,6 @@ go_library(
"index.go",
"pv_controller.go",
"pv_controller_base.go",
"scheduler_assume_cache.go",
"scheduler_bind_cache_metrics.go",
"scheduler_binder.go",
"scheduler_binder_cache.go",
"scheduler_binder_fake.go",
"util.go",
"volume_host.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume",
@ -26,6 +20,7 @@ go_library(
"//pkg/controller:go_default_library",
"//pkg/controller/volume/events:go_default_library",
"//pkg/controller/volume/persistentvolume/metrics:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/features:go_default_library",
"//pkg/util/goroutinemap:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
@ -39,14 +34,12 @@ go_library(
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
@ -62,7 +55,6 @@ go_library(
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@ -77,15 +69,14 @@ go_test(
"provision_test.go",
"pv_controller_test.go",
"recycle_test.go",
"scheduler_assume_cache_test.go",
"scheduler_binder_cache_test.go",
"scheduler_binder_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/api/testapi:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/persistentvolume/testing:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
@ -95,16 +86,12 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
@ -132,6 +119,8 @@ filegroup(
"//pkg/controller/volume/persistentvolume/config:all-srcs",
"//pkg/controller/volume/persistentvolume/metrics:all-srcs",
"//pkg/controller/volume/persistentvolume/options:all-srcs",
"//pkg/controller/volume/persistentvolume/testing:all-srcs",
"//pkg/controller/volume/persistentvolume/util:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -21,13 +21,9 @@ import (
"sort"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
@ -96,7 +92,7 @@ func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *v1.PersistentVol
return nil, err
}
bestVol, err := findMatchingVolume(claim, volumes, nil /* node for topology binding*/, nil /* exclusion map */, delayBinding)
bestVol, err := pvutil.FindMatchingVolume(claim, volumes, nil /* node for topology binding*/, nil /* exclusion map */, delayBinding)
if err != nil {
return nil, err
}
@ -108,176 +104,6 @@ func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *v1.PersistentVol
return nil, nil
}
// findMatchingVolume goes through the list of volumes to find the best matching volume
// for the claim.
//
// This function is used by both the PV controller and scheduler.
//
// delayBinding is true only in the PV controller path. When set, prebound PVs are still returned
// as a match for the claim, but unbound PVs are skipped.
//
// node is set only in the scheduler path. When set, the PV node affinity is checked against
// the node's labels.
//
// excludedVolumes is only used in the scheduler path, and is needed for evaluating multiple
// unbound PVCs for a single Pod at one time. As each PVC finds a matching PV, the chosen
// PV needs to be excluded from future matching.
func findMatchingVolume(
claim *v1.PersistentVolumeClaim,
volumes []*v1.PersistentVolume,
node *v1.Node,
excludedVolumes map[string]*v1.PersistentVolume,
delayBinding bool) (*v1.PersistentVolume, error) {
var smallestVolume *v1.PersistentVolume
var smallestVolumeQty resource.Quantity
requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestedClass := v1helper.GetPersistentVolumeClaimClass(claim)
var selector labels.Selector
if claim.Spec.Selector != nil {
internalSelector, err := metav1.LabelSelectorAsSelector(claim.Spec.Selector)
if err != nil {
// should be unreachable code due to validation
return nil, fmt.Errorf("error creating internal label selector for claim: %v: %v", claimToClaimKey(claim), err)
}
selector = internalSelector
}
// Go through all available volumes with two goals:
// - find a volume that is either pre-bound by user or dynamically
// provisioned for this claim. Because of this we need to loop through
// all volumes.
// - find the smallest matching one if there is no volume pre-bound to
// the claim.
for _, volume := range volumes {
if _, ok := excludedVolumes[volume.Name]; ok {
// Skip volumes in the excluded list
continue
}
volumeQty := volume.Spec.Capacity[v1.ResourceStorage]
// check if volumeModes do not match (feature gate protected)
isMismatch, err := checkVolumeModeMismatches(&claim.Spec, &volume.Spec)
if err != nil {
return nil, fmt.Errorf("error checking if volumeMode was a mismatch: %v", err)
}
// filter out mismatching volumeModes
if isMismatch {
continue
}
// check if PV's DeletionTimeStamp is set, if so, skip this volume.
if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) {
if volume.ObjectMeta.DeletionTimestamp != nil {
continue
}
}
nodeAffinityValid := true
if node != nil {
// Scheduler path, check that the PV NodeAffinity
// is satisfied by the node
err := volumeutil.CheckNodeAffinity(volume, node.Labels)
if err != nil {
nodeAffinityValid = false
}
}
if IsVolumeBoundToClaim(volume, claim) {
// this claim and volume are pre-bound; return
// the volume if the size request is satisfied,
// otherwise continue searching for a match
if volumeQty.Cmp(requestedQty) < 0 {
continue
}
// If PV node affinity is invalid, return no match.
// This means the prebound PV (and therefore PVC)
// is not suitable for this node.
if !nodeAffinityValid {
return nil, nil
}
return volume, nil
}
if node == nil && delayBinding {
// PV controller does not bind this claim.
// Scheduler will handle binding unbound volumes
// Scheduler path will have node != nil
continue
}
// filter out:
// - volumes in non-available phase
// - volumes bound to another claim
// - volumes whose labels don't match the claim's selector, if specified
// - volumes in Class that is not requested
// - volumes whose NodeAffinity does not match the node
if volume.Status.Phase != v1.VolumeAvailable {
// We ignore volumes in non-available phase, because volumes that
// satisfies matching criteria will be updated to available, binding
// them now has high chance of encountering unnecessary failures
// due to API conflicts.
continue
} else if volume.Spec.ClaimRef != nil {
continue
} else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) {
continue
}
if v1helper.GetPersistentVolumeClass(volume) != requestedClass {
continue
}
if !nodeAffinityValid {
continue
}
if node != nil {
// Scheduler path
// Check that the access modes match
if !checkAccessModes(claim, volume) {
continue
}
}
if volumeQty.Cmp(requestedQty) >= 0 {
if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {
smallestVolume = volume
smallestVolumeQty = volumeQty
}
}
}
if smallestVolume != nil {
// Found a matching volume
return smallestVolume, nil
}
return nil, nil
}
// checkVolumeModeMismatches is a convenience method that checks volumeMode for PersistentVolume
// and PersistentVolumeClaims
func checkVolumeModeMismatches(pvcSpec *v1.PersistentVolumeClaimSpec, pvSpec *v1.PersistentVolumeSpec) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
return false, nil
}
// In HA upgrades, we cannot guarantee that the apiserver is on a version >= controller-manager.
// So we default a nil volumeMode to filesystem
requestedVolumeMode := v1.PersistentVolumeFilesystem
if pvcSpec.VolumeMode != nil {
requestedVolumeMode = *pvcSpec.VolumeMode
}
pvVolumeMode := v1.PersistentVolumeFilesystem
if pvSpec.VolumeMode != nil {
pvVolumeMode = *pvSpec.VolumeMode
}
return requestedVolumeMode != pvVolumeMode, nil
}
// findBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage
func (pvIndex *persistentVolumeOrderedIndex) findBestMatchForClaim(claim *v1.PersistentVolumeClaim, delayBinding bool) (*v1.PersistentVolume, error) {
return pvIndex.findByClaim(claim, delayBinding)
@ -362,19 +188,3 @@ func claimToClaimKey(claim *v1.PersistentVolumeClaim) string {
func claimrefToClaimKey(claimref *v1.ObjectReference) string {
return fmt.Sprintf("%s/%s", claimref.Namespace, claimref.Name)
}
// Returns true if PV satisfies all the PVC's requested AccessModes
func checkAccessModes(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) bool {
pvModesMap := map[v1.PersistentVolumeAccessMode]bool{}
for _, mode := range volume.Spec.AccessModes {
pvModesMap[mode] = true
}
for _, mode := range claim.Spec.AccessModes {
_, ok := pvModesMap[mode]
if !ok {
return false
}
}
return true
}

View File

@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
"k8s.io/kubernetes/pkg/api/testapi"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume/util"
)
@ -773,7 +774,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value1"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value1"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@ -797,7 +798,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value1"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value1"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@ -822,7 +823,7 @@ func createTestVolumes() []*v1.PersistentVolume {
},
StorageClassName: classWait,
ClaimRef: &v1.ObjectReference{Name: "claim02", Namespace: "myns"},
NodeAffinity: getVolumeNodeAffinity("key1", "value1"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value1"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@ -846,7 +847,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value3"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value3"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@ -870,7 +871,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value4"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@ -894,7 +895,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value4"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@ -918,7 +919,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value4"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@ -942,7 +943,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value4"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"),
VolumeMode: &fs,
},
},
@ -968,24 +969,6 @@ func testVolume(name, size string) *v1.PersistentVolume {
}
}
func getVolumeNodeAffinity(key string, value string) *v1.VolumeNodeAffinity {
return &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: []string{value},
},
},
},
},
},
}
}
func createVolumeModeBlockTestVolume() *v1.PersistentVolume {
blockMode := v1.PersistentVolumeBlock
@ -1163,7 +1146,7 @@ func TestVolumeModeCheck(t *testing.T) {
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, scenario.enableBlock)()
expectedMismatch, err := checkVolumeModeMismatches(&scenario.pvc.Spec, &scenario.vol.Spec)
expectedMismatch, err := pvutil.CheckVolumeModeMismatches(&scenario.pvc.Spec, &scenario.vol.Spec)
if err != nil {
t.Errorf("Unexpected failure for checkVolumeModeMismatches: %v", err)
}
@ -1608,7 +1591,7 @@ func TestFindMatchVolumeWithNode(t *testing.T) {
}
for name, scenario := range scenarios {
volume, err := findMatchingVolume(scenario.claim, volumes, scenario.node, scenario.excludedVolumes, true)
volume, err := pvutil.FindMatchingVolume(scenario.claim, volumes, scenario.node, scenario.excludedVolumes, true)
if err != nil {
t.Errorf("Unexpected error matching volume by claim: %v", err)
}
@ -1662,7 +1645,7 @@ func TestCheckAccessModes(t *testing.T) {
}
for name, scenario := range scenarios {
result := checkAccessModes(scenario.claim, volume)
result := pvutil.CheckAccessModes(scenario.claim, volume)
if result != scenario.shouldSucceed {
t.Errorf("Test %q failed: Expected %v, got %v", name, scenario.shouldSucceed, result)
}

View File

@ -213,7 +213,7 @@ func TestProvisionSync(t *testing.T) {
// Inject error to the first
// kubeclient.PersistentVolumes.Create() call. All other calls
// will succeed.
{"create", "persistentvolumes", errors.New("Mock creation error")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error")},
},
wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim),
},
@ -229,11 +229,11 @@ func TestProvisionSync(t *testing.T) {
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
{"create", "persistentvolumes", errors.New("Mock creation error2")},
{"create", "persistentvolumes", errors.New("Mock creation error3")},
{"create", "persistentvolumes", errors.New("Mock creation error4")},
{"create", "persistentvolumes", errors.New("Mock creation error5")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")},
},
wrapTestWithPluginCalls(
nil, // recycle calls
@ -254,11 +254,11 @@ func TestProvisionSync(t *testing.T) {
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
{"create", "persistentvolumes", errors.New("Mock creation error2")},
{"create", "persistentvolumes", errors.New("Mock creation error3")},
{"create", "persistentvolumes", errors.New("Mock creation error4")},
{"create", "persistentvolumes", errors.New("Mock creation error5")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")},
},
// No deleteCalls are configured, which results into no deleter plugin available for the volume
wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim),
@ -275,11 +275,11 @@ func TestProvisionSync(t *testing.T) {
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
{"create", "persistentvolumes", errors.New("Mock creation error2")},
{"create", "persistentvolumes", errors.New("Mock creation error3")},
{"create", "persistentvolumes", errors.New("Mock creation error4")},
{"create", "persistentvolumes", errors.New("Mock creation error5")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")},
},
wrapTestWithPluginCalls(
nil, // recycle calls
@ -305,11 +305,11 @@ func TestProvisionSync(t *testing.T) {
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
{"create", "persistentvolumes", errors.New("Mock creation error2")},
{"create", "persistentvolumes", errors.New("Mock creation error3")},
{"create", "persistentvolumes", errors.New("Mock creation error4")},
{"create", "persistentvolumes", errors.New("Mock creation error5")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")},
},
wrapTestWithPluginCalls(
nil, // recycle calls
@ -399,8 +399,8 @@ func TestProvisionSync(t *testing.T) {
[]pvtesting.ReactorError{
// Inject errors to simulate crashed API server during
// kubeclient.PersistentVolumes.Create()
{"create", "persistentvolumes", errors.New("Mock creation error1")},
{"create", "persistentvolumes", apierrs.NewAlreadyExists(api.Resource("persistentvolumes"), "")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")},
{Verb: "create", Resource: "persistentvolumes", Error: apierrs.NewAlreadyExists(api.Resource("persistentvolumes"), "")},
},
wrapTestWithPluginCalls(
nil, // recycle calls

View File

@ -43,6 +43,7 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
@ -276,7 +277,7 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo
return fmt.Errorf("storageClassName does not match")
}
isMismatch, err := checkVolumeModeMismatches(&claim.Spec, &volume.Spec)
isMismatch, err := pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec)
if err != nil {
return fmt.Errorf("error checking volumeMode: %v", err)
}
@ -284,7 +285,7 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo
return fmt.Errorf("incompatible volumeMode")
}
if !checkAccessModes(claim, volume) {
if !pvutil.CheckAccessModes(claim, volume) {
return fmt.Errorf("incompatible accessMode")
}
@ -309,7 +310,7 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV
}
// If claim is in delay binding mode.
return IsDelayBindingMode(claim, ctrl.classLister)
return pvutil.IsDelayBindingMode(claim, ctrl.classLister)
}
// syncUnboundClaim is the main controller method to decide what to do with an
@ -407,7 +408,7 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol
}
// OBSERVATION: pvc is "Bound", pv is "Bound"
return nil
} else if IsVolumeBoundToClaim(volume, claim) {
} else if pvutil.IsVolumeBoundToClaim(volume, claim) {
// User asked for a PV that is claimed by this PVC
// OBSERVATION: pvc is "Pending", pv is "Bound"
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))
@ -610,7 +611,7 @@ func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume)
}
return nil
} else if claim.Spec.VolumeName == "" {
if isMismatch, err := checkVolumeModeMismatches(&claim.Spec, &volume.Spec); err != nil || isMismatch {
if isMismatch, err := pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec); err != nil || isMismatch {
// Binding for the volume won't be called in syncUnboundClaim,
// because findBestMatchForClaim won't return the volume due to volumeMode mismatch.
volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)
@ -851,7 +852,7 @@ func (ctrl *PersistentVolumeController) updateVolumePhaseWithEvent(volume *v1.Pe
func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q", volume.Name, claimToClaimKey(claim))
volumeClone, dirty, err := GetBindVolumeToClaim(volume, claim)
volumeClone, dirty, err := pvutil.GetBindVolumeToClaim(volume, claim)
if err != nil {
return nil, err
}

View File

@ -0,0 +1,36 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["testing.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -1,103 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/reference"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
)
// IsDelayBindingMode checks if claim is in delay binding mode.
func IsDelayBindingMode(claim *v1.PersistentVolumeClaim, classLister storagelisters.StorageClassLister) (bool, error) {
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, nil
}
class, err := classLister.Get(className)
if err != nil {
return false, nil
}
if class.VolumeBindingMode == nil {
return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className)
}
return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
// GetBindVolumeToClaim returns a new volume which is bound to given claim. In
// addition, it returns a bool which indicates whether we made modification on
// original volume.
func GetBindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, bool, error) {
dirty := false
// Check if the volume was already bound (either by user or by controller)
shouldSetBoundByController := false
if !IsVolumeBoundToClaim(volume, claim) {
shouldSetBoundByController = true
}
// The volume from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
volumeClone := volume.DeepCopy()
// Bind the volume to the claim if it is not bound yet
if volume.Spec.ClaimRef == nil ||
volume.Spec.ClaimRef.Name != claim.Name ||
volume.Spec.ClaimRef.Namespace != claim.Namespace ||
volume.Spec.ClaimRef.UID != claim.UID {
claimRef, err := reference.GetReference(scheme.Scheme, claim)
if err != nil {
return nil, false, fmt.Errorf("Unexpected error getting claim reference: %v", err)
}
volumeClone.Spec.ClaimRef = claimRef
dirty = true
}
// Set annBoundByController if it is not set yet
if shouldSetBoundByController && !metav1.HasAnnotation(volumeClone.ObjectMeta, annBoundByController) {
metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, annBoundByController, "yes")
dirty = true
}
return volumeClone, dirty, nil
}
// IsVolumeBoundToClaim returns true, if given volume is pre-bound or bound
// to specific claim. Both claim.Name and claim.Namespace must be equal.
// If claim.UID is present in volume.Spec.ClaimRef, it must be equal too.
func IsVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) bool {
if volume.Spec.ClaimRef == nil {
return false
}
if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace {
return false
}
if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID {
return false
}
return true
}

View File

@ -0,0 +1,36 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["util.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,342 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/scheme"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/reference"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
const (
// AnnBindCompleted Annotation applies to PVCs. It indicates that the lifecycle
// of the PVC has passed through the initial setup. This information changes how
// we interpret some observations of the state of the objects. Value of this
// Annotation does not matter.
AnnBindCompleted = "pv.kubernetes.io/bind-completed"
// AnnBoundByController annotation applies to PVs and PVCs. It indicates that
// the binding (PV->PVC or PVC->PV) was installed by the controller. The
// absence of this annotation means the binding was done by the user (i.e.
// pre-bound). Value of this annotation does not matter.
// External PV binders must bind PV the same way as PV controller, otherwise PV
// controller may not handle it correctly.
AnnBoundByController = "pv.kubernetes.io/bound-by-controller"
// AnnSelectedNode annotation is added to a PVC that has been triggered by scheduler to
// be dynamically provisioned. Its value is the name of the selected node.
AnnSelectedNode = "volume.kubernetes.io/selected-node"
// NotSupportedProvisioner is a special provisioner name which can be set
// in storage class to indicate dynamic provisioning is not supported by
// the storage.
NotSupportedProvisioner = "kubernetes.io/no-provisioner"
)
// IsDelayBindingMode checks if claim is in delay binding mode.
func IsDelayBindingMode(claim *v1.PersistentVolumeClaim, classLister storagelisters.StorageClassLister) (bool, error) {
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, nil
}
class, err := classLister.Get(className)
if err != nil {
return false, nil
}
if class.VolumeBindingMode == nil {
return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className)
}
return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
// GetBindVolumeToClaim returns a new volume which is bound to given claim. In
// addition, it returns a bool which indicates whether we made modification on
// original volume.
func GetBindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, bool, error) {
dirty := false
// Check if the volume was already bound (either by user or by controller)
shouldSetBoundByController := false
if !IsVolumeBoundToClaim(volume, claim) {
shouldSetBoundByController = true
}
// The volume from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
volumeClone := volume.DeepCopy()
// Bind the volume to the claim if it is not bound yet
if volume.Spec.ClaimRef == nil ||
volume.Spec.ClaimRef.Name != claim.Name ||
volume.Spec.ClaimRef.Namespace != claim.Namespace ||
volume.Spec.ClaimRef.UID != claim.UID {
claimRef, err := reference.GetReference(scheme.Scheme, claim)
if err != nil {
return nil, false, fmt.Errorf("Unexpected error getting claim reference: %v", err)
}
volumeClone.Spec.ClaimRef = claimRef
dirty = true
}
// Set AnnBoundByController if it is not set yet
if shouldSetBoundByController && !metav1.HasAnnotation(volumeClone.ObjectMeta, AnnBoundByController) {
metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, AnnBoundByController, "yes")
dirty = true
}
return volumeClone, dirty, nil
}
// IsVolumeBoundToClaim returns true, if given volume is pre-bound or bound
// to specific claim. Both claim.Name and claim.Namespace must be equal.
// If claim.UID is present in volume.Spec.ClaimRef, it must be equal too.
func IsVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) bool {
if volume.Spec.ClaimRef == nil {
return false
}
if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace {
return false
}
if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID {
return false
}
return true
}
// FindMatchingVolume goes through the list of volumes to find the best matching volume
// for the claim.
//
// This function is used by both the PV controller and scheduler.
//
// delayBinding is true only in the PV controller path. When set, prebound PVs are still returned
// as a match for the claim, but unbound PVs are skipped.
//
// node is set only in the scheduler path. When set, the PV node affinity is checked against
// the node's labels.
//
// excludedVolumes is only used in the scheduler path, and is needed for evaluating multiple
// unbound PVCs for a single Pod at one time. As each PVC finds a matching PV, the chosen
// PV needs to be excluded from future matching.
func FindMatchingVolume(
claim *v1.PersistentVolumeClaim,
volumes []*v1.PersistentVolume,
node *v1.Node,
excludedVolumes map[string]*v1.PersistentVolume,
delayBinding bool) (*v1.PersistentVolume, error) {
var smallestVolume *v1.PersistentVolume
var smallestVolumeQty resource.Quantity
requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestedClass := v1helper.GetPersistentVolumeClaimClass(claim)
var selector labels.Selector
if claim.Spec.Selector != nil {
internalSelector, err := metav1.LabelSelectorAsSelector(claim.Spec.Selector)
if err != nil {
// should be unreachable code due to validation
return nil, fmt.Errorf("error creating internal label selector for claim: %v: %v", claimToClaimKey(claim), err)
}
selector = internalSelector
}
// Go through all available volumes with two goals:
// - find a volume that is either pre-bound by user or dynamically
// provisioned for this claim. Because of this we need to loop through
// all volumes.
// - find the smallest matching one if there is no volume pre-bound to
// the claim.
for _, volume := range volumes {
if _, ok := excludedVolumes[volume.Name]; ok {
// Skip volumes in the excluded list
continue
}
volumeQty := volume.Spec.Capacity[v1.ResourceStorage]
// check if volumeModes do not match (feature gate protected)
isMismatch, err := CheckVolumeModeMismatches(&claim.Spec, &volume.Spec)
if err != nil {
return nil, fmt.Errorf("error checking if volumeMode was a mismatch: %v", err)
}
// filter out mismatching volumeModes
if isMismatch {
continue
}
// check if PV's DeletionTimeStamp is set, if so, skip this volume.
if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) {
if volume.ObjectMeta.DeletionTimestamp != nil {
continue
}
}
nodeAffinityValid := true
if node != nil {
// Scheduler path, check that the PV NodeAffinity
// is satisfied by the node
err := volumeutil.CheckNodeAffinity(volume, node.Labels)
if err != nil {
nodeAffinityValid = false
}
}
if IsVolumeBoundToClaim(volume, claim) {
// this claim and volume are pre-bound; return
// the volume if the size request is satisfied,
// otherwise continue searching for a match
if volumeQty.Cmp(requestedQty) < 0 {
continue
}
// If PV node affinity is invalid, return no match.
// This means the prebound PV (and therefore PVC)
// is not suitable for this node.
if !nodeAffinityValid {
return nil, nil
}
return volume, nil
}
if node == nil && delayBinding {
// PV controller does not bind this claim.
// Scheduler will handle binding unbound volumes
// Scheduler path will have node != nil
continue
}
// filter out:
// - volumes in non-available phase
// - volumes bound to another claim
// - volumes whose labels don't match the claim's selector, if specified
// - volumes in Class that is not requested
// - volumes whose NodeAffinity does not match the node
if volume.Status.Phase != v1.VolumeAvailable {
// We ignore volumes in non-available phase, because volumes that
// satisfies matching criteria will be updated to available, binding
// them now has high chance of encountering unnecessary failures
// due to API conflicts.
continue
} else if volume.Spec.ClaimRef != nil {
continue
} else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) {
continue
}
if v1helper.GetPersistentVolumeClass(volume) != requestedClass {
continue
}
if !nodeAffinityValid {
continue
}
if node != nil {
// Scheduler path
// Check that the access modes match
if !CheckAccessModes(claim, volume) {
continue
}
}
if volumeQty.Cmp(requestedQty) >= 0 {
if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {
smallestVolume = volume
smallestVolumeQty = volumeQty
}
}
}
if smallestVolume != nil {
// Found a matching volume
return smallestVolume, nil
}
return nil, nil
}
// CheckVolumeModeMismatches is a convenience method that checks volumeMode for PersistentVolume
// and PersistentVolumeClaims
func CheckVolumeModeMismatches(pvcSpec *v1.PersistentVolumeClaimSpec, pvSpec *v1.PersistentVolumeSpec) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
return false, nil
}
// In HA upgrades, we cannot guarantee that the apiserver is on a version >= controller-manager.
// So we default a nil volumeMode to filesystem
requestedVolumeMode := v1.PersistentVolumeFilesystem
if pvcSpec.VolumeMode != nil {
requestedVolumeMode = *pvcSpec.VolumeMode
}
pvVolumeMode := v1.PersistentVolumeFilesystem
if pvSpec.VolumeMode != nil {
pvVolumeMode = *pvSpec.VolumeMode
}
return requestedVolumeMode != pvVolumeMode, nil
}
// CheckAccessModes returns true if PV satisfies all the PVC's requested AccessModes
func CheckAccessModes(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) bool {
pvModesMap := map[v1.PersistentVolumeAccessMode]bool{}
for _, mode := range volume.Spec.AccessModes {
pvModesMap[mode] = true
}
for _, mode := range claim.Spec.AccessModes {
_, ok := pvModesMap[mode]
if !ok {
return false
}
}
return true
}
func claimToClaimKey(claim *v1.PersistentVolumeClaim) string {
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
}
// GetVolumeNodeAffinity returns a VolumeNodeAffinity for given key and value.
func GetVolumeNodeAffinity(key string, value string) *v1.VolumeNodeAffinity {
return &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: []string{value},
},
},
},
},
},
}
}

View File

@ -0,0 +1,76 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"scheduler_assume_cache.go",
"scheduler_bind_cache_metrics.go",
"scheduler_binder.go",
"scheduler_binder_cache.go",
"scheduler_binder_fake.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/scheduling",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"scheduler_assume_cache_test.go",
"scheduler_binder_cache_test.go",
"scheduler_binder_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/api/testapi:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/persistentvolume/testing:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"fmt"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"fmt"
@ -22,6 +22,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
)
func makePV(name, version, storageClass string) *v1.PersistentVolume {
@ -456,7 +457,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) {
// Assume PVC
newPVC := pvc.DeepCopy()
newPVC.Annotations[annSelectedNode] = "test-node"
newPVC.Annotations[pvutil.AnnSelectedNode] = "test-node"
if err := cache.Assume(newPVC); err != nil {
t.Fatalf("failed to assume PVC: %v", err)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"github.com/prometheus/client_golang/prometheus"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"fmt"
@ -32,6 +32,7 @@ import (
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
@ -211,7 +212,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
// Filter out claims to provision
for _, claim := range claimsToBind {
if selectedNode, ok := claim.Annotations[annSelectedNode]; ok {
if selectedNode, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok {
if selectedNode != node.Name {
// Fast path, skip unmatched node
return false, boundVolumesSatisfied, nil
@ -274,7 +275,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
// Assume PV
newBindings := []*bindingInfo{}
for _, binding := range claimsToBind {
newPV, dirty, err := GetBindVolumeToClaim(binding.pv, binding.pvc)
newPV, dirty, err := pvutil.GetBindVolumeToClaim(binding.pv, binding.pvc)
klog.V(5).Infof("AssumePodVolumes: GetBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v",
podName,
binding.pv.Name,
@ -303,7 +304,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
// The claims from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annSelectedNode, nodeName)
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnSelectedNode, nodeName)
err = b.pvcCache.Assume(claimClone)
if err != nil {
b.revertAssumedPVs(newBindings)
@ -511,7 +512,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
if pvc.Annotations == nil {
return false, fmt.Errorf("selectedNode annotation reset for PVC %q", pvc.Name)
}
selectedNode := pvc.Annotations[annSelectedNode]
selectedNode := pvc.Annotations[pvutil.AnnSelectedNode]
if selectedNode != pod.Spec.NodeName {
return false, fmt.Errorf("selectedNode annotation value %q not set to scheduled node %q", selectedNode, pod.Spec.NodeName)
}
@ -582,7 +583,7 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste
}
func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool {
return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted)
return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, pvutil.AnnBindCompleted)
}
// arePodVolumesBound returns true if all volumes are fully bound
@ -614,7 +615,7 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
if volumeBound {
boundClaims = append(boundClaims, pvc)
} else {
delayBindingMode, err := IsDelayBindingMode(pvc, b.classLister)
delayBindingMode, err := pvutil.IsDelayBindingMode(pvc, b.classLister)
if err != nil {
return nil, nil, nil, err
}
@ -674,7 +675,7 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
pvcName := getPVCName(pvc)
// Find a matching PV
pv, err := findMatchingVolume(pvc, allPVs, node, chosenPVs, true)
pv, err := pvutil.FindMatchingVolume(pvc, allPVs, node, chosenPVs, true)
if err != nil {
return false, nil, nil, err
}
@ -717,7 +718,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
return false, nil, fmt.Errorf("failed to find storage class %q", className)
}
provisioner := class.Provisioner
if provisioner == "" || provisioner == notSupportedProvisioner {
if provisioner == "" || provisioner == pvutil.NotSupportedProvisioner {
klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName)
return false, nil, nil
}
@ -775,3 +776,7 @@ func (a byPVCSize) Less(i, j int) bool {
// return true if iSize is less than jSize
return iSize.Cmp(jSize) == -1
}
func claimToClaimKey(claim *v1.PersistentVolumeClaim) string {
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"sync"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"reflect"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import "k8s.io/api/core/v1"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"context"
@ -39,6 +39,8 @@ import (
"k8s.io/klog"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/controller"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
)
var (
@ -97,7 +99,7 @@ var (
type testEnv struct {
client clientset.Interface
reactor *volumeReactor
reactor *pvtesting.VolumeReactor
binder SchedulerVolumeBinder
internalBinder *volumeBinder
internalNodeInformer coreinformers.NodeInformer
@ -107,7 +109,7 @@ type testEnv struct {
func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
client := &fake.Clientset{}
reactor := newVolumeReactor(client, nil, nil, nil, nil)
reactor := pvtesting.NewVolumeReactor(client, nil, nil, nil)
// TODO refactor all tests to use real watch mechanism, see #72327
client.AddWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
@ -238,11 +240,11 @@ func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs [
for _, pvc := range cachedPVCs {
internalPVCCache.add(pvc)
if apiPVCs == nil {
env.reactor.claims[pvc.Name] = pvc
env.reactor.AddClaim(pvc)
}
}
for _, pvc := range apiPVCs {
env.reactor.claims[pvc.Name] = pvc
env.reactor.AddClaim(pvc)
}
}
@ -251,11 +253,11 @@ func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.P
for _, pv := range cachedPVs {
internalPVCache.add(pv)
if apiPVs == nil {
env.reactor.volumes[pv.Name] = pv
env.reactor.AddVolume(pv)
}
}
for _, pv := range apiPVs {
env.reactor.volumes[pv.Name] = pv
env.reactor.AddVolume(pv)
}
}
@ -421,8 +423,8 @@ func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindi
t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err)
continue
}
if pvc.Annotations[annSelectedNode] != nodeLabelValue {
t.Errorf("Test %q failed: expected annSelectedNode of pvc %q to be %q, but got %q", name, pvcKey, nodeLabelValue, pvc.Annotations[annSelectedNode])
if pvc.Annotations[pvutil.AnnSelectedNode] != nodeLabelValue {
t.Errorf("Test %q failed: expected pvutil.AnnSelectedNode of pvc %q to be %q, but got %q", name, pvcKey, nodeLabelValue, pvc.Annotations[pvutil.AnnSelectedNode])
}
}
}
@ -447,8 +449,8 @@ func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod,
t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err)
continue
}
if pvc.Annotations[annSelectedNode] != "" {
t.Errorf("Test %q failed: expected annSelectedNode of pvc %q empty, but got %q", name, pvcKey, pvc.Annotations[annSelectedNode])
if pvc.Annotations[pvutil.AnnSelectedNode] != "" {
t.Errorf("Test %q failed: expected pvutil.AnnSelectedNode of pvc %q empty, but got %q", name, pvcKey, pvc.Annotations[pvutil.AnnSelectedNode])
}
}
}
@ -476,7 +478,7 @@ func (env *testEnv) validateBind(
}
// Check reactor for API updates
if err := env.reactor.checkVolumes(expectedAPIPVs); err != nil {
if err := env.reactor.CheckVolumes(expectedAPIPVs); err != nil {
t.Errorf("Test %q failed: API reactor validation failed: %v", name, err)
}
}
@ -504,7 +506,7 @@ func (env *testEnv) validateProvision(
}
// Check reactor for API updates
if err := env.reactor.checkClaims(expectedAPIPVCs); err != nil {
if err := env.reactor.CheckClaims(expectedAPIPVCs); err != nil {
t.Errorf("Test %q failed: API reactor validation failed: %v", name, err)
}
}
@ -543,10 +545,10 @@ func makeTestPVC(name, size, node string, pvcBoundState int, pvName, resourceVer
switch pvcBoundState {
case pvcSelectedNode:
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annSelectedNode, node)
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnSelectedNode, node)
// don't fallthrough
case pvcBound:
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annBindCompleted, "yes")
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes")
fallthrough
case pvcPrebound:
pvc.Spec.VolumeName = pvName
@ -595,7 +597,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
},
}
if node != "" {
pv.Spec.NodeAffinity = getVolumeNodeAffinity(nodeLabelKey, node)
pv.Spec.NodeAffinity = pvutil.GetVolumeNodeAffinity(nodeLabelKey, node)
}
if boundToPVC != nil {
@ -607,7 +609,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
Namespace: boundToPVC.Namespace,
UID: boundToPVC.UID,
}
metav1.SetMetaDataAnnotation(&pv.ObjectMeta, annBoundByController, "yes")
metav1.SetMetaDataAnnotation(&pv.ObjectMeta, pvutil.AnnBoundByController, "yes")
}
return pv
@ -615,7 +617,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
func pvcSetSelectedNode(pvc *v1.PersistentVolumeClaim, node string) *v1.PersistentVolumeClaim {
newPVC := pvc.DeepCopy()
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annSelectedNode, node)
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnSelectedNode, node)
return newPVC
}
@ -691,7 +693,7 @@ func makeBinding(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) *bindin
func addProvisionAnn(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
res := pvc.DeepCopy()
// Add provision related annotations
metav1.SetMetaDataAnnotation(&res.ObjectMeta, annSelectedNode, nodeLabelValue)
metav1.SetMetaDataAnnotation(&res.ObjectMeta, pvutil.AnnSelectedNode, nodeLabelValue)
return res
}
@ -1488,7 +1490,7 @@ func TestBindPodVolumes(t *testing.T) {
// Update PVC to be fully bound to PV
newPVC := pvc.DeepCopy()
newPVC.Spec.VolumeName = pv.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnBindCompleted, "yes")
if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil {
t.Errorf("failed to update PVC %q: %v", newPVC.Name, err)
}
@ -1512,7 +1514,7 @@ func TestBindPodVolumes(t *testing.T) {
return
}
newPVC.Spec.VolumeName = dynamicPV.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnBindCompleted, "yes")
if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil {
t.Errorf("failed to update PVC %q: %v", newPVC.Name, err)
}
@ -1588,7 +1590,7 @@ func TestBindPodVolumes(t *testing.T) {
// Update PVC to be fully bound to a PV with a different node
newPVC := pvcs[0].DeepCopy()
newPVC.Spec.VolumeName = pvNode2.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnBindCompleted, "yes")
if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil {
t.Errorf("failed to update PVC %q: %v", newPVC.Name, err)
}