From 28358e4775e16333967c1302d8ccc542a3572317 Mon Sep 17 00:00:00 2001 From: deads2k Date: Fri, 30 Sep 2016 15:07:27 -0400 Subject: [PATCH] fix pod eviction storage --- pkg/master/master.go | 15 +-- pkg/master/master_test.go | 2 + pkg/registry/core/pod/etcd/etcd.go | 156 +----------------------- pkg/registry/core/pod/etcd/etcd_test.go | 4 +- pkg/registry/core/pod/etcd/eviction.go | 153 +++++++++++++++++++++++ pkg/registry/core/rest/storage_core.go | 13 ++ 6 files changed, 174 insertions(+), 169 deletions(-) create mode 100644 pkg/registry/core/pod/etcd/eviction.go diff --git a/pkg/master/master.go b/pkg/master/master.go index f1737e22066..ada9f0e1c02 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -81,7 +81,6 @@ import ( storagerest "k8s.io/kubernetes/pkg/registry/storage/rest" // direct etcd registry dependencies - podetcd "k8s.io/kubernetes/pkg/registry/core/pod/etcd" "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata" thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata/etcd" ) @@ -220,6 +219,7 @@ func (c completedConfig) New() (*Master, error) { ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange, ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange, ComponentStatusServerFunc: func() map[string]apiserver.Server { return getServersToValidate(c.StorageFactory) }, + LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, }, } @@ -328,19 +328,6 @@ func (m *Master) InstallAPIs(c *Config) { } } - // This is here so that, if the policy group is present, the eviction - // subresource handler wil be able to find poddisruptionbudgets - // TODO(lavalamp) find a better way for groups to discover and interact - // with each other - if group == "policy" { - storage := apiGroupsInfo[0].VersionedResourcesStorageMap["v1"]["pods/eviction"] - evictionStorage := storage.(*podetcd.EvictionREST) - - storage = apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"]["poddisruptionbudgets"] - evictionStorage.PodDisruptionBudgetLister = storage.(rest.Lister) - evictionStorage.PodDisruptionBudgetUpdater = storage.(rest.Updater) - } - apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) } diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 763ff82660b..a83c96c6627 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/apis/rbac" + "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/generated/openapi" "k8s.io/kubernetes/pkg/genericapiserver" "k8s.io/kubernetes/pkg/kubelet/client" @@ -95,6 +96,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. config.GenericConfig.ProxyTLSClientConfig = &tls.Config{} config.GenericConfig.RequestContextMapper = api.NewRequestContextMapper() config.GenericConfig.EnableVersion = true + config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}} config.EnableCoreControllers = false // TODO: this is kind of hacky. The trouble is that the sync loop diff --git a/pkg/registry/core/pod/etcd/etcd.go b/pkg/registry/core/pod/etcd/etcd.go index b207dbca65d..189457694ea 100644 --- a/pkg/registry/core/pod/etcd/etcd.go +++ b/pkg/registry/core/pod/etcd/etcd.go @@ -27,9 +27,8 @@ import ( "k8s.io/kubernetes/pkg/api/rest" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/validation" - "k8s.io/kubernetes/pkg/apis/policy" + policyclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/unversioned" "k8s.io/kubernetes/pkg/kubelet/client" - "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/cachesize" "k8s.io/kubernetes/pkg/registry/core/pod" podrest "k8s.io/kubernetes/pkg/registry/core/pod/rest" @@ -59,7 +58,7 @@ type REST struct { } // NewStorage returns a RESTStorage object that will work against pods. -func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage { +func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage { prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.PodList{} } @@ -105,7 +104,7 @@ func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTr return PodStorage{ Pod: &REST{store, proxyTransport}, Binding: &BindingREST{store: store}, - Eviction: &EvictionREST{store: store}, + Eviction: newEvictionStorage(store, podDisruptionBudgetClient), Status: &StatusREST{store: &statusStore}, Log: &podrest.LogREST{Store: store, KubeletConn: k}, Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport}, @@ -123,155 +122,6 @@ func (r *REST) ResourceLocation(ctx api.Context, name string) (*url.URL, http.Ro return pod.ResourceLocation(r, r.proxyTransport, ctx, name) } -// EvictionREST implements the REST endpoint for evicting pods from nodes when etcd is in use. -type EvictionREST struct { - store *registry.Store - PodDisruptionBudgetLister rest.Lister - PodDisruptionBudgetUpdater rest.Updater -} - -var _ = rest.Creater(&EvictionREST{}) - -// New creates a new eviction resource -func (r *EvictionREST) New() runtime.Object { - return &policy.Eviction{} -} - -// Create attempts to create a new eviction. That is, it tries to evict a pod. -func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { - eviction := obj.(*policy.Eviction) - - obj, err := r.store.Get(ctx, eviction.Name) - if err != nil { - return nil, err - } - pod := obj.(*api.Pod) - pdbs, err := r.getPodDisruptionBudgets(ctx, pod) - if err != nil { - return nil, err - } - - if len(pdbs) > 1 { - return &unversioned.Status{ - Status: unversioned.StatusFailure, - Message: "This pod has more than one PodDisruptionBudget, which the eviction subresource does not support.", - Code: 500, - }, nil - } else if len(pdbs) == 1 { - pdb := pdbs[0] - // Try to verify-and-decrement - - // If it was false already, or if it becomes false during the course of our retries, - // raise an error marked as a 429. - ok, err := r.checkAndDecrement(ctx, pdb) - if err != nil { - return nil, err - } - - if !ok { - return &unversioned.Status{ - Status: unversioned.StatusFailure, - // TODO(mml): Include some more details about why the eviction is disallowed. - // Ideally any such text is generated by the DisruptionController (offline). - Message: "Cannot evict pod as it would violate the pod's disruption budget.", - Code: 429, - // TODO(mml): Add a Retry-After header. Once there are time-based - // budgets, we can sometimes compute a sensible suggested value. But - // even without that, we can give a suggestion (10 minutes?) that - // prevents well-behaved clients from hammering us. - }, nil - } - } - - // At this point there was either no PDB or we succeded in decrementing - - // Try the delete - _, err = r.store.Delete(ctx, eviction.Name, eviction.DeleteOptions) - if err != nil { - return nil, err - } - - // Success! - return &unversioned.Status{Status: unversioned.StatusSuccess}, nil -} - -// UpdatedObjectInfo is a simple interface for attempting updates to -// runtime.Objects. EvictionREST implements it directly. -var _ = rest.UpdatedObjectInfo(&EvictionREST{}) - -// Preconditions returns any preconditions required prior to updating the -// PDB. None currently. -func (r *EvictionREST) Preconditions() *api.Preconditions { - return nil -} - -// UpdatedObject returns the updated PDB if it is able to update -// PodDisruptionAllowed from true->false. -func (r *EvictionREST) UpdatedObject(ctx api.Context, oldObj runtime.Object) (newObj runtime.Object, err error) { - copy, err := api.Scheme.DeepCopy(oldObj) - if err != nil { - return - } - newObj = copy.(runtime.Object) - pdb := oldObj.(*policy.PodDisruptionBudget) - if !pdb.Status.PodDisruptionAllowed { - return nil, fmt.Errorf("PodDisruptionAllowed is already false") - } - pdb.Status.PodDisruptionAllowed = false - - return -} - -func (r *EvictionREST) checkAndDecrement(ctx api.Context, pdb policy.PodDisruptionBudget) (ok bool, err error) { - if !pdb.Status.PodDisruptionAllowed { - return false, nil - } - newObj, _, err := r.PodDisruptionBudgetUpdater.Update(ctx, pdb.Name, r) - if err != nil { - return false, err - } - - newPdb := newObj.(*policy.PodDisruptionBudget) - if newPdb.Status.PodDisruptionAllowed { - return false, fmt.Errorf("update did not succeed") - } - - return true, nil -} - -// Returns any PDBs that match the pod. -// err is set if there's an error. -func (r *EvictionREST) getPodDisruptionBudgets(ctx api.Context, pod *api.Pod) (pdbs []policy.PodDisruptionBudget, err error) { - if len(pod.Labels) == 0 { - return - } - - l, err := r.PodDisruptionBudgetLister.List(ctx, nil) - if err != nil { - return - } - - pdbList := l.(*policy.PodDisruptionBudgetList) - - for _, pdb := range pdbList.Items { - if pdb.Namespace != pod.Namespace { - continue - } - selector, err := unversioned.LabelSelectorAsSelector(pdb.Spec.Selector) - if err != nil { - continue - } - // If a PDB with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - continue - } - - pdbs = append(pdbs, pdb) - } - - return pdbs, nil -} - // BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use. type BindingREST struct { store *registry.Store diff --git a/pkg/registry/core/pod/etcd/etcd_test.go b/pkg/registry/core/pod/etcd/etcd_test.go index 2c7e8cc4272..a89bb93a923 100644 --- a/pkg/registry/core/pod/etcd/etcd_test.go +++ b/pkg/registry/core/pod/etcd/etcd_test.go @@ -40,7 +40,7 @@ import ( func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3} - storage := NewStorage(restOptions, nil, nil) + storage := NewStorage(restOptions, nil, nil, nil) return storage.Pod, storage.Binding, storage.Status, server } @@ -148,7 +148,7 @@ func (f FailDeletionStorage) Delete(ctx context.Context, key string, out runtime func newFailDeleteStorage(t *testing.T, called *bool) (*REST, *etcdtesting.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3} - storage := NewStorage(restOptions, nil, nil) + storage := NewStorage(restOptions, nil, nil, nil) storage.Pod.Store.Storage = FailDeletionStorage{storage.Pod.Store.Storage, called} return storage.Pod, server } diff --git a/pkg/registry/core/pod/etcd/eviction.go b/pkg/registry/core/pod/etcd/eviction.go new file mode 100644 index 00000000000..6127f5c1af8 --- /dev/null +++ b/pkg/registry/core/pod/etcd/eviction.go @@ -0,0 +1,153 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/rest" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/policy" + policyclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/unversioned" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/registry/generic/registry" + "k8s.io/kubernetes/pkg/runtime" +) + +func newEvictionStorage(store *registry.Store, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) *EvictionREST { + return &EvictionREST{store: store, podDisruptionBudgetClient: podDisruptionBudgetClient} +} + +// EvictionREST implements the REST endpoint for evicting pods from nodes when etcd is in use. +type EvictionREST struct { + store *registry.Store + podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter +} + +var _ = rest.Creater(&EvictionREST{}) + +// New creates a new eviction resource +func (r *EvictionREST) New() runtime.Object { + return &policy.Eviction{} +} + +// Create attempts to create a new eviction. That is, it tries to evict a pod. +func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { + eviction := obj.(*policy.Eviction) + + obj, err := r.store.Get(ctx, eviction.Name) + if err != nil { + return nil, err + } + pod := obj.(*api.Pod) + pdbs, err := r.getPodDisruptionBudgets(ctx, pod) + if err != nil { + return nil, err + } + + if len(pdbs) > 1 { + return &unversioned.Status{ + Status: unversioned.StatusFailure, + Message: "This pod has more than one PodDisruptionBudget, which the eviction subresource does not support.", + Code: 500, + }, nil + } else if len(pdbs) == 1 { + pdb := pdbs[0] + // Try to verify-and-decrement + + // If it was false already, or if it becomes false during the course of our retries, + // raise an error marked as a 429. + ok, err := r.checkAndDecrement(pod.Namespace, pdb) + if err != nil { + return nil, err + } + + if !ok { + return &unversioned.Status{ + Status: unversioned.StatusFailure, + // TODO(mml): Include some more details about why the eviction is disallowed. + // Ideally any such text is generated by the DisruptionController (offline). + Message: "Cannot evict pod as it would violate the pod's disruption budget.", + Code: 429, + // TODO(mml): Add a Retry-After header. Once there are time-based + // budgets, we can sometimes compute a sensible suggested value. But + // even without that, we can give a suggestion (10 minutes?) that + // prevents well-behaved clients from hammering us. + }, nil + } + } + + // At this point there was either no PDB or we succeded in decrementing + + // Try the delete + _, err = r.store.Delete(ctx, eviction.Name, eviction.DeleteOptions) + if err != nil { + return nil, err + } + + // Success! + return &unversioned.Status{Status: unversioned.StatusSuccess}, nil +} + +func (r *EvictionREST) checkAndDecrement(namespace string, pdb policy.PodDisruptionBudget) (ok bool, err error) { + if !pdb.Status.PodDisruptionAllowed { + return false, nil + } + + copied, err := api.Scheme.DeepCopy(pdb) + if err != nil { + return false, err + } + newPDB := copied.(policy.PodDisruptionBudget) + newPDB.Status.PodDisruptionAllowed = false + + if _, err := r.podDisruptionBudgetClient.PodDisruptionBudgets(namespace).Update(&newPDB); err != nil { + return false, err + } + + return true, nil +} + +// Returns any PDBs that match the pod. +// err is set if there's an error. +func (r *EvictionREST) getPodDisruptionBudgets(ctx api.Context, pod *api.Pod) (pdbs []policy.PodDisruptionBudget, err error) { + if len(pod.Labels) == 0 { + return + } + + pdbList, err := r.podDisruptionBudgetClient.PodDisruptionBudgets(pod.Namespace).List(api.ListOptions{}) + if err != nil { + return + } + + for _, pdb := range pdbList.Items { + if pdb.Namespace != pod.Namespace { + continue + } + selector, err := unversioned.LabelSelectorAsSelector(pdb.Spec.Selector) + if err != nil { + continue + } + // If a PDB with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + + pdbs = append(pdbs, pdb) + } + + return pdbs, nil +} diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 7cfaee03984..ba3d7ee7aff 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -27,6 +27,8 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apiserver" + policyclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/unversioned" + "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/genericapiserver" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/registry/core/componentstatus" @@ -72,6 +74,8 @@ type LegacyRESTStorageProvider struct { // ComponentStatusServerFunc is a func used to locate servers to back component status ComponentStatusServerFunc ComponentStatusServerFunc + + LoopbackClientConfig *restclient.Config } type ComponentStatusServerFunc func() map[string]apiserver.Server @@ -101,8 +105,16 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) { apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale") } + + var podDisruptionClient policyclient.PodDisruptionBudgetsGetter if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}); registered.IsEnabledVersion(policyGroupVersion) { apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction") + + var err error + podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig) + if err != nil { + return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err + } } restStorage := LegacyRESTStorage{} @@ -131,6 +143,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi restOptionsGetter(api.Resource("pods")), kubeletclient.ConnectionInfoGetter(nodeStorage.Node), c.ProxyTransport, + podDisruptionClient, ) serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptionsGetter(api.Resource("services")))