Merge pull request #110264 from wojtek-t/fix_leaking_goroutines_4

Fix leaking goroutines in multiple integration tests
This commit is contained in:
Kubernetes Prow Robot 2022-06-02 03:59:47 -07:00 committed by GitHub
commit 03d0e2c338
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 311 additions and 551 deletions

View File

@ -73,8 +73,6 @@ const (
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
@ -173,6 +171,12 @@ type Controller struct {
// endpoints will be handled in parallel.
func (e *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
// Start events processing pipeline.
e.eventBroadcaster.StartStructuredLogging(0)
e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")})
defer e.eventBroadcaster.Shutdown()
defer e.queue.ShutDown()
klog.Infof("Starting endpoint controller")

View File

@ -86,8 +86,6 @@ func NewController(podInformer coreinformers.PodInformer,
endpointUpdatesBatchPeriod time.Duration,
) *Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
@ -252,6 +250,12 @@ type Controller struct {
// Run will not return until stopCh is closed.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Start events processing pipeline.
c.eventBroadcaster.StartLogging(klog.Infof)
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
defer c.queue.ShutDown()
klog.Infof("Starting endpoint slice controller")

View File

@ -75,8 +75,6 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer,
endpointUpdatesBatchPeriod time.Duration,
) *Controller {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
@ -207,6 +205,12 @@ type Controller struct {
// Run will not return until stopCh is closed.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Start events processing pipeline.
c.eventBroadcaster.StartLogging(klog.Infof)
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
defer c.queue.ShutDown()
klog.Infof("Starting EndpointSliceMirroring controller")

View File

@ -28,9 +28,11 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
)
@ -58,6 +60,11 @@ func newController(batchPeriod time.Duration) (*fake.Clientset, *endpointSliceMi
client,
batchPeriod)
// The event processing pipeline is normally started via Run() method.
// However, since we don't start it in unit tests, we explicitly start it here.
esController.eventBroadcaster.StartLogging(klog.Infof)
esController.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
esController.endpointsSynced = alwaysReady
esController.endpointSlicesSynced = alwaysReady
esController.servicesSynced = alwaysReady

View File

@ -34,7 +34,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
@ -44,28 +44,23 @@ import (
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controlplane"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
// TODO(wojtek-t): Migrate to use testing.TestServer instead.
func setup(t testing.TB, groupVersions ...schema.GroupVersion) (clientset.Interface, framework.CloseFunc) {
opts := framework.ControlPlaneConfigOptions{EtcdOptions: framework.DefaultEtcdOptions()}
opts.EtcdOptions.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&opts)
if len(groupVersions) > 0 {
resourceConfig := controlplane.DefaultAPIResourceConfigSource()
resourceConfig.EnableVersions(groupVersions...)
controlPlaneConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
}
controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
_, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
func setup(t testing.TB) (clientset.Interface, kubeapiservertesting.TearDownFunc) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL, QPS: -1})
config := restclient.CopyConfig(server.ClientConfig)
// There are some tests (in scale_test.go) that rely on the response to be returned in JSON.
// So we overwrite it here.
config.ContentType = runtime.ContentTypeJSON
clientSet, err := clientset.NewForConfig(config)
if err != nil {
t.Fatalf("Error in create clientset: %v", err)
}
return clientSet, closeFn
return clientSet, server.TearDownFn
}
// TestApplyAlsoCreates makes sure that PATCH requests with the apply content type
@ -2812,15 +2807,13 @@ spec:
}
func TestStopTrackingManagedFieldsOnFeatureDisabled(t *testing.T) {
sharedEtcd := framework.DefaultEtcdOptions()
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfigWithOptions(&framework.ControlPlaneConfigOptions{
EtcdOptions: sharedEtcd,
})
controlPlaneConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
sharedEtcd := framework.SharedEtcd()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, true)()
_, instanceConfig, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
client, err := clientset.NewForConfig(&restclient.Config{Host: instanceConfig.URL, QPS: -1})
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, sharedEtcd)
client, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error in create clientset: %v", err)
}
@ -2870,14 +2863,15 @@ spec:
}
// Restart server with server-side apply disabled
closeFn()
server.TearDownFn()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.ServerSideApply, false)()
_, instanceConfig, closeFn = framework.RunAnAPIServer(controlPlaneConfig)
client, err = clientset.NewForConfig(&restclient.Config{Host: instanceConfig.URL, QPS: -1})
server = kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, sharedEtcd)
defer server.TearDownFn()
client, err = clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error in create clientset: %v", err)
}
defer closeFn()
_, err = client.CoreV1().RESTClient().Patch(types.ApplyPatchType).
AbsPath("/apis/apps/v1").

View File

@ -155,12 +155,12 @@ func TestCronJobLaunchesPodAndCleansUp(t *testing.T) {
cjClient := clientSet.BatchV1().CronJobs(ns.Name)
stopCh := make(chan struct{})
defer close(stopCh)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerSet.Start(stopCh)
go cjc.Run(context.TODO(), 1)
go jc.Run(context.TODO(), 1)
informerSet.Start(ctx.Done())
go cjc.Run(ctx, 1)
go jc.Run(ctx, 1)
_, err := cjClient.Create(context.TODO(), newCronJob(cronJobName, ns.Name, "* * * * ?"), metav1.CreateOptions{})
if err != nil {

View File

@ -51,7 +51,7 @@ import (
var zero = int64(0)
func setup(t *testing.T) (kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) {
func setup(t *testing.T) (context.Context, kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition"}, framework.SharedEtcd())
@ -74,22 +74,15 @@ func setup(t *testing.T) (kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsC
t.Fatalf("error creating DaemonSets controller: %v", err)
}
return server.TearDownFn, dc, informers, clientSet
}
ctx, cancel := context.WithCancel(context.Background())
func setupScheduler(
ctx context.Context,
t *testing.T,
cs clientset.Interface,
informerFactory informers.SharedInformerFactory,
) {
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: cs.EventsV1(),
Interface: clientSet.EventsV1(),
})
sched, err := scheduler.New(
cs,
informerFactory,
clientSet,
informers,
nil,
profile.NewRecorderFactory(eventBroadcaster),
ctx.Done(),
@ -99,8 +92,15 @@ func setupScheduler(
}
eventBroadcaster.StartRecordingToSink(ctx.Done())
go sched.Run(ctx)
tearDownFn := func() {
cancel()
server.TearDownFn()
eventBroadcaster.Shutdown()
}
return ctx, tearDownFn, dc, informers, clientSet
}
func testLabels() map[string]string {
@ -421,7 +421,7 @@ func forEachStrategy(t *testing.T, tf func(t *testing.T, strategy *apps.DaemonSe
func TestOneNodeDaemonLaunchesPod(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "one-node-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
@ -431,12 +431,6 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start Scheduler
setupScheduler(ctx, t, clientset, informers)
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
@ -460,7 +454,7 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "simple-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
@ -470,15 +464,9 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
// Start Scheduler
setupScheduler(ctx, t, clientset, informers)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
_, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
@ -496,7 +484,7 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "simple-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
@ -506,15 +494,9 @@ func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
// Start Scheduler
setupScheduler(ctx, t, clientset, informers)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
@ -565,7 +547,7 @@ func TestDaemonSetWithNodeSelectorLaunchesPods(t *testing.T) {
func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "simple-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
@ -575,15 +557,9 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
// Start Scheduler
setupScheduler(ctx, t, clientset, informers)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
_, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
@ -612,7 +588,7 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
// not schedule Pods onto the nodes with insufficient resource.
func TestInsufficientCapacityNode(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "insufficient-capacity", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
@ -622,15 +598,9 @@ func TestInsufficientCapacityNode(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
// Start Scheduler
setupScheduler(ctx, t, clientset, informers)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.Template.Spec = resourcePodSpec("", "120M", "75m")
ds.Spec.UpdateStrategy = *strategy
@ -676,7 +646,7 @@ func TestInsufficientCapacityNode(t *testing.T) {
// TestLaunchWithHashCollision tests that a DaemonSet can be updated even if there is a
// hash collision with an existing ControllerRevision
func TestLaunchWithHashCollision(t *testing.T) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "one-node-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
@ -685,15 +655,9 @@ func TestLaunchWithHashCollision(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
// Start Scheduler
setupScheduler(ctx, t, clientset, informers)
// Create single node
_, err := nodeClient.Create(context.TODO(), newNode("single-node", nil), metav1.CreateOptions{})
if err != nil {
@ -787,7 +751,7 @@ func TestLaunchWithHashCollision(t *testing.T) {
// 2. Add a node to ensure the controller sync
// 3. The dsc is expected to "PATCH" the existing pod label with new hash and deletes the old controllerrevision once finishes the update
func TestDSCUpdatesPodLabelAfterDedupCurHistories(t *testing.T) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "one-node-daemonset-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
@ -796,15 +760,9 @@ func TestDSCUpdatesPodLabelAfterDedupCurHistories(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
// Start Scheduler
setupScheduler(ctx, t, clientset, informers)
// Create single node
_, err := nodeClient.Create(context.TODO(), newNode("single-node", nil), metav1.CreateOptions{})
if err != nil {
@ -915,7 +873,7 @@ func TestDSCUpdatesPodLabelAfterDedupCurHistories(t *testing.T) {
// TestTaintedNode tests tainted node isn't expected to have pod scheduled
func TestTaintedNode(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "tainted-node", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
@ -925,15 +883,9 @@ func TestTaintedNode(t *testing.T) {
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
// Start Scheduler
setupScheduler(ctx, t, clientset, informers)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
ds, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
@ -980,7 +932,7 @@ func TestTaintedNode(t *testing.T) {
// to the Unschedulable nodes.
func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
closeFn, dc, informers, clientset := setup(t)
ctx, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "daemonset-unschedulable-test", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
@ -990,15 +942,9 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informers.Start(ctx.Done())
go dc.Run(ctx, 2)
// Start Scheduler
setupScheduler(ctx, t, clientset, informers)
ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.HostNetwork = true

View File

@ -22,25 +22,23 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/plugin/pkg/admission/defaulttolerationseconds"
"k8s.io/kubernetes/test/integration/framework"
)
func TestAdmission(t *testing.T) {
controlPlaneConfig := framework.NewControlPlaneConfig()
controlPlaneConfig.GenericConfig.EnableProfiling = true
controlPlaneConfig.GenericConfig.AdmissionControl = defaulttolerationseconds.NewDefaultTolerationSeconds()
_, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
defer closeFn()
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerConfig: func(cfg *controlplane.Config) {
cfg.GenericConfig.EnableProfiling = true
cfg.GenericConfig.AdmissionControl = defaulttolerationseconds.NewDefaultTolerationSeconds()
},
})
defer tearDownFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
ns := framework.CreateTestingNamespace("default-toleration-seconds", t)
defer framework.DeleteTestingNamespace(ns, t)
ns := framework.CreateNamespaceOrDie(client, "default-toleration-seconds", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
pod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{

View File

@ -29,42 +29,31 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/endpointslice"
"k8s.io/kubernetes/test/integration/framework"
netutils "k8s.io/utils/net"
)
func TestDualStackEndpoints(t *testing.T) {
// Create an IPv4IPv6 dual stack control-plane
serviceCIDR := "10.0.0.0/16"
secondaryServiceCIDR := "2001:db8:1::/48"
secondaryServiceCIDR := "2001:db8:1::/112"
labelMap := func() map[string]string {
return map[string]string{"foo": "bar"}
}
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("Bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
_, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR)
if err != nil {
t.Fatalf("Bad cidr: %v", err)
}
cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
},
})
defer tearDownFn()
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -156,8 +145,8 @@ func TestDualStackEndpoints(t *testing.T) {
for i, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
ns := framework.CreateTestingNamespace(fmt.Sprintf("test-endpointslice-dualstack-%d", i), t)
defer framework.DeleteTestingNamespace(ns, t)
ns := framework.CreateNamespaceOrDie(client, fmt.Sprintf("test-endpointslice-dualstack-%d", i), t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
// Create a pod with labels
pod := &v1.Pod{

View File

@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"reflect"
"strings"
"testing"
@ -36,9 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/test/integration/framework"
netutils "k8s.io/utils/net"
)
@ -48,19 +45,15 @@ func TestCreateServiceSingleStackIPv4(t *testing.T) {
// Create an IPv4 single stack control-plane
serviceCIDR := "10.0.0.0/16"
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = serviceCIDR
},
})
defer tearDownFn()
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -239,7 +232,7 @@ func TestCreateServiceSingleStackIPv4(t *testing.T) {
}
// create the service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if (err != nil) != tc.expectError {
t.Errorf("Test failed expected result: %v received %v ", tc.expectError, err)
}
@ -262,23 +255,18 @@ func TestCreateServiceSingleStackIPv4(t *testing.T) {
// TestCreateServiceDualStackIPv6 test the Service dualstackness in an IPv6 only DualStack cluster
func TestCreateServiceDualStackIPv6(t *testing.T) {
// Create an IPv6 only dual stack control-plane
serviceCIDR := "2001:db8:1::/48"
serviceCIDR := "2001:db8:1::/112"
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
cfg.GenericConfig.PublicAddress = netutils.ParseIPSloppy("2001:db8::10")
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = serviceCIDR
opts.GenericServerRunOptions.AdvertiseAddress = netutils.ParseIPSloppy("2001:db8::10")
},
})
defer tearDownFn()
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -459,7 +447,7 @@ func TestCreateServiceDualStackIPv6(t *testing.T) {
}
// create the service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if (err != nil) != tc.expectError {
t.Errorf("Test failed expected result: %v received %v ", tc.expectError, err)
}
@ -483,28 +471,17 @@ func TestCreateServiceDualStackIPv6(t *testing.T) {
func TestCreateServiceDualStackIPv4IPv6(t *testing.T) {
// Create an IPv4IPv6 dual stack control-plane
serviceCIDR := "10.0.0.0/16"
secondaryServiceCIDR := "2001:db8:1::/48"
secondaryServiceCIDR := "2001:db8:1::/112"
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
_, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
},
})
defer tearDownFn()
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -684,7 +661,7 @@ func TestCreateServiceDualStackIPv4IPv6(t *testing.T) {
}
// create a service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if (err != nil) != tc.expectError {
t.Errorf("Test failed expected result: %v received %v ", tc.expectError, err)
}
@ -708,30 +685,19 @@ func TestCreateServiceDualStackIPv4IPv6(t *testing.T) {
// TestCreateServiceDualStackIPv6IPv4 test the Service dualstackness in a IPv6IPv4 DualStack cluster
func TestCreateServiceDualStackIPv6IPv4(t *testing.T) {
// Create an IPv6IPv4 dual stack control-plane
serviceCIDR := "2001:db8:1::/48"
serviceCIDR := "2001:db8:1::/112"
secondaryServiceCIDR := "10.0.0.0/16"
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
cfg.GenericConfig.PublicAddress = netutils.ParseIPSloppy("2001:db8::10")
_, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
opts.GenericServerRunOptions.AdvertiseAddress = netutils.ParseIPSloppy("2001:db8::10")
},
})
defer tearDownFn()
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -743,7 +709,7 @@ func TestCreateServiceDualStackIPv6IPv4(t *testing.T) {
// verify client is working
if err := wait.PollImmediate(5*time.Second, 2*time.Minute, func() (bool, error) {
_, err = client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
_, err := client.CoreV1().Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil {
t.Logf("error fetching endpoints: %v", err)
return false, nil
@ -914,7 +880,7 @@ func TestCreateServiceDualStackIPv6IPv4(t *testing.T) {
}
// create a service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if (err != nil) != tc.expectError {
t.Errorf("Test failed expected result: %v received %v ", tc.expectError, err)
}
@ -939,28 +905,17 @@ func TestCreateServiceDualStackIPv6IPv4(t *testing.T) {
func TestUpgradeDowngrade(t *testing.T) {
// Create an IPv4IPv6 dual stack control-plane
serviceCIDR := "10.0.0.0/16"
secondaryServiceCIDR := "2001:db8:1::/48"
secondaryServiceCIDR := "2001:db8:1::/112"
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
_, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
},
})
defer tearDownFn()
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -988,7 +943,7 @@ func TestUpgradeDowngrade(t *testing.T) {
}
// create a service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unexpected error while creating service:%v", err)
}
@ -1053,28 +1008,17 @@ func TestUpgradeDowngrade(t *testing.T) {
func TestConvertToFromExternalName(t *testing.T) {
// Create an IPv4IPv6 dual stack control-plane
serviceCIDR := "10.0.0.0/16"
secondaryServiceCIDR := "2001:db8:1::/48"
secondaryServiceCIDR := "2001:db8:1::/112"
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
_, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
},
})
defer tearDownFn()
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -1101,7 +1045,7 @@ func TestConvertToFromExternalName(t *testing.T) {
}
// create a service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unexpected error while creating service:%v", err)
}
@ -1145,28 +1089,17 @@ func TestConvertToFromExternalName(t *testing.T) {
func TestPreferDualStack(t *testing.T) {
// Create an IPv4IPv6 dual stack control-plane
serviceCIDR := "10.0.0.0/16"
secondaryServiceCIDR := "2001:db8:1::/48"
secondaryServiceCIDR := "2001:db8:1::/112"
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
_, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
},
})
defer tearDownFn()
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -1197,7 +1130,7 @@ func TestPreferDualStack(t *testing.T) {
}
// create a service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unexpected error while creating service:%v", err)
}
@ -1231,19 +1164,15 @@ func TestServiceUpdate(t *testing.T) {
// Create an IPv4 single stack control-plane
serviceCIDR := "10.0.0.0/16"
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
_, s, closeFn := framework.RunAnAPIServer(cfg)
defer closeFn()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceClusterIPRanges = serviceCIDR
},
})
defer tearDownFn()
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -1270,7 +1199,7 @@ func TestServiceUpdate(t *testing.T) {
}
// create the service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
// if no error was expected validate the service otherwise return
if err != nil {
t.Errorf("unexpected error creating service:%v", err)
@ -1392,21 +1321,20 @@ func validateServiceAndClusterIPFamily(svc *v1.Service, expectedIPFamilies []v1.
}
func TestUpgradeServicePreferToDualStack(t *testing.T) {
sharedEtcd := framework.SharedEtcd()
// Create an IPv4 only dual stack control-plane
serviceCIDR := "192.168.0.0/24"
cfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.ServiceIPRange = *cidr
_, s, closeFn := framework.RunAnAPIServer(cfg)
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.Etcd.StorageConfig = *sharedEtcd
opts.ServiceClusterIPRanges = serviceCIDR
},
})
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -1438,7 +1366,7 @@ func TestUpgradeServicePreferToDualStack(t *testing.T) {
}
// create the service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -1452,18 +1380,17 @@ func TestUpgradeServicePreferToDualStack(t *testing.T) {
}
// reconfigure the apiserver to be dual-stack
closeFn()
tearDownFn()
secondaryServiceCIDR := "2001:db8:1::/48"
_, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
cfg.ExtraConfig.SecondaryServiceIPRange = *secCidr
_, s, closeFn = framework.RunAnAPIServer(cfg)
defer closeFn()
secondaryServiceCIDR := "2001:db8:1::/112"
client = clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
client, _, tearDownFn = framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.Etcd.StorageConfig = *sharedEtcd
opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
},
})
defer tearDownFn()
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
@ -1487,25 +1414,21 @@ func TestUpgradeServicePreferToDualStack(t *testing.T) {
}
func TestDowngradeServicePreferToDualStack(t *testing.T) {
sharedEtcd := framework.SharedEtcd()
// Create a dual stack control-plane
serviceCIDR := "192.168.0.0/24"
secondaryServiceCIDR := "2001:db8:1::/48"
secondaryServiceCIDR := "2001:db8:1::/112"
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.Etcd.StorageConfig = *sharedEtcd
opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
},
})
dualStackCfg := framework.NewIntegrationTestControlPlaneConfig()
_, cidr, err := netutils.ParseCIDRSloppy(serviceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
dualStackCfg.ExtraConfig.ServiceIPRange = *cidr
_, secCidr, err := netutils.ParseCIDRSloppy(secondaryServiceCIDR)
if err != nil {
t.Fatalf("bad cidr: %v", err)
}
dualStackCfg.ExtraConfig.SecondaryServiceIPRange = *secCidr
_, s, closeFn := framework.RunAnAPIServer(dualStackCfg)
client := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return false, err
@ -1535,7 +1458,7 @@ func TestDowngradeServicePreferToDualStack(t *testing.T) {
},
}
// create the service
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -1548,14 +1471,17 @@ func TestDowngradeServicePreferToDualStack(t *testing.T) {
t.Fatalf("Unexpected error validating the service %s %v", svc.Name, err)
}
// reconfigure the apiserver to be sinlge stack
closeFn()
// reset secondary
var emptyCidr net.IPNet
dualStackCfg.ExtraConfig.SecondaryServiceIPRange = emptyCidr
tearDownFn()
// reset secondary
client, _, tearDownFn = framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.Etcd.StorageConfig = *sharedEtcd
opts.ServiceClusterIPRanges = serviceCIDR
},
})
defer tearDownFn()
_, s, closeFn = framework.RunAnAPIServer(dualStackCfg)
defer closeFn()
client = clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL})
// Wait until the default "kubernetes" service is created.
if err = wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), "kubernetes", metav1.GetOptions{})
@ -1587,18 +1513,11 @@ type specMergePatch struct {
// tests success when converting ClusterIP:Headless service to ExternalName
func Test_ServiceChangeTypeHeadlessToExternalNameWithPatch(t *testing.T) {
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
_, server, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
defer closeFn()
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{})
defer tearDownFn()
config := restclient.Config{Host: server.URL}
client, err := clientset.NewForConfig(&config)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}
ns := framework.CreateTestingNamespace("test-service-allocate-node-ports", t)
defer framework.DeleteTestingNamespace(ns, t)
ns := framework.CreateNamespaceOrDie(client, "test-service-allocate-node-ports", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
@ -1611,6 +1530,7 @@ func Test_ServiceChangeTypeHeadlessToExternalNameWithPatch(t *testing.T) {
},
}
var err error
service, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)

View File

@ -47,6 +47,7 @@ import (
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/controller-manager/pkg/informerfactory"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
@ -586,9 +587,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
defer cancel()
// Job tracking with finalizers requires less calls in Indexed mode,
// so it's more likely to process all finalizers before all the pods
@ -785,9 +784,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
defer cancel()
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
@ -1162,24 +1159,23 @@ func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, n
}
func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Config, clientset.Interface, *v1.Namespace) {
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
_, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig)
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
config := restclient.Config{
Host: server.URL,
QPS: 200.0,
Burst: 200,
}
clientSet, err := clientset.NewForConfig(&config)
config := restclient.CopyConfig(server.ClientConfig)
config.QPS = 200
config.Burst = 200
clientSet, err := clientset.NewForConfig(config)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}
ns := framework.CreateTestingNamespace(nsBaseName, t)
ns := framework.CreateNamespaceOrDie(clientSet, nsBaseName, t)
closeFn := func() {
framework.DeleteTestingNamespace(ns, t)
apiServerCloseFn()
framework.DeleteNamespaceOrDie(clientSet, ns, t)
server.TearDownFn()
}
return closeFn, &config, clientSet, ns
return closeFn, config, clientSet, ns
}
func startJobControllerAndWaitForCaches(restConfig *restclient.Config) (context.Context, context.CancelFunc) {

View File

@ -353,7 +353,7 @@ func TestSchedulerExtender(t *testing.T) {
},
}
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, scheduler.WithExtenders(extenders...))
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, scheduler.WithExtenders(extenders...))
testutils.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
defer testutils.CleanupTest(t, testCtx)

View File

@ -1489,7 +1489,7 @@ func TestBindPlugin(t *testing.T) {
})
// Create the scheduler with the test plugin set.
testCtx := testutils.InitTestSchedulerWithOptions(t, testContext,
testCtx := testutils.InitTestSchedulerWithOptions(t, testContext, 0,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
testutils.SyncInformerFactory(testCtx)
@ -2421,7 +2421,7 @@ func TestActivatePods(t *testing.T) {
}
func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext {
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, opts...)
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, opts...)
testutils.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)

View File

@ -171,6 +171,7 @@ func TestPreemption(t *testing.T) {
testCtx := testutils.InitTestSchedulerWithOptions(t,
testutils.InitTestAPIServer(t, "preemption", nil),
0,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
testutils.SyncInformerFactory(testCtx)
@ -1436,7 +1437,7 @@ func TestPDBInPreemption(t *testing.T) {
}
func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext {
testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), opts...)
testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), 0, opts...)
testutils.SyncInformerFactory(testCtx)
// wraps the NextPod() method to make it appear the preemption has been done already and the nominated node has been set.
f := testCtx.Scheduler.NextPod

View File

@ -56,6 +56,7 @@ func TestCoreResourceEnqueue(t *testing.T) {
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
0,
scheduler.WithPodInitialBackoffSeconds(0),
scheduler.WithPodMaxBackoffSeconds(0),
)
@ -236,6 +237,7 @@ func TestCustomResourceEnqueue(t *testing.T) {
testCtx = testutils.InitTestSchedulerWithOptions(
t,
testCtx,
0,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry),
scheduler.WithPodInitialBackoffSeconds(0),

View File

@ -267,7 +267,7 @@ func TestMultipleSchedulers(t *testing.T) {
}},
},
})
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, scheduler.WithProfiles(cfg.Profiles...))
testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, scheduler.WithProfiles(cfg.Profiles...))
testutils.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)

View File

@ -92,6 +92,7 @@ func initTestSchedulerForPriorityTest(t *testing.T, scorePluginName string) *tes
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, strings.ToLower(scorePluginName), nil),
0,
scheduler.WithProfiles(cfg.Profiles...),
)
testutils.SyncInformerFactory(testCtx)
@ -127,6 +128,7 @@ func initTestSchedulerForNodeResourcesTest(t *testing.T) *testutils.TestContext
testCtx := testutils.InitTestSchedulerWithOptions(
t,
testutils.InitTestAPIServer(t, strings.ToLower(noderesources.Name), nil),
0,
scheduler.WithProfiles(cfg.Profiles...),
)
testutils.SyncInformerFactory(testCtx)

View File

@ -37,6 +37,7 @@ import (
apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/statefulset"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
)
@ -49,10 +50,7 @@ const (
// TestVolumeTemplateNoopUpdate ensures embedded StatefulSet objects with embedded PersistentVolumes can be updated
func TestVolumeTemplateNoopUpdate(t *testing.T) {
// Start the server with default storage setup
server, err := apiservertesting.StartTestServer(t, apiservertesting.NewDefaultTestServerOptions(), nil, framework.SharedEtcd())
if err != nil {
t.Fatal(err)
}
server := apiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
c, err := dynamic.NewForConfig(server.ClientConfig)
@ -125,8 +123,8 @@ func TestVolumeTemplateNoopUpdate(t *testing.T) {
func TestSpecReplicasChange(t *testing.T) {
closeFn, rm, informers, c := scSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("test-spec-replicas-change", t)
defer framework.DeleteTestingNamespace(ns, t)
ns := framework.CreateNamespaceOrDie(c, "test-spec-replicas-change", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
cancel := runControllerAndInformers(rm, informers)
defer cancel()
@ -168,8 +166,8 @@ func TestSpecReplicasChange(t *testing.T) {
func TestDeletingAndFailedPods(t *testing.T) {
closeFn, rm, informers, c := scSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("test-deleting-and-failed-pods", t)
defer framework.DeleteTestingNamespace(ns, t)
ns := framework.CreateNamespaceOrDie(c, "test-deleting-and-failed-pods", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
cancel := runControllerAndInformers(rm, informers)
defer cancel()
@ -269,8 +267,8 @@ func TestStatefulSetAvailable(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetMinReadySeconds, test.enabled)()
closeFn, rm, informers, c := scSetup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("test-available-pods", t)
defer framework.DeleteTestingNamespace(ns, t)
ns := framework.CreateNamespaceOrDie(c, "test-available-pods", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
cancel := runControllerAndInformers(rm, informers)
defer cancel()
@ -358,31 +356,28 @@ func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1
// add for issue: https://github.com/kubernetes/kubernetes/issues/108837
func TestStatefulSetStatusWithPodFail(t *testing.T) {
limitedPodNumber := 2
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
controlPlaneConfig.GenericConfig.AdmissionControl = &fakePodFailAdmission{
limitedPodNumber: limitedPodNumber,
}
_, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
c, config, closeFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerConfig: func(config *controlplane.Config) {
config.GenericConfig.AdmissionControl = &fakePodFailAdmission{
limitedPodNumber: limitedPodNumber,
}
},
})
defer closeFn()
config := restclient.Config{Host: s.URL}
c, err := clientset.NewForConfig(&config)
if err != nil {
t.Fatalf("Could not create clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod)
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod)
ssc := statefulset.NewStatefulSetController(
informers.Core().V1().Pods(),
informers.Apps().V1().StatefulSets(),
informers.Core().V1().PersistentVolumeClaims(),
informers.Apps().V1().ControllerRevisions(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")),
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-controller")),
)
ns := framework.CreateTestingNamespace("test-pod-fail", t)
defer framework.DeleteTestingNamespace(ns, t)
ns := framework.CreateNamespaceOrDie(c, "test-pod-fail", t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -390,7 +385,7 @@ func TestStatefulSetStatusWithPodFail(t *testing.T) {
go ssc.Run(ctx, 5)
sts := newSTS("sts", ns.Name, 4)
_, err = c.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{})
_, err := c.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Could not create statefuleSet %s: %v", sts.Name, err)
}

View File

@ -35,6 +35,7 @@ import (
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
api "k8s.io/kubernetes/pkg/apis/core"
//svc "k8s.io/kubernetes/pkg/api/v1/service"
@ -159,27 +160,27 @@ func newStatefulSetPVC(name string) v1.PersistentVolumeClaim {
}
// scSetup sets up necessities for Statefulset integration test, including control plane, apiserver, informers, and clientset
func scSetup(t *testing.T) (framework.CloseFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
_, s, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
func scSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
config := restclient.Config{Host: s.URL}
clientSet, err := clientset.NewForConfig(&config)
config := restclient.CopyConfig(server.ClientConfig)
clientSet, err := clientset.NewForConfig(config)
if err != nil {
t.Fatalf("error in create clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-informers")), resyncPeriod)
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod)
sc := statefulset.NewStatefulSetController(
informers.Core().V1().Pods(),
informers.Apps().V1().StatefulSets(),
informers.Core().V1().PersistentVolumeClaims(),
informers.Apps().V1().ControllerRevisions(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "statefulset-controller")),
clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-controller")),
)
return closeFn, sc, informers, clientSet
return server.TearDownFn, sc, informers, clientSet
}
// Run STS controller and informers

View File

@ -342,7 +342,7 @@ func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interf
testCtx.ClientSet, testCtx.KubeConfig, testCtx.CloseFn = framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(options *options.ServerRunOptions) {
options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority"}
options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority", "StorageObjectInUseProtection"}
},
ModifyServerConfig: func(config *controlplane.Config) {
if admission != nil {
@ -380,7 +380,7 @@ func InitTestScheduler(
testCtx *TestContext,
) *TestContext {
// Pod preemption is enabled by default scheduler configuration.
return InitTestSchedulerWithOptions(t, testCtx)
return InitTestSchedulerWithOptions(t, testCtx, 0)
}
// InitTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
@ -388,10 +388,11 @@ func InitTestScheduler(
func InitTestSchedulerWithOptions(
t *testing.T,
testCtx *TestContext,
resyncPeriod time.Duration,
opts ...scheduler.Option,
) *TestContext {
// 1. Create scheduler
testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, 0)
testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, resyncPeriod)
if testCtx.KubeConfig != nil {
dynClient := dynamic.NewForConfigOrDie(testCtx.KubeConfig)
testCtx.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil)
@ -489,7 +490,7 @@ func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.Di
// InitTestSchedulerWithNS initializes a test environment and creates API server and scheduler with default
// configuration.
func InitTestSchedulerWithNS(t *testing.T, nsPrefix string, opts ...scheduler.Option) *TestContext {
testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), opts...)
testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), 0, opts...)
SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
return testCtx
@ -512,6 +513,7 @@ func InitTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
})
testCtx := InitTestSchedulerWithOptions(
t, InitTestAPIServer(t, nsPrefix, nil),
0,
scheduler.WithProfiles(cfg.Profiles...))
SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)

View File

@ -18,125 +18,15 @@ package volumescheduling
import (
"context"
"net/http/httptest"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/events"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/test/integration/framework"
)
type testContext struct {
closeFn framework.CloseFunc
httpServer *httptest.Server
ns *v1.Namespace
clientSet *clientset.Clientset
informerFactory informers.SharedInformerFactory
scheduler *scheduler.Scheduler
ctx context.Context
cancelFn context.CancelFunc
}
// initTestAPIServer initializes a test environment and creates an API server with default
// configuration. Alpha resources are enabled automatically if the corresponding feature
// is enabled.
func initTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interface) *testContext {
ctx, cancelFunc := context.WithCancel(context.Background())
testCtx := testContext{
ctx: ctx,
cancelFn: cancelFunc,
}
// 1. Create API server
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
resourceConfig := controlplane.DefaultAPIResourceConfigSource()
controlPlaneConfig.ExtraConfig.APIResourceConfigSource = resourceConfig
if admission != nil {
controlPlaneConfig.GenericConfig.AdmissionControl = admission
}
_, testCtx.httpServer, testCtx.closeFn = framework.RunAnAPIServer(controlPlaneConfig)
s := testCtx.httpServer
if nsPrefix != "default" {
testCtx.ns = framework.CreateTestingNamespace(nsPrefix+string(uuid.NewUUID()), t)
} else {
testCtx.ns = framework.CreateTestingNamespace("default", t)
}
// 2. Create kubeclient
testCtx.clientSet = clientset.NewForConfigOrDie(
&restclient.Config{
QPS: -1, Host: s.URL,
ContentConfig: restclient.ContentConfig{
GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"},
},
},
)
return &testCtx
}
// initTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
// configuration and other options.
func initTestSchedulerWithOptions(
t *testing.T,
testCtx *testContext,
resyncPeriod time.Duration,
) *testContext {
// 1. Create scheduler
testCtx.informerFactory = informers.NewSharedInformerFactory(testCtx.clientSet, resyncPeriod)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: testCtx.clientSet.EventsV1(),
})
var err error
testCtx.scheduler, err = scheduler.New(
testCtx.clientSet,
testCtx.informerFactory,
nil,
profile.NewRecorderFactory(eventBroadcaster),
testCtx.ctx.Done())
if err != nil {
t.Fatalf("Couldn't create scheduler: %v", err)
}
eventBroadcaster.StartRecordingToSink(testCtx.ctx.Done())
testCtx.informerFactory.Start(testCtx.scheduler.StopEverything)
testCtx.informerFactory.WaitForCacheSync(testCtx.scheduler.StopEverything)
go testCtx.scheduler.Run(testCtx.ctx)
return testCtx
}
// cleanupTest deletes the scheduler and the test namespace. It should be called
// at the end of a test.
func cleanupTest(t *testing.T, testCtx *testContext) {
// Kill the scheduler.
testCtx.cancelFn()
// Cleanup nodes.
testCtx.clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
framework.DeleteTestingNamespace(testCtx.ns, t)
testCtx.closeFn()
}
// waitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns
// an error if it does not scheduled within the given timeout.
func waitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
testutil "k8s.io/kubernetes/test/integration/util"
imageutils "k8s.io/kubernetes/test/utils/image"
)
@ -990,19 +991,16 @@ func TestCapacity(t *testing.T) {
// selectedNode annotation from a claim to reschedule volume provision
// on provision failure.
func TestRescheduleProvisioning(t *testing.T) {
// Set feature gates
ctx, cancel := context.WithCancel(context.Background())
testCtx := testutil.InitTestAPIServer(t, "reschedule-volume-provision", nil)
testCtx := initTestAPIServer(t, "reschedule-volume-provision", nil)
clientset := testCtx.clientSet
ns := testCtx.ns.Name
clientset := testCtx.ClientSet
ns := testCtx.NS.Name
defer func() {
cancel()
testCtx.CancelFn()
deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
testCtx.clientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
testCtx.closeFn()
testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
testCtx.CloseFn()
}()
ctrl, informerFactory, err := initPVController(t, testCtx, 0)
@ -1038,9 +1036,9 @@ func TestRescheduleProvisioning(t *testing.T) {
}
// Start controller.
go ctrl.Run(ctx)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
go ctrl.Run(testCtx.Ctx)
informerFactory.Start(testCtx.Ctx.Done())
informerFactory.WaitForCacheSync(testCtx.Ctx.Done())
// Validate that the annotation is removed by controller for provision reschedule.
if err := waitForProvisionAnn(clientset, pvc, false); err != nil {
@ -1049,18 +1047,21 @@ func TestRescheduleProvisioning(t *testing.T) {
}
func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig {
testCtx := initTestSchedulerWithOptions(t, initTestAPIServer(t, nsName, nil), resyncPeriod)
clientset := testCtx.clientSet
ns := testCtx.ns.Name
testCtx := testutil.InitTestSchedulerWithOptions(t, testutil.InitTestAPIServer(t, nsName, nil), resyncPeriod)
testutil.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
clientset := testCtx.ClientSet
ns := testCtx.NS.Name
ctrl, informerFactory, err := initPVController(t, testCtx, provisionDelaySeconds)
if err != nil {
t.Fatalf("Failed to create PV controller: %v", err)
}
go ctrl.Run(testCtx.ctx)
go ctrl.Run(testCtx.Ctx)
// Start informer factory after all controllers are configured and running.
informerFactory.Start(testCtx.ctx.Done())
informerFactory.WaitForCacheSync(testCtx.ctx.Done())
informerFactory.Start(testCtx.Ctx.Done())
informerFactory.WaitForCacheSync(testCtx.Ctx.Done())
// Create shared objects
// Create nodes
@ -1081,17 +1082,17 @@ func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod t
return &testConfig{
client: clientset,
ns: ns,
stop: testCtx.ctx.Done(),
stop: testCtx.Ctx.Done(),
teardown: func() {
klog.Infof("test cluster %q start to tear down", ns)
deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
cleanupTest(t, testCtx)
testutil.CleanupTest(t, testCtx)
},
}
}
func initPVController(t *testing.T, testCtx *testContext, provisionDelaySeconds int) (*persistentvolume.PersistentVolumeController, informers.SharedInformerFactory, error) {
clientset := testCtx.clientSet
func initPVController(t *testing.T, testCtx *testutil.TestContext, provisionDelaySeconds int) (*persistentvolume.PersistentVolumeController, informers.SharedInformerFactory, error) {
clientset := testCtx.ClientSet
// Informers factory for controllers
informerFactory := informers.NewSharedInformerFactory(clientset, 0)

View File

@ -31,6 +31,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
testutil "k8s.io/kubernetes/test/integration/util"
)
var (
@ -46,28 +47,31 @@ func mergeNodeLabels(node *v1.Node, labels map[string]string) *v1.Node {
}
func setupClusterForVolumeCapacityPriority(t *testing.T, nsName string, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig {
textCtx := initTestSchedulerWithOptions(t, initTestAPIServer(t, nsName, nil), resyncPeriod)
clientset := textCtx.clientSet
ns := textCtx.ns.Name
testCtx := testutil.InitTestSchedulerWithOptions(t, testutil.InitTestAPIServer(t, nsName, nil), resyncPeriod)
testutil.SyncInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
ctrl, informerFactory, err := initPVController(t, textCtx, provisionDelaySeconds)
clientset := testCtx.ClientSet
ns := testCtx.NS.Name
ctrl, informerFactory, err := initPVController(t, testCtx, provisionDelaySeconds)
if err != nil {
t.Fatalf("Failed to create PV controller: %v", err)
}
go ctrl.Run(textCtx.ctx)
go ctrl.Run(testCtx.Ctx)
// Start informer factory after all controllers are configured and running.
informerFactory.Start(textCtx.ctx.Done())
informerFactory.WaitForCacheSync(textCtx.ctx.Done())
informerFactory.Start(testCtx.Ctx.Done())
informerFactory.WaitForCacheSync(testCtx.Ctx.Done())
return &testConfig{
client: clientset,
ns: ns,
stop: textCtx.ctx.Done(),
stop: testCtx.Ctx.Done(),
teardown: func() {
klog.Infof("test cluster %q start to tear down", ns)
deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
cleanupTest(t, textCtx)
testutil.CleanupTest(t, testCtx)
},
}
}