From 506899008f888df9d862abee93fde5488972c0ff Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 18 Feb 2016 14:50:43 +0100 Subject: [PATCH] Parallelization of namespace deletion --- cluster/gce/config-test.sh | 3 +- cmd/kube-apiserver/app/options/options.go | 29 ++++---- cmd/kube-apiserver/app/server.go | 7 +- docs/admin/kube-apiserver.md | 3 +- hack/jenkins/job-configs/kubernetes-e2e.yaml | 2 + hack/verify-flags/known-flags.txt | 1 + pkg/controller/serviceaccount/tokengetter.go | 4 +- pkg/master/master.go | 42 ++++++----- pkg/registry/configmap/etcd/etcd.go | 2 + pkg/registry/configmap/etcd/etcd_test.go | 2 +- pkg/registry/controller/etcd/etcd.go | 2 + pkg/registry/controller/etcd/etcd_test.go | 2 +- pkg/registry/daemonset/etcd/etcd.go | 3 +- pkg/registry/daemonset/etcd/etcd_test.go | 2 +- pkg/registry/deployment/etcd/etcd.go | 3 +- pkg/registry/deployment/etcd/etcd_test.go | 2 +- pkg/registry/endpoint/etcd/etcd.go | 3 +- pkg/registry/endpoint/etcd/etcd_test.go | 2 +- pkg/registry/event/etcd/etcd.go | 3 +- pkg/registry/event/etcd/etcd_test.go | 2 +- .../experimental/controller/etcd/etcd_test.go | 2 +- pkg/registry/generic/etcd/etcd.go | 71 ++++++++++++++++--- pkg/registry/generic/options.go | 5 +- .../horizontalpodautoscaler/etcd/etcd.go | 3 +- .../horizontalpodautoscaler/etcd/etcd_test.go | 2 +- pkg/registry/ingress/etcd/etcd.go | 3 +- pkg/registry/ingress/etcd/etcd_test.go | 2 +- pkg/registry/job/etcd/etcd.go | 3 +- pkg/registry/job/etcd/etcd_test.go | 2 +- pkg/registry/limitrange/etcd/etcd.go | 3 +- pkg/registry/limitrange/etcd/etcd_test.go | 2 +- pkg/registry/namespace/etcd/etcd.go | 3 +- pkg/registry/namespace/etcd/etcd_test.go | 2 +- pkg/registry/node/etcd/etcd.go | 5 +- pkg/registry/node/etcd/etcd_test.go | 2 +- pkg/registry/persistentvolume/etcd/etcd.go | 3 +- .../persistentvolume/etcd/etcd_test.go | 2 +- .../persistentvolumeclaim/etcd/etcd.go | 3 +- .../persistentvolumeclaim/etcd/etcd_test.go | 2 +- pkg/registry/pod/etcd/etcd.go | 3 +- pkg/registry/pod/etcd/etcd_test.go | 2 +- pkg/registry/podsecuritypolicy/etcd/etcd.go | 3 +- .../podsecuritypolicy/etcd/etcd_test.go | 2 +- pkg/registry/podtemplate/etcd/etcd.go | 3 +- pkg/registry/podtemplate/etcd/etcd_test.go | 2 +- pkg/registry/replicaset/etcd/etcd.go | 3 +- pkg/registry/replicaset/etcd/etcd_test.go | 2 +- pkg/registry/resourcequota/etcd/etcd.go | 3 +- pkg/registry/resourcequota/etcd/etcd_test.go | 2 +- pkg/registry/secret/etcd/etcd.go | 3 +- pkg/registry/secret/etcd/etcd_test.go | 2 +- pkg/registry/service/etcd/etcd.go | 3 +- pkg/registry/service/etcd/etcd_test.go | 2 +- pkg/registry/serviceaccount/etcd/etcd.go | 3 +- pkg/registry/serviceaccount/etcd/etcd_test.go | 2 +- pkg/registry/thirdpartyresource/etcd/etcd.go | 7 +- .../thirdpartyresource/etcd/etcd_test.go | 2 +- .../thirdpartyresourcedata/etcd/etcd.go | 7 +- .../thirdpartyresourcedata/etcd/etcd_test.go | 2 +- 59 files changed, 197 insertions(+), 100 deletions(-) diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index 958a000e1df..cedc301c4ba 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -73,10 +73,11 @@ CONTROLLER_MANAGER_TEST_LOG_LEVEL="${CONTROLLER_MANAGER_TEST_LOG_LEVEL:-$TEST_CL SCHEDULER_TEST_LOG_LEVEL="${SCHEDULER_TEST_LOG_LEVEL:-$TEST_CLUSTER_LOG_LEVEL}" KUBEPROXY_TEST_LOG_LEVEL="${KUBEPROXY_TEST_LOG_LEVEL:-$TEST_CLUSTER_LOG_LEVEL}" +TEST_CLUSTER_DELETE_COLLECTION_WORKERS="${TEST_CLUSTER_DELETE_COLLECTION_WORKERS:---delete-collection-workers=1}" TEST_CLUSTER_RESYNC_PERIOD="${TEST_CLUSTER_RESYNC_PERIOD:---min-resync-period=3m}" KUBELET_TEST_ARGS="--max-pods=110 --serialize-image-pulls=false --outofdisk-transition-frequency=0" -APISERVER_TEST_ARGS="--runtime-config=extensions/v1beta1" +APISERVER_TEST_ARGS="--runtime-config=extensions/v1beta1 ${TEST_CLUSTER_DELETE_COLLECTION_WORKERS}" CONTROLLER_MANAGER_TEST_ARGS="${TEST_CLUSTER_RESYNC_PERIOD}" SCHEDULER_TEST_ARGS="" KUBEPROXY_TEST_ARGS="" diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 5b4b0e9dede..b011e700f99 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -52,6 +52,7 @@ type APIServer struct { CloudConfigFile string CloudProvider string CorsAllowedOriginList []string + DeleteCollectionWorkers int DeprecatedStorageVersion string EnableLogsSupport bool EnableProfiling bool @@ -92,19 +93,20 @@ type APIServer struct { // NewAPIServer creates a new APIServer object with default parameters func NewAPIServer() *APIServer { s := APIServer{ - ServerRunOptions: genericapiserver.NewServerRunOptions(), - APIGroupPrefix: "/apis", - APIPrefix: "/api", - AdmissionControl: "AlwaysAdmit", - AuthorizationMode: "AlwaysAllow", - EnableLogsSupport: true, - EtcdPathPrefix: genericapiserver.DefaultEtcdPathPrefix, - EventTTL: 1 * time.Hour, - MasterCount: 1, - MasterServiceNamespace: api.NamespaceDefault, - RuntimeConfig: make(util.ConfigurationMap), - StorageVersions: registered.AllPreferredGroupVersions(), - DefaultStorageVersions: registered.AllPreferredGroupVersions(), + ServerRunOptions: genericapiserver.NewServerRunOptions(), + APIGroupPrefix: "/apis", + APIPrefix: "/api", + AdmissionControl: "AlwaysAdmit", + AuthorizationMode: "AlwaysAllow", + DeleteCollectionWorkers: 1, + EnableLogsSupport: true, + EtcdPathPrefix: genericapiserver.DefaultEtcdPathPrefix, + EventTTL: 1 * time.Hour, + MasterCount: 1, + MasterServiceNamespace: api.NamespaceDefault, + RuntimeConfig: make(util.ConfigurationMap), + StorageVersions: registered.AllPreferredGroupVersions(), + DefaultStorageVersions: registered.AllPreferredGroupVersions(), KubeletConfig: kubeletclient.KubeletClientConfig{ Port: ports.KubeletPort, EnableHttps: true, @@ -231,6 +233,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.MarkDeprecated("service-node-ports", "see --service-node-port-range instead.") fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods") fs.IntVar(&s.MasterCount, "apiserver-count", s.MasterCount, "The number of apiservers running in the cluster") + fs.IntVar(&s.DeleteCollectionWorkers, "delete-collection-workers", s.DeleteCollectionWorkers, "Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup.") fs.Var(&s.RuntimeConfig, "runtime-config", "A set of key=value pairs that describe runtime configuration that may be passed to apiserver. apis/ key can be used to turn on/off specific api versions. apis// can be used to turn on/off specific resources. api/all and api/legacy are special keys to control all and legacy api versions respectively.") fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") // TODO: enable cache in integration tests. diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 3f6ee3a703e..806f5598a37 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -469,9 +469,10 @@ func Run(s *options.APIServer) error { KubernetesServiceNodePort: s.KubernetesServiceNodePort, Serializer: api.Codecs, }, - EnableCoreControllers: true, - EventTTL: s.EventTTL, - KubeletClient: kubeletClient, + EnableCoreControllers: true, + DeleteCollectionWorkers: s.DeleteCollectionWorkers, + EventTTL: s.EventTTL, + KubeletClient: kubeletClient, Tunneler: tunneler, } diff --git a/docs/admin/kube-apiserver.md b/docs/admin/kube-apiserver.md index 9a9f3197490..5bb652641b5 100644 --- a/docs/admin/kube-apiserver.md +++ b/docs/admin/kube-apiserver.md @@ -65,6 +65,7 @@ kube-apiserver --cloud-config="": The path to the cloud provider configuration file. Empty string for no configuration file. --cloud-provider="": The provider for cloud services. Empty string for no provider. --cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled. + --delete-collection-workers=1: Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup. --etcd-prefix="/registry": The prefix for all resource paths in etcd. --etcd-quorum-read[=false]: If true, enable quorum read --etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config @@ -110,7 +111,7 @@ kube-apiserver --watch-cache-sizes=[]: List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. The individual override format: resource#size, where size is a number. It takes effect when watch-cache is enabled. ``` -###### Auto generated by spf13/cobra on 18-Feb-2016 +###### Auto generated by spf13/cobra on 24-Feb-2016 diff --git a/hack/jenkins/job-configs/kubernetes-e2e.yaml b/hack/jenkins/job-configs/kubernetes-e2e.yaml index 85890ed73a6..5f3120e7e2a 100644 --- a/hack/jenkins/job-configs/kubernetes-e2e.yaml +++ b/hack/jenkins/job-configs/kubernetes-e2e.yaml @@ -142,6 +142,8 @@ export KUBELET_TEST_LOG_LEVEL="--v=4" # Increase resync period to simulate production export TEST_CLUSTER_RESYNC_PERIOD="--min-resync-period=12h" + # Increase delete collection parallelism + export TEST_CLUSTER_DELETE_COLLECTION_WORKERS="--delete-collection-workers=16" - 'gce-examples': description: 'Run E2E examples test on GCE.' timeout: 90 diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 33d5e1ddfc7..12b9171ce61 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -67,6 +67,7 @@ current-replicas default-container-cpu-limit default-container-mem-limit delay-shutdown +delete-collection-workers delete-namespace deleting-pods-burst deleting-pods-qps diff --git a/pkg/controller/serviceaccount/tokengetter.go b/pkg/controller/serviceaccount/tokengetter.go index c1d17f48954..bd7fc827b25 100644 --- a/pkg/controller/serviceaccount/tokengetter.go +++ b/pkg/controller/serviceaccount/tokengetter.go @@ -71,7 +71,7 @@ func (r *registryGetter) GetSecret(namespace, name string) (*api.Secret, error) // uses the specified storage to retrieve service accounts and secrets. func NewGetterFromStorageInterface(s storage.Interface) serviceaccount.ServiceAccountTokenGetter { return NewGetterFromRegistries( - serviceaccountregistry.NewRegistry(serviceaccountetcd.NewREST(generic.RESTOptions{s, generic.UndecoratedStorage})), - secret.NewRegistry(secretetcd.NewREST(generic.RESTOptions{s, generic.UndecoratedStorage})), + serviceaccountregistry.NewRegistry(serviceaccountetcd.NewREST(generic.RESTOptions{Storage: s, Decorator: generic.UndecoratedStorage})), + secret.NewRegistry(secretetcd.NewREST(generic.RESTOptions{Storage: s, Decorator: generic.UndecoratedStorage})), ) } diff --git a/pkg/master/master.go b/pkg/master/master.go index 1f4d7ae24b7..3851c3f1615 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -91,9 +91,10 @@ import ( type Config struct { *genericapiserver.Config - EnableCoreControllers bool - EventTTL time.Duration - KubeletClient kubeletclient.KubeletClient + EnableCoreControllers bool + DeleteCollectionWorkers int + EventTTL time.Duration + KubeletClient kubeletclient.KubeletClient // Used to start and monitor tunneling Tunneler Tunneler } @@ -105,7 +106,8 @@ type Master struct { // Map of v1 resources to their REST storages. v1ResourcesStorage map[string]rest.Storage - enableCoreControllers bool + enableCoreControllers bool + deleteCollectionWorkers int // registries are internal client APIs for accessing the storage layer // TODO: define the internal typed interface in a way that clients can // also be replaced @@ -149,9 +151,10 @@ func New(c *Config) (*Master, error) { } m := &Master{ - GenericAPIServer: s, - enableCoreControllers: c.EnableCoreControllers, - tunneler: c.Tunneler, + GenericAPIServer: s, + enableCoreControllers: c.EnableCoreControllers, + deleteCollectionWorkers: c.DeleteCollectionWorkers, + tunneler: c.Tunneler, } m.InstallAPIs(c) @@ -327,8 +330,9 @@ func (m *Master) initV1ResourcesStorage(c *Config) { dbClient := func(resource string) storage.Interface { return c.StorageDestinations.Get("", resource) } restOptions := func(resource string) generic.RESTOptions { return generic.RESTOptions{ - Storage: dbClient(resource), - Decorator: m.StorageDecorator(), + Storage: dbClient(resource), + Decorator: m.StorageDecorator(), + DeleteCollectionWorkers: m.deleteCollectionWorkers, } } @@ -612,7 +616,8 @@ func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource) } func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupVersion { - resourceStorage := thirdpartyresourcedataetcd.NewREST(generic.RESTOptions{m.thirdPartyStorage, generic.UndecoratedStorage}, group, kind) + resourceStorage := thirdpartyresourcedataetcd.NewREST( + generic.RESTOptions{m.thirdPartyStorage, generic.UndecoratedStorage, m.deleteCollectionWorkers}, group, kind) apiRoot := makeThirdPartyPath("") @@ -662,8 +667,9 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage { } restOptions := func(resource string) generic.RESTOptions { return generic.RESTOptions{ - Storage: c.StorageDestinations.Get(extensions.GroupName, resource), - Decorator: m.StorageDecorator(), + Storage: c.StorageDestinations.Get(extensions.GroupName, resource), + Decorator: m.StorageDecorator(), + DeleteCollectionWorkers: m.deleteCollectionWorkers, } } @@ -671,7 +677,7 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage { if isEnabled("horizontalpodautoscalers") { m.constructHPAResources(c, storage) controllerStorage := expcontrolleretcd.NewStorage( - generic.RESTOptions{c.StorageDestinations.Get("", "replicationControllers"), m.StorageDecorator()}) + generic.RESTOptions{c.StorageDestinations.Get("", "replicationControllers"), m.StorageDecorator(), m.deleteCollectionWorkers}) storage["replicationcontrollers"] = controllerStorage.ReplicationController storage["replicationcontrollers/scale"] = controllerStorage.Scale } @@ -735,8 +741,9 @@ func (m *Master) constructHPAResources(c *Config, restStorage map[string]rest.St // matter where they're accessed from. restOptions := func(resource string) generic.RESTOptions { return generic.RESTOptions{ - Storage: c.StorageDestinations.Search([]string{autoscaling.GroupName, extensions.GroupName}, resource), - Decorator: m.StorageDecorator(), + Storage: c.StorageDestinations.Search([]string{autoscaling.GroupName, extensions.GroupName}, resource), + Decorator: m.StorageDecorator(), + DeleteCollectionWorkers: m.deleteCollectionWorkers, } } autoscalerStorage, autoscalerStatusStorage := horizontalpodautoscaleretcd.NewREST(restOptions("horizontalpodautoscalers")) @@ -771,8 +778,9 @@ func (m *Master) constructJobResources(c *Config, restStorage map[string]rest.St // matter where they're accessed from. restOptions := func(resource string) generic.RESTOptions { return generic.RESTOptions{ - Storage: c.StorageDestinations.Search([]string{batch.GroupName, extensions.GroupName}, resource), - Decorator: m.StorageDecorator(), + Storage: c.StorageDestinations.Search([]string{batch.GroupName, extensions.GroupName}, resource), + Decorator: m.StorageDecorator(), + DeleteCollectionWorkers: m.deleteCollectionWorkers, } } jobStorage, jobStatusStorage := jobetcd.NewREST(restOptions("jobs")) diff --git a/pkg/registry/configmap/etcd/etcd.go b/pkg/registry/configmap/etcd/etcd.go index 0b81f778a28..1e47491c4ca 100644 --- a/pkg/registry/configmap/etcd/etcd.go +++ b/pkg/registry/configmap/etcd/etcd.go @@ -68,6 +68,8 @@ func NewREST(opts generic.RESTOptions) *REST { QualifiedResource: api.Resource("configmaps"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, + CreateStrategy: configmap.Strategy, UpdateStrategy: configmap.Strategy, diff --git a/pkg/registry/configmap/etcd/etcd_test.go b/pkg/registry/configmap/etcd/etcd_test.go index ad2daa78418..81baa07b531 100644 --- a/pkg/registry/configmap/etcd/etcd_test.go +++ b/pkg/registry/configmap/etcd/etcd_test.go @@ -30,7 +30,7 @@ import ( func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewREST(restOptions), server } diff --git a/pkg/registry/controller/etcd/etcd.go b/pkg/registry/controller/etcd/etcd.go index a03bf6a0988..9d401685376 100644 --- a/pkg/registry/controller/etcd/etcd.go +++ b/pkg/registry/controller/etcd/etcd.go @@ -66,6 +66,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { }, QualifiedResource: api.Resource("replicationcontrollers"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, + // Used to validate controller creation CreateStrategy: controller.Strategy, diff --git a/pkg/registry/controller/etcd/etcd_test.go b/pkg/registry/controller/etcd/etcd_test.go index 33d2d8becd6..6189469f275 100644 --- a/pkg/registry/controller/etcd/etcd_test.go +++ b/pkg/registry/controller/etcd/etcd_test.go @@ -30,7 +30,7 @@ import ( func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} controllerStorage, statusStorage := NewREST(restOptions) return controllerStorage, statusStorage, server } diff --git a/pkg/registry/daemonset/etcd/etcd.go b/pkg/registry/daemonset/etcd/etcd.go index 61fcbcb4b3c..2d17c527b23 100644 --- a/pkg/registry/daemonset/etcd/etcd.go +++ b/pkg/registry/daemonset/etcd/etcd.go @@ -64,7 +64,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return daemonset.MatchDaemonSet(label, field) }, - QualifiedResource: extensions.Resource("daemonsets"), + QualifiedResource: extensions.Resource("daemonsets"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, // Used to validate daemon set creation CreateStrategy: daemonset.Strategy, diff --git a/pkg/registry/daemonset/etcd/etcd_test.go b/pkg/registry/daemonset/etcd/etcd_test.go index fdb8850951a..d2b0c42360f 100755 --- a/pkg/registry/daemonset/etcd/etcd_test.go +++ b/pkg/registry/daemonset/etcd/etcd_test.go @@ -32,7 +32,7 @@ import ( func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName) - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} daemonSetStorage, statusStorage := NewREST(restOptions) return daemonSetStorage, statusStorage, server } diff --git a/pkg/registry/deployment/etcd/etcd.go b/pkg/registry/deployment/etcd/etcd.go index 0b7bf6fbdb9..37df4e52c03 100644 --- a/pkg/registry/deployment/etcd/etcd.go +++ b/pkg/registry/deployment/etcd/etcd.go @@ -89,7 +89,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *RollbackREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return deployment.MatchDeployment(label, field) }, - QualifiedResource: extensions.Resource("deployments"), + QualifiedResource: extensions.Resource("deployments"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, // Used to validate deployment creation. CreateStrategy: deployment.Strategy, diff --git a/pkg/registry/deployment/etcd/etcd_test.go b/pkg/registry/deployment/etcd/etcd_test.go index fe2d1f2a665..7d554f2e55a 100644 --- a/pkg/registry/deployment/etcd/etcd_test.go +++ b/pkg/registry/deployment/etcd/etcd_test.go @@ -36,7 +36,7 @@ import ( func newStorage(t *testing.T) (*DeploymentStorage, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName) - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} deploymentStorage := NewStorage(restOptions) return &deploymentStorage, server } diff --git a/pkg/registry/endpoint/etcd/etcd.go b/pkg/registry/endpoint/etcd/etcd.go index 3facdee6509..09799b95103 100644 --- a/pkg/registry/endpoint/etcd/etcd.go +++ b/pkg/registry/endpoint/etcd/etcd.go @@ -54,7 +54,8 @@ func NewREST(opts generic.RESTOptions) *REST { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return endpoint.MatchEndpoints(label, field) }, - QualifiedResource: api.Resource("endpoints"), + QualifiedResource: api.Resource("endpoints"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: endpoint.Strategy, UpdateStrategy: endpoint.Strategy, diff --git a/pkg/registry/endpoint/etcd/etcd_test.go b/pkg/registry/endpoint/etcd/etcd_test.go index 7f52665c9ca..d55896315d8 100644 --- a/pkg/registry/endpoint/etcd/etcd_test.go +++ b/pkg/registry/endpoint/etcd/etcd_test.go @@ -30,7 +30,7 @@ import ( func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewREST(restOptions), server } diff --git a/pkg/registry/event/etcd/etcd.go b/pkg/registry/event/etcd/etcd.go index db324a80f53..19b1df70539 100644 --- a/pkg/registry/event/etcd/etcd.go +++ b/pkg/registry/event/etcd/etcd.go @@ -56,7 +56,8 @@ func NewREST(opts generic.RESTOptions, ttl uint64) *REST { TTLFunc: func(runtime.Object, uint64, bool) (uint64, error) { return ttl, nil }, - QualifiedResource: api.Resource("events"), + QualifiedResource: api.Resource("events"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: event.Strategy, UpdateStrategy: event.Strategy, diff --git a/pkg/registry/event/etcd/etcd_test.go b/pkg/registry/event/etcd/etcd_test.go index 0a0d17e5c16..d5247d599ee 100644 --- a/pkg/registry/event/etcd/etcd_test.go +++ b/pkg/registry/event/etcd/etcd_test.go @@ -30,7 +30,7 @@ var testTTL uint64 = 60 func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewREST(restOptions, testTTL), server } diff --git a/pkg/registry/experimental/controller/etcd/etcd_test.go b/pkg/registry/experimental/controller/etcd/etcd_test.go index 1ee7e5749f4..6727f371e9a 100644 --- a/pkg/registry/experimental/controller/etcd/etcd_test.go +++ b/pkg/registry/experimental/controller/etcd/etcd_test.go @@ -30,7 +30,7 @@ import ( func newStorage(t *testing.T) (*ScaleREST, *etcdtesting.EtcdTestServer, storage.Interface) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewStorage(restOptions).Scale, server, etcdStorage } diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 2c09a34ad33..5e9ba144a15 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -19,6 +19,7 @@ package etcd import ( "fmt" "reflect" + "sync" "k8s.io/kubernetes/pkg/api" kubeerr "k8s.io/kubernetes/pkg/api/errors" @@ -32,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/registry/generic" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/watch" @@ -84,6 +86,10 @@ type Etcd struct { // Returns a matcher corresponding to the provided labels and fields. PredicateFunc func(label labels.Selector, field fields.Selector) generic.Matcher + // DeleteCollectionWorkers is the maximum number of workers in a single + // DeleteCollection call. + DeleteCollectionWorkers int + // Called on all objects returned from the underlying store, after // the exit hooks are invoked. Decorators are intended for integrations // that are above etcd and should only be used for specific cases where @@ -459,16 +465,63 @@ func (e *Etcd) DeleteCollection(ctx api.Context, options *api.DeleteOptions, lis if err != nil { return nil, err } - for _, item := range items { - accessor, err := meta.Accessor(item) - if err != nil { - return nil, err - } - if _, err := e.Delete(ctx, accessor.GetName(), options); err != nil && !kubeerr.IsNotFound(err) { - return nil, err - } + // Spawn a number of goroutines, so that we can issue requests to etcd + // in parallel to speed up deletion. + // TODO: Make this proportional to the number of items to delete, up to + // DeleteCollectionWorkers (it doesn't make much sense to spawn 16 + // workers to delete 10 items). + workersNumber := e.DeleteCollectionWorkers + if workersNumber < 1 { + workersNumber = 1 + } + wg := sync.WaitGroup{} + toProcess := make(chan int, 2*workersNumber) + errs := make(chan error, workersNumber+1) + + go func() { + defer utilruntime.HandleCrash(func(panicReason interface{}) { + errs <- fmt.Errorf("DeleteCollection distributor panicked: %v", panicReason) + }) + for i := 0; i < len(items); i++ { + toProcess <- i + } + close(toProcess) + }() + + wg.Add(workersNumber) + for i := 0; i < workersNumber; i++ { + go func() { + // panics don't cross goroutine boundaries + defer utilruntime.HandleCrash(func(panicReason interface{}) { + errs <- fmt.Errorf("DeleteCollection goroutine panicked: %v", panicReason) + }) + defer wg.Done() + + for { + index, ok := <-toProcess + if !ok { + return + } + accessor, err := meta.Accessor(items[index]) + if err != nil { + errs <- err + return + } + if _, err := e.Delete(ctx, accessor.GetName(), options); err != nil && !kubeerr.IsNotFound(err) { + glog.V(4).Infof("Delete %s in DeleteCollection failed: %v", accessor.GetName(), err) + errs <- err + return + } + } + }() + } + wg.Wait() + select { + case err := <-errs: + return nil, err + default: + return listObj, nil } - return listObj, nil } func (e *Etcd) finalizeDelete(obj runtime.Object, runHooks bool) (runtime.Object, error) { diff --git a/pkg/registry/generic/options.go b/pkg/registry/generic/options.go index ad9300cdf68..eea52c995b1 100644 --- a/pkg/registry/generic/options.go +++ b/pkg/registry/generic/options.go @@ -22,6 +22,7 @@ import ( // RESTOptions is set of configuration options to generic registries. type RESTOptions struct { - Storage pkgstorage.Interface - Decorator StorageDecorator + Storage pkgstorage.Interface + Decorator StorageDecorator + DeleteCollectionWorkers int } diff --git a/pkg/registry/horizontalpodautoscaler/etcd/etcd.go b/pkg/registry/horizontalpodautoscaler/etcd/etcd.go index 39a3772b70b..27393142a70 100644 --- a/pkg/registry/horizontalpodautoscaler/etcd/etcd.go +++ b/pkg/registry/horizontalpodautoscaler/etcd/etcd.go @@ -62,7 +62,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return horizontalpodautoscaler.MatchAutoscaler(label, field) }, - QualifiedResource: extensions.Resource("horizontalpodautoscalers"), + QualifiedResource: extensions.Resource("horizontalpodautoscalers"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, // Used to validate autoscaler creation CreateStrategy: horizontalpodautoscaler.Strategy, diff --git a/pkg/registry/horizontalpodautoscaler/etcd/etcd_test.go b/pkg/registry/horizontalpodautoscaler/etcd/etcd_test.go index 37f3f30408d..f291b27c2ca 100644 --- a/pkg/registry/horizontalpodautoscaler/etcd/etcd_test.go +++ b/pkg/registry/horizontalpodautoscaler/etcd/etcd_test.go @@ -33,7 +33,7 @@ import ( func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName) - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} horizontalPodAutoscalerStorage, statusStorage := NewREST(restOptions) return horizontalPodAutoscalerStorage, statusStorage, server } diff --git a/pkg/registry/ingress/etcd/etcd.go b/pkg/registry/ingress/etcd/etcd.go index b5eff10a6b1..c99847cbcb4 100644 --- a/pkg/registry/ingress/etcd/etcd.go +++ b/pkg/registry/ingress/etcd/etcd.go @@ -64,7 +64,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return ingress.MatchIngress(label, field) }, - QualifiedResource: extensions.Resource("ingresses"), + QualifiedResource: extensions.Resource("ingresses"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, // Used to validate controller creation CreateStrategy: ingress.Strategy, diff --git a/pkg/registry/ingress/etcd/etcd_test.go b/pkg/registry/ingress/etcd/etcd_test.go index b2b8fe2ecaf..7d33e93a2c7 100755 --- a/pkg/registry/ingress/etcd/etcd_test.go +++ b/pkg/registry/ingress/etcd/etcd_test.go @@ -32,7 +32,7 @@ import ( func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName) - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} ingressStorage, statusStorage := NewREST(restOptions) return ingressStorage, statusStorage, server } diff --git a/pkg/registry/job/etcd/etcd.go b/pkg/registry/job/etcd/etcd.go index 9f2a40777c2..c819ed7824d 100644 --- a/pkg/registry/job/etcd/etcd.go +++ b/pkg/registry/job/etcd/etcd.go @@ -64,7 +64,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return job.MatchJob(label, field) }, - QualifiedResource: extensions.Resource("jobs"), + QualifiedResource: extensions.Resource("jobs"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, // Used to validate job creation CreateStrategy: job.Strategy, diff --git a/pkg/registry/job/etcd/etcd_test.go b/pkg/registry/job/etcd/etcd_test.go index 623125e8a66..8999d56e52f 100644 --- a/pkg/registry/job/etcd/etcd_test.go +++ b/pkg/registry/job/etcd/etcd_test.go @@ -34,7 +34,7 @@ import ( func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName) - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} jobStorage, statusStorage := NewREST(restOptions) return jobStorage, statusStorage, server } diff --git a/pkg/registry/limitrange/etcd/etcd.go b/pkg/registry/limitrange/etcd/etcd.go index 32bc6d64549..a1a5166437e 100644 --- a/pkg/registry/limitrange/etcd/etcd.go +++ b/pkg/registry/limitrange/etcd/etcd.go @@ -54,7 +54,8 @@ func NewREST(opts generic.RESTOptions) *REST { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return limitrange.MatchLimitRange(label, field) }, - QualifiedResource: api.Resource("limitranges"), + QualifiedResource: api.Resource("limitranges"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: limitrange.Strategy, UpdateStrategy: limitrange.Strategy, diff --git a/pkg/registry/limitrange/etcd/etcd_test.go b/pkg/registry/limitrange/etcd/etcd_test.go index 6c5b6e60a88..5c991229cac 100644 --- a/pkg/registry/limitrange/etcd/etcd_test.go +++ b/pkg/registry/limitrange/etcd/etcd_test.go @@ -31,7 +31,7 @@ import ( func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewREST(restOptions), server } diff --git a/pkg/registry/namespace/etcd/etcd.go b/pkg/registry/namespace/etcd/etcd.go index c09048a29b9..f76e288b727 100644 --- a/pkg/registry/namespace/etcd/etcd.go +++ b/pkg/registry/namespace/etcd/etcd.go @@ -70,7 +70,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *FinalizeREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return namespace.MatchNamespace(label, field) }, - QualifiedResource: api.Resource("namespaces"), + QualifiedResource: api.Resource("namespaces"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: namespace.Strategy, UpdateStrategy: namespace.Strategy, diff --git a/pkg/registry/namespace/etcd/etcd_test.go b/pkg/registry/namespace/etcd/etcd_test.go index afd8c6b761d..ff2f70e357b 100644 --- a/pkg/registry/namespace/etcd/etcd_test.go +++ b/pkg/registry/namespace/etcd/etcd_test.go @@ -31,7 +31,7 @@ import ( func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} namespaceStorage, _, _ := NewREST(restOptions) return namespaceStorage, server } diff --git a/pkg/registry/node/etcd/etcd.go b/pkg/registry/node/etcd/etcd.go index e0ee832b9fe..2fe177b7307 100644 --- a/pkg/registry/node/etcd/etcd.go +++ b/pkg/registry/node/etcd/etcd.go @@ -79,8 +79,9 @@ func NewStorage(opts generic.RESTOptions, connection client.ConnectionInfoGetter ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Node).Name, nil }, - PredicateFunc: node.MatchNode, - QualifiedResource: api.Resource("nodes"), + PredicateFunc: node.MatchNode, + QualifiedResource: api.Resource("nodes"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: node.Strategy, UpdateStrategy: node.Strategy, diff --git a/pkg/registry/node/etcd/etcd_test.go b/pkg/registry/node/etcd/etcd_test.go index 7dae78b4799..d0882492aec 100644 --- a/pkg/registry/node/etcd/etcd_test.go +++ b/pkg/registry/node/etcd/etcd_test.go @@ -39,7 +39,7 @@ func (fakeConnectionInfoGetter) GetConnectionInfo(ctx api.Context, nodeName stri func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} storage := NewStorage(restOptions, fakeConnectionInfoGetter{}, nil) return storage.Node, server } diff --git a/pkg/registry/persistentvolume/etcd/etcd.go b/pkg/registry/persistentvolume/etcd/etcd.go index d0a51c4d7fb..877f3039d0b 100644 --- a/pkg/registry/persistentvolume/etcd/etcd.go +++ b/pkg/registry/persistentvolume/etcd/etcd.go @@ -54,7 +54,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return persistentvolume.MatchPersistentVolumes(label, field) }, - QualifiedResource: api.Resource("persistentvolumes"), + QualifiedResource: api.Resource("persistentvolumes"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: persistentvolume.Strategy, UpdateStrategy: persistentvolume.Strategy, diff --git a/pkg/registry/persistentvolume/etcd/etcd_test.go b/pkg/registry/persistentvolume/etcd/etcd_test.go index f0ef32418d8..74d26ed0ecb 100644 --- a/pkg/registry/persistentvolume/etcd/etcd_test.go +++ b/pkg/registry/persistentvolume/etcd/etcd_test.go @@ -33,7 +33,7 @@ import ( func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} persistentVolumeStorage, statusStorage := NewREST(restOptions) return persistentVolumeStorage, statusStorage, server } diff --git a/pkg/registry/persistentvolumeclaim/etcd/etcd.go b/pkg/registry/persistentvolumeclaim/etcd/etcd.go index e218a9bee8e..8a77b0316b1 100644 --- a/pkg/registry/persistentvolumeclaim/etcd/etcd.go +++ b/pkg/registry/persistentvolumeclaim/etcd/etcd.go @@ -54,7 +54,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return persistentvolumeclaim.MatchPersistentVolumeClaim(label, field) }, - QualifiedResource: api.Resource("persistentvolumeclaims"), + QualifiedResource: api.Resource("persistentvolumeclaims"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: persistentvolumeclaim.Strategy, UpdateStrategy: persistentvolumeclaim.Strategy, diff --git a/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go b/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go index 499fa2951ec..b51342fdb2e 100644 --- a/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go +++ b/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go @@ -33,7 +33,7 @@ import ( func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} persistentVolumeClaimStorage, statusStorage := NewREST(restOptions) return persistentVolumeClaimStorage, statusStorage, server } diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 5db0667fc3c..b401bfea98e 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -80,7 +80,8 @@ func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTr PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return pod.MatchPod(label, field) }, - QualifiedResource: api.Resource("pods"), + QualifiedResource: api.Resource("pods"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: pod.Strategy, UpdateStrategy: pod.Strategy, diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index 6ccb245361e..0ef38978a15 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -37,7 +37,7 @@ import ( func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 3} storage := NewStorage(restOptions, nil, nil) return storage.Pod, storage.Binding, storage.Status, server } diff --git a/pkg/registry/podsecuritypolicy/etcd/etcd.go b/pkg/registry/podsecuritypolicy/etcd/etcd.go index 644a10f618d..80bf0e09977 100644 --- a/pkg/registry/podsecuritypolicy/etcd/etcd.go +++ b/pkg/registry/podsecuritypolicy/etcd/etcd.go @@ -55,7 +55,8 @@ func NewREST(opts generic.RESTOptions) *REST { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return podsecuritypolicy.MatchPodSecurityPolicy(label, field) }, - QualifiedResource: extensions.Resource("podsecuritypolicies"), + QualifiedResource: extensions.Resource("podsecuritypolicies"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: podsecuritypolicy.Strategy, UpdateStrategy: podsecuritypolicy.Strategy, diff --git a/pkg/registry/podsecuritypolicy/etcd/etcd_test.go b/pkg/registry/podsecuritypolicy/etcd/etcd_test.go index 9e1071045d3..e2f0b373a81 100644 --- a/pkg/registry/podsecuritypolicy/etcd/etcd_test.go +++ b/pkg/registry/podsecuritypolicy/etcd/etcd_test.go @@ -33,7 +33,7 @@ import ( func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "extensions") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewREST(restOptions), server } diff --git a/pkg/registry/podtemplate/etcd/etcd.go b/pkg/registry/podtemplate/etcd/etcd.go index 73a23773278..93e8cfde752 100644 --- a/pkg/registry/podtemplate/etcd/etcd.go +++ b/pkg/registry/podtemplate/etcd/etcd.go @@ -54,7 +54,8 @@ func NewREST(opts generic.RESTOptions) *REST { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return podtemplate.MatchPodTemplate(label, field) }, - QualifiedResource: api.Resource("podtemplates"), + QualifiedResource: api.Resource("podtemplates"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: podtemplate.Strategy, UpdateStrategy: podtemplate.Strategy, diff --git a/pkg/registry/podtemplate/etcd/etcd_test.go b/pkg/registry/podtemplate/etcd/etcd_test.go index c6c10dd18b5..d3f628ed2d7 100644 --- a/pkg/registry/podtemplate/etcd/etcd_test.go +++ b/pkg/registry/podtemplate/etcd/etcd_test.go @@ -30,7 +30,7 @@ import ( func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewREST(restOptions), server } diff --git a/pkg/registry/replicaset/etcd/etcd.go b/pkg/registry/replicaset/etcd/etcd.go index ce0f4f4f469..f6beb65d954 100644 --- a/pkg/registry/replicaset/etcd/etcd.go +++ b/pkg/registry/replicaset/etcd/etcd.go @@ -80,7 +80,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return replicaset.MatchReplicaSet(label, field) }, - QualifiedResource: api.Resource("replicasets"), + QualifiedResource: api.Resource("replicasets"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, // Used to validate ReplicaSet creation CreateStrategy: replicaset.Strategy, diff --git a/pkg/registry/replicaset/etcd/etcd_test.go b/pkg/registry/replicaset/etcd/etcd_test.go index a6c2836a2e5..48ea865808a 100644 --- a/pkg/registry/replicaset/etcd/etcd_test.go +++ b/pkg/registry/replicaset/etcd/etcd_test.go @@ -32,7 +32,7 @@ import ( func newStorage(t *testing.T) (*ReplicaSetStorage, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "extensions") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} replicaSetStorage := NewStorage(restOptions) return &replicaSetStorage, server } diff --git a/pkg/registry/resourcequota/etcd/etcd.go b/pkg/registry/resourcequota/etcd/etcd.go index f6421451e73..1cffeeebdaa 100644 --- a/pkg/registry/resourcequota/etcd/etcd.go +++ b/pkg/registry/resourcequota/etcd/etcd.go @@ -54,7 +54,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return resourcequota.MatchResourceQuota(label, field) }, - QualifiedResource: api.Resource("resourcequotas"), + QualifiedResource: api.Resource("resourcequotas"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: resourcequota.Strategy, UpdateStrategy: resourcequota.Strategy, diff --git a/pkg/registry/resourcequota/etcd/etcd_test.go b/pkg/registry/resourcequota/etcd/etcd_test.go index 1423151a71a..3197f83e08c 100644 --- a/pkg/registry/resourcequota/etcd/etcd_test.go +++ b/pkg/registry/resourcequota/etcd/etcd_test.go @@ -32,7 +32,7 @@ import ( func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} resourceQuotaStorage, statusStorage := NewREST(restOptions) return resourceQuotaStorage, statusStorage, server } diff --git a/pkg/registry/secret/etcd/etcd.go b/pkg/registry/secret/etcd/etcd.go index d8fc89b1dff..3aaea7009ea 100644 --- a/pkg/registry/secret/etcd/etcd.go +++ b/pkg/registry/secret/etcd/etcd.go @@ -54,7 +54,8 @@ func NewREST(opts generic.RESTOptions) *REST { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return secret.Matcher(label, field) }, - QualifiedResource: api.Resource("secrets"), + QualifiedResource: api.Resource("secrets"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: secret.Strategy, UpdateStrategy: secret.Strategy, diff --git a/pkg/registry/secret/etcd/etcd_test.go b/pkg/registry/secret/etcd/etcd_test.go index 626e6434ac9..99b0edb1cce 100644 --- a/pkg/registry/secret/etcd/etcd_test.go +++ b/pkg/registry/secret/etcd/etcd_test.go @@ -30,7 +30,7 @@ import ( func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewREST(restOptions), server } diff --git a/pkg/registry/service/etcd/etcd.go b/pkg/registry/service/etcd/etcd.go index 9aa3ddfdcdf..28505d2c87e 100644 --- a/pkg/registry/service/etcd/etcd.go +++ b/pkg/registry/service/etcd/etcd.go @@ -54,7 +54,8 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return service.MatchServices(label, field) }, - QualifiedResource: api.Resource("services"), + QualifiedResource: api.Resource("services"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: service.Strategy, UpdateStrategy: service.Strategy, diff --git a/pkg/registry/service/etcd/etcd_test.go b/pkg/registry/service/etcd/etcd_test.go index 6a78d29b1d7..b68ac3603a7 100644 --- a/pkg/registry/service/etcd/etcd_test.go +++ b/pkg/registry/service/etcd/etcd_test.go @@ -31,7 +31,7 @@ import ( func newStorage(t *testing.T) (*REST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} serviceStorage, statusStorage := NewREST(restOptions) return serviceStorage, statusStorage, server } diff --git a/pkg/registry/serviceaccount/etcd/etcd.go b/pkg/registry/serviceaccount/etcd/etcd.go index d41d7773fdb..81925bd723e 100644 --- a/pkg/registry/serviceaccount/etcd/etcd.go +++ b/pkg/registry/serviceaccount/etcd/etcd.go @@ -54,7 +54,8 @@ func NewREST(opts generic.RESTOptions) *REST { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return serviceaccount.Matcher(label, field) }, - QualifiedResource: api.Resource("serviceaccounts"), + QualifiedResource: api.Resource("serviceaccounts"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: serviceaccount.Strategy, UpdateStrategy: serviceaccount.Strategy, diff --git a/pkg/registry/serviceaccount/etcd/etcd_test.go b/pkg/registry/serviceaccount/etcd/etcd_test.go index 8704ae8db79..9700e4a5e1d 100644 --- a/pkg/registry/serviceaccount/etcd/etcd_test.go +++ b/pkg/registry/serviceaccount/etcd/etcd_test.go @@ -30,7 +30,7 @@ import ( func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewREST(restOptions), server } diff --git a/pkg/registry/thirdpartyresource/etcd/etcd.go b/pkg/registry/thirdpartyresource/etcd/etcd.go index 0a624865585..95fc9fb5a4a 100644 --- a/pkg/registry/thirdpartyresource/etcd/etcd.go +++ b/pkg/registry/thirdpartyresource/etcd/etcd.go @@ -54,9 +54,10 @@ func NewREST(opts generic.RESTOptions) *REST { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return thirdpartyresource.Matcher(label, field) }, - QualifiedResource: extensions.Resource("thirdpartyresources"), - CreateStrategy: thirdpartyresource.Strategy, - UpdateStrategy: thirdpartyresource.Strategy, + QualifiedResource: extensions.Resource("thirdpartyresources"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, + CreateStrategy: thirdpartyresource.Strategy, + UpdateStrategy: thirdpartyresource.Strategy, Storage: storageInterface, } diff --git a/pkg/registry/thirdpartyresource/etcd/etcd_test.go b/pkg/registry/thirdpartyresource/etcd/etcd_test.go index dc9cad39605..2d6b4562994 100644 --- a/pkg/registry/thirdpartyresource/etcd/etcd_test.go +++ b/pkg/registry/thirdpartyresource/etcd/etcd_test.go @@ -33,7 +33,7 @@ import ( func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName) - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewREST(restOptions), server } diff --git a/pkg/registry/thirdpartyresourcedata/etcd/etcd.go b/pkg/registry/thirdpartyresourcedata/etcd/etcd.go index d0689faec75..8b5e0bc16eb 100644 --- a/pkg/registry/thirdpartyresourcedata/etcd/etcd.go +++ b/pkg/registry/thirdpartyresourcedata/etcd/etcd.go @@ -57,9 +57,10 @@ func NewREST(opts generic.RESTOptions, group, kind string) *REST { PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { return thirdpartyresourcedata.Matcher(label, field) }, - QualifiedResource: extensions.Resource("thirdpartyresourcedatas"), - CreateStrategy: thirdpartyresourcedata.Strategy, - UpdateStrategy: thirdpartyresourcedata.Strategy, + QualifiedResource: extensions.Resource("thirdpartyresourcedatas"), + DeleteCollectionWorkers: opts.DeleteCollectionWorkers, + CreateStrategy: thirdpartyresourcedata.Strategy, + UpdateStrategy: thirdpartyresourcedata.Strategy, Storage: storageInterface, } diff --git a/pkg/registry/thirdpartyresourcedata/etcd/etcd_test.go b/pkg/registry/thirdpartyresourcedata/etcd/etcd_test.go index 4f70e6aa63a..eafaffa2cfd 100644 --- a/pkg/registry/thirdpartyresourcedata/etcd/etcd_test.go +++ b/pkg/registry/thirdpartyresourcedata/etcd/etcd_test.go @@ -33,7 +33,7 @@ import ( func newStorage(t *testing.T) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, extensions.GroupName) - restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage} + restOptions := generic.RESTOptions{etcdStorage, generic.UndecoratedStorage, 1} return NewREST(restOptions, "foo", "bar"), server }