Svc REST: De-layer Update

This is the last layered method.  All allocator logic is moved to the
beginUpdate() path.  Removing the now-useless layer will happen in a
subsequent commit.
This commit is contained in:
Tim Hockin 2020-12-05 15:56:37 -08:00
parent 89a9ca52bc
commit ccf3376570
7 changed files with 138 additions and 183 deletions

View File

@ -45,6 +45,7 @@ import (
schedulinghelper "k8s.io/component-helpers/scheduling/corev1"
apiservice "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/apis/core"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
podshelper "k8s.io/kubernetes/pkg/apis/core/pods"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
@ -4528,6 +4529,18 @@ func validateServiceExternalTrafficFieldsValue(service *core.Service) field.Erro
return allErrs
}
func validateServiceExternalTrafficFieldsUpdate(before, after *api.Service) field.ErrorList {
allErrs := field.ErrorList{}
if apiservice.NeedsHealthCheck(before) && apiservice.NeedsHealthCheck(after) {
if after.Spec.HealthCheckNodePort != before.Spec.HealthCheckNodePort {
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec", "healthCheckNodePort"), "field is immutable"))
}
}
return allErrs
}
// validateServiceInternalTrafficFieldsValue validates InternalTraffic related
// spec have legal value.
func validateServiceInternalTrafficFieldsValue(service *core.Service) field.ErrorList {
@ -4592,6 +4605,8 @@ func ValidateServiceUpdate(service, oldService *core.Service) field.ErrorList {
upgradeDowngradeLoadBalancerClassErrs := validateLoadBalancerClassField(oldService, service)
allErrs = append(allErrs, upgradeDowngradeLoadBalancerClassErrs...)
allErrs = append(allErrs, validateServiceExternalTrafficFieldsUpdate(oldService, service)...)
return append(allErrs, ValidateService(service)...)
}

View File

@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/watch"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
apiservice "k8s.io/kubernetes/pkg/api/service"
@ -253,7 +252,6 @@ func (al *RESTAllocStuff) healthCheckNodePortUpdate(oldService, service *api.Ser
oldHealthCheckNodePort := oldService.Spec.HealthCheckNodePort
needsHealthCheckNodePort := apiservice.NeedsHealthCheck(service)
newHealthCheckNodePort := service.Spec.HealthCheckNodePort
switch {
// Case 1: Transition from don't need HealthCheckNodePort to needs HealthCheckNodePort.
@ -271,130 +269,88 @@ func (al *RESTAllocStuff) healthCheckNodePortUpdate(oldService, service *api.Ser
klog.Infof("Transition to non LoadBalancer type service or LoadBalancer type service with ExternalTrafficPolicy=Global")
klog.V(4).Infof("Releasing healthCheckNodePort: %d", oldHealthCheckNodePort)
nodePortOp.ReleaseDeferred(int(oldHealthCheckNodePort))
// Case 3: Remain in needs HealthCheckNodePort.
// Reject changing the value of the HealthCheckNodePort field.
case neededHealthCheckNodePort && needsHealthCheckNodePort:
if oldHealthCheckNodePort != newHealthCheckNodePort {
//FIXME: Let validation do this.
klog.Warningf("Attempt to change value of health check node port DENIED")
fldPath := field.NewPath("spec", "healthCheckNodePort")
el := field.ErrorList{field.Invalid(fldPath, newHealthCheckNodePort,
"cannot change healthCheckNodePort on loadBalancer service with externalTraffic=Local during update")}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
}
return true, nil
}
func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
oldObj, err := rs.services.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
// Support create on update, if forced to.
if forceAllowCreate {
obj, err := objInfo.UpdatedObject(ctx, nil)
if err != nil {
return nil, false, err
}
createdObj, err := rs.Create(ctx, obj, createValidation, &metav1.CreateOptions{DryRun: options.DryRun})
if err != nil {
return nil, false, err
}
return createdObj, true, nil
}
return nil, false, err
}
oldService := oldObj.(*api.Service)
obj, err := objInfo.UpdatedObject(ctx, oldService)
if err != nil {
return nil, false, err
return rs.services.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
}
func (al *RESTAllocStuff) allocateUpdate(service, oldService *api.Service, dryRun bool) (transaction, error) {
result := metaTransaction{}
// Ensure IP family fields are correctly initialized. We do it here, since
// we want this to be visible even when dryRun == true.
if err := al.initIPFamilyFields(oldService, service); err != nil {
return nil, err
}
service := obj.(*api.Service)
if !rest.ValidNamespace(ctx, &service.ObjectMeta) {
return nil, false, errors.NewConflict(api.Resource("services"), service.Namespace, fmt.Errorf("Service.Namespace does not match the provided context"))
}
// Copy over non-user fields
if err := rest.BeforeUpdate(rs.strategy, ctx, service, oldService); err != nil {
return nil, false, err
}
var allocated map[api.IPFamily]string
var toReleaseIPs map[api.IPFamily]string
performRelease := false // when set, any clusterIP that should be released will be released
// cleanup
// on failure: Any allocated ip must be released back
// on failure: any ip that should be released, will *not* be released
// on success: any ip that should be released, will be released
defer func() {
//FIXME: plumb dryRun down here
// release the allocated, this is expected to be cleared if the entire function ran to success
if allocated_released, err := rs.alloc.releaseClusterIPs(allocated); err != nil {
klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. Allocated/Released:%v/%v", service.Namespace, service.Name, err, allocated, allocated_released)
}
// performRelease is set when the enture function ran to success
if performRelease {
//FIXME: plumb dryRun down here
if toReleaseIPs_released, err := rs.alloc.releaseClusterIPs(toReleaseIPs); err != nil {
klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. ShouldRelease/Released:%v/%v", service.Namespace, service.Name, err, toReleaseIPs, toReleaseIPs_released)
}
}
}()
nodePortOp := portallocator.StartOperation(rs.alloc.serviceNodePorts, dryrun.IsDryRun(options.DryRun))
defer nodePortOp.Finish()
// try set ip families (for missing ip families)
if err := rs.alloc.initIPFamilyFields(oldService, service); err != nil {
return nil, false, err
}
if !dryrun.IsDryRun(options.DryRun) {
allocated, toReleaseIPs, err = rs.alloc.handleClusterIPsForUpdatedService(oldService, service)
if err != nil {
return nil, false, err
// Allocate ClusterIPs
//FIXME: we need to put values in, even if dry run - else validation should
//not pass. It does but that should be fixed. Plumb dryRun thru update
//logic.
// xref: https://groups.google.com/g/kubernetes-sig-api-machinery/c/_-5TKHXHcXE/m/RfKj7CtzAQAJ
if !dryRun {
if txn, err := al.allocUpdateServiceClusterIPsNew(service, oldService); err != nil {
result.Revert()
return nil, err
} else {
result = append(result, txn)
}
}
// Allocate ports
if txn, err := al.allocUpdateServiceNodePortsNew(service, oldService, dryRun); err != nil {
result.Revert()
return nil, err
} else {
result = append(result, txn)
}
return result, nil
}
//FIXME: rename and merge with updateNodePorts?
func (al *RESTAllocStuff) allocUpdateServiceNodePortsNew(service, oldService *api.Service, dryRun bool) (transaction, error) {
// The allocator tracks dry-run-ness internally.
nodePortOp := portallocator.StartOperation(al.serviceNodePorts, dryRun)
txn := callbackTransaction{
commit: func() {
nodePortOp.Commit()
// We don't NEED to call Finish() here, but for that package says
// to, so for future-safety, we will.
nodePortOp.Finish()
},
revert: func() {
// Weirdly named but this will revert if commit wasn't called
nodePortOp.Finish()
},
}
// Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists.
if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) &&
(service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) {
releaseNodePorts(oldService, nodePortOp)
}
// Update service from any type to NodePort or LoadBalancer, should update NodePort.
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
if err := updateNodePorts(oldService, service, nodePortOp); err != nil {
return nil, false, err
txn.Revert()
return nil, err
}
}
// Update service from LoadBalancer to non-LoadBalancer, should remove any LoadBalancerStatus.
if service.Spec.Type != api.ServiceTypeLoadBalancer {
// Although loadbalancer delete is actually asynchronous, we don't need to expose the user to that complexity.
service.Status.LoadBalancer = api.LoadBalancerStatus{}
}
// Handle ExternalTraffic related updates.
success, err := rs.alloc.healthCheckNodePortUpdate(oldService, service, nodePortOp)
success, err := al.healthCheckNodePortUpdate(oldService, service, nodePortOp)
if !success || err != nil {
return nil, false, err
txn.Revert()
return nil, err
}
out, created, err := rs.services.Update(ctx, service.Name, rest.DefaultUpdatedObjectInfo(service), createValidation, updateValidation, forceAllowCreate, options)
if err == nil {
el := nodePortOp.Commit()
if el != nil {
// problems should be fixed by an eventual reconciliation / restart
utilruntime.HandleError(fmt.Errorf("error(s) committing NodePorts changes: %v", el))
}
}
// all good
allocated = nil // if something was allocated, keep it allocated
performRelease = true // if something that should be released then go ahead and release it
return out, created, err
return txn, nil
}
// GetResetFields implements rest.ResetFieldsStrategy
@ -635,6 +591,35 @@ func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service, dryRun bo
return allocated, err
}
//FIXME: rename and merge with handleClusterIPsForUpdatedService
func (al *RESTAllocStuff) allocUpdateServiceClusterIPsNew(service *api.Service, oldService *api.Service) (transaction, error) {
allocated, released, err := al.handleClusterIPsForUpdatedService(oldService, service)
if err != nil {
return nil, err
}
// on failure: Any newly allocated IP must be released back
// on failure: Any previously allocated IP that would have been released,
// must *not* be released
// on success: Any previously allocated IP that should be released, will be
// released
txn := callbackTransaction{
commit: func() {
if actuallyReleased, err := al.releaseClusterIPs(released); err != nil {
klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. ShouldRelease/Released:%v/%v",
service.Namespace, service.Name, err, released, actuallyReleased)
}
},
revert: func() {
if actuallyReleased, err := al.releaseClusterIPs(allocated); err != nil {
klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. Allocated/Released:%v/%v",
service.Namespace, service.Name, err, allocated, actuallyReleased)
}
},
}
return txn, nil
}
// handles type change/upgrade/downgrade change type for an update service
// this func does not perform actual release of clusterIPs. it returns
// a map[family]ip for the caller to release when everything else has

View File

@ -133,14 +133,17 @@ func (s *serviceStorage) Create(ctx context.Context, obj runtime.Object, createV
}
func (s *serviceStorage) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
obj, err := objInfo.UpdatedObject(ctx, nil)
ret, created, err := s.inner.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
return nil, false, err
return ret, created, err
}
if !dryrun.IsDryRun(options.DryRun) {
s.saveService(obj.(*api.Service))
if dryrun.IsDryRun(options.DryRun) {
return ret.DeepCopyObject(), created, err
}
return obj, false, nil
svc := ret.(*api.Service)
s.saveService(svc)
return s.Services[name].DeepCopy(), created, nil
}
func (s *serviceStorage) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
@ -716,12 +719,13 @@ func TestServiceRegistryUpdateLoadBalancerService(t *testing.T) {
svc2 := obj.(*api.Service).DeepCopy()
svc2.Spec.Type = api.ServiceTypeLoadBalancer
svc2.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true)
if _, _, err := storage.Update(ctx, svc2.Name, rest.DefaultUpdatedObjectInfo(svc2), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
obj, _, err = storage.Update(ctx, svc2.Name, rest.DefaultUpdatedObjectInfo(svc2), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Change port.
svc3 := svc2.DeepCopy()
svc3 := obj.(*api.Service).DeepCopy()
svc3.Spec.Ports[0].Port = 6504
if _, _, err := storage.Update(ctx, svc3.Name, rest.DefaultUpdatedObjectInfo(svc3), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
t.Fatalf("Unexpected error: %v", err)
@ -971,7 +975,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) {
}
}
update = createdService.DeepCopy()
update = updatedService.DeepCopy()
update.Spec.Ports[0].Port = 6503
update.Spec.ClusterIP = testIP
update.Spec.ClusterIPs[0] = testIP

View File

@ -326,15 +326,27 @@ func (r *GenericREST) beginUpdate(ctx context.Context, obj, oldObj runtime.Objec
newSvc := obj.(*api.Service)
oldSvc := oldObj.(*api.Service)
// Fix up allocated values that the client may have not specified (for
// idempotence).
svcreg.PatchAllocatedValues(newSvc, oldSvc)
// Make sure ClusterIP and ClusterIPs are in sync. This has to happen
// early, before anyone looks at them.
// NOTE: the args are (old, new)
svcreg.NormalizeClusterIPs(oldSvc, newSvc)
// Allocate and initialize fields.
txn, err := r.alloc.allocateUpdate(newSvc, oldSvc, dryrun.IsDryRun(options.DryRun))
if err != nil {
return nil, err
}
// Our cleanup callback
finish := func(_ context.Context, success bool) {
if success {
txn.Commit()
} else {
txn.Revert()
}
}

View File

@ -5837,3 +5837,5 @@ func TestDeleteDryRun(t *testing.T) {
})
}
}
//FIXME: Tests for update()

View File

@ -120,7 +120,6 @@ func (strategy svcStrategy) PrepareForUpdate(ctx context.Context, obj, old runti
oldService := old.(*api.Service)
newService.Status = oldService.Status
patchAllocatedValues(newService, oldService)
//FIXME: Normalize is now called from BeginUpdate in pkg/registry/core/service/storage
NormalizeClusterIPs(oldService, newService)
dropServiceDisabledFields(newService, oldService)
@ -305,11 +304,12 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt
return nil
}
// patchAllocatedValues allows clients to avoid a read-modify-write cycle while
// PatchAllocatedValues allows clients to avoid a read-modify-write cycle while
// preserving values that we allocated on their behalf. For example, they
// might create a Service without specifying the ClusterIP, in which case we
// allocate one. If they resubmit that same YAML, we want it to succeed.
func patchAllocatedValues(newSvc, oldSvc *api.Service) {
//FIXME: move this to pkg/registry/core/service/storage
func PatchAllocatedValues(newSvc, oldSvc *api.Service) {
if needsClusterIP(oldSvc) && needsClusterIP(newSvc) {
if newSvc.Spec.ClusterIP == "" {
newSvc.Spec.ClusterIP = oldSvc.Spec.ClusterIP
@ -521,6 +521,13 @@ func dropTypeDependentFields(newSvc *api.Service, oldSvc *api.Service) {
// NOTE: there are other fields like `selector` which we could wipe.
// Historically we did not wipe them and they are not allocated from
// finite pools, so we are (currently) choosing to leave them alone.
// Clear the load-balancer status if it is no longer appropriate. Although
// LB de-provisioning is actually asynchronous, we don't need to expose the
// user to that complexity.
if newSvc.Spec.Type != api.ServiceTypeLoadBalancer {
newSvc.Status.LoadBalancer = api.LoadBalancerStatus{}
}
}
func needsClusterIP(svc *api.Service) bool {

View File

@ -22,7 +22,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/intstr"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
@ -124,75 +123,6 @@ func makeServiceWithClusterIp(clusterIP string, clusterIPs []string) *api.Servic
}
}
// TODO: This should be done on types that are not part of our API
func TestBeforeUpdate(t *testing.T) {
testCases := []struct {
name string
tweakSvc func(oldSvc, newSvc *api.Service) // given basic valid services, each test case can customize them
expectErr bool
}{
{
name: "no change",
tweakSvc: func(oldSvc, newSvc *api.Service) {
// nothing
},
expectErr: false,
},
{
name: "change port",
tweakSvc: func(oldSvc, newSvc *api.Service) {
newSvc.Spec.Ports[0].Port++
},
expectErr: false,
},
{
name: "bad namespace",
tweakSvc: func(oldSvc, newSvc *api.Service) {
newSvc.Namespace = "#$%%invalid"
},
expectErr: true,
},
{
name: "change name",
tweakSvc: func(oldSvc, newSvc *api.Service) {
newSvc.Name += "2"
},
expectErr: true,
},
{
name: "change ClusterIP",
tweakSvc: func(oldSvc, newSvc *api.Service) {
oldSvc.Spec.ClusterIPs = []string{"1.2.3.4"}
newSvc.Spec.ClusterIPs = []string{"4.3.2.1"}
},
expectErr: true,
},
{
name: "change selector",
tweakSvc: func(oldSvc, newSvc *api.Service) {
newSvc.Spec.Selector = map[string]string{"newkey": "newvalue"}
},
expectErr: false,
},
}
for _, tc := range testCases {
strategy, _ := newStrategy("172.30.0.0/16", false)
oldSvc := makeValidService()
newSvc := makeValidService()
tc.tweakSvc(oldSvc, newSvc)
ctx := genericapirequest.NewDefaultContext()
err := rest.BeforeUpdate(strategy, ctx, runtime.Object(oldSvc), runtime.Object(newSvc))
if tc.expectErr && err == nil {
t.Errorf("unexpected non-error for %q", tc.name)
}
if !tc.expectErr && err != nil {
t.Errorf("unexpected error for %q: %v", tc.name, err)
}
}
}
func TestServiceStatusStrategy(t *testing.T) {
_, testStatusStrategy := newStrategy("10.0.0.0/16", false)
ctx := genericapirequest.NewDefaultContext()