Merge pull request #101394 from Huang-Wei/dynamic-events-handler

sched: dynamic event handlers registration
This commit is contained in:
Kubernetes Prow Robot 2021-05-24 16:18:40 -07:00 committed by GitHub
commit 943e0c2f91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 314 additions and 89 deletions

View File

@ -19,7 +19,10 @@ package scheduler
import (
"fmt"
"reflect"
"strings"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
@ -39,32 +42,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/profile"
)
func (sched *Scheduler) onPvAdd(obj interface{}) {
// Pods created when there are no PVs available will be stuck in
// unschedulable queue. But unbound PVs created for static provisioning and
// delay binding storage class are skipped in PV controller dynamic
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd, nil)
}
func (sched *Scheduler) onPvUpdate(old, new interface{}) {
// Scheduler.bindVolumesWorker may fail to update assumed pod volume
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate, nil)
}
func (sched *Scheduler) onPvcAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd, nil)
}
func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate, nil)
}
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
sc, ok := obj.(*storagev1.StorageClass)
if !ok {
@ -83,18 +60,6 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
}
}
func (sched *Scheduler) onServiceAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd, nil)
}
func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate, nil)
}
func (sched *Scheduler) onServiceDelete(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete, nil)
}
func (sched *Scheduler) addNodeToCache(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
@ -153,14 +118,6 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
}
}
func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd, nil)
}
func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate, nil)
}
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
pod := obj.(*v1.Pod)
klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod))
@ -312,6 +269,8 @@ func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool {
func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
gvkMap map[framework.GVK]framework.ActionType,
) {
// scheduled pod cache
informerFactory.Core().V1().Pods().Informer().AddEventHandler(
@ -372,46 +331,100 @@ func addAllEventHandlers(
},
)
buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs {
funcs := cache.ResourceEventHandlerFuncs{}
if at&framework.Add != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: fmt.Sprintf("%vAdd", shortGVK)}
funcs.AddFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
if at&framework.Update != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: fmt.Sprintf("%vUpdate", shortGVK)}
funcs.UpdateFunc = func(_, _ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
if at&framework.Delete != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: fmt.Sprintf("%vDelete", shortGVK)}
funcs.DeleteFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
return funcs
}
for gvk, at := range gvkMap {
switch gvk {
case framework.Node, framework.Pod:
// Do nothing.
case framework.CSINode:
informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onCSINodeAdd,
UpdateFunc: sched.onCSINodeUpdate,
},
buildEvtResHandler(at, framework.CSINode, "CSINode"),
)
// On add and update of PVs.
informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
case framework.PersistentVolume:
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: sched.onPvAdd,
UpdateFunc: sched.onPvUpdate,
},
//
// PvAdd: Pods created when there are no PVs available will be stuck in
// unschedulable queue. But unbound PVs created for static provisioning and
// delay binding storage class are skipped in PV controller dynamic
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
//
// PvUpdate: Scheduler.bindVolumesWorker may fail to update assumed pod volume
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolume, "Pv"),
)
// This is for MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound.
case framework.PersistentVolumeClaim:
// MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound.
informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onPvcAdd,
UpdateFunc: sched.onPvcUpdate,
},
buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"),
)
// This is for ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result.
informerFactory.Core().V1().Services().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onServiceAdd,
UpdateFunc: sched.onServiceUpdate,
DeleteFunc: sched.onServiceDelete,
},
)
case framework.StorageClass:
if at&framework.Add != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onStorageClassAdd,
},
)
}
case framework.Service:
// ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result.
informerFactory.Core().V1().Services().Informer().AddEventHandler(
buildEvtResHandler(at, framework.Service, "Service"),
)
default:
// Tests may not instantiate dynInformerFactory.
if dynInformerFactory == nil {
continue
}
// GVK is expected to be at least 3-folded, separated by dots.
// <kind in plural>.<version>.<group>
// Valid examples:
// - foos.v1.example.com
// - bars.v1beta1.a.b.c
// Invalid examples:
// - foos.v1 (2 sections)
// - foo.v1.example.com (the first section should be plural)
if strings.Count(string(gvk), ".") < 2 {
klog.ErrorS(nil, "incorrect event registration", "gvk", gvk)
continue
}
// Fall back to try dynamic informers.
gvr, _ := schema.ParseResourceArg(string(gvk))
dynInformer := dynInformerFactory.ForResource(*gvr).Informer()
dynInformer.AddEventHandler(
buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)),
)
go dynInformer.Run(sched.StopEverything)
}
}
}
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {

View File

@ -86,6 +86,8 @@ type Configurator struct {
extenders []schedulerapi.Extender
frameworkCapturer FrameworkCapturer
parallellism int32
// A "cluster event" -> "plugin names" map.
clusterEventMap map[framework.ClusterEvent]sets.String
}
// create a scheduler from a set of registered plugins.
@ -135,8 +137,6 @@ func (c *Configurator) create() (*Scheduler, error) {
// The nominator will be passed all the way to framework instantiation.
nominator := internalqueue.NewPodNominator()
// It's a "cluster event" -> "plugin names" map.
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
profiles, err := profile.NewMap(c.profiles, c.registry, c.recorderFactory,
frameworkruntime.WithClientSet(c.client),
frameworkruntime.WithKubeConfig(c.kubeConfig),
@ -145,7 +145,7 @@ func (c *Configurator) create() (*Scheduler, error) {
frameworkruntime.WithRunAllFilters(c.alwaysCheckAllPredicates),
frameworkruntime.WithPodNominator(nominator),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(c.frameworkCapturer)),
frameworkruntime.WithClusterEventMap(clusterEventMap),
frameworkruntime.WithClusterEventMap(c.clusterEventMap),
frameworkruntime.WithParallelism(int(c.parallellism)),
)
if err != nil {
@ -162,7 +162,7 @@ func (c *Configurator) create() (*Scheduler, error) {
internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodNominator(nominator),
internalqueue.WithClusterEventMap(clusterEventMap),
internalqueue.WithClusterEventMap(c.clusterEventMap),
)
// Setup cache debugger.

View File

@ -648,6 +648,7 @@ func newConfigFactoryWithFrameworkRegistry(
},
recorderFactory: recorderFactory,
nodeInfoSnapshot: snapshot,
clusterEventMap: make(map[framework.ClusterEvent]sets.String),
}
}

View File

@ -29,7 +29,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
@ -221,6 +224,7 @@ func New(client clientset.Interface,
}
snapshot := internalcache.NewEmptySnapshot()
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
configurator := &Configurator{
client: client,
@ -238,6 +242,7 @@ func New(client clientset.Interface,
extenders: options.extenders,
frameworkCapturer: options.frameworkCapturer,
parallellism: options.parallelism,
clusterEventMap: clusterEventMap,
}
metrics.Register()
@ -281,10 +286,30 @@ func New(client clientset.Interface,
sched.StopEverything = stopEverything
sched.client = client
addAllEventHandlers(sched, informerFactory)
// Build dynamic client and dynamic informer factory
var dynInformerFactory dynamicinformer.DynamicSharedInformerFactory
// options.kubeConfig can be nil in tests.
if options.kubeConfig != nil {
dynClient := dynamic.NewForConfigOrDie(options.kubeConfig)
dynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil)
}
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
return sched, nil
}
func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]framework.ActionType {
gvkMap := make(map[framework.GVK]framework.ActionType)
for evt := range m {
if _, ok := gvkMap[evt.Resource]; ok {
gvkMap[evt.Resource] |= evt.ActionType
} else {
gvkMap[evt.Resource] = evt.ActionType
}
}
return gvkMap
}
// initPolicyFromFile initialize policy from file
func initPolicyFromFile(policyFile string, policy *schedulerapi.Policy) error {
// Use a policy serialized in a file.

View File

@ -17,18 +17,31 @@ limitations under the License.
package scheduler
import (
"context"
"fmt"
"testing"
"time"
v1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/scheduler"
schedapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
testfwk "k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/integration/util"
imageutils "k8s.io/kubernetes/test/utils/image"
)
@ -148,3 +161,174 @@ func TestServiceAffinityEnqueue(t *testing.T) {
t.Errorf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts)
}
}
var _ framework.FilterPlugin = &fakeCRPlugin{}
var _ framework.EnqueueExtensions = &fakeCRPlugin{}
type fakeCRPlugin struct{}
func (f *fakeCRPlugin) Name() string {
return "fakeCRPlugin"
}
func (f *fakeCRPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
return framework.NewStatus(framework.Unschedulable, "always fail")
}
// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: "foos.v1.example.com", ActionType: framework.All},
}
}
// TestCustomResourceEnqueue constructs a fake plugin that registers custom resources
// to verify Pods failed by this plugin can be moved properly upon CR events.
func TestCustomResourceEnqueue(t *testing.T) {
// Start API Server with apiextensions supported.
server := apiservertesting.StartTestServerOrDie(
t, apiservertesting.NewDefaultTestServerOptions(),
[]string{"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition", "--runtime-config=api/all=true"},
testfwk.SharedEtcd(),
)
testCtx := &testutils.TestContext{}
testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())
testCtx.CloseFn = func() { server.TearDownFn() }
apiExtensionClient := apiextensionsclient.NewForConfigOrDie(server.ClientConfig)
dynamicClient := dynamic.NewForConfigOrDie(server.ClientConfig)
// Create a Foo CRD.
fooCRD := &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: "foos.example.com",
},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Group: "example.com",
Scope: apiextensionsv1.NamespaceScoped,
Names: apiextensionsv1.CustomResourceDefinitionNames{
Plural: "foos",
Kind: "Foo",
},
Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1",
Served: true,
Storage: true,
Schema: &apiextensionsv1.CustomResourceValidation{
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensionsv1.JSONSchemaProps{
"field": {Type: "string"},
},
},
},
},
},
},
}
var err error
fooCRD, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Create(testCtx.Ctx, fooCRD, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
registry := frameworkruntime.Registry{
"fakeCRPlugin": func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return &fakeCRPlugin{}, nil
},
}
profile := schedapi.KubeSchedulerProfile{
SchedulerName: v1.DefaultSchedulerName,
Plugins: &schedapi.Plugins{
Filter: schedapi.PluginSet{
Enabled: []schedapi.Plugin{
{Name: "fakeCRPlugin"},
},
},
},
}
testCtx.KubeConfig = server.ClientConfig
testCtx.ClientSet = kubernetes.NewForConfigOrDie(server.ClientConfig)
testCtx.NS, err = testCtx.ClientSet.CoreV1().Namespaces().Create(testCtx.Ctx, &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("cr-enqueue-%v", string(uuid.NewUUID()))}}, metav1.CreateOptions{})
if err != nil && !errors.IsAlreadyExists(err) {
t.Fatalf("Failed to integration test ns: %v", err)
}
// Use zero backoff seconds to bypass backoffQ.
testCtx = testutils.InitTestSchedulerWithOptions(
t,
testCtx,
nil,
scheduler.WithProfiles(profile),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
scheduler.WithPodInitialBackoffSeconds(0),
scheduler.WithPodMaxBackoffSeconds(0),
)
testutils.SyncInformerFactory(testCtx)
// It's intended to not start the scheduler's queue, and hence to
// not start any flushing logic. We will pop and schedule the Pods manually later.
defer testutils.CleanupTest(t, testCtx)
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
// Create one Node.
node := st.MakeNode().Name("fake-node").Obj()
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Node %q: %v", node.Name, err)
}
// Create a testing Pod.
pause := imageutils.GetPauseImageName()
pod := st.MakePod().Namespace(ns).Name("fake-pod").Container(pause).Obj()
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
}
// Wait for the testing Pod to be present in the scheduling queue.
if err := wait.Poll(time.Millisecond*200, wait.ForeverTestTimeout, func() (bool, error) {
return len(testCtx.Scheduler.SchedulingQueue.PendingPods()) == 1, nil
}); err != nil {
t.Fatal(err)
}
// Pop fake-pod out. It should be unschedulable.
podInfo := testCtx.Scheduler.NextPod()
fwk, ok := testCtx.Scheduler.Profiles[podInfo.Pod.Spec.SchedulerName]
if !ok {
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
}
// Schedule the Pod manually.
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
// The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin.
if fitError == nil {
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
}
testCtx.Scheduler.Error(podInfo, fitError)
// Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so
// pass a number larger than 1 to move Pod to unschedulableQ.
testCtx.Scheduler.SchedulingQueue.AddUnschedulableIfNotPresent(podInfo, 10)
// Trigger a Custom Resource event.
// We expect this event to trigger moving the test Pod from unschedulableQ to activeQ.
crdGVR := schema.GroupVersionResource{Group: fooCRD.Spec.Group, Version: fooCRD.Spec.Versions[0].Name, Resource: "foos"}
crClient := dynamicClient.Resource(crdGVR).Namespace(ns)
if _, err := crClient.Create(ctx, &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "example.com/v1",
"kind": "Foo",
"metadata": map[string]interface{}{"name": "foo1"},
},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("Unable to create cr: %v", err)
}
// Now we should be able to pop the Pod from activeQ again.
podInfo = testCtx.Scheduler.NextPod()
if podInfo.Attempts != 2 {
t.Errorf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts)
}
}

2
vendor/modules.txt vendored
View File

@ -1593,6 +1593,8 @@ k8s.io/client-go/discovery/cached/disk
k8s.io/client-go/discovery/cached/memory
k8s.io/client-go/discovery/fake
k8s.io/client-go/dynamic
k8s.io/client-go/dynamic/dynamicinformer
k8s.io/client-go/dynamic/dynamiclister
k8s.io/client-go/dynamic/fake
k8s.io/client-go/informers
k8s.io/client-go/informers/admissionregistration