Use storage directly for scale subresources

This commit is contained in:
Jordan Liggitt 2018-07-10 17:23:29 -04:00
parent 16c5ba4114
commit 3cb771a866
No known key found for this signature in database
GPG Key ID: 39928704103C7229
9 changed files with 97 additions and 62 deletions

View File

@ -70,7 +70,6 @@ import (
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/reconcilers" "k8s.io/kubernetes/pkg/master/reconcilers"
"k8s.io/kubernetes/pkg/master/tunneler" "k8s.io/kubernetes/pkg/master/tunneler"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
endpointsstorage "k8s.io/kubernetes/pkg/registry/core/endpoint/storage" endpointsstorage "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
"k8s.io/kubernetes/pkg/routes" "k8s.io/kubernetes/pkg/routes"
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
@ -228,9 +227,8 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
DeleteCollectionWorkers: 0, DeleteCollectionWorkers: 0,
ResourcePrefix: c.ExtraConfig.StorageFactory.ResourcePrefix(api.Resource("endpoints")), ResourcePrefix: c.ExtraConfig.StorageFactory.ResourcePrefix(api.Resource("endpoints")),
}) })
endpointRegistry := endpoint.NewRegistry(endpointsStorage)
masterLeases := reconcilers.NewLeases(leaseStorage, "/masterleases/", ttl) masterLeases := reconcilers.NewLeases(leaseStorage, "/masterleases/", ttl)
return reconcilers.NewLeaseEndpointReconciler(endpointRegistry, masterLeases) return reconcilers.NewLeaseEndpointReconciler(endpointsStorage.Store, masterLeases)
} }
func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler { func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {

View File

@ -29,6 +29,7 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime" kruntime "k8s.io/apimachinery/pkg/runtime"
@ -37,7 +38,6 @@ import (
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/kubernetes/pkg/api/endpoints" "k8s.io/kubernetes/pkg/api/endpoints"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
) )
// Leases is an interface which assists in managing the set of active masters // Leases is an interface which assists in managing the set of active masters
@ -119,16 +119,16 @@ func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duratio
} }
type leaseEndpointReconciler struct { type leaseEndpointReconciler struct {
endpointRegistry endpoint.Registry endpointStorage rest.StandardStorage
masterLeases Leases masterLeases Leases
stopReconcilingCalled bool stopReconcilingCalled bool
reconcilingLock sync.Mutex reconcilingLock sync.Mutex
} }
// NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases Leases) EndpointReconciler { func NewLeaseEndpointReconciler(endpointStorage rest.StandardStorage, masterLeases Leases) EndpointReconciler {
return &leaseEndpointReconciler{ return &leaseEndpointReconciler{
endpointRegistry: endpointRegistry, endpointStorage: endpointStorage,
masterLeases: masterLeases, masterLeases: masterLeases,
stopReconcilingCalled: false, stopReconcilingCalled: false,
} }
@ -163,7 +163,8 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
ctx := apirequest.NewDefaultContext() ctx := apirequest.NewDefaultContext()
// Retrieve the current list of endpoints... // Retrieve the current list of endpoints...
e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName, &metav1.GetOptions{}) var e *api.Endpoints
obj, err := r.endpointStorage.Get(ctx, serviceName, &metav1.GetOptions{})
if err != nil { if err != nil {
if !errors.IsNotFound(err) { if !errors.IsNotFound(err) {
return err return err
@ -175,6 +176,8 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
Namespace: api.NamespaceDefault, Namespace: api.NamespaceDefault,
}, },
} }
} else {
e = obj.(*api.Endpoints)
} }
// ... and the list of master IP keys from etcd // ... and the list of master IP keys from etcd
@ -221,7 +224,8 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
} }
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs) glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
return r.endpointRegistry.UpdateEndpoints(ctx, e, rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, &metav1.UpdateOptions{}) _, _, err = r.endpointStorage.Update(ctx, e.Name, rest.DefaultUpdatedObjectInfo(e), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
return err
} }
// checkEndpointSubsetFormatWithLease determines if the endpoint is in the // checkEndpointSubsetFormatWithLease determines if the endpoint is in the

View File

@ -55,12 +55,11 @@ type DeploymentStorage struct {
func NewStorage(optsGetter generic.RESTOptionsGetter) DeploymentStorage { func NewStorage(optsGetter generic.RESTOptionsGetter) DeploymentStorage {
deploymentRest, deploymentStatusRest, deploymentRollbackRest := NewREST(optsGetter) deploymentRest, deploymentStatusRest, deploymentRollbackRest := NewREST(optsGetter)
deploymentRegistry := deployment.NewRegistry(deploymentRest)
return DeploymentStorage{ return DeploymentStorage{
Deployment: deploymentRest, Deployment: deploymentRest,
Status: deploymentStatusRest, Status: deploymentStatusRest,
Scale: &ScaleREST{registry: deploymentRegistry}, Scale: &ScaleREST{store: deploymentRest.Store},
Rollback: deploymentRollbackRest, Rollback: deploymentRollbackRest,
} }
} }
@ -220,7 +219,7 @@ func (r *RollbackREST) setDeploymentRollback(ctx context.Context, deploymentID s
} }
type ScaleREST struct { type ScaleREST struct {
registry deployment.Registry store *genericregistry.Store
} }
// ScaleREST implements Patcher // ScaleREST implements Patcher
@ -246,10 +245,11 @@ func (r *ScaleREST) New() runtime.Object {
} }
func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
deployment, err := r.registry.GetDeployment(ctx, name, options) obj, err := r.store.Get(ctx, name, options)
if err != nil { if err != nil {
return nil, errors.NewNotFound(extensions.Resource("deployments/scale"), name) return nil, errors.NewNotFound(extensions.Resource("deployments/scale"), name)
} }
deployment := obj.(*extensions.Deployment)
scale, err := scaleFromDeployment(deployment) scale, err := scaleFromDeployment(deployment)
if err != nil { if err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("%v", err)) return nil, errors.NewBadRequest(fmt.Sprintf("%v", err))
@ -258,17 +258,18 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
} }
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
deployment, err := r.registry.GetDeployment(ctx, name, &metav1.GetOptions{}) obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil { if err != nil {
return nil, false, errors.NewNotFound(extensions.Resource("deployments/scale"), name) return nil, false, errors.NewNotFound(extensions.Resource("deployments/scale"), name)
} }
deployment := obj.(*extensions.Deployment)
oldScale, err := scaleFromDeployment(deployment) oldScale, err := scaleFromDeployment(deployment)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
obj, err := objInfo.UpdatedObject(ctx, oldScale) obj, err = objInfo.UpdatedObject(ctx, oldScale)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -286,10 +287,11 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
deployment.Spec.Replicas = scale.Spec.Replicas deployment.Spec.Replicas = scale.Spec.Replicas
deployment.ResourceVersion = scale.ResourceVersion deployment.ResourceVersion = scale.ResourceVersion
deployment, err = r.registry.UpdateDeployment(ctx, deployment, createValidation, updateValidation, options) obj, _, err = r.store.Update(ctx, deployment.Name, rest.DefaultUpdatedObjectInfo(deployment), createValidation, updateValidation, false, options)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
deployment = obj.(*extensions.Deployment)
newScale, err := scaleFromDeployment(deployment) newScale, err := scaleFromDeployment(deployment)
if err != nil { if err != nil {
return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err)) return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err))

View File

@ -51,12 +51,11 @@ type ReplicaSetStorage struct {
func NewStorage(optsGetter generic.RESTOptionsGetter) ReplicaSetStorage { func NewStorage(optsGetter generic.RESTOptionsGetter) ReplicaSetStorage {
replicaSetRest, replicaSetStatusRest := NewREST(optsGetter) replicaSetRest, replicaSetStatusRest := NewREST(optsGetter)
replicaSetRegistry := replicaset.NewRegistry(replicaSetRest)
return ReplicaSetStorage{ return ReplicaSetStorage{
ReplicaSet: replicaSetRest, ReplicaSet: replicaSetRest,
Status: replicaSetStatusRest, Status: replicaSetStatusRest,
Scale: &ScaleREST{registry: replicaSetRegistry}, Scale: &ScaleREST{store: replicaSetRest.Store},
} }
} }
@ -133,7 +132,7 @@ func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.Updat
} }
type ScaleREST struct { type ScaleREST struct {
registry replicaset.Registry store *genericregistry.Store
} }
// ScaleREST implements Patcher // ScaleREST implements Patcher
@ -159,10 +158,11 @@ func (r *ScaleREST) New() runtime.Object {
} }
func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
rs, err := r.registry.GetReplicaSet(ctx, name, options) obj, err := r.store.Get(ctx, name, options)
if err != nil { if err != nil {
return nil, errors.NewNotFound(extensions.Resource("replicasets/scale"), name) return nil, errors.NewNotFound(extensions.Resource("replicasets/scale"), name)
} }
rs := obj.(*extensions.ReplicaSet)
scale, err := scaleFromReplicaSet(rs) scale, err := scaleFromReplicaSet(rs)
if err != nil { if err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("%v", err)) return nil, errors.NewBadRequest(fmt.Sprintf("%v", err))
@ -171,10 +171,11 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
} }
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
rs, err := r.registry.GetReplicaSet(ctx, name, &metav1.GetOptions{}) obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil { if err != nil {
return nil, false, errors.NewNotFound(extensions.Resource("replicasets/scale"), name) return nil, false, errors.NewNotFound(extensions.Resource("replicasets/scale"), name)
} }
rs := obj.(*extensions.ReplicaSet)
oldScale, err := scaleFromReplicaSet(rs) oldScale, err := scaleFromReplicaSet(rs)
if err != nil { if err != nil {
@ -182,7 +183,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
} }
// TODO: should this pass admission? // TODO: should this pass admission?
obj, err := objInfo.UpdatedObject(ctx, oldScale) obj, err = objInfo.UpdatedObject(ctx, oldScale)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -200,10 +201,11 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
rs.Spec.Replicas = scale.Spec.Replicas rs.Spec.Replicas = scale.Spec.Replicas
rs.ResourceVersion = scale.ResourceVersion rs.ResourceVersion = scale.ResourceVersion
rs, err = r.registry.UpdateReplicaSet(ctx, rs, createValidation, updateValidation, options) obj, _, err = r.store.Update(ctx, rs.Name, rest.DefaultUpdatedObjectInfo(rs), createValidation, updateValidation, false, options)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
rs = obj.(*extensions.ReplicaSet)
newScale, err := scaleFromReplicaSet(rs) newScale, err := scaleFromReplicaSet(rs)
if err != nil { if err != nil {
return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err)) return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err))

View File

@ -49,12 +49,11 @@ type StatefulSetStorage struct {
func NewStorage(optsGetter generic.RESTOptionsGetter) StatefulSetStorage { func NewStorage(optsGetter generic.RESTOptionsGetter) StatefulSetStorage {
statefulSetRest, statefulSetStatusRest := NewREST(optsGetter) statefulSetRest, statefulSetStatusRest := NewREST(optsGetter)
statefulSetRegistry := statefulset.NewRegistry(statefulSetRest)
return StatefulSetStorage{ return StatefulSetStorage{
StatefulSet: statefulSetRest, StatefulSet: statefulSetRest,
Status: statefulSetStatusRest, Status: statefulSetStatusRest,
Scale: &ScaleREST{registry: statefulSetRegistry}, Scale: &ScaleREST{store: statefulSetRest.Store},
} }
} }
@ -124,7 +123,7 @@ func (r *REST) ShortNames() []string {
} }
type ScaleREST struct { type ScaleREST struct {
registry statefulset.Registry store *genericregistry.Store
} }
// ScaleREST implements Patcher // ScaleREST implements Patcher
@ -148,10 +147,11 @@ func (r *ScaleREST) New() runtime.Object {
} }
func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
ss, err := r.registry.GetStatefulSet(ctx, name, options) obj, err := r.store.Get(ctx, name, options)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ss := obj.(*apps.StatefulSet)
scale, err := scaleFromStatefulSet(ss) scale, err := scaleFromStatefulSet(ss)
if err != nil { if err != nil {
return nil, errors.NewBadRequest(fmt.Sprintf("%v", err)) return nil, errors.NewBadRequest(fmt.Sprintf("%v", err))
@ -160,17 +160,18 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
} }
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
ss, err := r.registry.GetStatefulSet(ctx, name, &metav1.GetOptions{}) obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
ss := obj.(*apps.StatefulSet)
oldScale, err := scaleFromStatefulSet(ss) oldScale, err := scaleFromStatefulSet(ss)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
obj, err := objInfo.UpdatedObject(ctx, oldScale) obj, err = objInfo.UpdatedObject(ctx, oldScale)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -188,10 +189,11 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
ss.Spec.Replicas = scale.Spec.Replicas ss.Spec.Replicas = scale.Spec.Replicas
ss.ResourceVersion = scale.ResourceVersion ss.ResourceVersion = scale.ResourceVersion
ss, err = r.registry.UpdateStatefulSet(ctx, ss, createValidation, updateValidation, options) obj, _, err = r.store.Update(ctx, ss.Name, rest.DefaultUpdatedObjectInfo(ss), createValidation, updateValidation, false, options)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
ss = obj.(*apps.StatefulSet)
newScale, err := scaleFromStatefulSet(ss) newScale, err := scaleFromStatefulSet(ss)
if err != nil { if err != nil {
return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err)) return nil, false, errors.NewBadRequest(fmt.Sprintf("%v", err))

View File

@ -50,12 +50,11 @@ type ControllerStorage struct {
func NewStorage(optsGetter generic.RESTOptionsGetter) ControllerStorage { func NewStorage(optsGetter generic.RESTOptionsGetter) ControllerStorage {
controllerREST, statusREST := NewREST(optsGetter) controllerREST, statusREST := NewREST(optsGetter)
controllerRegistry := replicationcontroller.NewRegistry(controllerREST)
return ControllerStorage{ return ControllerStorage{
Controller: controllerREST, Controller: controllerREST,
Status: statusREST, Status: statusREST,
Scale: &ScaleREST{registry: controllerRegistry}, Scale: &ScaleREST{store: controllerREST.Store},
} }
} }
@ -126,7 +125,7 @@ func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.Updat
} }
type ScaleREST struct { type ScaleREST struct {
registry replicationcontroller.Registry store *genericregistry.Store
} }
// ScaleREST implements Patcher // ScaleREST implements Patcher
@ -148,22 +147,24 @@ func (r *ScaleREST) New() runtime.Object {
} }
func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
rc, err := r.registry.GetController(ctx, name, options) obj, err := r.store.Get(ctx, name, options)
if err != nil { if err != nil {
return nil, errors.NewNotFound(autoscaling.Resource("replicationcontrollers/scale"), name) return nil, errors.NewNotFound(autoscaling.Resource("replicationcontrollers/scale"), name)
} }
rc := obj.(*api.ReplicationController)
return scaleFromRC(rc), nil return scaleFromRC(rc), nil
} }
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
rc, err := r.registry.GetController(ctx, name, &metav1.GetOptions{}) obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil { if err != nil {
return nil, false, errors.NewNotFound(autoscaling.Resource("replicationcontrollers/scale"), name) return nil, false, errors.NewNotFound(autoscaling.Resource("replicationcontrollers/scale"), name)
} }
rc := obj.(*api.ReplicationController)
oldScale := scaleFromRC(rc) oldScale := scaleFromRC(rc)
// TODO: should this pass validation? // TODO: should this pass validation?
obj, err := objInfo.UpdatedObject(ctx, oldScale) obj, err = objInfo.UpdatedObject(ctx, oldScale)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -182,10 +183,11 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
rc.Spec.Replicas = scale.Spec.Replicas rc.Spec.Replicas = scale.Spec.Replicas
rc.ResourceVersion = scale.ResourceVersion rc.ResourceVersion = scale.ResourceVersion
rc, err = r.registry.UpdateController(ctx, rc, createValidation, updateValidation, options) obj, _, err = r.store.Update(ctx, rc.Name, rest.DefaultUpdatedObjectInfo(rc), createValidation, updateValidation, false, options)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
rc = obj.(*api.ReplicationController)
return scaleFromRC(rc), false, nil return scaleFromRC(rc), false, nil
} }

View File

@ -25,12 +25,12 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/apis/autoscaling" "k8s.io/kubernetes/pkg/apis/autoscaling"
autoscalingvalidation "k8s.io/kubernetes/pkg/apis/autoscaling/validation" autoscalingvalidation "k8s.io/kubernetes/pkg/apis/autoscaling/validation"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/registry/core/replicationcontroller"
controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage" controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage"
) )
@ -43,16 +43,15 @@ type ContainerStorage struct {
func NewStorage(optsGetter generic.RESTOptionsGetter) ContainerStorage { func NewStorage(optsGetter generic.RESTOptionsGetter) ContainerStorage {
// scale does not set status, only updates spec so we ignore the status // scale does not set status, only updates spec so we ignore the status
controllerREST, _ := controllerstore.NewREST(optsGetter) controllerREST, _ := controllerstore.NewREST(optsGetter)
rcRegistry := replicationcontroller.NewRegistry(controllerREST)
return ContainerStorage{ return ContainerStorage{
ReplicationController: &RcREST{}, ReplicationController: &RcREST{},
Scale: &ScaleREST{registry: &rcRegistry}, Scale: &ScaleREST{store: controllerREST.Store},
} }
} }
type ScaleREST struct { type ScaleREST struct {
registry *replicationcontroller.Registry store *genericregistry.Store
} }
// ScaleREST implements Patcher // ScaleREST implements Patcher
@ -64,21 +63,23 @@ func (r *ScaleREST) New() runtime.Object {
} }
func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
rc, err := (*r.registry).GetController(ctx, name, options) obj, err := r.store.Get(ctx, name, options)
if err != nil { if err != nil {
return nil, errors.NewNotFound(extensions.Resource("replicationcontrollers/scale"), name) return nil, errors.NewNotFound(extensions.Resource("replicationcontrollers/scale"), name)
} }
rc := obj.(*api.ReplicationController)
return scaleFromRC(rc), nil return scaleFromRC(rc), nil
} }
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
rc, err := (*r.registry).GetController(ctx, name, &metav1.GetOptions{}) obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil { if err != nil {
return nil, false, errors.NewNotFound(extensions.Resource("replicationcontrollers/scale"), name) return nil, false, errors.NewNotFound(extensions.Resource("replicationcontrollers/scale"), name)
} }
rc := obj.(*api.ReplicationController)
oldScale := scaleFromRC(rc) oldScale := scaleFromRC(rc)
obj, err := objInfo.UpdatedObject(ctx, oldScale) obj, err = objInfo.UpdatedObject(ctx, oldScale)
if obj == nil { if obj == nil {
return nil, false, errors.NewBadRequest(fmt.Sprintf("nil update passed to Scale")) return nil, false, errors.NewBadRequest(fmt.Sprintf("nil update passed to Scale"))
@ -94,10 +95,11 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
rc.Spec.Replicas = scale.Spec.Replicas rc.Spec.Replicas = scale.Spec.Replicas
rc.ResourceVersion = scale.ResourceVersion rc.ResourceVersion = scale.ResourceVersion
rc, err = (*r.registry).UpdateController(ctx, rc, createValidation, updateValidation, options) obj, _, err = r.store.Update(ctx, rc.Name, rest.DefaultUpdatedObjectInfo(rc), createValidation, updateValidation, false, options)
if err != nil { if err != nil {
return nil, false, errors.NewConflict(extensions.Resource("replicationcontrollers/scale"), scale.Name, err) return nil, false, errors.NewConflict(extensions.Resource("replicationcontrollers/scale"), scale.Name, err)
} }
rc = obj.(*api.ReplicationController)
return scaleFromRC(rc), false, nil return scaleFromRC(rc), false, nil
} }

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
@ -38,7 +39,7 @@ type EndpointRegistry struct {
lock sync.Mutex lock sync.Mutex
} }
func (e *EndpointRegistry) ListEndpoints(ctx context.Context, options *metainternalversion.ListOptions) (*api.EndpointsList, error) { func (e *EndpointRegistry) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
// TODO: support namespaces in this mock // TODO: support namespaces in this mock
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
@ -46,7 +47,14 @@ func (e *EndpointRegistry) ListEndpoints(ctx context.Context, options *metainter
return e.Endpoints, e.Err return e.Endpoints, e.Err
} }
func (e *EndpointRegistry) GetEndpoints(ctx context.Context, name string, options *metav1.GetOptions) (*api.Endpoints, error) { func (e *EndpointRegistry) New() runtime.Object {
return &api.Endpoints{}
}
func (e *EndpointRegistry) NewList() runtime.Object {
return &api.EndpointsList{}
}
func (e *EndpointRegistry) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
// TODO: support namespaces in this mock // TODO: support namespaces in this mock
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
@ -63,11 +71,20 @@ func (e *EndpointRegistry) GetEndpoints(ctx context.Context, name string, option
return nil, errors.NewNotFound(api.Resource("endpoints"), name) return nil, errors.NewNotFound(api.Resource("endpoints"), name)
} }
func (e *EndpointRegistry) WatchEndpoints(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { func (e *EndpointRegistry) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
return nil, fmt.Errorf("unimplemented!") return nil, fmt.Errorf("unimplemented!")
} }
func (e *EndpointRegistry) UpdateEndpoints(ctx context.Context, endpoints *api.Endpoints, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, options *metav1.UpdateOptions) error { func (e *EndpointRegistry) Create(ctx context.Context, endpoints runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
return nil, fmt.Errorf("unimplemented!")
}
func (e *EndpointRegistry) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreateOnUpdate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
obj, err := objInfo.UpdatedObject(ctx, nil)
if err != nil {
return nil, false, err
}
endpoints := obj.(*api.Endpoints)
// TODO: support namespaces in this mock // TODO: support namespaces in this mock
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
@ -75,7 +92,7 @@ func (e *EndpointRegistry) UpdateEndpoints(ctx context.Context, endpoints *api.E
e.Updates = append(e.Updates, *endpoints) e.Updates = append(e.Updates, *endpoints)
if e.Err != nil { if e.Err != nil {
return e.Err return nil, false, e.Err
} }
if e.Endpoints == nil { if e.Endpoints == nil {
e.Endpoints = &api.EndpointsList{ e.Endpoints = &api.EndpointsList{
@ -83,7 +100,7 @@ func (e *EndpointRegistry) UpdateEndpoints(ctx context.Context, endpoints *api.E
*endpoints, *endpoints,
}, },
} }
return nil return endpoints, false, nil
} }
for ix := range e.Endpoints.Items { for ix := range e.Endpoints.Items {
if e.Endpoints.Items[ix].Name == endpoints.Name { if e.Endpoints.Items[ix].Name == endpoints.Name {
@ -91,15 +108,15 @@ func (e *EndpointRegistry) UpdateEndpoints(ctx context.Context, endpoints *api.E
} }
} }
e.Endpoints.Items = append(e.Endpoints.Items, *endpoints) e.Endpoints.Items = append(e.Endpoints.Items, *endpoints)
return nil return endpoints, false, nil
} }
func (e *EndpointRegistry) DeleteEndpoints(ctx context.Context, name string) error { func (e *EndpointRegistry) Delete(ctx context.Context, name string, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
// TODO: support namespaces in this mock // TODO: support namespaces in this mock
e.lock.Lock() e.lock.Lock()
defer e.lock.Unlock() defer e.lock.Unlock()
if e.Err != nil { if e.Err != nil {
return e.Err return nil, false, e.Err
} }
if e.Endpoints != nil { if e.Endpoints != nil {
var newList []api.Endpoints var newList []api.Endpoints
@ -110,5 +127,9 @@ func (e *EndpointRegistry) DeleteEndpoints(ctx context.Context, name string) err
} }
e.Endpoints.Items = newList e.Endpoints.Items = newList
} }
return nil return nil, true, nil
}
func (e *EndpointRegistry) DeleteCollection(ctx context.Context, _ *metav1.DeleteOptions, _ *metainternalversion.ListOptions) (runtime.Object, error) {
return nil, fmt.Errorf("unimplemented!")
} }

View File

@ -42,7 +42,6 @@ type CustomResourceStorage struct {
func NewStorage(resource schema.GroupResource, listKind schema.GroupVersionKind, strategy customResourceStrategy, optsGetter generic.RESTOptionsGetter, categories []string, tableConvertor rest.TableConvertor) CustomResourceStorage { func NewStorage(resource schema.GroupResource, listKind schema.GroupVersionKind, strategy customResourceStrategy, optsGetter generic.RESTOptionsGetter, categories []string, tableConvertor rest.TableConvertor) CustomResourceStorage {
customResourceREST, customResourceStatusREST := newREST(resource, listKind, strategy, optsGetter, categories, tableConvertor) customResourceREST, customResourceStatusREST := newREST(resource, listKind, strategy, optsGetter, categories, tableConvertor)
customResourceRegistry := NewRegistry(customResourceREST)
s := CustomResourceStorage{ s := CustomResourceStorage{
CustomResource: customResourceREST, CustomResource: customResourceREST,
@ -59,7 +58,7 @@ func NewStorage(resource schema.GroupResource, listKind schema.GroupVersionKind,
} }
s.Scale = &ScaleREST{ s.Scale = &ScaleREST{
registry: customResourceRegistry, store: customResourceREST.Store,
specReplicasPath: scale.SpecReplicasPath, specReplicasPath: scale.SpecReplicasPath,
statusReplicasPath: scale.StatusReplicasPath, statusReplicasPath: scale.StatusReplicasPath,
labelSelectorPath: labelSelectorPath, labelSelectorPath: labelSelectorPath,
@ -189,7 +188,7 @@ func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.Updat
} }
type ScaleREST struct { type ScaleREST struct {
registry Registry store *genericregistry.Store
specReplicasPath string specReplicasPath string
statusReplicasPath string statusReplicasPath string
labelSelectorPath string labelSelectorPath string
@ -209,10 +208,11 @@ func (r *ScaleREST) New() runtime.Object {
} }
func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
cr, err := r.registry.GetCustomResource(ctx, name, options) obj, err := r.store.Get(ctx, name, options)
if err != nil { if err != nil {
return nil, err return nil, err
} }
cr := obj.(*unstructured.Unstructured)
scaleObject, replicasFound, err := scaleFromCustomResource(cr, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath) scaleObject, replicasFound, err := scaleFromCustomResource(cr, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath)
if err != nil { if err != nil {
@ -225,10 +225,11 @@ func (r *ScaleREST) Get(ctx context.Context, name string, options *metav1.GetOpt
} }
func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
cr, err := r.registry.GetCustomResource(ctx, name, &metav1.GetOptions{}) obj, err := r.store.Get(ctx, name, &metav1.GetOptions{})
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
cr := obj.(*unstructured.Unstructured)
const invalidSpecReplicas = -2147483648 // smallest int32 const invalidSpecReplicas = -2147483648 // smallest int32
oldScale, replicasFound, err := scaleFromCustomResource(cr, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath) oldScale, replicasFound, err := scaleFromCustomResource(cr, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath)
@ -239,7 +240,7 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
oldScale.Spec.Replicas = invalidSpecReplicas // signal that this was not set before oldScale.Spec.Replicas = invalidSpecReplicas // signal that this was not set before
} }
obj, err := objInfo.UpdatedObject(ctx, oldScale) obj, err = objInfo.UpdatedObject(ctx, oldScale)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -262,10 +263,11 @@ func (r *ScaleREST) Update(ctx context.Context, name string, objInfo rest.Update
} }
cr.SetResourceVersion(scale.ResourceVersion) cr.SetResourceVersion(scale.ResourceVersion)
cr, err = r.registry.UpdateCustomResource(ctx, cr, createValidation, updateValidation, options) obj, _, err = r.store.Update(ctx, cr.GetName(), rest.DefaultUpdatedObjectInfo(cr), createValidation, updateValidation, false, options)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
cr = obj.(*unstructured.Unstructured)
newScale, _, err := scaleFromCustomResource(cr, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath) newScale, _, err := scaleFromCustomResource(cr, r.specReplicasPath, r.statusReplicasPath, r.labelSelectorPath)
if err != nil { if err != nil {