mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Use initTest for integration to start scheduler
Fix import path Fix configurator name
This commit is contained in:
parent
782a092e2e
commit
187c5bc91e
@ -31,18 +31,11 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/informers"
|
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
||||||
restclient "k8s.io/client-go/rest"
|
|
||||||
"k8s.io/client-go/tools/record"
|
|
||||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
|
||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
"k8s.io/kubernetes/pkg/scheduler"
|
|
||||||
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||||
e2e "k8s.io/kubernetes/test/e2e/framework"
|
e2e "k8s.io/kubernetes/test/e2e/framework"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -287,13 +280,8 @@ func machine_3_Prioritizer(pod *v1.Pod, nodes *v1.NodeList) (*schedulerapi.HostP
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestSchedulerExtender(t *testing.T) {
|
func TestSchedulerExtender(t *testing.T) {
|
||||||
_, s, closeFn := framework.RunAMaster(nil)
|
context := initTestMaster(t, "scheduler-extender", nil)
|
||||||
defer closeFn()
|
clientSet := context.clientSet
|
||||||
|
|
||||||
ns := framework.CreateTestingNamespace("scheduler-extender", s, t)
|
|
||||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
|
||||||
|
|
||||||
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
|
||||||
|
|
||||||
extender1 := &Extender{
|
extender1 := &Extender{
|
||||||
name: "extender1",
|
name: "extender1",
|
||||||
@ -362,22 +350,10 @@ func TestSchedulerExtender(t *testing.T) {
|
|||||||
}
|
}
|
||||||
policy.APIVersion = testapi.Groups[v1.GroupName].GroupVersion().String()
|
policy.APIVersion = testapi.Groups[v1.GroupName].GroupVersion().String()
|
||||||
|
|
||||||
informerFactory := informers.NewSharedInformerFactory(clientSet, 0)
|
context = initTestScheduler(t, context, nil, false, &policy)
|
||||||
schedulerConfigFactory := CreateConfigurator(clientSet, informerFactory)
|
defer cleanupTest(t, context)
|
||||||
schedulerConfig, err := schedulerConfigFactory.CreateFromConfig(policy)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Couldn't create scheduler config: %v", err)
|
|
||||||
}
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
|
||||||
schedulerConfig.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName})
|
|
||||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(clientSet.CoreV1().RESTClient()).Events("")})
|
|
||||||
scheduler, _ := scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: schedulerConfig}, nil...)
|
|
||||||
informerFactory.Start(schedulerConfig.StopEverything)
|
|
||||||
scheduler.Run()
|
|
||||||
|
|
||||||
defer close(schedulerConfig.StopEverything)
|
DoTestPodScheduling(context.ns, t, clientSet)
|
||||||
|
|
||||||
DoTestPodScheduling(ns, t, clientSet)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface) {
|
func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface) {
|
||||||
|
@ -517,7 +517,7 @@ func TestMultiScheduler(t *testing.T) {
|
|||||||
informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0)
|
informerFactory2 := informers.NewSharedInformerFactory(context.clientSet, 0)
|
||||||
podInformer2 := factory.NewPodInformer(context.clientSet, 0, fooScheduler)
|
podInformer2 := factory.NewPodInformer(context.clientSet, 0, fooScheduler)
|
||||||
|
|
||||||
schedulerConfigFactory2 := CreateConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2)
|
schedulerConfigFactory2 := createConfiguratorWithPodInformer(fooScheduler, clientSet2, podInformer2, informerFactory2)
|
||||||
schedulerConfig2, err := schedulerConfigFactory2.Create()
|
schedulerConfig2, err := schedulerConfigFactory2.Create()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Couldn't create scheduler config: %v", err)
|
t.Errorf("Couldn't create scheduler config: %v", err)
|
||||||
|
@ -19,8 +19,6 @@ package scheduler
|
|||||||
// This file tests the Taint feature.
|
// This file tests the Taint feature.
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -29,22 +27,17 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/informers"
|
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/tools/record"
|
|
||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
internalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
internalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
||||||
"k8s.io/kubernetes/pkg/controller/nodelifecycle"
|
"k8s.io/kubernetes/pkg/controller/nodelifecycle"
|
||||||
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
|
kubeadmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
|
||||||
"k8s.io/kubernetes/pkg/scheduler"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||||
"k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction"
|
"k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction"
|
||||||
pluginapi "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction"
|
pluginapi "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestTaintNodeByCondition verifies:
|
// TestTaintNodeByCondition verifies:
|
||||||
@ -52,37 +45,35 @@ import (
|
|||||||
// 2. NodeController taints nodes by node condition
|
// 2. NodeController taints nodes by node condition
|
||||||
// 3. Scheduler allows pod to tolerate node condition taints, e.g. network unavailable
|
// 3. Scheduler allows pod to tolerate node condition taints, e.g. network unavailable
|
||||||
func TestTaintNodeByCondition(t *testing.T) {
|
func TestTaintNodeByCondition(t *testing.T) {
|
||||||
h := &framework.MasterHolder{Initialized: make(chan struct{})}
|
|
||||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
||||||
<-h.Initialized
|
|
||||||
h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
|
|
||||||
}))
|
|
||||||
|
|
||||||
// Enable TaintNodeByCondition
|
// Enable TaintNodeByCondition
|
||||||
utilfeature.DefaultFeatureGate.Set("TaintNodesByCondition=True")
|
utilfeature.DefaultFeatureGate.Set("TaintNodesByCondition=True")
|
||||||
|
|
||||||
// Build clientset and informers for controllers.
|
|
||||||
internalClientset := internalclientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
|
||||||
internalInformers := internalinformers.NewSharedInformerFactory(internalClientset, time.Second)
|
|
||||||
|
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
|
||||||
informers := informers.NewSharedInformerFactory(clientset, time.Second)
|
|
||||||
|
|
||||||
// Build PodToleration Admission.
|
// Build PodToleration Admission.
|
||||||
admission := podtolerationrestriction.NewPodTolerationsPlugin(&pluginapi.Configuration{})
|
admission := podtolerationrestriction.NewPodTolerationsPlugin(&pluginapi.Configuration{})
|
||||||
|
|
||||||
|
context := initTestMaster(t, "default", admission)
|
||||||
|
|
||||||
|
// Build clientset and informers for controllers.
|
||||||
|
internalClientset := internalclientset.NewForConfigOrDie(&restclient.Config{
|
||||||
|
QPS: -1,
|
||||||
|
Host: context.httpServer.URL,
|
||||||
|
ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
||||||
|
internalInformers := internalinformers.NewSharedInformerFactory(internalClientset, time.Second)
|
||||||
|
|
||||||
kubeadmission.WantsInternalKubeClientSet(admission).SetInternalKubeClientSet(internalClientset)
|
kubeadmission.WantsInternalKubeClientSet(admission).SetInternalKubeClientSet(internalClientset)
|
||||||
kubeadmission.WantsInternalKubeInformerFactory(admission).SetInternalKubeInformerFactory(internalInformers)
|
kubeadmission.WantsInternalKubeInformerFactory(admission).SetInternalKubeInformerFactory(internalInformers)
|
||||||
|
|
||||||
// Start master with admission.
|
|
||||||
masterConfig := framework.NewIntegrationTestMasterConfig()
|
|
||||||
masterConfig.GenericConfig.AdmissionControl = admission
|
|
||||||
_, _, closeFn := framework.RunAMasterUsingServer(masterConfig, s, h)
|
|
||||||
defer closeFn()
|
|
||||||
|
|
||||||
nsName := "default"
|
|
||||||
controllerCh := make(chan struct{})
|
controllerCh := make(chan struct{})
|
||||||
defer close(controllerCh)
|
defer close(controllerCh)
|
||||||
|
|
||||||
|
// Apply feature gates to enable TaintNodesByCondition
|
||||||
|
algorithmprovider.ApplyFeatureGates()
|
||||||
|
|
||||||
|
context = initTestScheduler(t, context, controllerCh, false, nil)
|
||||||
|
clientset := context.clientSet
|
||||||
|
informers := context.informerFactory
|
||||||
|
nsName := context.ns.Name
|
||||||
|
|
||||||
// Start NodeLifecycleController for taint.
|
// Start NodeLifecycleController for taint.
|
||||||
nc, err := nodelifecycle.NewNodeLifecycleController(
|
nc, err := nodelifecycle.NewNodeLifecycleController(
|
||||||
informers.Core().V1().Pods(),
|
informers.Core().V1().Pods(),
|
||||||
@ -108,27 +99,8 @@ func TestTaintNodeByCondition(t *testing.T) {
|
|||||||
}
|
}
|
||||||
go nc.Run(controllerCh)
|
go nc.Run(controllerCh)
|
||||||
|
|
||||||
// Apply feature gates to enable TaintNodesByCondition
|
|
||||||
algorithmprovider.ApplyFeatureGates()
|
|
||||||
|
|
||||||
// Start scheduler
|
|
||||||
configurator := CreateConfigurator(clientset, informers)
|
|
||||||
|
|
||||||
sched, err := scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
|
|
||||||
cfg.StopEverything = controllerCh
|
|
||||||
cfg.Recorder = &record.FakeRecorder{}
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to create scheduler: %v.", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
go sched.Run()
|
|
||||||
|
|
||||||
// Waiting for all controller sync.
|
// Waiting for all controller sync.
|
||||||
informers.Start(controllerCh)
|
|
||||||
internalInformers.Start(controllerCh)
|
internalInformers.Start(controllerCh)
|
||||||
|
|
||||||
informers.WaitForCacheSync(controllerCh)
|
|
||||||
internalInformers.WaitForCacheSync(controllerCh)
|
internalInformers.WaitForCacheSync(controllerCh)
|
||||||
|
|
||||||
// -------------------------------------------
|
// -------------------------------------------
|
||||||
|
@ -18,6 +18,7 @@ package scheduler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -27,7 +28,9 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apiserver/pkg/admission"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
|
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
@ -36,10 +39,12 @@ import (
|
|||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||||
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/pkg/scheduler"
|
"k8s.io/kubernetes/pkg/scheduler"
|
||||||
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||||
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
|
|
||||||
@ -57,42 +62,13 @@ type TestContext struct {
|
|||||||
scheduler *scheduler.Scheduler
|
scheduler *scheduler.Scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
// createConfigurator create a configurator for scheduler with given informer factory and default name.
|
// createConfiguratorWithPodInformer create a configurator for scheduler with given informer factory, custom name and pod informer.
|
||||||
func CreateConfigurator(
|
func createConfiguratorWithPodInformer(
|
||||||
clientSet clientset.Interface,
|
|
||||||
informerFactory informers.SharedInformerFactory,
|
|
||||||
) scheduler.Configurator {
|
|
||||||
// Enable EnableEquivalenceClassCache for all integration tests.
|
|
||||||
utilfeature.DefaultFeatureGate.Set("EnableEquivalenceClassCache=true")
|
|
||||||
|
|
||||||
return factory.NewConfigFactory(
|
|
||||||
v1.DefaultSchedulerName,
|
|
||||||
clientSet,
|
|
||||||
informerFactory.Core().V1().Nodes(),
|
|
||||||
informerFactory.Core().V1().Pods(),
|
|
||||||
informerFactory.Core().V1().PersistentVolumes(),
|
|
||||||
informerFactory.Core().V1().PersistentVolumeClaims(),
|
|
||||||
informerFactory.Core().V1().ReplicationControllers(),
|
|
||||||
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
|
||||||
informerFactory.Apps().V1beta1().StatefulSets(),
|
|
||||||
informerFactory.Core().V1().Services(),
|
|
||||||
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
|
||||||
informerFactory.Storage().V1().StorageClasses(),
|
|
||||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
|
||||||
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateConfiguratorWithPodInformer create a configurator for scheduler with given informer factory, custom name and pod informer.
|
|
||||||
func CreateConfiguratorWithPodInformer(
|
|
||||||
schedulerName string,
|
schedulerName string,
|
||||||
clientSet clientset.Interface,
|
clientSet clientset.Interface,
|
||||||
podInformer coreinformers.PodInformer,
|
podInformer coreinformers.PodInformer,
|
||||||
informerFactory informers.SharedInformerFactory,
|
informerFactory informers.SharedInformerFactory,
|
||||||
) scheduler.Configurator {
|
) scheduler.Configurator {
|
||||||
// Enable EnableEquivalenceClassCache for all integration tests.
|
|
||||||
utilfeature.DefaultFeatureGate.Set("EnableEquivalenceClassCache=true")
|
|
||||||
|
|
||||||
return factory.NewConfigFactory(
|
return factory.NewConfigFactory(
|
||||||
schedulerName,
|
schedulerName,
|
||||||
clientSet,
|
clientSet,
|
||||||
@ -111,35 +87,108 @@ func CreateConfiguratorWithPodInformer(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// initTest initializes a test environment and creates a scheduler with default
|
// initTestMasterAndScheduler initializes a test environment and creates a master with default
|
||||||
// configuration.
|
// configuration.
|
||||||
func initTest(t *testing.T, nsPrefix string) *TestContext {
|
func initTestMaster(t *testing.T, nsPrefix string, admission admission.Interface) *TestContext {
|
||||||
var context TestContext
|
var context TestContext
|
||||||
|
|
||||||
|
// 1. Create master
|
||||||
|
h := &framework.MasterHolder{Initialized: make(chan struct{})}
|
||||||
|
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
<-h.Initialized
|
||||||
|
h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
|
||||||
|
}))
|
||||||
|
|
||||||
masterConfig := framework.NewIntegrationTestMasterConfig()
|
masterConfig := framework.NewIntegrationTestMasterConfig()
|
||||||
_, context.httpServer, context.closeFn = framework.RunAMaster(masterConfig)
|
|
||||||
|
|
||||||
context.ns = framework.CreateTestingNamespace(nsPrefix+string(uuid.NewUUID()), context.httpServer, t)
|
if admission != nil {
|
||||||
|
masterConfig.GenericConfig.AdmissionControl = admission
|
||||||
|
}
|
||||||
|
|
||||||
|
_, context.httpServer, context.closeFn = framework.RunAMasterUsingServer(masterConfig, s, h)
|
||||||
|
|
||||||
|
if nsPrefix != "default" {
|
||||||
|
context.ns = framework.CreateTestingNamespace(nsPrefix+string(uuid.NewUUID()), s, t)
|
||||||
|
} else {
|
||||||
|
context.ns = framework.CreateTestingNamespace("default", s, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Create kubeclient
|
||||||
|
context.clientSet = clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
||||||
|
return &context
|
||||||
|
}
|
||||||
|
|
||||||
|
// initTestScheduler initializes a test environment and creates a scheduler with default
|
||||||
|
// configuration.
|
||||||
|
func initTestScheduler(
|
||||||
|
t *testing.T,
|
||||||
|
context *TestContext,
|
||||||
|
controllerCh chan struct{},
|
||||||
|
setPodInformer bool,
|
||||||
|
policy *schedulerapi.Policy,
|
||||||
|
) *TestContext {
|
||||||
|
// Enable EnableEquivalenceClassCache for all integration tests.
|
||||||
|
defer utilfeaturetesting.SetFeatureGateDuringTest(
|
||||||
|
t,
|
||||||
|
utilfeature.DefaultFeatureGate,
|
||||||
|
features.EnableEquivalenceClassCache, true)()
|
||||||
|
|
||||||
|
// 1. Create scheduler
|
||||||
|
context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, time.Second)
|
||||||
|
|
||||||
|
var podInformer coreinformers.PodInformer
|
||||||
|
|
||||||
|
// create independent pod informer if required
|
||||||
|
if setPodInformer {
|
||||||
|
podInformer = factory.NewPodInformer(context.clientSet, 12*time.Hour, v1.DefaultSchedulerName)
|
||||||
|
} else {
|
||||||
|
podInformer = context.informerFactory.Core().V1().Pods()
|
||||||
|
}
|
||||||
|
|
||||||
|
context.schedulerConfigFactory = createConfiguratorWithPodInformer(
|
||||||
|
v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory)
|
||||||
|
|
||||||
context.clientSet = clientset.NewForConfigOrDie(&restclient.Config{Host: context.httpServer.URL})
|
|
||||||
context.informerFactory = informers.NewSharedInformerFactory(context.clientSet, 0)
|
|
||||||
podInformer := factory.NewPodInformer(context.clientSet, 12*time.Hour, v1.DefaultSchedulerName)
|
|
||||||
context.schedulerConfigFactory = CreateConfiguratorWithPodInformer(v1.DefaultSchedulerName, context.clientSet, podInformer, context.informerFactory)
|
|
||||||
var err error
|
var err error
|
||||||
context.schedulerConfig, err = context.schedulerConfigFactory.Create()
|
|
||||||
|
if policy != nil {
|
||||||
|
context.schedulerConfig, err = context.schedulerConfigFactory.CreateFromConfig(*policy)
|
||||||
|
} else {
|
||||||
|
context.schedulerConfig, err = context.schedulerConfigFactory.Create()
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Couldn't create scheduler config: %v", err)
|
t.Fatalf("Couldn't create scheduler config: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set controllerCh if provided.
|
||||||
|
if controllerCh != nil {
|
||||||
|
context.schedulerConfig.StopEverything = controllerCh
|
||||||
|
}
|
||||||
|
|
||||||
|
// set setPodInformer if provided.
|
||||||
|
if setPodInformer {
|
||||||
|
go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
|
||||||
|
}
|
||||||
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName})
|
context.schedulerConfig.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName})
|
||||||
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(context.clientSet.CoreV1().RESTClient()).Events("")})
|
eventBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{Interface: clientv1core.New(context.clientSet.CoreV1().RESTClient()).Events("")})
|
||||||
go podInformer.Informer().Run(context.schedulerConfig.StopEverything)
|
|
||||||
context.informerFactory.Start(context.schedulerConfig.StopEverything)
|
context.informerFactory.Start(context.schedulerConfig.StopEverything)
|
||||||
|
context.informerFactory.WaitForCacheSync(context.schedulerConfig.StopEverything)
|
||||||
|
|
||||||
context.scheduler, err = scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: context.schedulerConfig}, nil...)
|
context.scheduler, err = scheduler.NewFromConfigurator(&scheduler.FakeConfigurator{Config: context.schedulerConfig}, nil...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Couldn't create scheduler: %v", err)
|
t.Fatalf("Couldn't create scheduler: %v", err)
|
||||||
}
|
}
|
||||||
context.scheduler.Run()
|
context.scheduler.Run()
|
||||||
return &context
|
return context
|
||||||
|
}
|
||||||
|
|
||||||
|
// initTest initializes a test environment and creates master and scheduler with default
|
||||||
|
// configuration.
|
||||||
|
func initTest(t *testing.T, nsPrefix string) *TestContext {
|
||||||
|
return initTestScheduler(t, initTestMaster(t, nsPrefix, nil), nil, true, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanupTest deletes the scheduler and the test namespace. It should be called
|
// cleanupTest deletes the scheduler and the test namespace. It should be called
|
||||||
|
@ -20,8 +20,6 @@ package scheduler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@ -35,17 +33,8 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/informers"
|
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
||||||
restclient "k8s.io/client-go/rest"
|
|
||||||
"k8s.io/client-go/tools/record"
|
|
||||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
|
||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
|
||||||
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
|
||||||
"k8s.io/kubernetes/pkg/scheduler"
|
|
||||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type testConfig struct {
|
type testConfig struct {
|
||||||
@ -347,27 +336,17 @@ func TestPVAffinityConflict(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig {
|
func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig {
|
||||||
h := &framework.MasterHolder{Initialized: make(chan struct{})}
|
|
||||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
||||||
<-h.Initialized
|
|
||||||
h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
|
|
||||||
}))
|
|
||||||
|
|
||||||
// Enable feature gates
|
// Enable feature gates
|
||||||
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true,PersistentLocalVolumes=true")
|
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true,PersistentLocalVolumes=true")
|
||||||
|
|
||||||
// Build clientset and informers for controllers.
|
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[v1.GroupName].GroupVersion()}})
|
|
||||||
informers := informers.NewSharedInformerFactory(clientset, time.Second)
|
|
||||||
|
|
||||||
// Start master
|
|
||||||
masterConfig := framework.NewIntegrationTestMasterConfig()
|
|
||||||
|
|
||||||
_, _, closeFn := framework.RunAMasterUsingServer(masterConfig, s, h)
|
|
||||||
ns := framework.CreateTestingNamespace(nsName, s, t).Name
|
|
||||||
|
|
||||||
controllerCh := make(chan struct{})
|
controllerCh := make(chan struct{})
|
||||||
|
|
||||||
|
context := initTestScheduler(t, initTestMaster(t, nsName, nil), controllerCh, false, nil)
|
||||||
|
|
||||||
|
clientset := context.clientSet
|
||||||
|
ns := context.ns.Name
|
||||||
|
informers := context.informerFactory
|
||||||
|
|
||||||
// Start PV controller for volume binding.
|
// Start PV controller for volume binding.
|
||||||
params := persistentvolume.ControllerParameters{
|
params := persistentvolume.ControllerParameters{
|
||||||
KubeClient: clientset,
|
KubeClient: clientset,
|
||||||
@ -388,40 +367,6 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig {
|
|||||||
}
|
}
|
||||||
go ctrl.Run(controllerCh)
|
go ctrl.Run(controllerCh)
|
||||||
|
|
||||||
// Start scheduler
|
|
||||||
configurator := factory.NewConfigFactory(
|
|
||||||
v1.DefaultSchedulerName,
|
|
||||||
clientset,
|
|
||||||
informers.Core().V1().Nodes(),
|
|
||||||
informers.Core().V1().Pods(),
|
|
||||||
informers.Core().V1().PersistentVolumes(),
|
|
||||||
informers.Core().V1().PersistentVolumeClaims(),
|
|
||||||
informers.Core().V1().ReplicationControllers(),
|
|
||||||
informers.Extensions().V1beta1().ReplicaSets(),
|
|
||||||
informers.Apps().V1beta1().StatefulSets(),
|
|
||||||
informers.Core().V1().Services(),
|
|
||||||
informers.Policy().V1beta1().PodDisruptionBudgets(),
|
|
||||||
informers.Storage().V1().StorageClasses(),
|
|
||||||
v1.DefaultHardPodAffinitySymmetricWeight,
|
|
||||||
true, // Enable EqualCache by default.
|
|
||||||
)
|
|
||||||
|
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
|
||||||
sched, err := scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
|
|
||||||
cfg.StopEverything = controllerCh
|
|
||||||
cfg.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: v1.DefaultSchedulerName})
|
|
||||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(clientset.CoreV1().RESTClient()).Events("")})
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create scheduler: %v.", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go sched.Run()
|
|
||||||
|
|
||||||
// Waiting for all controller sync.
|
|
||||||
informers.Start(controllerCh)
|
|
||||||
informers.WaitForCacheSync(controllerCh)
|
|
||||||
|
|
||||||
// Create shared objects
|
// Create shared objects
|
||||||
// Create nodes
|
// Create nodes
|
||||||
for i := 0; i < numberOfNodes; i++ {
|
for i := 0; i < numberOfNodes; i++ {
|
||||||
@ -470,9 +415,7 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int) *testConfig {
|
|||||||
clientset.CoreV1().PersistentVolumeClaims(ns).DeleteCollection(nil, metav1.ListOptions{})
|
clientset.CoreV1().PersistentVolumeClaims(ns).DeleteCollection(nil, metav1.ListOptions{})
|
||||||
clientset.CoreV1().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{})
|
clientset.CoreV1().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{})
|
||||||
clientset.StorageV1().StorageClasses().DeleteCollection(nil, metav1.ListOptions{})
|
clientset.StorageV1().StorageClasses().DeleteCollection(nil, metav1.ListOptions{})
|
||||||
clientset.CoreV1().Nodes().DeleteCollection(nil, metav1.ListOptions{})
|
cleanupTest(t, context)
|
||||||
close(controllerCh)
|
|
||||||
closeFn()
|
|
||||||
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false,LocalPersistentVolumes=false")
|
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false,LocalPersistentVolumes=false")
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -22,14 +22,16 @@ import (
|
|||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
clientv1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||||
|
"k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/pkg/scheduler"
|
"k8s.io/kubernetes/pkg/scheduler"
|
||||||
|
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
utilscheduler "k8s.io/kubernetes/test/integration/scheduler"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module
|
// ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module
|
||||||
@ -63,7 +65,7 @@ func StartScheduler(clientSet clientset.Interface, enableEquivalenceCache bool)
|
|||||||
evtWatch := evtBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{
|
evtWatch := evtBroadcaster.StartRecordingToSink(&clientv1core.EventSinkImpl{
|
||||||
Interface: clientv1core.New(clientSet.CoreV1().RESTClient()).Events("")})
|
Interface: clientv1core.New(clientSet.CoreV1().RESTClient()).Events("")})
|
||||||
|
|
||||||
schedulerConfigurator := utilscheduler.CreateConfigurator(clientSet, informerFactory)
|
schedulerConfigurator := createSchedulerConfigurator(clientSet, informerFactory)
|
||||||
|
|
||||||
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *scheduler.Config) {
|
sched, err := scheduler.NewFromConfigurator(schedulerConfigurator, func(conf *scheduler.Config) {
|
||||||
conf.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
|
conf.Recorder = evtBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
|
||||||
@ -86,3 +88,29 @@ func StartScheduler(clientSet clientset.Interface, enableEquivalenceCache bool)
|
|||||||
}
|
}
|
||||||
return schedulerConfigurator, shutdownFunc
|
return schedulerConfigurator, shutdownFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// createSchedulerConfigurator create a configurator for scheduler with given informer factory and default name.
|
||||||
|
func createSchedulerConfigurator(
|
||||||
|
clientSet clientset.Interface,
|
||||||
|
informerFactory informers.SharedInformerFactory,
|
||||||
|
) scheduler.Configurator {
|
||||||
|
// Enable EnableEquivalenceClassCache for all integration tests.
|
||||||
|
utilfeature.DefaultFeatureGate.Set("EnableEquivalenceClassCache=true")
|
||||||
|
|
||||||
|
return factory.NewConfigFactory(
|
||||||
|
v1.DefaultSchedulerName,
|
||||||
|
clientSet,
|
||||||
|
informerFactory.Core().V1().Nodes(),
|
||||||
|
informerFactory.Core().V1().Pods(),
|
||||||
|
informerFactory.Core().V1().PersistentVolumes(),
|
||||||
|
informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||||
|
informerFactory.Core().V1().ReplicationControllers(),
|
||||||
|
informerFactory.Extensions().V1beta1().ReplicaSets(),
|
||||||
|
informerFactory.Apps().V1beta1().StatefulSets(),
|
||||||
|
informerFactory.Core().V1().Services(),
|
||||||
|
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
|
||||||
|
informerFactory.Storage().V1().StorageClasses(),
|
||||||
|
v1.DefaultHardPodAffinitySymmetricWeight,
|
||||||
|
utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user