use scheduler.New in createConfiguratorArgsWithPodInformer

This commit is contained in:
Ahmad Diaa 2019-08-25 18:14:57 +02:00
parent 08ef34a2b0
commit 995e741ef9
3 changed files with 93 additions and 91 deletions

View File

@ -93,6 +93,7 @@ go_library(
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/api/latest:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/scheduler/framework/plugins:go_default_library",
@ -104,6 +105,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -41,6 +41,7 @@ import (
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/factory"
schedulerplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
@ -352,7 +353,7 @@ func TestUnschedulableNodes(t *testing.T) {
context := initTest(t, "unschedulable-nodes")
defer cleanupTest(t, context)
nodeLister := context.schedulerConfigArgs.NodeInformer.Lister()
nodeLister := context.informerFactory.Core().V1().Nodes().Lister()
// NOTE: This test cannot run in parallel, because it is creating and deleting
// non-namespaced objects (Nodes).
defer context.clientSet.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
@ -603,28 +604,39 @@ func TestMultiScheduler(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
schedulerConfigFactory2 := factory.NewConfigFactory(createConfiguratorArgsWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2, schedulerplugins.NewDefaultRegistry(),
nil, []kubeschedulerconfig.PluginConfig{}, stopCh))
schedulerConfig2, err := schedulerConfigFactory2.Create()
eventBroadcaster2 := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet2.EventsV1beta1().Events("")})
recorder := eventBroadcaster2.NewRecorder(legacyscheme.Scheme, "k8s.io/"+fooScheduler)
provider := schedulerconfig.SchedulerDefaultProviderName
algorithmSrc := schedulerconfig.SchedulerAlgorithmSource{
Provider: &provider,
}
sched2, err := scheduler.New(
clientSet2,
informerFactory2.Core().V1().Nodes(),
podInformer2,
informerFactory2.Core().V1().PersistentVolumes(),
informerFactory2.Core().V1().PersistentVolumeClaims(),
informerFactory2.Core().V1().ReplicationControllers(),
informerFactory2.Apps().V1().ReplicaSets(),
informerFactory2.Apps().V1().StatefulSets(),
informerFactory2.Core().V1().Services(),
informerFactory2.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory2.Storage().V1().StorageClasses(),
informerFactory2.Storage().V1beta1().CSINodes(),
recorder,
algorithmSrc,
stopCh,
schedulerplugins.NewDefaultRegistry(),
nil,
[]kubeschedulerconfig.PluginConfig{},
scheduler.WithName(fooScheduler),
scheduler.WithBindTimeoutSeconds(600),
)
if err != nil {
t.Errorf("Couldn't create scheduler config: %v", err)
}
eventBroadcaster2 := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientSet2.EventsV1beta1().Events("")})
schedulerConfig2.Recorder = eventBroadcaster2.NewRecorder(legacyscheme.Scheme, "k8s.io/"+fooScheduler)
eventBroadcaster2.StartRecordingToSink(stopCh)
sched2 := scheduler.NewFromConfig(schedulerConfig2)
scheduler.AddAllEventHandlers(sched2,
fooScheduler,
context.informerFactory.Core().V1().Nodes(),
podInformer2,
context.informerFactory.Core().V1().PersistentVolumes(),
context.informerFactory.Core().V1().PersistentVolumeClaims(),
context.informerFactory.Core().V1().Services(),
context.informerFactory.Storage().V1().StorageClasses(),
context.informerFactory.Storage().V1beta1().CSINodes(),
)
go podInformer2.Informer().Run(stopCh)
informerFactory2.Start(stopCh)
sched2.Run()

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
@ -48,6 +49,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/disruption"
"k8s.io/kubernetes/pkg/scheduler"
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
// Register defaults in pkg/scheduler/algorithmprovider.
@ -62,49 +64,32 @@ import (
)
type testContext struct {
closeFn framework.CloseFunc
httpServer *httptest.Server
ns *v1.Namespace
clientSet *clientset.Clientset
informerFactory informers.SharedInformerFactory
schedulerConfigArgs *factory.ConfigFactoryArgs
schedulerConfig *factory.Config
scheduler *scheduler.Scheduler
stopCh chan struct{}
closeFn framework.CloseFunc
httpServer *httptest.Server
ns *v1.Namespace
clientSet *clientset.Clientset
informerFactory informers.SharedInformerFactory
scheduler *scheduler.Scheduler
stopCh chan struct{}
}
// createConfiguratorWithPodInformer creates a configurator for scheduler.
func createConfiguratorArgsWithPodInformer(
schedulerName string,
clientSet clientset.Interface,
podInformer coreinformers.PodInformer,
informerFactory informers.SharedInformerFactory,
pluginRegistry schedulerframework.Registry,
plugins *schedulerconfig.Plugins,
pluginConfig []schedulerconfig.PluginConfig,
stopCh <-chan struct{},
) *factory.ConfigFactoryArgs {
return &factory.ConfigFactoryArgs{
Client: clientSet,
NodeInformer: informerFactory.Core().V1().Nodes(),
PodInformer: podInformer,
PvInformer: informerFactory.Core().V1().PersistentVolumes(),
PvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ReplicationControllerInformer: informerFactory.Core().V1().ReplicationControllers(),
ReplicaSetInformer: informerFactory.Apps().V1().ReplicaSets(),
StatefulSetInformer: informerFactory.Apps().V1().StatefulSets(),
ServiceInformer: informerFactory.Core().V1().Services(),
PdbInformer: informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
StorageClassInformer: informerFactory.Storage().V1().StorageClasses(),
CSINodeInformer: informerFactory.Storage().V1beta1().CSINodes(),
Registry: pluginRegistry,
Plugins: plugins,
PluginConfig: pluginConfig,
HardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight,
DisablePreemption: false,
PercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
BindTimeoutSeconds: 600,
StopCh: stopCh,
func createAlgorithmSourceFromPolicy(policy *schedulerapi.Policy, clientSet clientset.Interface) schedulerconfig.SchedulerAlgorithmSource {
policyString := runtime.EncodeOrDie(latestschedulerapi.Codec, policy)
configPolicyName := "scheduler-custom-policy-config"
policyConfigMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: configPolicyName},
Data: map[string]string{schedulerconfig.SchedulerPolicyConfigMapKey: policyString},
}
policyConfigMap.APIVersion = "v1"
clientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(&policyConfigMap)
return schedulerconfig.SchedulerAlgorithmSource{
Policy: &schedulerconfig.SchedulerPolicySource{
ConfigMap: &schedulerconfig.SchedulerPolicyConfigMapSource{
Namespace: policyConfigMap.Namespace,
Name: policyConfigMap.Name,
},
},
}
}
@ -185,58 +170,61 @@ func initTestSchedulerWithOptions(
} else {
podInformer = context.informerFactory.Core().V1().Pods()
}
context.schedulerConfigArgs = createConfiguratorArgsWithPodInformer(
v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory, pluginRegistry, plugins,
pluginConfig, context.stopCh)
configFactory := factory.NewConfigFactory(context.schedulerConfigArgs)
var err error
if policy != nil {
context.schedulerConfig, err = configFactory.CreateFromConfig(*policy)
} else {
context.schedulerConfig, err = configFactory.Create()
}
if err != nil {
t.Fatalf("Couldn't create scheduler config: %v", err)
}
// set DisablePreemption option
context.schedulerConfig.DisablePreemption = disablePreemption
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: context.clientSet.EventsV1beta1().Events(""),
})
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(
recorder := eventBroadcaster.NewRecorder(
legacyscheme.Scheme,
v1.DefaultSchedulerName,
)
context.scheduler = scheduler.NewFromConfig(context.schedulerConfig)
scheduler.AddAllEventHandlers(context.scheduler,
v1.DefaultSchedulerName,
var algorithmSrc schedulerconfig.SchedulerAlgorithmSource
if policy != nil {
algorithmSrc = createAlgorithmSourceFromPolicy(policy, context.clientSet)
} else {
provider := schedulerconfig.SchedulerDefaultProviderName
algorithmSrc = schedulerconfig.SchedulerAlgorithmSource{
Provider: &provider,
}
}
context.scheduler, err = scheduler.New(
context.clientSet,
context.informerFactory.Core().V1().Nodes(),
podInformer,
context.informerFactory.Core().V1().PersistentVolumes(),
context.informerFactory.Core().V1().PersistentVolumeClaims(),
context.informerFactory.Core().V1().ReplicationControllers(),
context.informerFactory.Apps().V1().ReplicaSets(),
context.informerFactory.Apps().V1().StatefulSets(),
context.informerFactory.Core().V1().Services(),
context.informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
context.informerFactory.Storage().V1().StorageClasses(),
context.informerFactory.Storage().V1beta1().CSINodes(),
recorder,
algorithmSrc,
context.stopCh,
pluginRegistry,
plugins,
pluginConfig,
scheduler.WithPreemptionDisabled(disablePreemption),
scheduler.WithBindTimeoutSeconds(600),
)
if err != nil {
t.Fatalf("Couldn't create scheduler: %v", err)
}
// set setPodInformer if provided.
if setPodInformer {
go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
cache.WaitForNamedCacheSync("scheduler", context.schedulerConfig.StopEverything, podInformer.Informer().HasSynced)
go podInformer.Informer().Run(context.scheduler.StopEverything)
cache.WaitForNamedCacheSync("scheduler", context.scheduler.StopEverything, podInformer.Informer().HasSynced)
}
stopCh := make(chan struct{})
eventBroadcaster.StartRecordingToSink(stopCh)
context.informerFactory.Start(context.schedulerConfig.StopEverything)
context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything)
context.informerFactory.Start(context.scheduler.StopEverything)
context.informerFactory.WaitForCacheSync(context.scheduler.StopEverything)
context.scheduler.Run()
return context
@ -268,9 +256,9 @@ func initDisruptionController(t *testing.T, context *testContext) *disruption.Di
mapper,
scaleClient)
informers.Start(context.schedulerConfig.StopEverything)
informers.WaitForCacheSync(context.schedulerConfig.StopEverything)
go dc.Run(context.schedulerConfig.StopEverything)
informers.Start(context.scheduler.StopEverything)
informers.WaitForCacheSync(context.scheduler.StopEverything)
go dc.Run(context.scheduler.StopEverything)
return dc
}