refactor volume binder

This commit is contained in:
louisgong 2020-02-21 16:35:58 +08:00
parent a54e1a8a04
commit c6b94e4606
14 changed files with 71 additions and 157 deletions

View File

@ -131,6 +131,9 @@ type SchedulerVolumeBinder interface {
// GetBindingsCache returns the cache used (if any) to store volume binding decisions. // GetBindingsCache returns the cache used (if any) to store volume binding decisions.
GetBindingsCache() PodBindingCache GetBindingsCache() PodBindingCache
// DeletePodBindings will delete pod's bindingDecisions in podBindingCache.
DeletePodBindings(pod *v1.Pod)
} }
type volumeBinder struct { type volumeBinder struct {
@ -181,6 +184,14 @@ func (b *volumeBinder) GetBindingsCache() PodBindingCache {
return b.podBindingCache return b.podBindingCache
} }
// DeletePodBindings will delete pod's bindingDecisions in podBindingCache.
func (b *volumeBinder) DeletePodBindings(pod *v1.Pod) {
cache := b.podBindingCache
if pod != nil {
cache.DeleteBindings(pod)
}
}
// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache. // FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache.
// This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer. // This method intentionally takes in a *v1.Node object instead of using volumebinder.nodeInformer.
// That's necessary because some operations will need to pass in to the predicate fake node objects. // That's necessary because some operations will need to pass in to the predicate fake node objects.

View File

@ -63,3 +63,6 @@ func (b *FakeVolumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
func (b *FakeVolumeBinder) GetBindingsCache() PodBindingCache { func (b *FakeVolumeBinder) GetBindingsCache() PodBindingCache {
return nil return nil
} }
// DeletePodBindings implements SchedulerVolumeBinder.DeletePodBindings.
func (b *FakeVolumeBinder) DeletePodBindings(pod *v1.Pod) {}

View File

@ -11,6 +11,7 @@ go_library(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library", "//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
@ -28,7 +29,6 @@ go_library(
"//pkg/scheduler/internal/queue:go_default_library", "//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/profile:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
@ -82,7 +82,6 @@ go_test(
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/profile:go_default_library", "//pkg/scheduler/profile:go_default_library",
"//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/events/v1beta1:go_default_library", "//staging/src/k8s.io/api/events/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
@ -130,7 +129,6 @@ filegroup(
"//pkg/scheduler/profile:all-srcs", "//pkg/scheduler/profile:all-srcs",
"//pkg/scheduler/testing:all-srcs", "//pkg/scheduler/testing:all-srcs",
"//pkg/scheduler/util:all-srcs", "//pkg/scheduler/util:all-srcs",
"//pkg/scheduler/volumebinder:all-srcs",
], ],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],

View File

@ -38,6 +38,7 @@ import (
policylisters "k8s.io/client-go/listers/policy/v1beta1" policylisters "k8s.io/client-go/listers/policy/v1beta1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider" "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -53,7 +54,6 @@ import (
cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
) )
const ( const (
@ -83,7 +83,7 @@ type Configurator struct {
schedulerCache internalcache.Cache schedulerCache internalcache.Cache
// Handles volume binding decisions // Handles volume binding decisions
volumeBinder *volumebinder.VolumeBinder volumeBinder scheduling.SchedulerVolumeBinder
// Disable pod preemption or not. // Disable pod preemption or not.
disablePreemption bool disablePreemption bool

View File

@ -6,9 +6,9 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding", importpath = "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
], ],
@ -36,7 +36,6 @@ go_test(
"//pkg/controller/volume/scheduling:go_default_library", "//pkg/controller/volume/scheduling:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library", "//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
], ],
) )

View File

@ -21,14 +21,14 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
) )
// VolumeBinding is a plugin that binds pod volumes in scheduling. // VolumeBinding is a plugin that binds pod volumes in scheduling.
type VolumeBinding struct { type VolumeBinding struct {
binder *volumebinder.VolumeBinder binder scheduling.SchedulerVolumeBinder
} }
var _ framework.FilterPlugin = &VolumeBinding{} var _ framework.FilterPlugin = &VolumeBinding{}
@ -72,7 +72,7 @@ func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, p
return nil return nil
} }
reasons, err := pl.binder.Binder.FindPodVolumes(pod, node) reasons, err := pl.binder.FindPodVolumes(pod, node)
if err != nil { if err != nil {
return framework.NewStatus(framework.Error, err.Error()) return framework.NewStatus(framework.Error, err.Error())

View File

@ -23,10 +23,9 @@ import (
"testing" "testing"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling" "k8s.io/kubernetes/pkg/controller/volume/scheduling"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
) )
func TestVolumeBinding(t *testing.T) { func TestVolumeBinding(t *testing.T) {
@ -44,7 +43,7 @@ func TestVolumeBinding(t *testing.T) {
name string name string
pod *v1.Pod pod *v1.Pod
node *v1.Node node *v1.Node
volumeBinderConfig *volumescheduling.FakeVolumeBinderConfig volumeBinderConfig *scheduling.FakeVolumeBinderConfig
wantStatus *framework.Status wantStatus *framework.Status
}{ }{
{ {
@ -57,7 +56,7 @@ func TestVolumeBinding(t *testing.T) {
name: "all bound", name: "all bound",
pod: &v1.Pod{Spec: volState}, pod: &v1.Pod{Spec: volState},
node: &v1.Node{}, node: &v1.Node{},
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
AllBound: true, AllBound: true,
}, },
wantStatus: nil, wantStatus: nil,
@ -66,32 +65,32 @@ func TestVolumeBinding(t *testing.T) {
name: "unbound/no matches", name: "unbound/no matches",
pod: &v1.Pod{Spec: volState}, pod: &v1.Pod{Spec: volState},
node: &v1.Node{}, node: &v1.Node{},
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
FindReasons: []volumescheduling.ConflictReason{volumescheduling.ErrReasonBindConflict}, FindReasons: []scheduling.ConflictReason{scheduling.ErrReasonBindConflict},
}, },
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumescheduling.ErrReasonBindConflict)), wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict)),
}, },
{ {
name: "bound and unbound unsatisfied", name: "bound and unbound unsatisfied",
pod: &v1.Pod{Spec: volState}, pod: &v1.Pod{Spec: volState},
node: &v1.Node{}, node: &v1.Node{},
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
FindReasons: []volumescheduling.ConflictReason{volumescheduling.ErrReasonBindConflict, volumescheduling.ErrReasonNodeConflict}, FindReasons: []scheduling.ConflictReason{scheduling.ErrReasonBindConflict, scheduling.ErrReasonNodeConflict},
}, },
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumescheduling.ErrReasonBindConflict), string(volumescheduling.ErrReasonNodeConflict)), wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, string(scheduling.ErrReasonBindConflict), string(scheduling.ErrReasonNodeConflict)),
}, },
{ {
name: "unbound/found matches/bind succeeds", name: "unbound/found matches/bind succeeds",
pod: &v1.Pod{Spec: volState}, pod: &v1.Pod{Spec: volState},
node: &v1.Node{}, node: &v1.Node{},
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{}, volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{},
wantStatus: nil, wantStatus: nil,
}, },
{ {
name: "predicate error", name: "predicate error",
pod: &v1.Pod{Spec: volState}, pod: &v1.Pod{Spec: volState},
node: &v1.Node{}, node: &v1.Node{},
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
FindErr: findErr, FindErr: findErr,
}, },
wantStatus: framework.NewStatus(framework.Error, findErr.Error()), wantStatus: framework.NewStatus(framework.Error, findErr.Error()),
@ -102,7 +101,7 @@ func TestVolumeBinding(t *testing.T) {
t.Run(item.name, func(t *testing.T) { t.Run(item.name, func(t *testing.T) {
nodeInfo := schedulernodeinfo.NewNodeInfo() nodeInfo := schedulernodeinfo.NewNodeInfo()
nodeInfo.SetNode(item.node) nodeInfo.SetNode(item.node)
fakeVolumeBinder := volumebinder.NewFakeVolumeBinder(item.volumeBinderConfig) fakeVolumeBinder := scheduling.NewFakeVolumeBinder(item.volumeBinderConfig)
p := &VolumeBinding{ p := &VolumeBinding{
binder: fakeVolumeBinder, binder: fakeVolumeBinder,
} }

View File

@ -13,12 +13,12 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1", importpath = "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/scheduler/apis/config:go_default_library", "//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/listers:go_default_library", "//pkg/scheduler/listers:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library", "//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -30,12 +30,12 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedutil "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
) )
const ( const (
@ -78,7 +78,7 @@ type framework struct {
clientSet clientset.Interface clientSet clientset.Interface
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
volumeBinder *volumebinder.VolumeBinder volumeBinder scheduling.SchedulerVolumeBinder
metricsRecorder *metricsRecorder metricsRecorder *metricsRecorder
@ -119,7 +119,7 @@ type frameworkOptions struct {
informerFactory informers.SharedInformerFactory informerFactory informers.SharedInformerFactory
snapshotSharedLister schedulerlisters.SharedLister snapshotSharedLister schedulerlisters.SharedLister
metricsRecorder *metricsRecorder metricsRecorder *metricsRecorder
volumeBinder *volumebinder.VolumeBinder volumeBinder scheduling.SchedulerVolumeBinder
runAllFilters bool runAllFilters bool
} }
@ -163,7 +163,7 @@ func withMetricsRecorder(recorder *metricsRecorder) Option {
} }
// WithVolumeBinder sets volume binder for the scheduling framework. // WithVolumeBinder sets volume binder for the scheduling framework.
func WithVolumeBinder(binder *volumebinder.VolumeBinder) Option { func WithVolumeBinder(binder scheduling.SchedulerVolumeBinder) Option {
return func(o *frameworkOptions) { return func(o *frameworkOptions) {
o.volumeBinder = binder o.volumeBinder = binder
} }
@ -888,7 +888,7 @@ func (f *framework) SharedInformerFactory() informers.SharedInformerFactory {
} }
// VolumeBinder returns the volume binder used by scheduler. // VolumeBinder returns the volume binder used by scheduler.
func (f *framework) VolumeBinder() *volumebinder.VolumeBinder { func (f *framework) VolumeBinder() scheduling.SchedulerVolumeBinder {
return f.volumeBinder return f.volumeBinder
} }

View File

@ -29,10 +29,10 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers" schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
) )
// NodeScoreList declares a list of nodes and their scores. // NodeScoreList declares a list of nodes and their scores.
@ -521,5 +521,5 @@ type FrameworkHandle interface {
SharedInformerFactory() informers.SharedInformerFactory SharedInformerFactory() informers.SharedInformerFactory
// VolumeBinder returns the volume binder used by scheduler. // VolumeBinder returns the volume binder used by scheduler.
VolumeBinder() *volumebinder.VolumeBinder VolumeBinder() scheduling.SchedulerVolumeBinder
} }

View File

@ -35,6 +35,7 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog" "k8s.io/klog"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
@ -45,7 +46,6 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
) )
const ( const (
@ -104,7 +104,7 @@ type Scheduler struct {
StopEverything <-chan struct{} StopEverything <-chan struct{}
// VolumeBinder handles PVC/PV binding for the pod. // VolumeBinder handles PVC/PV binding for the pod.
VolumeBinder *volumebinder.VolumeBinder VolumeBinder scheduling.SchedulerVolumeBinder
// Disable pod preemption or not. // Disable pod preemption or not.
DisablePreemption bool DisablePreemption bool
@ -230,7 +230,7 @@ func New(client clientset.Interface,
} }
schedulerCache := internalcache.New(30*time.Second, stopEverything) schedulerCache := internalcache.New(30*time.Second, stopEverything)
volumeBinder := volumebinder.NewVolumeBinder( volumeBinder := scheduling.NewVolumeBinder(
client, client,
informerFactory.Core().V1().Nodes(), informerFactory.Core().V1().Nodes(),
informerFactory.Storage().V1().CSINodes(), informerFactory.Storage().V1().CSINodes(),
@ -440,7 +440,7 @@ func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, stat
// retry scheduling. // retry scheduling.
func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error { func (sched *Scheduler) bindVolumes(assumed *v1.Pod) error {
klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name) klog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
err := sched.VolumeBinder.Binder.BindPodVolumes(assumed) err := sched.VolumeBinder.BindPodVolumes(assumed)
if err != nil { if err != nil {
klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err) klog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
@ -599,7 +599,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// Otherwise, binding of volumes is started after the pod is assumed, but before pod binding. // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
// //
// This function modifies 'assumedPod' if volume binding is required. // This function modifies 'assumedPod' if volume binding is required.
allBound, err := sched.VolumeBinder.Binder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost) allBound, err := sched.VolumeBinder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost)
if err != nil { if err != nil {
sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError,
fmt.Sprintf("AssumePodVolumes failed: %v", err)) fmt.Sprintf("AssumePodVolumes failed: %v", err))

View File

@ -46,7 +46,7 @@ import (
clienttesting "k8s.io/client-go/testing" clienttesting "k8s.io/client-go/testing"
clientcache "k8s.io/client-go/tools/cache" clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling" "k8s.io/kubernetes/pkg/controller/volume/scheduling"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
@ -61,7 +61,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/nodeinfo" "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/scheduler/profile" "k8s.io/kubernetes/pkg/scheduler/profile"
st "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"
) )
type fakePodConditionUpdater struct{} type fakePodConditionUpdater struct{}
@ -350,7 +349,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName), Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName),
}, },
}, },
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}), VolumeBinder: scheduling.NewFakeVolumeBinder(&scheduling.FakeVolumeBinderConfig{AllBound: true}),
} }
called := make(chan struct{}) called := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
@ -785,10 +784,10 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
// queuedPodStore: pods queued before processing. // queuedPodStore: pods queued before processing.
// scache: scheduler cache that might contain assumed pods. // scache: scheduler cache that might contain assumed pods.
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fakeVolumeBinder *volumebinder.VolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) { func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, volumeBinder scheduling.SchedulerVolumeBinder, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
if fakeVolumeBinder == nil { if volumeBinder == nil {
// Create default volume binder if it didn't set. // Create default volume binder if it didn't set.
fakeVolumeBinder = volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}) volumeBinder = scheduling.NewFakeVolumeBinder(&scheduling.FakeVolumeBinderConfig{AllBound: true})
} }
bindingChan := make(chan *v1.Binding, 1) bindingChan := make(chan *v1.Binding, 1)
client := clientsetfake.NewSimpleClientset() client := clientsetfake.NewSimpleClientset()
@ -801,7 +800,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
return true, b, nil return true, b, nil
}) })
fwk, _ := st.NewFramework(fns, framework.WithClientSet(client), framework.WithVolumeBinder(fakeVolumeBinder)) fwk, _ := st.NewFramework(fns, framework.WithClientSet(client), framework.WithVolumeBinder(volumeBinder))
prof := &profile.Profile{ prof := &profile.Profile{
Framework: fwk, Framework: fwk,
Recorder: &events.FakeRecorder{}, Recorder: &events.FakeRecorder{},
@ -838,13 +837,13 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
Profiles: profiles, Profiles: profiles,
podConditionUpdater: fakePodConditionUpdater{}, podConditionUpdater: fakePodConditionUpdater{},
podPreemptor: fakePodPreemptor{}, podPreemptor: fakePodPreemptor{},
VolumeBinder: fakeVolumeBinder, VolumeBinder: volumeBinder,
} }
return sched, bindingChan, errChan return sched, bindingChan, errChan
} }
func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) { func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolumeBinder, stop <-chan struct{}, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
pod := podWithID("foo", "") pod := podWithID("foo", "")
@ -863,10 +862,10 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterFilterPlugin(volumebinding.Name, volumebinding.New), st.RegisterFilterPlugin(volumebinding.Name, volumebinding.New),
} }
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fakeVolumeBinder, fns...) s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, volumeBinder, fns...)
informerFactory.Start(stop) informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop) informerFactory.WaitForCacheSync(stop)
s.VolumeBinder = fakeVolumeBinder s.VolumeBinder = volumeBinder
return s, bindingChan, errChan return s, bindingChan, errChan
} }
@ -896,11 +895,11 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
expectAssumeCalled bool expectAssumeCalled bool
expectBindCalled bool expectBindCalled bool
eventReason string eventReason string
volumeBinderConfig *volumescheduling.FakeVolumeBinderConfig volumeBinderConfig *scheduling.FakeVolumeBinderConfig
}{ }{
{ {
name: "all bound", name: "all bound",
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
AllBound: true, AllBound: true,
}, },
expectAssumeCalled: true, expectAssumeCalled: true,
@ -909,32 +908,32 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
}, },
{ {
name: "bound/invalid pv affinity", name: "bound/invalid pv affinity",
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
AllBound: true, AllBound: true,
FindReasons: volumescheduling.ConflictReasons{volumescheduling.ErrReasonNodeConflict}, FindReasons: scheduling.ConflictReasons{scheduling.ErrReasonNodeConflict},
}, },
eventReason: "FailedScheduling", eventReason: "FailedScheduling",
expectError: makePredicateError("1 node(s) had volume node affinity conflict"), expectError: makePredicateError("1 node(s) had volume node affinity conflict"),
}, },
{ {
name: "unbound/no matches", name: "unbound/no matches",
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
FindReasons: volumescheduling.ConflictReasons{volumescheduling.ErrReasonBindConflict}, FindReasons: scheduling.ConflictReasons{scheduling.ErrReasonBindConflict},
}, },
eventReason: "FailedScheduling", eventReason: "FailedScheduling",
expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind"), expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind"),
}, },
{ {
name: "bound and unbound unsatisfied", name: "bound and unbound unsatisfied",
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
FindReasons: volumescheduling.ConflictReasons{volumescheduling.ErrReasonBindConflict, volumescheduling.ErrReasonNodeConflict}, FindReasons: scheduling.ConflictReasons{scheduling.ErrReasonBindConflict, scheduling.ErrReasonNodeConflict},
}, },
eventReason: "FailedScheduling", eventReason: "FailedScheduling",
expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"), expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"),
}, },
{ {
name: "unbound/found matches/bind succeeds", name: "unbound/found matches/bind succeeds",
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{}, volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{},
expectAssumeCalled: true, expectAssumeCalled: true,
expectBindCalled: true, expectBindCalled: true,
expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}}, expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "machine1"}},
@ -942,7 +941,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
}, },
{ {
name: "predicate error", name: "predicate error",
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
FindErr: findErr, FindErr: findErr,
}, },
eventReason: "FailedScheduling", eventReason: "FailedScheduling",
@ -950,7 +949,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
}, },
{ {
name: "assume error", name: "assume error",
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
AssumeErr: assumeErr, AssumeErr: assumeErr,
}, },
expectAssumeCalled: true, expectAssumeCalled: true,
@ -959,7 +958,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
}, },
{ {
name: "bind error", name: "bind error",
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{ volumeBinderConfig: &scheduling.FakeVolumeBinderConfig{
BindErr: bindErr, BindErr: bindErr,
}, },
expectAssumeCalled: true, expectAssumeCalled: true,
@ -972,11 +971,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
for _, item := range table { for _, item := range table {
t.Run(item.name, func(t *testing.T) { t.Run(item.name, func(t *testing.T) {
stop := make(chan struct{}) stop := make(chan struct{})
fakeVolumeBinder := volumebinder.NewFakeVolumeBinder(item.volumeBinderConfig) fakeVolumeBinder := scheduling.NewFakeVolumeBinder(item.volumeBinderConfig)
internalBinder, ok := fakeVolumeBinder.Binder.(*volumescheduling.FakeVolumeBinder)
if !ok {
t.Fatalf("Failed to get fake volume binder")
}
s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster) s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(fakeVolumeBinder, stop, eventBroadcaster)
eventChan := make(chan struct{}) eventChan := make(chan struct{})
stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) { stopFunc := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
@ -1018,11 +1013,11 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
} }
} }
if item.expectAssumeCalled != internalBinder.AssumeCalled { if item.expectAssumeCalled != fakeVolumeBinder.AssumeCalled {
t.Errorf("expectedAssumeCall %v", item.expectAssumeCalled) t.Errorf("expectedAssumeCall %v", item.expectAssumeCalled)
} }
if item.expectBindCalled != internalBinder.BindCalled { if item.expectBindCalled != fakeVolumeBinder.BindCalled {
t.Errorf("expectedBindCall %v", item.expectBindCalled) t.Errorf("expectedBindCall %v", item.expectBindCalled)
} }

View File

@ -1,29 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["volume_binder.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/volumebinder",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller/volume/scheduling:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -1,62 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumebinder
import (
"time"
v1 "k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
)
// VolumeBinder sets up the volume binding library
type VolumeBinder struct {
Binder volumescheduling.SchedulerVolumeBinder
}
// NewVolumeBinder sets up the volume binding library and binding queue
func NewVolumeBinder(
client clientset.Interface,
nodeInformer coreinformers.NodeInformer,
csiNodeInformer storageinformers.CSINodeInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
storageClassInformer storageinformers.StorageClassInformer,
bindTimeout time.Duration) *VolumeBinder {
return &VolumeBinder{
Binder: volumescheduling.NewVolumeBinder(client, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, bindTimeout),
}
}
// NewFakeVolumeBinder sets up a fake volume binder and binding queue
func NewFakeVolumeBinder(config *volumescheduling.FakeVolumeBinderConfig) *VolumeBinder {
return &VolumeBinder{
Binder: volumescheduling.NewFakeVolumeBinder(config),
}
}
// DeletePodBindings will delete the cached volume bindings for the given pod.
func (b *VolumeBinder) DeletePodBindings(pod *v1.Pod) {
cache := b.Binder.GetBindingsCache()
if cache != nil && pod != nil {
cache.DeleteBindings(pod)
}
}