mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 11:13:48 +00:00
scheduler_perf: fix namespace deletion
Merely deleting the namespace is not enough: - Workloads might rely on the garbage collector to get rid of obsolete objects, so we should run it to be on the safe side. - Pods must be force-deleted because kubelet is not running. - Finally, the namespace controller is needed to get rid of deleted namespaces.
This commit is contained in:
parent
d9c16a1ced
commit
dfd646e0a8
@ -41,27 +41,22 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
|
||||
"k8s.io/client-go/metadata"
|
||||
"k8s.io/client-go/metadata/metadatainformer"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/restmapper"
|
||||
"k8s.io/client-go/util/retry"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
basemetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/controller-manager/pkg/informerfactory"
|
||||
"k8s.io/klog/v2"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
|
||||
"k8s.io/kubernetes/pkg/controller/job/metrics"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
"k8s.io/kubernetes/test/integration/util"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
@ -1313,7 +1308,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
|
||||
defer cancel()
|
||||
restConfig.QPS = 200
|
||||
restConfig.Burst = 200
|
||||
runGC := createGC(ctx, t, restConfig, informerSet)
|
||||
runGC := util.CreateGCController(ctx, t, *restConfig, informerSet)
|
||||
informerSet.Start(ctx.Done())
|
||||
go jc.Run(ctx, 1)
|
||||
runGC()
|
||||
@ -2092,40 +2087,6 @@ func createJobControllerWithSharedInformers(restConfig *restclient.Config, infor
|
||||
return jc, ctx, cancel
|
||||
}
|
||||
|
||||
func createGC(ctx context.Context, t *testing.T, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) func() {
|
||||
restConfig = restclient.AddUserAgent(restConfig, "gc-controller")
|
||||
clientSet := clientset.NewForConfigOrDie(restConfig)
|
||||
metadataClient, err := metadata.NewForConfig(restConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create metadataClient: %v", err)
|
||||
}
|
||||
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery()))
|
||||
restMapper.Reset()
|
||||
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
|
||||
alwaysStarted := make(chan struct{})
|
||||
close(alwaysStarted)
|
||||
gc, err := garbagecollector.NewGarbageCollector(
|
||||
clientSet,
|
||||
metadataClient,
|
||||
restMapper,
|
||||
garbagecollector.DefaultIgnoredResources(),
|
||||
informerfactory.NewInformerFactory(informerSet, metadataInformers),
|
||||
alwaysStarted,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed creating garbage collector")
|
||||
}
|
||||
startGC := func() {
|
||||
syncPeriod := 5 * time.Second
|
||||
go wait.Until(func() {
|
||||
restMapper.Reset()
|
||||
}, syncPeriod, ctx.Done())
|
||||
go gc.Run(ctx, 1)
|
||||
go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
|
||||
}
|
||||
return startGC
|
||||
}
|
||||
|
||||
func hasJobTrackingFinalizer(obj metav1.Object) bool {
|
||||
for _, fin := range obj.GetFinalizers() {
|
||||
if fin == batchv1.JobTrackingFinalizer {
|
||||
|
@ -30,6 +30,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/gomega"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@ -47,6 +49,7 @@ import (
|
||||
"k8s.io/component-base/featuregate"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
|
||||
@ -794,11 +797,11 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
|
||||
b.Fatalf("validate scheduler config file failed: %v", err)
|
||||
}
|
||||
}
|
||||
informerFactory, client, dynClient := mustSetupScheduler(ctx, b, cfg, tc.FeatureGates)
|
||||
informerFactory, client, dynClient := mustSetupCluster(ctx, b, cfg, tc.FeatureGates)
|
||||
|
||||
// Additional informers needed for testing. The pod informer was
|
||||
// already created before (scheduler.NewInformerFactory) and the
|
||||
// factory was started for it (mustSetupScheduler), therefore we don't
|
||||
// factory was started for it (mustSetupCluster), therefore we don't
|
||||
// need to start again.
|
||||
podInformer := informerFactory.Core().V1().Pods()
|
||||
|
||||
@ -816,13 +819,8 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
|
||||
numPodsScheduledPerNamespace := make(map[string]int)
|
||||
|
||||
if cleanup {
|
||||
b.Cleanup(func() {
|
||||
for namespace := range numPodsScheduledPerNamespace {
|
||||
if err := client.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{}); err != nil {
|
||||
b.Errorf("Deleting Namespace in numPodsScheduledPerNamespace: %v", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
// This must run before controllers get shut down.
|
||||
defer cleanupWorkload(ctx, b, tc, client, numPodsScheduledPerNamespace)
|
||||
}
|
||||
|
||||
for opIndex, op := range unrollWorkloadTemplate(b, tc.WorkloadTemplate, w) {
|
||||
@ -1089,6 +1087,70 @@ func runWorkload(ctx context.Context, b *testing.B, tc *testCase, w *workload, c
|
||||
return dataItems
|
||||
}
|
||||
|
||||
// cleanupWorkload ensures that everything is removed from the API server that
|
||||
// might have been created by runWorkload. This must be done before starting
|
||||
// the next workload because otherwise it might stumble over previously created
|
||||
// objects. For example, the namespaces are the same in different workloads, so
|
||||
// not deleting them would cause the next one to fail with "cannot create
|
||||
// namespace: already exists".
|
||||
//
|
||||
// Calling cleanupWorkload can be skipped if it is known that the next workload
|
||||
// will run with a fresh etcd instance.
|
||||
func cleanupWorkload(ctx context.Context, tb testing.TB, tc *testCase, client clientset.Interface, numPodsScheduledPerNamespace map[string]int) {
|
||||
deleteNow := *metav1.NewDeleteOptions(0)
|
||||
for namespace := range numPodsScheduledPerNamespace {
|
||||
// Pods have to be deleted explicitly, with no grace period. Normally
|
||||
// kubelet will set the DeletionGracePeriodSeconds to zero when it's okay
|
||||
// to remove a deleted pod, but we don't run kubelet...
|
||||
if err := client.CoreV1().Pods(namespace).DeleteCollection(ctx, deleteNow, metav1.ListOptions{}); err != nil {
|
||||
tb.Fatalf("failed to delete pods in namespace %q: %v", namespace, err)
|
||||
}
|
||||
if err := client.CoreV1().Namespaces().Delete(ctx, namespace, deleteNow); err != nil {
|
||||
tb.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err)
|
||||
}
|
||||
}
|
||||
|
||||
// We need to wait here because even with deletion timestamp set,
|
||||
// actually removing a namespace can take some time (garbage collecting
|
||||
// other generated object like secrets, etc.) and we don't want to
|
||||
// start the next workloads while that cleanup is still going on.
|
||||
gomega.NewGomegaWithT(tb).Eventually(ctx, func(ctx context.Context) ([]interface{}, error) {
|
||||
var objects []interface{}
|
||||
namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Collecting several objects of interest (pods, claims) is done to
|
||||
// provide a more informative failure message when a namespace doesn't
|
||||
// disappear quickly enough.
|
||||
for _, namespace := range namespaces.Items {
|
||||
if _, ok := numPodsScheduledPerNamespace[namespace.Name]; !ok {
|
||||
// Not a namespace created by the workload.
|
||||
continue
|
||||
}
|
||||
pods, err := client.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(pods.Items) > 0 {
|
||||
// Record one pod per namespace - that's usually enough information.
|
||||
objects = append(objects, pods.Items[0])
|
||||
}
|
||||
if tc.FeatureGates[features.DynamicResourceAllocation] {
|
||||
claims, err := client.ResourceV1alpha2().ResourceClaims(namespace.Name).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(claims.Items) > 0 {
|
||||
objects = append(objects, claims.Items[0])
|
||||
}
|
||||
}
|
||||
objects = append(objects, namespace)
|
||||
}
|
||||
return objects, nil
|
||||
}).WithTimeout(5*time.Minute).Should(gomega.BeEmpty(), "deleting namespaces")
|
||||
}
|
||||
|
||||
func createNamespaceIfNotPresent(ctx context.Context, b *testing.B, client clientset.Interface, namespace string, podsPerNamespace *map[string]int) {
|
||||
if _, ok := (*podsPerNamespace)[namespace]; !ok {
|
||||
// The namespace has not created yet.
|
||||
|
@ -76,11 +76,13 @@ func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
|
||||
// mustSetupScheduler starts the following components:
|
||||
// - k8s api server
|
||||
// - scheduler
|
||||
// - some of the kube-controller-manager controllers
|
||||
//
|
||||
// It returns regular and dynamic clients, and destroyFunc which should be used to
|
||||
// remove resources after finished.
|
||||
// Notes on rate limiter:
|
||||
// - client rate limit is set to 5000.
|
||||
func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
|
||||
func mustSetupCluster(ctx context.Context, b *testing.B, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool) (informers.SharedInformerFactory, clientset.Interface, dynamic.Interface) {
|
||||
// Run API server with minimimal logging by default. Can be raised with -v.
|
||||
framework.MinVerbosity = 0
|
||||
|
||||
@ -126,6 +128,8 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc
|
||||
// be applied to start a scheduler, most of them are defined in `scheduler.schedulerOptions`.
|
||||
_, informerFactory := util.StartScheduler(ctx, client, cfg, config)
|
||||
util.StartFakePVController(ctx, client, informerFactory)
|
||||
runGC := util.CreateGCController(ctx, b, *cfg, informerFactory)
|
||||
runNS := util.CreateNamespaceController(ctx, b, *cfg, informerFactory)
|
||||
|
||||
runResourceClaimController := func() {}
|
||||
if enabledFeatures[features.DynamicResourceAllocation] {
|
||||
@ -136,6 +140,8 @@ func mustSetupScheduler(ctx context.Context, b *testing.B, config *config.KubeSc
|
||||
|
||||
informerFactory.Start(ctx.Done())
|
||||
informerFactory.WaitForCacheSync(ctx.Done())
|
||||
go runGC()
|
||||
go runNS()
|
||||
go runResourceClaimController()
|
||||
|
||||
return informerFactory, client, dynClient
|
||||
|
@ -36,17 +36,22 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/metadata"
|
||||
"k8s.io/client-go/metadata/metadatainformer"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/restmapper"
|
||||
"k8s.io/client-go/scale"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
pvutil "k8s.io/component-helpers/storage/volume"
|
||||
"k8s.io/controller-manager/pkg/informerfactory"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kube-scheduler/config/v1beta3"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/controller/disruption"
|
||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||
"k8s.io/kubernetes/pkg/controller/namespace"
|
||||
"k8s.io/kubernetes/pkg/controller/resourceclaim"
|
||||
"k8s.io/kubernetes/pkg/controlplane"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
@ -162,6 +167,67 @@ func StartFakePVController(ctx context.Context, clientSet clientset.Interface, i
|
||||
})
|
||||
}
|
||||
|
||||
// CreateGCController creates a garbage controller and returns a run function
|
||||
// for it. The informer factory needs to be started before invoking that
|
||||
// function.
|
||||
func CreateGCController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
|
||||
restclient.AddUserAgent(&restConfig, "gc-controller")
|
||||
clientSet := clientset.NewForConfigOrDie(&restConfig)
|
||||
metadataClient, err := metadata.NewForConfig(&restConfig)
|
||||
if err != nil {
|
||||
tb.Fatalf("Failed to create metadataClient: %v", err)
|
||||
}
|
||||
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery()))
|
||||
restMapper.Reset()
|
||||
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
|
||||
alwaysStarted := make(chan struct{})
|
||||
close(alwaysStarted)
|
||||
gc, err := garbagecollector.NewGarbageCollector(
|
||||
clientSet,
|
||||
metadataClient,
|
||||
restMapper,
|
||||
garbagecollector.DefaultIgnoredResources(),
|
||||
informerfactory.NewInformerFactory(informerSet, metadataInformers),
|
||||
alwaysStarted,
|
||||
)
|
||||
if err != nil {
|
||||
tb.Fatalf("Failed creating garbage collector")
|
||||
}
|
||||
startGC := func() {
|
||||
syncPeriod := 5 * time.Second
|
||||
go wait.Until(func() {
|
||||
restMapper.Reset()
|
||||
}, syncPeriod, ctx.Done())
|
||||
go gc.Run(ctx, 1)
|
||||
go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
|
||||
}
|
||||
return startGC
|
||||
}
|
||||
|
||||
// CreateNamespaceController creates a namespace controller and returns a run
|
||||
// function for it. The informer factory needs to be started before invoking
|
||||
// that function.
|
||||
func CreateNamespaceController(ctx context.Context, tb testing.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
|
||||
restclient.AddUserAgent(&restConfig, "namespace-controller")
|
||||
clientSet := clientset.NewForConfigOrDie(&restConfig)
|
||||
metadataClient, err := metadata.NewForConfig(&restConfig)
|
||||
if err != nil {
|
||||
tb.Fatalf("Failed to create metadataClient: %v", err)
|
||||
}
|
||||
discoverResourcesFn := clientSet.Discovery().ServerPreferredNamespacedResources
|
||||
controller := namespace.NewNamespaceController(
|
||||
ctx,
|
||||
clientSet,
|
||||
metadataClient,
|
||||
discoverResourcesFn,
|
||||
informerSet.Core().V1().Namespaces(),
|
||||
10*time.Hour,
|
||||
v1.FinalizerKubernetes)
|
||||
return func() {
|
||||
go controller.Run(ctx, 5)
|
||||
}
|
||||
}
|
||||
|
||||
// TestContext store necessary context info
|
||||
type TestContext struct {
|
||||
NS *v1.Namespace
|
||||
|
Loading…
Reference in New Issue
Block a user