Merge pull request #92846 from Huang-Wei/rm-pvcLister

Remove pvcLister from genericScheduler
This commit is contained in:
Kubernetes Prow Robot 2020-09-15 07:54:07 -07:00 committed by GitHub
commit b14fcc544b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 89 additions and 87 deletions

View File

@ -10,7 +10,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/runtime:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
@ -20,11 +19,8 @@ go_library(
"//pkg/scheduler/profile:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
@ -40,15 +36,16 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/plugins/defaultbinder:go_default_library",
"//pkg/scheduler/framework/plugins/noderesources:go_default_library",
"//pkg/scheduler/framework/plugins/podtopologyspread:go_default_library",
"//pkg/scheduler/framework/plugins/queuesort:go_default_library",
"//pkg/scheduler/framework/plugins/selectorspread:go_default_library",
"//pkg/scheduler/framework/plugins/volumebinding:go_default_library",
"//pkg/scheduler/framework/runtime:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/framework/v1alpha1/fake:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/profile:go_default_library",

View File

@ -272,6 +272,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
fwk, err := st.NewFramework(
test.registerPlugins,
runtime.WithClientSet(client),
runtime.WithInformerFactory(informerFactory),
runtime.WithPodNominator(internalqueue.NewPodNominator()),
)
if err != nil {
@ -285,7 +286,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
cache,
emptySnapshot,
extenders,
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
schedulerapi.DefaultPercentageOfNodesToScore)
podIgnored := &v1.Pod{}
result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), podIgnored)

View File

@ -29,12 +29,8 @@ import (
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
@ -119,7 +115,6 @@ type genericScheduler struct {
cache internalcache.Cache
extenders []framework.Extender
nodeInfoSnapshot *internalcache.Snapshot
pvcLister corelisters.PersistentVolumeClaimLister
percentageOfNodesToScore int32
nextStartNodeIndex int
}
@ -138,11 +133,6 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}
trace.Step("Basic checks done")
if err := g.snapshot(); err != nil {
return result, err
}
@ -273,7 +263,6 @@ func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profil
filteredNodesStatuses[n.Node().Name] = s
}
return nil, filteredNodesStatuses, nil
}
feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
@ -584,57 +573,16 @@ func (g *genericScheduler) prioritizeNodes(
return result, nil
}
// podPassesBasicChecks makes sanity checks on the pod if it can be scheduled.
func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error {
// Check PVCs used by the pod
namespace := pod.Namespace
manifest := &(pod.Spec)
for i := range manifest.Volumes {
volume := &manifest.Volumes[i]
var pvcName string
ephemeral := false
switch {
case volume.PersistentVolumeClaim != nil:
pvcName = volume.PersistentVolumeClaim.ClaimName
case volume.Ephemeral != nil &&
utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume):
pvcName = pod.Name + "-" + volume.Name
ephemeral = true
default:
// Volume is not using a PVC, ignore
continue
}
pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
if err != nil {
// The error has already enough context ("persistentvolumeclaim "myclaim" not found")
return err
}
if pvc.DeletionTimestamp != nil {
return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
}
if ephemeral &&
!metav1.IsControlledBy(pvc, pod) {
return fmt.Errorf("persistentvolumeclaim %q was not created for the pod", pvc.Name)
}
}
return nil
}
// NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler(
cache internalcache.Cache,
nodeInfoSnapshot *internalcache.Snapshot,
extenders []framework.Extender,
pvcLister corelisters.PersistentVolumeClaimLister,
percentageOfNodesToScore int32) ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
extenders: extenders,
nodeInfoSnapshot: nodeInfoSnapshot,
pvcLister: pvcLister,
percentageOfNodesToScore: percentageOfNodesToScore,
}
}

View File

@ -20,8 +20,8 @@ import (
"context"
"fmt"
"math"
"reflect"
"strconv"
"strings"
"testing"
"time"
@ -34,15 +34,16 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/selectorspread"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
fakeframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1/fake"
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
@ -418,13 +419,19 @@ func TestGenericScheduler(t *testing.T) {
// Pod with existing PVC
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"machine1", "machine2"},
pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC"}}},
pvcs: []v1.PersistentVolumeClaim{
{
ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault},
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "existingPV"},
},
},
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")},
ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore"), Namespace: v1.NamespaceDefault},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
@ -445,6 +452,7 @@ func TestGenericScheduler(t *testing.T) {
// Pod with non existing PVC
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
@ -470,13 +478,14 @@ func TestGenericScheduler(t *testing.T) {
// Pod with deleting PVC
registerPlugins: []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterPreFilterPlugin(volumebinding.Name, volumebinding.New),
st.RegisterFilterPlugin("TrueFilter", st.NewTrueFilterPlugin),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"machine1", "machine2"},
pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", DeletionTimestamp: &metav1.Time{}}}},
pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault, DeletionTimestamp: &metav1.Time{}}}},
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore")},
ObjectMeta: metav1.ObjectMeta{Name: "ignore", UID: types.UID("ignore"), Namespace: v1.NamespaceDefault},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
@ -728,10 +737,22 @@ func TestGenericScheduler(t *testing.T) {
cache.AddNode(node)
}
ctx := context.Background()
cs := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(cs, 0)
for _, pvc := range test.pvcs {
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "true")
cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, &pvc, metav1.CreateOptions{})
if pvName := pvc.Spec.VolumeName; pvName != "" {
pv := v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: pvName}}
cs.CoreV1().PersistentVolumes().Create(ctx, &pv, metav1.CreateOptions{})
}
}
snapshot := internalcache.NewSnapshot(test.pods, nodes)
fwk, err := st.NewFramework(
test.registerPlugins,
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
)
if err != nil {
@ -741,19 +762,18 @@ func TestGenericScheduler(t *testing.T) {
Framework: fwk,
}
var pvcs []v1.PersistentVolumeClaim
pvcs = append(pvcs, test.pvcs...)
pvcLister := fakeframework.PersistentVolumeClaimLister(pvcs)
scheduler := NewGenericScheduler(
cache,
snapshot,
[]framework.Extender{},
pvcLister,
schedulerapi.DefaultPercentageOfNodesToScore)
result, err := scheduler.Schedule(context.Background(), prof, framework.NewCycleState(), test.pod)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("want: %v, got: %v", test.wErr, err)
schedulerapi.DefaultPercentageOfNodesToScore,
)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
result, err := scheduler.Schedule(ctx, prof, framework.NewCycleState(), test.pod)
if err != test.wErr && !strings.Contains(err.Error(), test.wErr.Error()) {
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
}
if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) {
t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost)
@ -775,7 +795,7 @@ func makeScheduler(nodes []*v1.Node) *genericScheduler {
s := NewGenericScheduler(
cache,
emptySnapshot,
nil, nil,
nil,
schedulerapi.DefaultPercentageOfNodesToScore)
cache.UpdateSnapshot(s.(*genericScheduler).nodeInfoSnapshot)
return s.(*genericScheduler)
@ -1069,7 +1089,6 @@ func TestZeroRequest(t *testing.T) {
nil,
emptySnapshot,
[]framework.Extender{},
nil,
schedulerapi.DefaultPercentageOfNodesToScore).(*genericScheduler)
scheduler.nodeInfoSnapshot = snapshot

View File

@ -180,7 +180,6 @@ func (c *Configurator) create() (*Scheduler, error) {
c.schedulerCache,
c.nodeInfoSnapshot,
extenders,
c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
c.percentageOfNodesToScore,
)

View File

@ -11,8 +11,10 @@ go_library(
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@ -23,8 +23,10 @@ import (
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/features"
@ -62,6 +64,7 @@ func (d *stateData) Clone() framework.StateData {
// Reserve and PreBind phases.
type VolumeBinding struct {
Binder scheduling.SchedulerVolumeBinder
PVCLister corelisters.PersistentVolumeClaimLister
GenericEphemeralVolumeFeatureEnabled bool
}
@ -78,14 +81,40 @@ func (pl *VolumeBinding) Name() string {
return Name
}
func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) bool {
// podHasPVCs returns 2 values:
// - the first one to denote if the given "pod" has any PVC defined.
// - the second one to return any error if the requested PVC is illegal.
func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) (bool, error) {
hasPVC := false
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil ||
pl.GenericEphemeralVolumeFeatureEnabled && vol.Ephemeral != nil {
return true
var pvcName string
ephemeral := false
switch {
case vol.PersistentVolumeClaim != nil:
pvcName = vol.PersistentVolumeClaim.ClaimName
case vol.Ephemeral != nil && pl.GenericEphemeralVolumeFeatureEnabled:
pvcName = pod.Name + "-" + vol.Name
ephemeral = true
default:
// Volume is not using a PVC, ignore
continue
}
hasPVC = true
pvc, err := pl.PVCLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
if err != nil {
// The error has already enough context ("persistentvolumeclaim "myclaim" not found")
return hasPVC, err
}
if pvc.DeletionTimestamp != nil {
return hasPVC, fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
}
if ephemeral && !metav1.IsControlledBy(pvc, pod) {
return hasPVC, fmt.Errorf("persistentvolumeclaim %q was not created for the pod", pvc.Name)
}
}
return false
return hasPVC, nil
}
// PreFilter invoked at the prefilter extension point to check if pod has all
@ -93,7 +122,9 @@ func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) bool {
// UnschedulableAndUnresolvable is returned.
func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
// If pod does not reference any PVC, we don't need to do anything.
if !pl.podHasPVCs(pod) {
if hasPVC, err := pl.podHasPVCs(pod); err != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
} else if !hasPVC {
state.Write(stateKey, &stateData{skip: true})
return nil
}
@ -271,6 +302,7 @@ func New(plArgs runtime.Object, fh framework.FrameworkHandle) (framework.Plugin,
binder := scheduling.NewVolumeBinder(fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
return &VolumeBinding{
Binder: binder,
PVCLister: pvcInformer.Lister(),
GenericEphemeralVolumeFeatureEnabled: utilfeature.DefaultFeatureGate.Enabled(features.GenericEphemeralVolume),
}, nil
}

View File

@ -209,7 +209,7 @@ func TestVolumeBinding(t *testing.T) {
name: "pvc not found",
pod: makePod("pod-a", []string{"pvc-a"}),
node: &v1.Node{},
wantPreFilterStatus: framework.NewStatus(framework.Error, `error getting PVC "default/pvc-a": could not find v1.PersistentVolumeClaim "default/pvc-a"`),
wantPreFilterStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "pvc-a" not found`),
wantFilterStatus: nil,
},
{

View File

@ -807,7 +807,12 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
return true, b, nil
})
fwk, _ := st.NewFramework(fns, frameworkruntime.WithClientSet(client), frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()))
fwk, _ := st.NewFramework(
fns,
frameworkruntime.WithClientSet(client),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator()),
)
prof := &profile.Profile{
Framework: fwk,
Recorder: &events.FakeRecorder{},
@ -824,7 +829,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
scache,
internalcache.NewEmptySnapshot(),
[]framework.Extender{},
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
schedulerapi.DefaultPercentageOfNodesToScore,
)
@ -858,12 +862,14 @@ func setupTestSchedulerWithVolumeBinding(volumeBinder scheduling.SchedulerVolume
testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}}
client := clientsetfake.NewSimpleClientset(&testNode, &testPVC)
informerFactory := informers.NewSharedInformerFactory(client, 0)
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
pvcInformer.Informer().GetStore().Add(&testPVC)
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(volumebinding.Name, func(plArgs runtime.Object, handle framework.FrameworkHandle) (framework.Plugin, error) {
return &volumebinding.VolumeBinding{Binder: volumeBinder}, nil
return &volumebinding.VolumeBinding{Binder: volumeBinder, PVCLister: pvcInformer.Lister()}, nil
}, "PreFilter", "Filter", "Reserve", "PreBind"),
}
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, broadcaster, fns...)
@ -1172,7 +1178,6 @@ func TestSchedulerBinding(t *testing.T) {
scache,
nil,
test.extenders,
nil,
0,
)
sched := Scheduler{