Make ResourceQuota admission and controller work generically

This commit is contained in:
derekwaynecarr 2016-02-22 11:15:09 -05:00
parent 553c4701af
commit af85fb57c3
9 changed files with 1174 additions and 1001 deletions

View File

@ -60,6 +60,7 @@ import (
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
@ -220,9 +221,23 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Infof("allocate-node-cidrs set to %v, node controller not creating routes", s.AllocateNodeCIDRs)
}
go resourcequotacontroller.NewResourceQuotaController(
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resourcequota-controller")),
controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration)).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)
resourceQuotaControllerClient := clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resourcequota-controller"))
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient)
groupKindsToReplenish := []unversioned.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
api.Kind("Secret"),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
GroupKindsToReplenish: groupKindsToReplenish,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(resourceQuotaControllerClient),
}
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.

View File

@ -28,6 +28,7 @@ import (
kubecontrollermanager "k8s.io/kubernetes/cmd/kube-controller-manager/app"
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
client "k8s.io/kubernetes/pkg/client/unversioned"
@ -53,6 +54,7 @@ import (
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/healthz"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
@ -173,8 +175,23 @@ func (s *CMServer) Run(_ []string) error {
routeController.Run(s.NodeSyncPeriod.Duration)
}
go resourcequotacontroller.NewResourceQuotaController(
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resource-quota-controller")), controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration)).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)
resourceQuotaControllerClient := clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "resource-quota-controller"))
resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient)
groupKindsToReplenish := []unversioned.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
api.Kind("Secret"),
}
resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry,
GroupKindsToReplenish: groupKindsToReplenish,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(resourceQuotaControllerClient),
}
go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop)
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.

View File

@ -22,7 +22,6 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/controller/resourcequota"
)
type resources struct {
@ -81,7 +80,6 @@ func TestResources(tst *testing.T) {
}
tst.Logf("Testing resource computation for %v => request=%v limit=%v", t, pod.Spec.Containers[0].Resources.Requests, pod.Spec.Containers[0].Resources.Limits)
tst.Logf("hasRequests: cpu => %v, mem => %v", resourcequota.PodHasRequests(pod, api.ResourceCPU), resourcequota.PodHasRequests(pod, api.ResourceMemory))
beforeCpuR, beforeCpuL, _, err := LimitedCPUForPod(pod, DefaultDefaultContainerCPULimit)
assert.NoError(err, "CPUForPod should not return an error")

View File

@ -0,0 +1,194 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcequota
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/quota/evaluator/core"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// ReplenishmentFunc is a function that is invoked when controller sees a change
// that may require a quota to be replenished (i.e. object deletion, or object moved to terminal state)
type ReplenishmentFunc func(groupKind unversioned.GroupKind, namespace string, object runtime.Object)
// ReplenishmentControllerOptions is an options struct that tells a factory
// how to configure a controller that can inform the quota system it should
// replenish quota
type ReplenishmentControllerOptions struct {
// The kind monitored for replenishment
GroupKind unversioned.GroupKind
// The period that should be used to re-sync the monitored resource
ResyncPeriod controller.ResyncPeriodFunc
// The function to invoke when a change is observed that should trigger
// replenishment
ReplenishmentFunc ReplenishmentFunc
}
// PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not
func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) {
return func(oldObj, newObj interface{}) {
oldPod := oldObj.(*api.Pod)
newPod := newObj.(*api.Pod)
if core.QuotaPod(oldPod) && !core.QuotaPod(newPod) {
options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, newPod)
}
}
}
// ObjectReplenenishmentDeleteFunc will replenish on every delete
func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func(obj interface{}) {
return func(obj interface{}) {
metaObject, err := meta.Accessor(obj)
if err != nil {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished", obj, options.ResyncPeriod())
utilruntime.HandleError(err)
return
}
metaObject, err = meta.Accessor(tombstone.Obj)
if err != nil {
glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished", tombstone.Obj, options.ResyncPeriod())
utilruntime.HandleError(err)
return
}
}
options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil)
}
}
// ReplenishmentControllerFactory knows how to build replenishment controllers
type ReplenishmentControllerFactory interface {
// NewController returns a controller configured with the specified options
NewController(options *ReplenishmentControllerOptions) (*framework.Controller, error)
}
// replenishmentControllerFactory implements ReplenishmentControllerFactory
type replenishmentControllerFactory struct {
kubeClient clientset.Interface
}
// NewReplenishmentControllerFactory returns a factory that knows how to build controllers
// to replenish resources when updated or deleted
func NewReplenishmentControllerFactory(kubeClient clientset.Interface) ReplenishmentControllerFactory {
return &replenishmentControllerFactory{
kubeClient: kubeClient,
}
}
func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (*framework.Controller, error) {
var result *framework.Controller
switch options.GroupKind {
case api.Kind("Pod"):
_, result = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
options.ResyncPeriod(),
framework.ResourceEventHandlerFuncs{
UpdateFunc: PodReplenishmentUpdateFunc(options),
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("Service"):
_, result = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().Services(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().Services(api.NamespaceAll).Watch(options)
},
},
&api.Service{},
options.ResyncPeriod(),
framework.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("ReplicationController"):
_, result = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().ReplicationControllers(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().ReplicationControllers(api.NamespaceAll).Watch(options)
},
},
&api.ReplicationController{},
options.ResyncPeriod(),
framework.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("PersistentVolumeClaim"):
_, result = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
options.ResyncPeriod(),
framework.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
case api.Kind("Secret"):
_, result = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return r.kubeClient.Core().Secrets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return r.kubeClient.Core().Secrets(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
options.ResyncPeriod(),
framework.ResourceEventHandlerFuncs{
DeleteFunc: ObjectReplenishmentDeleteFunc(options),
},
)
default:
return nil, fmt.Errorf("no replenishment controller available for %s", options.GroupKind)
}
return result, nil
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourcequota
import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
)
// testReplenishment lets us test replenishment functions are invoked
type testReplenishment struct {
groupKind unversioned.GroupKind
namespace string
}
// mock function that holds onto the last kind that was replenished
func (t *testReplenishment) Replenish(groupKind unversioned.GroupKind, namespace string, object runtime.Object) {
t.groupKind = groupKind
t.namespace = namespace
}
func TestPodReplenishmentUpdateFunc(t *testing.T) {
mockReplenish := &testReplenishment{}
options := ReplenishmentControllerOptions{
GroupKind: api.Kind("Pod"),
ReplenishmentFunc: mockReplenish.Replenish,
ResyncPeriod: controller.NoResyncPeriodFunc,
}
oldPod := &api.Pod{
ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "pod"},
Status: api.PodStatus{Phase: api.PodRunning},
}
newPod := &api.Pod{
ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "pod"},
Status: api.PodStatus{Phase: api.PodFailed},
}
updateFunc := PodReplenishmentUpdateFunc(&options)
updateFunc(oldPod, newPod)
if mockReplenish.groupKind != api.Kind("Pod") {
t.Errorf("Unexpected group kind %v", mockReplenish.groupKind)
}
if mockReplenish.namespace != oldPod.Namespace {
t.Errorf("Unexpected namespace %v", mockReplenish.namespace)
}
}
func TestObjectReplenishmentDeleteFunc(t *testing.T) {
mockReplenish := &testReplenishment{}
options := ReplenishmentControllerOptions{
GroupKind: api.Kind("Pod"),
ReplenishmentFunc: mockReplenish.Replenish,
ResyncPeriod: controller.NoResyncPeriodFunc,
}
oldPod := &api.Pod{
ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "pod"},
Status: api.PodStatus{Phase: api.PodRunning},
}
deleteFunc := ObjectReplenishmentDeleteFunc(&options)
deleteFunc(oldPod)
if mockReplenish.groupKind != api.Kind("Pod") {
t.Errorf("Unexpected group kind %v", mockReplenish.groupKind)
}
if mockReplenish.namespace != oldPod.Namespace {
t.Errorf("Unexpected namespace %v", mockReplenish.namespace)
}
}

View File

@ -17,16 +17,17 @@ limitations under the License.
package resourcequota
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@ -34,6 +35,21 @@ import (
"k8s.io/kubernetes/pkg/watch"
)
// ResourceQuotaControllerOptions holds options for creating a quota controller
type ResourceQuotaControllerOptions struct {
// Must have authority to list all quotas, and update quota status
KubeClient clientset.Interface
// Controls full recalculation of quota usage
ResyncPeriod controller.ResyncPeriodFunc
// Knows how to calculate usage
Registry quota.Registry
// Knows how to build controllers that notify replenishment events
ControllerFactory ReplenishmentControllerFactory
// List of GroupKind objects that should be monitored for replenishment at
// a faster frequency than the quota controller recalculation interval
GroupKindsToReplenish []unversioned.GroupKind
}
// ResourceQuotaController is responsible for tracking quota usage status in the system
type ResourceQuotaController struct {
// Must have authority to list all resources in the system, and update quota status
@ -42,27 +58,32 @@ type ResourceQuotaController struct {
rqIndexer cache.Indexer
// Watches changes to all resource quota
rqController *framework.Controller
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
// Watches changes to all pods (so we can optimize release of compute resources)
podController *framework.Controller
// ResourceQuota objects that need to be synchronized
queue *workqueue.Type
// To allow injection of syncUsage for testing.
syncHandler func(key string) error
// function that controls full recalculation of quota usage
resyncPeriod controller.ResyncPeriodFunc
// knows how to calculate usage
registry quota.Registry
// controllers monitoring to notify for replenishment
replenishmentControllers []*framework.Controller
}
// NewResourceQuotaController creates a new ResourceQuotaController
func NewResourceQuotaController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *ResourceQuotaController {
func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController {
// build the resource quota controller
rq := &ResourceQuotaController{
kubeClient: kubeClient,
queue: workqueue.New(),
resyncPeriod: resyncPeriod,
kubeClient: options.KubeClient,
queue: workqueue.New(),
resyncPeriod: options.ResyncPeriod,
registry: options.Registry,
replenishmentControllers: []*framework.Controller{},
}
// set the synchronization handler
rq.syncHandler = rq.syncResourceQuotaFromKey
// build the controller that observes quota
rq.rqIndexer, rq.rqController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -73,7 +94,7 @@ func NewResourceQuotaController(kubeClient clientset.Interface, resyncPeriod con
},
},
&api.ResourceQuota{},
resyncPeriod(),
rq.resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: rq.enqueueResourceQuota,
UpdateFunc: func(old, cur interface{}) {
@ -87,10 +108,9 @@ func NewResourceQuotaController(kubeClient clientset.Interface, resyncPeriod con
// responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
oldResourceQuota := old.(*api.ResourceQuota)
curResourceQuota := cur.(*api.ResourceQuota)
if api.Semantic.DeepEqual(oldResourceQuota.Spec.Hard, curResourceQuota.Status.Hard) {
if quota.Equals(curResourceQuota.Spec.Hard, oldResourceQuota.Spec.Hard) {
return
}
glog.V(4).Infof("Observed updated quota spec for %v/%v", curResourceQuota.Namespace, curResourceQuota.Name)
rq.enqueueResourceQuota(curResourceQuota)
},
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
@ -101,26 +121,19 @@ func NewResourceQuotaController(kubeClient clientset.Interface, resyncPeriod con
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
)
// We use this pod controller to rapidly observe when a pod deletion occurs in order to
// release compute resources from any associated quota.
rq.podStore.Store, rq.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rq.kubeClient.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return rq.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
DeleteFunc: rq.deletePod,
},
)
// set the synchronization handler
rq.syncHandler = rq.syncResourceQuotaFromKey
for _, groupKindToReplenish := range options.GroupKindsToReplenish {
controllerOptions := &ReplenishmentControllerOptions{
GroupKind: groupKindToReplenish,
ResyncPeriod: options.ResyncPeriod,
ReplenishmentFunc: rq.replenishQuota,
}
replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
if err != nil {
glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err)
} else {
rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController)
}
}
return rq
}
@ -155,6 +168,7 @@ func (rq *ResourceQuotaController) worker() {
err := rq.syncHandler(key.(string))
if err != nil {
utilruntime.HandleError(err)
rq.queue.Add(key)
}
}()
}
@ -164,45 +178,21 @@ func (rq *ResourceQuotaController) worker() {
func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go rq.rqController.Run(stopCh)
go rq.podController.Run(stopCh)
// the controllers that replenish other resources to respond rapidly to state changes
for _, replenishmentController := range rq.replenishmentControllers {
go replenishmentController.Run(stopCh)
}
// the workers that chug through the quota calculation backlog
for i := 0; i < workers; i++ {
go wait.Until(rq.worker, time.Second, stopCh)
}
// the timer for how often we do a full recalculation across all quotas
go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
<-stopCh
glog.Infof("Shutting down ResourceQuotaController")
rq.queue.ShutDown()
}
// FilterQuotaPods eliminates pods that no longer have a cost against the quota
// pods that have a restart policy of always are always returned
// pods that are in a failed state, but have a restart policy of on failure are always returned
// pods that are not in a success state or a failure state are included in quota
func FilterQuotaPods(pods []api.Pod) []*api.Pod {
var result []*api.Pod
for i := range pods {
value := &pods[i]
// a pod that has a restart policy always no matter its state counts against usage
if value.Spec.RestartPolicy == api.RestartPolicyAlways {
result = append(result, value)
continue
}
// a failed pod with a restart policy of on failure will count against usage
if api.PodFailed == value.Status.Phase &&
value.Spec.RestartPolicy == api.RestartPolicyOnFailure {
result = append(result, value)
continue
}
// if the pod is not succeeded or failed, then we count it against quota
if api.PodSucceeded != value.Status.Phase &&
api.PodFailed != value.Status.Phase {
result = append(result, value)
continue
}
}
return result
}
// syncResourceQuotaFromKey syncs a quota key
func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) {
startTime := time.Now()
@ -224,115 +214,68 @@ func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err err
return rq.syncResourceQuota(quota)
}
// syncResourceQuota runs a complete sync of current status
func (rq *ResourceQuotaController) syncResourceQuota(quota api.ResourceQuota) (err error) {
// syncResourceQuota runs a complete sync of resource quota status across all known kinds
func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota api.ResourceQuota) (err error) {
// quota is dirty if any part of spec hard limits differs from the status hard limits
dirty := !api.Semantic.DeepEqual(quota.Spec.Hard, quota.Status.Hard)
dirty := !api.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
// dirty tracks if the usage status differs from the previous sync,
// if so, we send a new usage with latest status
// if this is our first sync, it will be dirty by default, since we need track usage
dirty = dirty || (quota.Status.Hard == nil || quota.Status.Used == nil)
dirty = dirty || (resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil)
// Create a usage object that is based on the quota resource version
// Create a usage object that is based on the quota resource version that will handle updates
// by default, we preserve the past usage observation, and set hard to the current spec
previousUsed := api.ResourceList{}
if resourceQuota.Status.Used != nil {
previousUsed = quota.Add(api.ResourceList{}, resourceQuota.Status.Used)
}
usage := api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Name: quota.Name,
Namespace: quota.Namespace,
ResourceVersion: quota.ResourceVersion,
Labels: quota.Labels,
Annotations: quota.Annotations},
Name: resourceQuota.Name,
Namespace: resourceQuota.Namespace,
ResourceVersion: resourceQuota.ResourceVersion,
Labels: resourceQuota.Labels,
Annotations: resourceQuota.Annotations},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
Hard: quota.Add(api.ResourceList{}, resourceQuota.Spec.Hard),
Used: previousUsed,
},
}
// set the hard values supported on the quota
for k, v := range quota.Spec.Hard {
usage.Status.Hard[k] = *v.Copy()
}
// set any last known observed status values for usage
for k, v := range quota.Status.Used {
usage.Status.Used[k] = *v.Copy()
// find the intersection between the hard resources on the quota
// and the resources this controller can track to know what we can
// look to measure updated usage stats for
hardResources := quota.ResourceNames(usage.Status.Hard)
potentialResources := []api.ResourceName{}
evaluators := rq.registry.Evaluators()
for _, evaluator := range evaluators {
potentialResources = append(potentialResources, evaluator.MatchesResources()...)
}
matchedResources := quota.Intersection(hardResources, potentialResources)
set := map[api.ResourceName]bool{}
for k := range usage.Status.Hard {
set[k] = true
}
pods := &api.PodList{}
if set[api.ResourcePods] || set[api.ResourceMemory] || set[api.ResourceCPU] {
pods, err = rq.kubeClient.Core().Pods(usage.Namespace).List(api.ListOptions{})
// sum the observed usage from each evaluator
newUsage := api.ResourceList{}
usageStatsOptions := quota.UsageStatsOptions{Namespace: resourceQuota.Namespace, Scopes: resourceQuota.Spec.Scopes}
for _, evaluator := range evaluators {
stats, err := evaluator.UsageStats(usageStatsOptions)
if err != nil {
return err
}
newUsage = quota.Add(newUsage, stats.Used)
}
filteredPods := FilterQuotaPods(pods.Items)
// iterate over each resource, and update observation
for k := range usage.Status.Hard {
// look if there is a used value, if none, we are definitely dirty
prevQuantity, found := usage.Status.Used[k]
if !found {
dirty = true
}
var value *resource.Quantity
switch k {
case api.ResourcePods:
value = resource.NewQuantity(int64(len(filteredPods)), resource.DecimalSI)
case api.ResourceServices:
items, err := rq.kubeClient.Core().Services(usage.Namespace).List(api.ListOptions{})
if err != nil {
return err
}
value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI)
case api.ResourceReplicationControllers:
items, err := rq.kubeClient.Core().ReplicationControllers(usage.Namespace).List(api.ListOptions{})
if err != nil {
return err
}
value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI)
case api.ResourceQuotas:
items, err := rq.kubeClient.Core().ResourceQuotas(usage.Namespace).List(api.ListOptions{})
if err != nil {
return err
}
value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI)
case api.ResourceSecrets:
items, err := rq.kubeClient.Core().Secrets(usage.Namespace).List(api.ListOptions{})
if err != nil {
return err
}
value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI)
case api.ResourcePersistentVolumeClaims:
items, err := rq.kubeClient.Core().PersistentVolumeClaims(usage.Namespace).List(api.ListOptions{})
if err != nil {
return err
}
value = resource.NewQuantity(int64(len(items.Items)), resource.DecimalSI)
case api.ResourceMemory:
value = PodsRequests(filteredPods, api.ResourceMemory)
case api.ResourceCPU:
value = PodsRequests(filteredPods, api.ResourceCPU)
}
// ignore fields we do not understand (assume another controller is tracking it)
if value != nil {
// see if the value has changed
dirty = dirty || (value.Value() != prevQuantity.Value())
// just update the value
usage.Status.Used[k] = *value
}
// mask the observed usage to only the set of resources tracked by this quota
// merge our observed usage with the quota usage status
// if the new usage is different than the last usage, we will need to do an update
newUsage = quota.Mask(newUsage, matchedResources)
for key, value := range newUsage {
usage.Status.Used[key] = value
}
// update the usage only if it changed
dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)
// there was a change observed by this controller that requires we update quota
if dirty {
_, err = rq.kubeClient.Core().ResourceQuotas(usage.Namespace).UpdateStatus(&usage)
return err
@ -340,102 +283,20 @@ func (rq *ResourceQuotaController) syncResourceQuota(quota api.ResourceQuota) (e
return nil
}
// PodsRequests returns sum of each resource request for each pod in list
// If a given pod in the list does not have a request for the named resource, we log the error
// but still attempt to get the most representative count
func PodsRequests(pods []*api.Pod, resourceName api.ResourceName) *resource.Quantity {
var sum *resource.Quantity
for i := range pods {
pod := pods[i]
podQuantity, err := PodRequests(pod, resourceName)
if err != nil {
// log the error, but try to keep the most accurate count possible in log
// rationale here is that you may have had pods in a namespace that did not have
// explicit requests prior to adding the quota
glog.Infof("No explicit request for resource, pod %s/%s, %s", pod.Namespace, pod.Name, resourceName)
} else {
if sum == nil {
sum = podQuantity
} else {
sum.Add(*podQuantity)
}
}
}
// if list is empty
if sum == nil {
q := resource.MustParse("0")
sum = &q
}
return sum
}
// PodRequests returns sum of each resource request across all containers in pod
func PodRequests(pod *api.Pod, resourceName api.ResourceName) (*resource.Quantity, error) {
if !PodHasRequests(pod, resourceName) {
return nil, fmt.Errorf("Each container in pod %s/%s does not have an explicit request for resource %s.", pod.Namespace, pod.Name, resourceName)
}
var sum *resource.Quantity
for j := range pod.Spec.Containers {
value, _ := pod.Spec.Containers[j].Resources.Requests[resourceName]
if sum == nil {
sum = value.Copy()
} else {
err := sum.Add(value)
if err != nil {
return sum, err
}
}
}
// if list is empty
if sum == nil {
q := resource.MustParse("0")
sum = &q
}
return sum, nil
}
// PodHasRequests verifies that each container in the pod has an explicit request that is non-zero for a named resource
func PodHasRequests(pod *api.Pod, resourceName api.ResourceName) bool {
for j := range pod.Spec.Containers {
value, valueSet := pod.Spec.Containers[j].Resources.Requests[resourceName]
if !valueSet || value.Value() == int64(0) {
return false
}
}
return true
}
// When a pod is deleted, enqueue the quota that manages the pod and update its expectations.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (rq *ResourceQuotaController) deletePod(obj interface{}) {
pod, ok := obj.(*api.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new rc will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a quota records the deletion", obj, rq.resyncPeriod())
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before quota records the deletion", obj, rq.resyncPeriod())
return
}
}
quotas, err := rq.rqIndexer.Index("namespace", pod)
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
func (rq *ResourceQuotaController) replenishQuota(groupKind unversioned.GroupKind, namespace string, object runtime.Object) {
// TODO: make this support targeted replenishment to a specific kind, right now it does a full replenish
indexKey := &api.ResourceQuota{}
indexKey.Namespace = namespace
resourceQuotas, err := rq.rqIndexer.Index("namespace", indexKey)
if err != nil {
glog.Errorf("Couldn't find resource quota associated with pod %+v, could take up to %v before a quota records the deletion", obj, rq.resyncPeriod())
glog.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod())
}
if len(quotas) == 0 {
glog.V(4).Infof("No resource quota associated with namespace %q", pod.Namespace)
if len(resourceQuotas) == 0 {
return
}
for i := range quotas {
quota := quotas[i].(*api.ResourceQuota)
rq.enqueueResourceQuota(quota)
for i := range resourceQuotas {
resourceQuota := resourceQuotas[i].(*api.ResourceQuota)
rq.enqueueResourceQuota(resourceQuota)
}
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -17,15 +17,16 @@ limitations under the License.
package resourcequota
import (
"strconv"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -47,85 +48,11 @@ func getResourceRequirements(requests, limits api.ResourceList) api.ResourceRequ
return res
}
func validPod(name string, numContainers int, resources api.ResourceRequirements) *api.Pod {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: name, Namespace: "test"},
Spec: api.PodSpec{},
}
pod.Spec.Containers = make([]api.Container, 0, numContainers)
for i := 0; i < numContainers; i++ {
pod.Spec.Containers = append(pod.Spec.Containers, api.Container{
Image: "foo:V" + strconv.Itoa(i),
Resources: resources,
})
}
return pod
}
func TestFilterQuotaPods(t *testing.T) {
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{Name: "pod-running"},
Status: api.PodStatus{Phase: api.PodRunning},
},
{
ObjectMeta: api.ObjectMeta{Name: "pod-pending"},
Status: api.PodStatus{Phase: api.PodPending},
},
{
ObjectMeta: api.ObjectMeta{Name: "pod-succeeded"},
Status: api.PodStatus{Phase: api.PodSucceeded},
},
{
ObjectMeta: api.ObjectMeta{Name: "pod-unknown"},
Status: api.PodStatus{Phase: api.PodUnknown},
},
{
ObjectMeta: api.ObjectMeta{Name: "pod-failed"},
Status: api.PodStatus{Phase: api.PodFailed},
},
{
ObjectMeta: api.ObjectMeta{Name: "pod-failed-with-restart-always"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
},
Status: api.PodStatus{Phase: api.PodFailed},
},
{
ObjectMeta: api.ObjectMeta{Name: "pod-failed-with-restart-on-failure"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyOnFailure,
},
Status: api.PodStatus{Phase: api.PodFailed},
},
{
ObjectMeta: api.ObjectMeta{Name: "pod-failed-with-restart-never"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyNever,
},
Status: api.PodStatus{Phase: api.PodFailed},
},
}
expectedResults := sets.NewString("pod-running",
"pod-pending", "pod-unknown", "pod-failed-with-restart-always",
"pod-failed-with-restart-on-failure")
actualResults := sets.String{}
result := FilterQuotaPods(pods)
for i := range result {
actualResults.Insert(result[i].Name)
}
if len(expectedResults) != len(actualResults) || !actualResults.HasAll(expectedResults.List()...) {
t.Errorf("Expected results %v, Actual results %v", expectedResults, actualResults)
}
}
func TestSyncResourceQuota(t *testing.T) {
podList := api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{Name: "pod-running"},
ObjectMeta: api.ObjectMeta{Name: "pod-running", Namespace: "testing"},
Status: api.PodStatus{Phase: api.PodRunning},
Spec: api.PodSpec{
Volumes: []api.Volume{{Name: "vol"}},
@ -133,7 +60,7 @@ func TestSyncResourceQuota(t *testing.T) {
},
},
{
ObjectMeta: api.ObjectMeta{Name: "pod-running-2"},
ObjectMeta: api.ObjectMeta{Name: "pod-running-2", Namespace: "testing"},
Status: api.PodStatus{Phase: api.PodRunning},
Spec: api.PodSpec{
Volumes: []api.Volume{{Name: "vol"}},
@ -141,7 +68,7 @@ func TestSyncResourceQuota(t *testing.T) {
},
},
{
ObjectMeta: api.ObjectMeta{Name: "pod-failed"},
ObjectMeta: api.ObjectMeta{Name: "pod-failed", Namespace: "testing"},
Status: api.PodStatus{Phase: api.PodFailed},
Spec: api.PodSpec{
Volumes: []api.Volume{{Name: "vol"}},
@ -150,7 +77,8 @@ func TestSyncResourceQuota(t *testing.T) {
},
},
}
quota := api.ResourceQuota{
resourceQuota := api.ResourceQuota{
ObjectMeta: api.ObjectMeta{Name: "quota", Namespace: "testing"},
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("3"),
@ -174,15 +102,43 @@ func TestSyncResourceQuota(t *testing.T) {
},
}
kubeClient := fake.NewSimpleClientset(&podList, &quota)
ResourceQuotaController := NewResourceQuotaController(kubeClient, controller.StaticResyncPeriodFunc(time.Second))
err := ResourceQuotaController.syncResourceQuota(quota)
kubeClient := fake.NewSimpleClientset(&podList, &resourceQuota)
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
KubeClient: kubeClient,
ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: install.NewRegistry(kubeClient),
GroupKindsToReplenish: []unversioned.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
},
ControllerFactory: NewReplenishmentControllerFactory(kubeClient),
}
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
err := quotaController.syncResourceQuota(resourceQuota)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
expectedActionSet := sets.NewString(
strings.Join([]string{"list", "replicationcontrollers", ""}, "-"),
strings.Join([]string{"list", "services", ""}, "-"),
strings.Join([]string{"list", "pods", ""}, "-"),
strings.Join([]string{"list", "resourcequotas", ""}, "-"),
strings.Join([]string{"list", "secrets", ""}, "-"),
strings.Join([]string{"list", "persistentvolumeclaims", ""}, "-"),
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
)
actionSet := sets.NewString()
for _, action := range kubeClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-"))
}
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
}
usage := kubeClient.Actions()[1].(testclient.UpdateAction).GetObject().(*api.ResourceQuota)
lastActionIndex := len(kubeClient.Actions()) - 1
usage := kubeClient.Actions()[lastActionIndex].(testclient.UpdateAction).GetObject().(*api.ResourceQuota)
// ensure hard and used limits are what we expected
for k, v := range expectedUsage.Status.Hard {
@ -204,7 +160,7 @@ func TestSyncResourceQuota(t *testing.T) {
}
func TestSyncResourceQuotaSpecChange(t *testing.T) {
quota := api.ResourceQuota{
resourceQuota := api.ResourceQuota{
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
@ -231,15 +187,44 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) {
},
}
kubeClient := fake.NewSimpleClientset(&quota)
ResourceQuotaController := NewResourceQuotaController(kubeClient, controller.StaticResyncPeriodFunc(time.Second))
err := ResourceQuotaController.syncResourceQuota(quota)
kubeClient := fake.NewSimpleClientset(&resourceQuota)
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
KubeClient: kubeClient,
ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: install.NewRegistry(kubeClient),
GroupKindsToReplenish: []unversioned.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
},
ControllerFactory: NewReplenishmentControllerFactory(kubeClient),
}
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
err := quotaController.syncResourceQuota(resourceQuota)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
usage := kubeClient.Actions()[1].(testclient.UpdateAction).GetObject().(*api.ResourceQuota)
expectedActionSet := sets.NewString(
strings.Join([]string{"list", "replicationcontrollers", ""}, "-"),
strings.Join([]string{"list", "services", ""}, "-"),
strings.Join([]string{"list", "pods", ""}, "-"),
strings.Join([]string{"list", "resourcequotas", ""}, "-"),
strings.Join([]string{"list", "secrets", ""}, "-"),
strings.Join([]string{"list", "persistentvolumeclaims", ""}, "-"),
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
)
actionSet := sets.NewString()
for _, action := range kubeClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-"))
}
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
}
lastActionIndex := len(kubeClient.Actions()) - 1
usage := kubeClient.Actions()[lastActionIndex].(testclient.UpdateAction).GetObject().(*api.ResourceQuota)
// ensure hard and used limits are what we expected
for k, v := range expectedUsage.Status.Hard {
@ -262,7 +247,7 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) {
}
func TestSyncResourceQuotaNoChange(t *testing.T) {
quota := api.ResourceQuota{
resourceQuota := api.ResourceQuota{
Spec: api.ResourceQuotaSpec{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("4"),
@ -278,165 +263,37 @@ func TestSyncResourceQuotaNoChange(t *testing.T) {
},
}
kubeClient := fake.NewSimpleClientset(&api.PodList{}, &quota)
ResourceQuotaController := NewResourceQuotaController(kubeClient, controller.StaticResyncPeriodFunc(time.Second))
err := ResourceQuotaController.syncResourceQuota(quota)
kubeClient := fake.NewSimpleClientset(&api.PodList{}, &resourceQuota)
resourceQuotaControllerOptions := &ResourceQuotaControllerOptions{
KubeClient: kubeClient,
ResyncPeriod: controller.NoResyncPeriodFunc,
Registry: install.NewRegistry(kubeClient),
GroupKindsToReplenish: []unversioned.GroupKind{
api.Kind("Pod"),
api.Kind("Service"),
api.Kind("ReplicationController"),
api.Kind("PersistentVolumeClaim"),
},
ControllerFactory: NewReplenishmentControllerFactory(kubeClient),
}
quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)
err := quotaController.syncResourceQuota(resourceQuota)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 1 && !actions[0].Matches("list", "pods") {
t.Errorf("SyncResourceQuota made an unexpected client action when state was not dirty: %v", kubeClient.Actions)
}
}
func TestPodHasRequests(t *testing.T) {
type testCase struct {
pod *api.Pod
resourceName api.ResourceName
expectedResult bool
}
testCases := []testCase{
{
pod: validPod("request-cpu", 2, getResourceRequirements(getResourceList("100m", ""), getResourceList("", ""))),
resourceName: api.ResourceCPU,
expectedResult: true,
},
{
pod: validPod("no-request-cpu", 2, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))),
resourceName: api.ResourceCPU,
expectedResult: false,
},
{
pod: validPod("request-zero-cpu", 2, getResourceRequirements(getResourceList("0", ""), getResourceList("", ""))),
resourceName: api.ResourceCPU,
expectedResult: false,
},
{
pod: validPod("request-memory", 2, getResourceRequirements(getResourceList("", "2Mi"), getResourceList("", ""))),
resourceName: api.ResourceMemory,
expectedResult: true,
},
{
pod: validPod("no-request-memory", 2, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))),
resourceName: api.ResourceMemory,
expectedResult: false,
},
{
pod: validPod("request-zero-memory", 2, getResourceRequirements(getResourceList("", "0"), getResourceList("", ""))),
resourceName: api.ResourceMemory,
expectedResult: false,
},
}
for _, item := range testCases {
if actual := PodHasRequests(item.pod, item.resourceName); item.expectedResult != actual {
t.Errorf("Pod %s for resource %s expected %v actual %v", item.pod.Name, item.resourceName, item.expectedResult, actual)
}
}
}
func TestPodRequests(t *testing.T) {
type testCase struct {
pod *api.Pod
resourceName api.ResourceName
expectedResult string
expectedError bool
}
testCases := []testCase{
{
pod: validPod("request-cpu", 2, getResourceRequirements(getResourceList("100m", ""), getResourceList("", ""))),
resourceName: api.ResourceCPU,
expectedResult: "200m",
expectedError: false,
},
{
pod: validPod("no-request-cpu", 2, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))),
resourceName: api.ResourceCPU,
expectedResult: "",
expectedError: true,
},
{
pod: validPod("request-zero-cpu", 2, getResourceRequirements(getResourceList("0", ""), getResourceList("", ""))),
resourceName: api.ResourceCPU,
expectedResult: "",
expectedError: true,
},
{
pod: validPod("request-memory", 2, getResourceRequirements(getResourceList("", "500Mi"), getResourceList("", ""))),
resourceName: api.ResourceMemory,
expectedResult: "1000Mi",
expectedError: false,
},
{
pod: validPod("no-request-memory", 2, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))),
resourceName: api.ResourceMemory,
expectedResult: "",
expectedError: true,
},
{
pod: validPod("request-zero-memory", 2, getResourceRequirements(getResourceList("", "0"), getResourceList("", ""))),
resourceName: api.ResourceMemory,
expectedResult: "",
expectedError: true,
},
}
for _, item := range testCases {
actual, err := PodRequests(item.pod, item.resourceName)
if item.expectedError != (err != nil) {
t.Errorf("Unexpected error result for pod %s for resource %s expected error %v got %v", item.pod.Name, item.resourceName, item.expectedError, err)
}
if item.expectedResult != "" && (item.expectedResult != actual.String()) {
t.Errorf("Expected %s, Actual %s, pod %s for resource %s", item.expectedResult, actual.String(), item.pod.Name, item.resourceName)
}
}
}
func TestPodsRequests(t *testing.T) {
type testCase struct {
pods []*api.Pod
resourceName api.ResourceName
expectedResult string
}
testCases := []testCase{
{
pods: []*api.Pod{
validPod("request-cpu-1", 1, getResourceRequirements(getResourceList("100m", ""), getResourceList("", ""))),
validPod("request-cpu-2", 1, getResourceRequirements(getResourceList("1", ""), getResourceList("", ""))),
},
resourceName: api.ResourceCPU,
expectedResult: "1100m",
},
{
pods: []*api.Pod{
validPod("no-request-cpu-1", 1, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))),
validPod("no-request-cpu-2", 1, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))),
},
resourceName: api.ResourceCPU,
expectedResult: "",
},
{
pods: []*api.Pod{
validPod("request-zero-cpu-1", 1, getResourceRequirements(getResourceList("0", ""), getResourceList("", ""))),
validPod("request-zero-cpu-1", 1, getResourceRequirements(getResourceList("0", ""), getResourceList("", ""))),
},
resourceName: api.ResourceCPU,
expectedResult: "",
},
{
pods: []*api.Pod{
validPod("request-memory-1", 1, getResourceRequirements(getResourceList("", "500Mi"), getResourceList("", ""))),
validPod("request-memory-2", 1, getResourceRequirements(getResourceList("", "1Gi"), getResourceList("", ""))),
},
resourceName: api.ResourceMemory,
expectedResult: "1524Mi",
},
}
for _, item := range testCases {
actual := PodsRequests(item.pods, item.resourceName)
if item.expectedResult != "" && (item.expectedResult != actual.String()) {
t.Errorf("Expected %s, Actual %s, pod %s for resource %s", item.expectedResult, actual.String(), item.pods[0].Name, item.resourceName)
}
expectedActionSet := sets.NewString(
strings.Join([]string{"list", "replicationcontrollers", ""}, "-"),
strings.Join([]string{"list", "services", ""}, "-"),
strings.Join([]string{"list", "pods", ""}, "-"),
strings.Join([]string{"list", "resourcequotas", ""}, "-"),
strings.Join([]string{"list", "secrets", ""}, "-"),
strings.Join([]string{"list", "persistentvolumeclaims", ""}, "-"),
)
actionSet := sets.NewString()
for _, action := range kubeClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-"))
}
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
}
}

View File

@ -20,35 +20,61 @@ import (
"fmt"
"io"
"math/rand"
"strings"
"time"
"github.com/hashicorp/golang-lru"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
"k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/watch"
)
func init() {
admission.RegisterPlugin("ResourceQuota", func(client clientset.Interface, config io.Reader) (admission.Interface, error) {
return NewResourceQuota(client), nil
})
admission.RegisterPlugin("ResourceQuota",
func(client clientset.Interface, config io.Reader) (admission.Interface, error) {
registry := install.NewRegistry(client)
return NewResourceQuota(client, registry)
})
}
type quota struct {
// quotaAdmission implements an admission controller that can enforce quota constraints
type quotaAdmission struct {
*admission.Handler
client clientset.Interface
// must be able to read/write ResourceQuota
client clientset.Interface
// indexer that holds quota objects by namespace
indexer cache.Indexer
// registry that knows how to measure usage for objects
registry quota.Registry
// liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
// This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
// We track the lookup result here so that for repeated requests, we don't look it up very often.
liveLookupCache *lru.Cache
liveTTL time.Duration
}
// NewResourceQuota creates a new resource quota admission control handler
func NewResourceQuota(client clientset.Interface) admission.Interface {
type liveLookupEntry struct {
expiry time.Time
items []*api.ResourceQuota
}
// NewResourceQuota configures an admission controller that can enforce quota constraints
// using the provided registry. The registry must have the capability to handle group/kinds that
// are persisted by the server this admission controller is intercepting
func NewResourceQuota(client clientset.Interface, registry quota.Registry) (admission.Interface, error) {
liveLookupCache, err := lru.New(100)
if err != nil {
return nil, err
}
lw := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
@ -59,213 +85,214 @@ func NewResourceQuota(client clientset.Interface) admission.Interface {
}
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0)
reflector.Run()
return createResourceQuota(client, indexer)
return &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
indexer: indexer,
registry: registry,
liveLookupCache: liveLookupCache,
liveTTL: time.Duration(30 * time.Second),
}, nil
}
func createResourceQuota(client clientset.Interface, indexer cache.Indexer) admission.Interface {
return &quota{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: client,
indexer: indexer,
}
}
var resourceToResourceName = map[unversioned.GroupResource]api.ResourceName{
api.Resource("pods"): api.ResourcePods,
api.Resource("services"): api.ResourceServices,
api.Resource("replicationcontrollers"): api.ResourceReplicationControllers,
api.Resource("resourcequotas"): api.ResourceQuotas,
api.Resource("secrets"): api.ResourceSecrets,
api.Resource("persistentvolumeclaims"): api.ResourcePersistentVolumeClaims,
}
func (q *quota) Admit(a admission.Attributes) (err error) {
// Admit makes admission decisions while enforcing quota
func (q *quotaAdmission) Admit(a admission.Attributes) (err error) {
// ignore all operations that correspond to sub-resource actions
if a.GetSubresource() != "" {
return nil
}
if a.GetOperation() == "DELETE" {
// if we do not know how to evaluate use for this kind, just ignore
evaluators := q.registry.Evaluators()
evaluator, found := evaluators[a.GetKind()]
if !found {
return nil
}
key := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Namespace: a.GetNamespace(),
Name: "",
},
// for this kind, check if the operation could mutate any quota resources
// if no resources tracked by quota are impacted, then just return
op := a.GetOperation()
operationResources := evaluator.OperationResources(op)
if len(operationResources) == 0 {
return nil
}
// concurrent operations that modify quota tracked resources can cause a conflict when incrementing usage
// as a result, we will attempt to increment quota usage per request up to numRetries limit
// we fuzz each retry with an interval period to attempt to improve end-user experience during concurrent operations
numRetries := 10
interval := time.Duration(rand.Int63n(90)+int64(10)) * time.Millisecond
items, err := q.indexer.Index("namespace", key)
// determine if there are any quotas in this namespace
// if there are no quotas, we don't need to do anything
namespace, name := a.GetNamespace(), a.GetName()
items, err := q.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}})
if err != nil {
return admission.NewForbidden(a, fmt.Errorf("unable to %s %s at this time because there was an error enforcing quota", a.GetOperation(), a.GetResource()))
return admission.NewForbidden(a, fmt.Errorf("Error resolving quota."))
}
// if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it.
if len(items) == 0 {
lruItemObj, ok := q.liveLookupCache.Get(a.GetNamespace())
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
liveList, err := q.client.Core().ResourceQuotas(namespace).List(api.ListOptions{})
if err != nil {
return admission.NewForbidden(a, err)
}
newEntry := liveLookupEntry{expiry: time.Now().Add(q.liveTTL)}
for i := range liveList.Items {
newEntry.items = append(newEntry.items, &liveList.Items[i])
}
q.liveLookupCache.Add(a.GetNamespace(), newEntry)
lruItemObj = newEntry
}
lruEntry := lruItemObj.(liveLookupEntry)
for i := range lruEntry.items {
items = append(items, lruEntry.items[i])
}
}
// if there are still no items, we can return
if len(items) == 0 {
return nil
}
// find the set of quotas that are pertinent to this request
// reject if we match the quota, but usage is not calculated yet
// reject if the input object does not satisfy quota constraints
// if there are no pertinent quotas, we can just return
inputObject := a.GetObject()
resourceQuotas := []*api.ResourceQuota{}
for i := range items {
resourceQuota := items[i].(*api.ResourceQuota)
match := evaluator.Matches(resourceQuota, inputObject)
if !match {
continue
}
hardResources := quota.ResourceNames(resourceQuota.Status.Hard)
evaluatorResources := evaluator.MatchesResources()
requiredResources := quota.Intersection(hardResources, evaluatorResources)
err := evaluator.Constraints(requiredResources, inputObject)
if err != nil {
return admission.NewForbidden(a, fmt.Errorf("Failed quota: %s: %v", resourceQuota.Name, err))
}
if !hasUsageStats(resourceQuota) {
return admission.NewForbidden(a, fmt.Errorf("Status unknown for quota: %s", resourceQuota.Name))
}
resourceQuotas = append(resourceQuotas, resourceQuota)
}
if len(resourceQuotas) == 0 {
return nil
}
quota := items[i].(*api.ResourceQuota)
// there is at least one quota that definitely matches our object
// as a result, we need to measure the usage of this object for quota
// on updates, we need to subtract the previous measured usage
// if usage shows no change, just return since it has no impact on quota
deltaUsage := evaluator.Usage(inputObject)
if admission.Update == op {
prevItem, err := evaluator.Get(namespace, name)
if err != nil {
return admission.NewForbidden(a, fmt.Errorf("Unable to get previous: %v", err))
}
prevUsage := evaluator.Usage(prevItem)
deltaUsage = quota.Subtract(deltaUsage, prevUsage)
}
if quota.IsZero(deltaUsage) {
return nil
}
for retry := 1; retry <= numRetries; retry++ {
// TODO: Move to a bucketing work queue
// If we guaranteed that we processed the request in order it was received to server, we would reduce quota conflicts.
// Until we have the bucketing work queue, we jitter requests and retry on conflict.
numRetries := 10
interval := time.Duration(rand.Int63n(90)+int64(10)) * time.Millisecond
// we cannot modify the value directly in the cache, so we copy
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
for k, v := range quota.Status.Hard {
status.Hard[k] = *v.Copy()
}
for k, v := range quota.Status.Used {
status.Used[k] = *v.Copy()
// seed the retry loop with the initial set of quotas to process (should reduce each iteration)
resourceQuotasToProcess := resourceQuotas
for retry := 1; retry <= numRetries; retry++ {
// the list of quotas we will try again if there is a version conflict
tryAgain := []*api.ResourceQuota{}
// check that we pass all remaining quotas so we do not prematurely charge
// for each quota, mask the usage to the set of resources tracked by the quota
// if request + used < hard, return an error describing the failure
updatedUsage := map[string]api.ResourceList{}
for _, resourceQuota := range resourceQuotasToProcess {
hardResources := quota.ResourceNames(resourceQuota.Status.Hard)
requestedUsage := quota.Mask(deltaUsage, hardResources)
newUsage := quota.Add(resourceQuota.Status.Used, requestedUsage)
if allowed, exceeded := quota.LessThanOrEqual(newUsage, resourceQuota.Status.Hard); !allowed {
failedRequestedUsage := quota.Mask(requestedUsage, exceeded)
failedUsed := quota.Mask(resourceQuota.Status.Used, exceeded)
failedHard := quota.Mask(resourceQuota.Status.Hard, exceeded)
return admission.NewForbidden(a,
fmt.Errorf("Exceeded quota: %s, requested: %s, used: %s, limited: %s",
resourceQuota.Name,
prettyPrint(failedRequestedUsage),
prettyPrint(failedUsed),
prettyPrint(failedHard)))
}
updatedUsage[resourceQuota.Name] = newUsage
}
dirty, err := IncrementUsage(a, status, q.client)
// update the status for each quota with its new usage
// if we get a conflict, get updated quota, and enqueue
for _, resourceQuota := range resourceQuotasToProcess {
newUsage := updatedUsage[resourceQuota.Name]
quotaToUpdate := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Name: resourceQuota.Name,
Namespace: resourceQuota.Namespace,
ResourceVersion: resourceQuota.ResourceVersion,
},
Status: api.ResourceQuotaStatus{
Hard: quota.Add(api.ResourceList{}, resourceQuota.Status.Hard),
Used: newUsage,
},
}
_, err = q.client.Core().ResourceQuotas(quotaToUpdate.Namespace).UpdateStatus(quotaToUpdate)
if err != nil {
return admission.NewForbidden(a, err)
}
if dirty {
// construct a usage record
usage := api.ResourceQuota{
ObjectMeta: api.ObjectMeta{
Name: quota.Name,
Namespace: quota.Namespace,
ResourceVersion: quota.ResourceVersion,
Labels: quota.Labels,
Annotations: quota.Annotations},
if !errors.IsConflict(err) {
return admission.NewForbidden(a, fmt.Errorf("Unable to update quota status: %s %v", resourceQuota.Name, err))
}
usage.Status = *status
_, err = q.client.Core().ResourceQuotas(usage.Namespace).UpdateStatus(&usage)
if err == nil {
break
}
// we have concurrent requests to update quota, so look to retry if needed
if retry == numRetries {
return admission.NewForbidden(a, fmt.Errorf("unable to %s %s at this time because there are too many concurrent requests to increment quota", a.GetOperation(), a.GetResource()))
}
time.Sleep(interval)
// manually get the latest quota
quota, err = q.client.Core().ResourceQuotas(usage.Namespace).Get(quota.Name)
latestQuota, err := q.client.Core().ResourceQuotas(namespace).Get(resourceQuota.Name)
if err != nil {
return admission.NewForbidden(a, err)
return admission.NewForbidden(a, fmt.Errorf("Unable to get quota: %s %v", resourceQuota.Name, err))
}
tryAgain = append(tryAgain, latestQuota)
}
}
// all quotas were updated, so we can return
if len(tryAgain) == 0 {
return nil
}
// we have concurrent requests to update quota, so look to retry if needed
// next iteration, we need to process the items that have to try again
// pause the specified interval to encourage jitter
if retry == numRetries {
names := []string{}
for _, quota := range tryAgain {
names = append(names, quota.Name)
}
return admission.NewForbidden(a, fmt.Errorf("Unable to update status for quota: %s, ", strings.Join(names, ",")))
}
resourceQuotasToProcess = tryAgain
time.Sleep(interval)
}
return nil
}
// IncrementUsage updates the supplied ResourceQuotaStatus object based on the incoming operation
// Return true if the usage must be recorded prior to admitting the new resource
// Return an error if the operation should not pass admission control
func IncrementUsage(a admission.Attributes, status *api.ResourceQuotaStatus, client clientset.Interface) (bool, error) {
// on update, the only resource that can modify the value of a quota is pods
// so if your not a pod, we exit quickly
if a.GetOperation() == admission.Update && a.GetResource() != api.Resource("pods") {
return false, nil
// prettyPrint formats a resource list for usage in errors
func prettyPrint(item api.ResourceList) string {
parts := []string{}
for key, value := range item {
constraint := string(key) + "=" + value.String()
parts = append(parts, constraint)
}
var errs []error
dirty := true
set := map[api.ResourceName]bool{}
for k := range status.Hard {
set[k] = true
}
obj := a.GetObject()
// handle max counts for each kind of resource (pods, services, replicationControllers, etc.)
if a.GetOperation() == admission.Create {
resourceName := resourceToResourceName[a.GetResource()]
hard, hardFound := status.Hard[resourceName]
if hardFound {
used, usedFound := status.Used[resourceName]
if !usedFound {
return false, fmt.Errorf("quota usage stats are not yet known, unable to admit resource until an accurate count is completed.")
}
if used.Value() >= hard.Value() {
errs = append(errs, fmt.Errorf("limited to %s %s", hard.String(), resourceName))
dirty = false
} else {
status.Used[resourceName] = *resource.NewQuantity(used.Value()+int64(1), resource.DecimalSI)
}
}
}
if a.GetResource() == api.Resource("pods") {
for _, resourceName := range []api.ResourceName{api.ResourceMemory, api.ResourceCPU} {
// ignore tracking the resource if it's not in the quota document
if !set[resourceName] {
continue
}
hard, hardFound := status.Hard[resourceName]
if !hardFound {
continue
}
// if we do not yet know how much of the current resource is used, we cannot accept any request
used, usedFound := status.Used[resourceName]
if !usedFound {
return false, fmt.Errorf("unable to admit pod until quota usage stats are calculated.")
}
// the amount of resource being requested, or an error if it does not make a request that is tracked
pod := obj.(*api.Pod)
delta, err := resourcequotacontroller.PodRequests(pod, resourceName)
if err != nil {
return false, fmt.Errorf("%s is limited by quota, must make explicit request.", resourceName)
}
// if this operation is an update, we need to find the delta usage from the previous state
if a.GetOperation() == admission.Update {
oldPod, err := client.Core().Pods(a.GetNamespace()).Get(pod.Name)
if err != nil {
return false, err
}
// if the previous version of the resource made a resource request, we need to subtract the old request
// from the current to get the actual resource request delta. if the previous version of the pod
// made no request on the resource, then we get an err value. we ignore the err value, and delta
// will just be equal to the total resource request on the pod since there is nothing to subtract.
oldRequest, err := resourcequotacontroller.PodRequests(oldPod, resourceName)
if err == nil {
err = delta.Sub(*oldRequest)
if err != nil {
return false, err
}
}
}
newUsage := used.Copy()
newUsage.Add(*delta)
// make the most precise comparison possible
newUsageValue := newUsage.Value()
hardUsageValue := hard.Value()
if newUsageValue <= resource.MaxMilliValue && hardUsageValue <= resource.MaxMilliValue {
newUsageValue = newUsage.MilliValue()
hardUsageValue = hard.MilliValue()
}
if newUsageValue > hardUsageValue {
errs = append(errs, fmt.Errorf("%s quota is %s, current usage is %s, requesting %s.", resourceName, hard.String(), used.String(), delta.String()))
dirty = false
} else {
status.Used[resourceName] = *newUsage
}
}
}
return dirty, utilerrors.NewAggregate(errs)
return strings.Join(parts, ",")
}
// hasUsageStats returns true if for each hard constraint there is a value for its current usage
func hasUsageStats(resourceQuota *api.ResourceQuota) bool {
for resourceName := range resourceQuota.Status.Hard {
if _, found := resourceQuota.Status.Used[resourceName]; !found {
return false
}
}
return true
}

View File

@ -18,16 +18,20 @@ package resourcequota
import (
"strconv"
"strings"
"testing"
"time"
"github.com/hashicorp/golang-lru"
"k8s.io/kubernetes/pkg/admission"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/util/sets"
)
func getResourceList(cpu, memory string) api.ResourceList {
@ -63,386 +67,502 @@ func validPod(name string, numContainers int, resources api.ResourceRequirements
return pod
}
// TestAdmissionIgnoresDelete verifies that the admission controller ignores delete operations
func TestAdmissionIgnoresDelete(t *testing.T) {
kubeClient := fake.NewSimpleClientset()
handler, err := NewResourceQuota(kubeClient, install.NewRegistry(kubeClient))
if err != nil {
t.Errorf("Unexpected error %v", err)
}
namespace := "default"
handler := createResourceQuota(&fake.Clientset{}, nil)
err := handler.Admit(admission.NewAttributesRecord(nil, api.Kind("Pod"), namespace, "name", api.Resource("pods"), "", admission.Delete, nil))
err = handler.Admit(admission.NewAttributesRecord(nil, api.Kind("Pod"), namespace, "name", api.Resource("pods"), "", admission.Delete, nil))
if err != nil {
t.Errorf("ResourceQuota should admit all deletes: %v", err)
}
}
// TestAdmissionIgnoresSubresources verifies that the admission controller ignores subresources
// It verifies that creation of a pod that would have exceeded quota is properly failed
// It verifies that create operations to a subresource that would have exceeded quota would succeed
func TestAdmissionIgnoresSubresources(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := createResourceQuota(&fake.Clientset{}, indexer)
quota := &api.ResourceQuota{}
quota.Name = "quota"
quota.Namespace = "test"
quota.Status = api.ResourceQuotaStatus{
resourceQuota := &api.ResourceQuota{}
resourceQuota.Name = "quota"
resourceQuota.Namespace = "test"
resourceQuota.Status = api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
quota.Status.Hard[api.ResourceMemory] = resource.MustParse("2Gi")
quota.Status.Used[api.ResourceMemory] = resource.MustParse("1Gi")
indexer.Add(quota)
resourceQuota.Status.Hard[api.ResourceMemory] = resource.MustParse("2Gi")
resourceQuota.Status.Used[api.ResourceMemory] = resource.MustParse("1Gi")
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: kubeClient,
indexer: indexer,
registry: install.NewRegistry(kubeClient),
}
handler.indexer.Add(resourceQuota)
newPod := validPod("123", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
if err == nil {
t.Errorf("Expected an error because the pod exceeded allowed quota")
}
err = handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "subresource", admission.Create, nil))
if err != nil {
t.Errorf("Did not expect an error because the action went to a subresource: %v", err)
}
}
func TestIncrementUsagePodResources(t *testing.T) {
type testCase struct {
testName string
existing *api.Pod
input *api.Pod
resourceName api.ResourceName
hard resource.Quantity
expectedUsage resource.Quantity
expectedError bool
}
testCases := []testCase{
{
testName: "memory-allowed",
existing: validPod("a", 1, getResourceRequirements(getResourceList("", "100Mi"), getResourceList("", ""))),
input: validPod("b", 1, getResourceRequirements(getResourceList("", "100Mi"), getResourceList("", ""))),
resourceName: api.ResourceMemory,
hard: resource.MustParse("500Mi"),
expectedUsage: resource.MustParse("200Mi"),
expectedError: false,
},
{
testName: "memory-not-allowed",
existing: validPod("a", 1, getResourceRequirements(getResourceList("", "100Mi"), getResourceList("", ""))),
input: validPod("b", 1, getResourceRequirements(getResourceList("", "450Mi"), getResourceList("", ""))),
resourceName: api.ResourceMemory,
hard: resource.MustParse("500Mi"),
expectedError: true,
},
{
testName: "memory-not-allowed-with-different-format",
existing: validPod("a", 1, getResourceRequirements(getResourceList("", "100M"), getResourceList("", ""))),
input: validPod("b", 1, getResourceRequirements(getResourceList("", "450Mi"), getResourceList("", ""))),
resourceName: api.ResourceMemory,
hard: resource.MustParse("500Mi"),
expectedError: true,
},
{
testName: "memory-no-request",
existing: validPod("a", 1, getResourceRequirements(getResourceList("", "100Mi"), getResourceList("", ""))),
input: validPod("b", 1, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))),
resourceName: api.ResourceMemory,
hard: resource.MustParse("500Mi"),
expectedError: true,
},
{
testName: "cpu-allowed",
existing: validPod("a", 1, getResourceRequirements(getResourceList("1", ""), getResourceList("", ""))),
input: validPod("b", 1, getResourceRequirements(getResourceList("1", ""), getResourceList("", ""))),
resourceName: api.ResourceCPU,
hard: resource.MustParse("2"),
expectedUsage: resource.MustParse("2"),
expectedError: false,
},
{
testName: "cpu-not-allowed",
existing: validPod("a", 1, getResourceRequirements(getResourceList("1", ""), getResourceList("", ""))),
input: validPod("b", 1, getResourceRequirements(getResourceList("600m", ""), getResourceList("", ""))),
resourceName: api.ResourceCPU,
hard: resource.MustParse("1500m"),
expectedError: true,
},
{
testName: "cpu-no-request",
existing: validPod("a", 1, getResourceRequirements(getResourceList("1", ""), getResourceList("", ""))),
input: validPod("b", 1, getResourceRequirements(getResourceList("", ""), getResourceList("", ""))),
resourceName: api.ResourceCPU,
hard: resource.MustParse("1500m"),
expectedError: true,
// TestAdmitBelowQuotaLimit verifies that a pod when created has its usage reflected on the quota
func TestAdmitBelowQuotaLimit(t *testing.T) {
resourceQuota := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("3"),
api.ResourceMemory: resource.MustParse("100Gi"),
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("1"),
api.ResourceMemory: resource.MustParse("50Gi"),
api.ResourcePods: resource.MustParse("3"),
},
},
}
for _, item := range testCases {
podList := &api.PodList{Items: []api.Pod{*item.existing}}
client := fake.NewSimpleClientset(podList)
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
used, err := resourcequotacontroller.PodRequests(item.existing, item.resourceName)
if err != nil {
t.Errorf("Test %s, unexpected error %v", item.testName, err)
}
status.Hard[item.resourceName] = item.hard
status.Used[item.resourceName] = *used
dirty, err := IncrementUsage(admission.NewAttributesRecord(item.input, api.Kind("Pod"), item.input.Namespace, item.input.Name, api.Resource("pods"), "", admission.Create, nil), status, client)
if err == nil && item.expectedError {
t.Errorf("Test %s, expected error", item.testName)
}
if err != nil && !item.expectedError {
t.Errorf("Test %s, unexpected error", err)
}
if !item.expectedError {
if !dirty {
t.Errorf("Test %s, expected the quota to be dirty", item.testName)
}
quantity := status.Used[item.resourceName]
if quantity.String() != item.expectedUsage.String() {
t.Errorf("Test %s, expected usage %s, actual usage %s", item.testName, item.expectedUsage.String(), quantity.String())
}
}
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: kubeClient,
indexer: indexer,
registry: install.NewRegistry(kubeClient),
}
}
func TestIncrementUsagePods(t *testing.T) {
pod := validPod("123", 1, getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", "")))
podList := &api.PodList{Items: []api.Pod{*pod}}
client := fake.NewSimpleClientset(podList)
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
r := api.ResourcePods
status.Hard[r] = resource.MustParse("2")
status.Used[r] = resource.MustParse("1")
dirty, err := IncrementUsage(admission.NewAttributesRecord(&api.Pod{}, api.Kind("Pod"), pod.Namespace, "new-pod", api.Resource("pods"), "", admission.Create, nil), status, client)
handler.indexer.Add(resourceQuota)
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !dirty {
t.Errorf("Expected the status to get incremented, therefore should have been dirty")
if len(kubeClient.Actions()) == 0 {
t.Errorf("Expected a client action")
}
quantity := status.Used[r]
if quantity.Value() != int64(2) {
t.Errorf("Expected new item count to be 2, but was %s", quantity.String())
}
}
func TestExceedUsagePods(t *testing.T) {
pod := validPod("123", 1, getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", "")))
podList := &api.PodList{Items: []api.Pod{*pod}}
client := fake.NewSimpleClientset(podList)
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
expectedActionSet := sets.NewString(
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
)
actionSet := sets.NewString()
for _, action := range kubeClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-"))
}
r := api.ResourcePods
status.Hard[r] = resource.MustParse("1")
status.Used[r] = resource.MustParse("1")
_, err := IncrementUsage(admission.NewAttributesRecord(&api.Pod{}, api.Kind("Pod"), pod.Namespace, "name", api.Resource("pods"), "", admission.Create, nil), status, client)
if err == nil {
t.Errorf("Expected error because this would exceed your quota")
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
}
}
func TestIncrementUsageServices(t *testing.T) {
namespace := "default"
client := fake.NewSimpleClientset(&api.ServiceList{
Items: []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
lastActionIndex := len(kubeClient.Actions()) - 1
usage := kubeClient.Actions()[lastActionIndex].(testclient.UpdateAction).GetObject().(*api.ResourceQuota)
expectedUsage := api.ResourceQuota{
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("3"),
api.ResourceMemory: resource.MustParse("100Gi"),
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("1100m"),
api.ResourceMemory: resource.MustParse("52Gi"),
api.ResourcePods: resource.MustParse("4"),
},
},
})
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
r := api.ResourceServices
status.Hard[r] = resource.MustParse("2")
status.Used[r] = resource.MustParse("1")
dirty, err := IncrementUsage(admission.NewAttributesRecord(&api.Service{}, api.Kind("Service"), namespace, "name", api.Resource("services"), "", admission.Create, nil), status, client)
for k, v := range expectedUsage.Status.Used {
actual := usage.Status.Used[k]
actualValue := actual.String()
expectedValue := v.String()
if expectedValue != actualValue {
t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
}
}
}
// TestAdmitExceedQuotaLimit verifies that if a pod exceeded allowed usage that its rejected during admission.
func TestAdmitExceedQuotaLimit(t *testing.T) {
resourceQuota := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("3"),
api.ResourceMemory: resource.MustParse("100Gi"),
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("1"),
api.ResourceMemory: resource.MustParse("50Gi"),
api.ResourcePods: resource.MustParse("3"),
},
},
}
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: kubeClient,
indexer: indexer,
registry: install.NewRegistry(kubeClient),
}
handler.indexer.Add(resourceQuota)
newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("3", "2Gi"), getResourceList("", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
if err == nil {
t.Errorf("Expected an error exceeding quota")
}
}
// TestAdmitEnforceQuotaConstraints verifies that if a quota tracks a particular resource that that resource is
// specified on the pod. In this case, we create a quota that tracks cpu request, memory request, and memory limit.
// We ensure that a pod that does not specify a memory limit that it fails in admission.
func TestAdmitEnforceQuotaConstraints(t *testing.T) {
resourceQuota := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{Name: "quota", Namespace: "test", ResourceVersion: "124"},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("3"),
api.ResourceMemory: resource.MustParse("100Gi"),
api.ResourceLimitsMemory: resource.MustParse("200Gi"),
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("1"),
api.ResourceMemory: resource.MustParse("50Gi"),
api.ResourceLimitsMemory: resource.MustParse("100Gi"),
api.ResourcePods: resource.MustParse("3"),
},
},
}
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: kubeClient,
indexer: indexer,
registry: install.NewRegistry(kubeClient),
}
handler.indexer.Add(resourceQuota)
newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("200m", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
if err == nil {
t.Errorf("Expected an error because the pod does not specify a memory limit")
}
}
// TestAdmitPodInNamespaceWithoutQuota ensures that if a namespace has no quota, that a pod can get in
func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) {
resourceQuota := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{Name: "quota", Namespace: "other", ResourceVersion: "124"},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("3"),
api.ResourceMemory: resource.MustParse("100Gi"),
api.ResourceLimitsMemory: resource.MustParse("200Gi"),
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("1"),
api.ResourceMemory: resource.MustParse("50Gi"),
api.ResourceLimitsMemory: resource.MustParse("100Gi"),
api.ResourcePods: resource.MustParse("3"),
},
},
}
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
liveLookupCache, err := lru.New(100)
if err != nil {
t.Fatal(err)
}
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: kubeClient,
indexer: indexer,
registry: install.NewRegistry(kubeClient),
liveLookupCache: liveLookupCache,
}
// Add to the index
handler.indexer.Add(resourceQuota)
newPod := validPod("not-allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("200m", "")))
// Add to the lru cache so we do not do a live client lookup
liveLookupCache.Add(newPod.Namespace, liveLookupEntry{expiry: time.Now().Add(time.Duration(30 * time.Second)), items: []*api.ResourceQuota{}})
err = handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
if err != nil {
t.Errorf("Did not expect an error because the pod is in a different namespace than the quota")
}
}
// TestAdmitBelowTerminatingQuotaLimit ensures that terminating pods are charged to the right quota.
// It creates a terminating and non-terminating quota, and creates a terminating pod.
// It ensures that the terminating quota is incremented, and the non-terminating quota is not.
func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
resourceQuotaNonTerminating := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{Name: "quota-non-terminating", Namespace: "test", ResourceVersion: "124"},
Spec: api.ResourceQuotaSpec{
Scopes: []api.ResourceQuotaScope{api.ResourceQuotaScopeNotTerminating},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("3"),
api.ResourceMemory: resource.MustParse("100Gi"),
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("1"),
api.ResourceMemory: resource.MustParse("50Gi"),
api.ResourcePods: resource.MustParse("3"),
},
},
}
resourceQuotaTerminating := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{Name: "quota-terminating", Namespace: "test", ResourceVersion: "124"},
Spec: api.ResourceQuotaSpec{
Scopes: []api.ResourceQuotaScope{api.ResourceQuotaScopeTerminating},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("3"),
api.ResourceMemory: resource.MustParse("100Gi"),
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("1"),
api.ResourceMemory: resource.MustParse("50Gi"),
api.ResourcePods: resource.MustParse("3"),
},
},
}
kubeClient := fake.NewSimpleClientset(resourceQuotaTerminating, resourceQuotaNonTerminating)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: kubeClient,
indexer: indexer,
registry: install.NewRegistry(kubeClient),
}
handler.indexer.Add(resourceQuotaNonTerminating)
handler.indexer.Add(resourceQuotaTerminating)
// create a pod that has an active deadline
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "2Gi"), getResourceList("", "")))
activeDeadlineSeconds := int64(30)
newPod.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !dirty {
t.Errorf("Expected the status to get incremented, therefore should have been dirty")
if len(kubeClient.Actions()) == 0 {
t.Errorf("Expected a client action")
}
quantity := status.Used[r]
if quantity.Value() != int64(2) {
t.Errorf("Expected new item count to be 2, but was %s", quantity.String())
expectedActionSet := sets.NewString(
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
)
actionSet := sets.NewString()
for _, action := range kubeClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-"))
}
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
}
lastActionIndex := len(kubeClient.Actions()) - 1
usage := kubeClient.Actions()[lastActionIndex].(testclient.UpdateAction).GetObject().(*api.ResourceQuota)
// ensure only the quota-terminating was updated
if usage.Name != resourceQuotaTerminating.Name {
t.Errorf("Incremented the wrong quota, expected %v, actual %v", resourceQuotaTerminating.Name, usage.Name)
}
expectedUsage := api.ResourceQuota{
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceCPU: resource.MustParse("3"),
api.ResourceMemory: resource.MustParse("100Gi"),
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourceCPU: resource.MustParse("1100m"),
api.ResourceMemory: resource.MustParse("52Gi"),
api.ResourcePods: resource.MustParse("4"),
},
},
}
for k, v := range expectedUsage.Status.Used {
actual := usage.Status.Used[k]
actualValue := actual.String()
expectedValue := v.String()
if expectedValue != actualValue {
t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
}
}
}
func TestExceedUsageServices(t *testing.T) {
namespace := "default"
client := fake.NewSimpleClientset(&api.ServiceList{
Items: []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
// TestAdmitBelowBestEffortQuotaLimit creates a best effort and non-best effort quota.
// It verifies that best effort pods are properly scoped to the best effort quota document.
func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
resourceQuotaBestEffort := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{Name: "quota-besteffort", Namespace: "test", ResourceVersion: "124"},
Spec: api.ResourceQuotaSpec{
Scopes: []api.ResourceQuotaScope{api.ResourceQuotaScopeBestEffort},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourcePods: resource.MustParse("3"),
},
},
})
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
r := api.ResourceServices
status.Hard[r] = resource.MustParse("1")
status.Used[r] = resource.MustParse("1")
_, err := IncrementUsage(admission.NewAttributesRecord(&api.Service{}, api.Kind("Service"), namespace, "name", api.Resource("services"), "", admission.Create, nil), status, client)
if err == nil {
t.Errorf("Expected error because this would exceed usage")
resourceQuotaNotBestEffort := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{Name: "quota-not-besteffort", Namespace: "test", ResourceVersion: "124"},
Spec: api.ResourceQuotaSpec{
Scopes: []api.ResourceQuotaScope{api.ResourceQuotaScopeNotBestEffort},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourcePods: resource.MustParse("3"),
},
},
}
}
kubeClient := fake.NewSimpleClientset(resourceQuotaBestEffort, resourceQuotaNotBestEffort)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: kubeClient,
indexer: indexer,
registry: install.NewRegistry(kubeClient),
}
handler.indexer.Add(resourceQuotaBestEffort)
handler.indexer.Add(resourceQuotaNotBestEffort)
func TestIncrementUsageReplicationControllers(t *testing.T) {
namespace := "default"
client := fake.NewSimpleClientset(&api.ReplicationControllerList{
Items: []api.ReplicationController{
{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
},
},
})
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
r := api.ResourceReplicationControllers
status.Hard[r] = resource.MustParse("2")
status.Used[r] = resource.MustParse("1")
dirty, err := IncrementUsage(admission.NewAttributesRecord(&api.ReplicationController{}, api.Kind("ReplicationController"), namespace, "name", api.Resource("replicationcontrollers"), "", admission.Create, nil), status, client)
// create a pod that is best effort because it does not make a request for anything
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("", ""), getResourceList("", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !dirty {
t.Errorf("Expected the status to get incremented, therefore should have been dirty")
expectedActionSet := sets.NewString(
strings.Join([]string{"update", "resourcequotas", "status"}, "-"),
)
actionSet := sets.NewString()
for _, action := range kubeClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-"))
}
quantity := status.Used[r]
if quantity.Value() != int64(2) {
t.Errorf("Expected new item count to be 2, but was %s", quantity.String())
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
}
}
lastActionIndex := len(kubeClient.Actions()) - 1
usage := kubeClient.Actions()[lastActionIndex].(testclient.UpdateAction).GetObject().(*api.ResourceQuota)
func TestExceedUsageReplicationControllers(t *testing.T) {
namespace := "default"
client := fake.NewSimpleClientset(&api.ReplicationControllerList{
Items: []api.ReplicationController{
{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
if usage.Name != resourceQuotaBestEffort.Name {
t.Errorf("Incremented the wrong quota, expected %v, actual %v", resourceQuotaBestEffort.Name, usage.Name)
}
expectedUsage := api.ResourceQuota{
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourcePods: resource.MustParse("4"),
},
},
})
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
r := api.ResourceReplicationControllers
status.Hard[r] = resource.MustParse("1")
status.Used[r] = resource.MustParse("1")
_, err := IncrementUsage(admission.NewAttributesRecord(&api.ReplicationController{}, api.Kind("ReplicationController"), namespace, "name", api.Resource("replicationcontrollers"), "", admission.Create, nil), status, client)
if err == nil {
t.Errorf("Expected error for exceeding hard limits")
for k, v := range expectedUsage.Status.Used {
actual := usage.Status.Used[k]
actualValue := actual.String()
expectedValue := v.String()
if expectedValue != actualValue {
t.Errorf("Usage Used: Key: %v, Expected: %v, Actual: %v", k, expectedValue, actualValue)
}
}
}
func TestExceedUsageSecrets(t *testing.T) {
namespace := "default"
client := fake.NewSimpleClientset(&api.SecretList{
Items: []api.Secret{
{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
// TestAdmitBestEffortQuotaLimitIgnoresBurstable validates that a besteffort quota does not match a resource
// guaranteed pod.
func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
resourceQuota := &api.ResourceQuota{
ObjectMeta: api.ObjectMeta{Name: "quota-besteffort", Namespace: "test", ResourceVersion: "124"},
Spec: api.ResourceQuotaSpec{
Scopes: []api.ResourceQuotaScope{api.ResourceQuotaScopeBestEffort},
},
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourcePods: resource.MustParse("5"),
},
Used: api.ResourceList{
api.ResourcePods: resource.MustParse("3"),
},
},
})
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
r := api.ResourceSecrets
status.Hard[r] = resource.MustParse("1")
status.Used[r] = resource.MustParse("1")
_, err := IncrementUsage(admission.NewAttributesRecord(&api.Secret{}, api.Kind("Secret"), namespace, "name", api.Resource("secrets"), "", admission.Create, nil), status, client)
if err == nil {
t.Errorf("Expected error for exceeding hard limits")
kubeClient := fake.NewSimpleClientset(resourceQuota)
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
handler := &quotaAdmission{
Handler: admission.NewHandler(admission.Create, admission.Update),
client: kubeClient,
indexer: indexer,
registry: install.NewRegistry(kubeClient),
}
handler.indexer.Add(resourceQuota)
newPod := validPod("allowed-pod", 1, getResourceRequirements(getResourceList("100m", "1Gi"), getResourceList("", "")))
err := handler.Admit(admission.NewAttributesRecord(newPod, api.Kind("Pod"), newPod.Namespace, newPod.Name, api.Resource("pods"), "", admission.Create, nil))
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(kubeClient.Actions()) != 0 {
t.Errorf("Expected no client actions because the incoming pod did not match best effort quota")
}
}
func TestExceedUsagePersistentVolumeClaims(t *testing.T) {
namespace := "default"
client := fake.NewSimpleClientset(&api.PersistentVolumeClaimList{
Items: []api.PersistentVolumeClaim{
{
ObjectMeta: api.ObjectMeta{Name: "123", Namespace: namespace},
},
},
})
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
r := api.ResourcePersistentVolumeClaims
status.Hard[r] = resource.MustParse("1")
status.Used[r] = resource.MustParse("1")
_, err := IncrementUsage(admission.NewAttributesRecord(&api.PersistentVolumeClaim{}, api.Kind("PersistentVolumeClaim"), namespace, "name", api.Resource("persistentvolumeclaims"), "", admission.Create, nil), status, client)
if err == nil {
t.Errorf("Expected error for exceeding hard limits")
}
}
func TestIncrementUsageOnUpdateIgnoresNonPodResources(t *testing.T) {
testCase := []struct {
kind unversioned.GroupKind
resource unversioned.GroupResource
subresource string
object runtime.Object
func TestHasUsageStats(t *testing.T) {
testCases := map[string]struct {
a api.ResourceQuota
expected bool
}{
{
kind: api.Kind("Service"),
resource: api.Resource("services"),
object: &api.Service{},
"empty": {
a: api.ResourceQuota{Status: api.ResourceQuotaStatus{Hard: api.ResourceList{}}},
expected: true,
},
{
kind: api.Kind("ReplicationController"),
resource: api.Resource("replicationcontrollers"),
object: &api.ReplicationController{},
"hard-only": {
a: api.ResourceQuota{
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceMemory: resource.MustParse("1Gi"),
},
Used: api.ResourceList{},
},
},
expected: false,
},
{
kind: api.Kind("ResourceQuota"),
resource: api.Resource("resourcequotas"),
object: &api.ResourceQuota{},
},
{
kind: api.Kind("Secret"),
resource: api.Resource("secrets"),
object: &api.Secret{},
},
{
kind: api.Kind("PersistentVolumeClaim"),
resource: api.Resource("persistentvolumeclaims"),
object: &api.PersistentVolumeClaim{},
"hard-used": {
a: api.ResourceQuota{
Status: api.ResourceQuotaStatus{
Hard: api.ResourceList{
api.ResourceMemory: resource.MustParse("1Gi"),
},
Used: api.ResourceList{
api.ResourceMemory: resource.MustParse("500Mi"),
},
},
},
expected: true,
},
}
for _, testCase := range testCase {
client := fake.NewSimpleClientset()
status := &api.ResourceQuotaStatus{
Hard: api.ResourceList{},
Used: api.ResourceList{},
}
r := resourceToResourceName[testCase.resource]
status.Hard[r] = resource.MustParse("2")
status.Used[r] = resource.MustParse("1")
attributesRecord := admission.NewAttributesRecord(testCase.object, testCase.kind, "my-ns", "new-thing",
testCase.resource, testCase.subresource, admission.Update, nil)
dirty, err := IncrementUsage(attributesRecord, status, client)
if err != nil {
t.Errorf("Increment usage of resource %v had unexpected error: %v", testCase.resource, err)
}
if dirty {
t.Errorf("Increment usage of resource %v should not result in a dirty quota on update", testCase.resource)
for testName, testCase := range testCases {
if result := hasUsageStats(&testCase.a); result != testCase.expected {
t.Errorf("%s expected: %v, actual: %v", testName, testCase.expected, result)
}
}
}