mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
fix pod eviction storage
This commit is contained in:
parent
0ad50d2033
commit
28358e4775
@ -81,7 +81,6 @@ import (
|
||||
storagerest "k8s.io/kubernetes/pkg/registry/storage/rest"
|
||||
|
||||
// direct etcd registry dependencies
|
||||
podetcd "k8s.io/kubernetes/pkg/registry/core/pod/etcd"
|
||||
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
|
||||
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata/etcd"
|
||||
)
|
||||
@ -220,6 +219,7 @@ func (c completedConfig) New() (*Master, error) {
|
||||
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
|
||||
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
|
||||
ComponentStatusServerFunc: func() map[string]apiserver.Server { return getServersToValidate(c.StorageFactory) },
|
||||
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
|
||||
},
|
||||
}
|
||||
|
||||
@ -328,19 +328,6 @@ func (m *Master) InstallAPIs(c *Config) {
|
||||
}
|
||||
}
|
||||
|
||||
// This is here so that, if the policy group is present, the eviction
|
||||
// subresource handler wil be able to find poddisruptionbudgets
|
||||
// TODO(lavalamp) find a better way for groups to discover and interact
|
||||
// with each other
|
||||
if group == "policy" {
|
||||
storage := apiGroupsInfo[0].VersionedResourcesStorageMap["v1"]["pods/eviction"]
|
||||
evictionStorage := storage.(*podetcd.EvictionREST)
|
||||
|
||||
storage = apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"]["poddisruptionbudgets"]
|
||||
evictionStorage.PodDisruptionBudgetLister = storage.(rest.Lister)
|
||||
evictionStorage.PodDisruptionBudgetUpdater = storage.(rest.Updater)
|
||||
}
|
||||
|
||||
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/apis/rbac"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/generated/openapi"
|
||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
@ -95,6 +96,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
|
||||
config.GenericConfig.ProxyTLSClientConfig = &tls.Config{}
|
||||
config.GenericConfig.RequestContextMapper = api.NewRequestContextMapper()
|
||||
config.GenericConfig.EnableVersion = true
|
||||
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
|
||||
config.EnableCoreControllers = false
|
||||
|
||||
// TODO: this is kind of hacky. The trouble is that the sync loop
|
||||
|
@ -27,9 +27,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
"k8s.io/kubernetes/pkg/apis/policy"
|
||||
policyclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/unversioned"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/cachesize"
|
||||
"k8s.io/kubernetes/pkg/registry/core/pod"
|
||||
podrest "k8s.io/kubernetes/pkg/registry/core/pod/rest"
|
||||
@ -59,7 +58,7 @@ type REST struct {
|
||||
}
|
||||
|
||||
// NewStorage returns a RESTStorage object that will work against pods.
|
||||
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
|
||||
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
|
||||
prefix := "/" + opts.ResourcePrefix
|
||||
|
||||
newListFunc := func() runtime.Object { return &api.PodList{} }
|
||||
@ -105,7 +104,7 @@ func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTr
|
||||
return PodStorage{
|
||||
Pod: &REST{store, proxyTransport},
|
||||
Binding: &BindingREST{store: store},
|
||||
Eviction: &EvictionREST{store: store},
|
||||
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
|
||||
Status: &StatusREST{store: &statusStore},
|
||||
Log: &podrest.LogREST{Store: store, KubeletConn: k},
|
||||
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
|
||||
@ -123,155 +122,6 @@ func (r *REST) ResourceLocation(ctx api.Context, name string) (*url.URL, http.Ro
|
||||
return pod.ResourceLocation(r, r.proxyTransport, ctx, name)
|
||||
}
|
||||
|
||||
// EvictionREST implements the REST endpoint for evicting pods from nodes when etcd is in use.
|
||||
type EvictionREST struct {
|
||||
store *registry.Store
|
||||
PodDisruptionBudgetLister rest.Lister
|
||||
PodDisruptionBudgetUpdater rest.Updater
|
||||
}
|
||||
|
||||
var _ = rest.Creater(&EvictionREST{})
|
||||
|
||||
// New creates a new eviction resource
|
||||
func (r *EvictionREST) New() runtime.Object {
|
||||
return &policy.Eviction{}
|
||||
}
|
||||
|
||||
// Create attempts to create a new eviction. That is, it tries to evict a pod.
|
||||
func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
|
||||
eviction := obj.(*policy.Eviction)
|
||||
|
||||
obj, err := r.store.Get(ctx, eviction.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pod := obj.(*api.Pod)
|
||||
pdbs, err := r.getPodDisruptionBudgets(ctx, pod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(pdbs) > 1 {
|
||||
return &unversioned.Status{
|
||||
Status: unversioned.StatusFailure,
|
||||
Message: "This pod has more than one PodDisruptionBudget, which the eviction subresource does not support.",
|
||||
Code: 500,
|
||||
}, nil
|
||||
} else if len(pdbs) == 1 {
|
||||
pdb := pdbs[0]
|
||||
// Try to verify-and-decrement
|
||||
|
||||
// If it was false already, or if it becomes false during the course of our retries,
|
||||
// raise an error marked as a 429.
|
||||
ok, err := r.checkAndDecrement(ctx, pdb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !ok {
|
||||
return &unversioned.Status{
|
||||
Status: unversioned.StatusFailure,
|
||||
// TODO(mml): Include some more details about why the eviction is disallowed.
|
||||
// Ideally any such text is generated by the DisruptionController (offline).
|
||||
Message: "Cannot evict pod as it would violate the pod's disruption budget.",
|
||||
Code: 429,
|
||||
// TODO(mml): Add a Retry-After header. Once there are time-based
|
||||
// budgets, we can sometimes compute a sensible suggested value. But
|
||||
// even without that, we can give a suggestion (10 minutes?) that
|
||||
// prevents well-behaved clients from hammering us.
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// At this point there was either no PDB or we succeded in decrementing
|
||||
|
||||
// Try the delete
|
||||
_, err = r.store.Delete(ctx, eviction.Name, eviction.DeleteOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Success!
|
||||
return &unversioned.Status{Status: unversioned.StatusSuccess}, nil
|
||||
}
|
||||
|
||||
// UpdatedObjectInfo is a simple interface for attempting updates to
|
||||
// runtime.Objects. EvictionREST implements it directly.
|
||||
var _ = rest.UpdatedObjectInfo(&EvictionREST{})
|
||||
|
||||
// Preconditions returns any preconditions required prior to updating the
|
||||
// PDB. None currently.
|
||||
func (r *EvictionREST) Preconditions() *api.Preconditions {
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdatedObject returns the updated PDB if it is able to update
|
||||
// PodDisruptionAllowed from true->false.
|
||||
func (r *EvictionREST) UpdatedObject(ctx api.Context, oldObj runtime.Object) (newObj runtime.Object, err error) {
|
||||
copy, err := api.Scheme.DeepCopy(oldObj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
newObj = copy.(runtime.Object)
|
||||
pdb := oldObj.(*policy.PodDisruptionBudget)
|
||||
if !pdb.Status.PodDisruptionAllowed {
|
||||
return nil, fmt.Errorf("PodDisruptionAllowed is already false")
|
||||
}
|
||||
pdb.Status.PodDisruptionAllowed = false
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (r *EvictionREST) checkAndDecrement(ctx api.Context, pdb policy.PodDisruptionBudget) (ok bool, err error) {
|
||||
if !pdb.Status.PodDisruptionAllowed {
|
||||
return false, nil
|
||||
}
|
||||
newObj, _, err := r.PodDisruptionBudgetUpdater.Update(ctx, pdb.Name, r)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
newPdb := newObj.(*policy.PodDisruptionBudget)
|
||||
if newPdb.Status.PodDisruptionAllowed {
|
||||
return false, fmt.Errorf("update did not succeed")
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Returns any PDBs that match the pod.
|
||||
// err is set if there's an error.
|
||||
func (r *EvictionREST) getPodDisruptionBudgets(ctx api.Context, pod *api.Pod) (pdbs []policy.PodDisruptionBudget, err error) {
|
||||
if len(pod.Labels) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
l, err := r.PodDisruptionBudgetLister.List(ctx, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
pdbList := l.(*policy.PodDisruptionBudgetList)
|
||||
|
||||
for _, pdb := range pdbList.Items {
|
||||
if pdb.Namespace != pod.Namespace {
|
||||
continue
|
||||
}
|
||||
selector, err := unversioned.LabelSelectorAsSelector(pdb.Spec.Selector)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// If a PDB with a nil or empty selector creeps in, it should match nothing, not everything.
|
||||
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
|
||||
continue
|
||||
}
|
||||
|
||||
pdbs = append(pdbs, pdb)
|
||||
}
|
||||
|
||||
return pdbs, nil
|
||||
}
|
||||
|
||||
// BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use.
|
||||
type BindingREST struct {
|
||||
store *registry.Store
|
||||
|
@ -40,7 +40,7 @@ import (
|
||||
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *etcdtesting.EtcdTestServer) {
|
||||
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
|
||||
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3}
|
||||
storage := NewStorage(restOptions, nil, nil)
|
||||
storage := NewStorage(restOptions, nil, nil, nil)
|
||||
return storage.Pod, storage.Binding, storage.Status, server
|
||||
}
|
||||
|
||||
@ -148,7 +148,7 @@ func (f FailDeletionStorage) Delete(ctx context.Context, key string, out runtime
|
||||
func newFailDeleteStorage(t *testing.T, called *bool) (*REST, *etcdtesting.EtcdTestServer) {
|
||||
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
|
||||
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 3}
|
||||
storage := NewStorage(restOptions, nil, nil)
|
||||
storage := NewStorage(restOptions, nil, nil, nil)
|
||||
storage.Pod.Store.Storage = FailDeletionStorage{storage.Pod.Store.Storage, called}
|
||||
return storage.Pod, server
|
||||
}
|
||||
|
153
pkg/registry/core/pod/etcd/eviction.go
Normal file
153
pkg/registry/core/pod/etcd/eviction.go
Normal file
@ -0,0 +1,153 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/policy"
|
||||
policyclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/unversioned"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/generic/registry"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
)
|
||||
|
||||
func newEvictionStorage(store *registry.Store, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) *EvictionREST {
|
||||
return &EvictionREST{store: store, podDisruptionBudgetClient: podDisruptionBudgetClient}
|
||||
}
|
||||
|
||||
// EvictionREST implements the REST endpoint for evicting pods from nodes when etcd is in use.
|
||||
type EvictionREST struct {
|
||||
store *registry.Store
|
||||
podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter
|
||||
}
|
||||
|
||||
var _ = rest.Creater(&EvictionREST{})
|
||||
|
||||
// New creates a new eviction resource
|
||||
func (r *EvictionREST) New() runtime.Object {
|
||||
return &policy.Eviction{}
|
||||
}
|
||||
|
||||
// Create attempts to create a new eviction. That is, it tries to evict a pod.
|
||||
func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
|
||||
eviction := obj.(*policy.Eviction)
|
||||
|
||||
obj, err := r.store.Get(ctx, eviction.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pod := obj.(*api.Pod)
|
||||
pdbs, err := r.getPodDisruptionBudgets(ctx, pod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(pdbs) > 1 {
|
||||
return &unversioned.Status{
|
||||
Status: unversioned.StatusFailure,
|
||||
Message: "This pod has more than one PodDisruptionBudget, which the eviction subresource does not support.",
|
||||
Code: 500,
|
||||
}, nil
|
||||
} else if len(pdbs) == 1 {
|
||||
pdb := pdbs[0]
|
||||
// Try to verify-and-decrement
|
||||
|
||||
// If it was false already, or if it becomes false during the course of our retries,
|
||||
// raise an error marked as a 429.
|
||||
ok, err := r.checkAndDecrement(pod.Namespace, pdb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !ok {
|
||||
return &unversioned.Status{
|
||||
Status: unversioned.StatusFailure,
|
||||
// TODO(mml): Include some more details about why the eviction is disallowed.
|
||||
// Ideally any such text is generated by the DisruptionController (offline).
|
||||
Message: "Cannot evict pod as it would violate the pod's disruption budget.",
|
||||
Code: 429,
|
||||
// TODO(mml): Add a Retry-After header. Once there are time-based
|
||||
// budgets, we can sometimes compute a sensible suggested value. But
|
||||
// even without that, we can give a suggestion (10 minutes?) that
|
||||
// prevents well-behaved clients from hammering us.
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// At this point there was either no PDB or we succeded in decrementing
|
||||
|
||||
// Try the delete
|
||||
_, err = r.store.Delete(ctx, eviction.Name, eviction.DeleteOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Success!
|
||||
return &unversioned.Status{Status: unversioned.StatusSuccess}, nil
|
||||
}
|
||||
|
||||
func (r *EvictionREST) checkAndDecrement(namespace string, pdb policy.PodDisruptionBudget) (ok bool, err error) {
|
||||
if !pdb.Status.PodDisruptionAllowed {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
copied, err := api.Scheme.DeepCopy(pdb)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
newPDB := copied.(policy.PodDisruptionBudget)
|
||||
newPDB.Status.PodDisruptionAllowed = false
|
||||
|
||||
if _, err := r.podDisruptionBudgetClient.PodDisruptionBudgets(namespace).Update(&newPDB); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Returns any PDBs that match the pod.
|
||||
// err is set if there's an error.
|
||||
func (r *EvictionREST) getPodDisruptionBudgets(ctx api.Context, pod *api.Pod) (pdbs []policy.PodDisruptionBudget, err error) {
|
||||
if len(pod.Labels) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
pdbList, err := r.podDisruptionBudgetClient.PodDisruptionBudgets(pod.Namespace).List(api.ListOptions{})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, pdb := range pdbList.Items {
|
||||
if pdb.Namespace != pod.Namespace {
|
||||
continue
|
||||
}
|
||||
selector, err := unversioned.LabelSelectorAsSelector(pdb.Spec.Selector)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// If a PDB with a nil or empty selector creeps in, it should match nothing, not everything.
|
||||
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
|
||||
continue
|
||||
}
|
||||
|
||||
pdbs = append(pdbs, pdb)
|
||||
}
|
||||
|
||||
return pdbs, nil
|
||||
}
|
@ -27,6 +27,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||
"k8s.io/kubernetes/pkg/apiserver"
|
||||
policyclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/genericapiserver"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/registry/core/componentstatus"
|
||||
@ -72,6 +74,8 @@ type LegacyRESTStorageProvider struct {
|
||||
|
||||
// ComponentStatusServerFunc is a func used to locate servers to back component status
|
||||
ComponentStatusServerFunc ComponentStatusServerFunc
|
||||
|
||||
LoopbackClientConfig *restclient.Config
|
||||
}
|
||||
|
||||
type ComponentStatusServerFunc func() map[string]apiserver.Server
|
||||
@ -101,8 +105,16 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
|
||||
if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) {
|
||||
apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale")
|
||||
}
|
||||
|
||||
var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
|
||||
if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}); registered.IsEnabledVersion(policyGroupVersion) {
|
||||
apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction")
|
||||
|
||||
var err error
|
||||
podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
|
||||
}
|
||||
}
|
||||
restStorage := LegacyRESTStorage{}
|
||||
|
||||
@ -131,6 +143,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
|
||||
restOptionsGetter(api.Resource("pods")),
|
||||
kubeletclient.ConnectionInfoGetter(nodeStorage.Node),
|
||||
c.ProxyTransport,
|
||||
podDisruptionClient,
|
||||
)
|
||||
|
||||
serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptionsGetter(api.Resource("services")))
|
||||
|
Loading…
Reference in New Issue
Block a user