diff --git a/pkg/apis/core/validation/validation.go b/pkg/apis/core/validation/validation.go index d5c66ce84c4..696563223b8 100644 --- a/pkg/apis/core/validation/validation.go +++ b/pkg/apis/core/validation/validation.go @@ -45,6 +45,7 @@ import ( schedulinghelper "k8s.io/component-helpers/scheduling/corev1" apiservice "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/apis/core" + api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/helper" podshelper "k8s.io/kubernetes/pkg/apis/core/pods" corev1 "k8s.io/kubernetes/pkg/apis/core/v1" @@ -4528,6 +4529,18 @@ func validateServiceExternalTrafficFieldsValue(service *core.Service) field.Erro return allErrs } +func validateServiceExternalTrafficFieldsUpdate(before, after *api.Service) field.ErrorList { + allErrs := field.ErrorList{} + + if apiservice.NeedsHealthCheck(before) && apiservice.NeedsHealthCheck(after) { + if after.Spec.HealthCheckNodePort != before.Spec.HealthCheckNodePort { + allErrs = append(allErrs, field.Forbidden(field.NewPath("spec", "healthCheckNodePort"), "field is immutable")) + } + } + + return allErrs +} + // validateServiceInternalTrafficFieldsValue validates InternalTraffic related // spec have legal value. func validateServiceInternalTrafficFieldsValue(service *core.Service) field.ErrorList { @@ -4592,6 +4605,8 @@ func ValidateServiceUpdate(service, oldService *core.Service) field.ErrorList { upgradeDowngradeLoadBalancerClassErrs := validateLoadBalancerClassField(oldService, service) allErrs = append(allErrs, upgradeDowngradeLoadBalancerClassErrs...) + allErrs = append(allErrs, validateServiceExternalTrafficFieldsUpdate(oldService, service)...) + return append(allErrs, ValidateService(service)...) } diff --git a/pkg/registry/core/service/storage/rest.go b/pkg/registry/core/service/storage/rest.go index 06304661746..1e2730ccc4a 100644 --- a/pkg/registry/core/service/storage/rest.go +++ b/pkg/registry/core/service/storage/rest.go @@ -35,7 +35,6 @@ import ( "k8s.io/apimachinery/pkg/watch" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/apiserver/pkg/util/dryrun" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" apiservice "k8s.io/kubernetes/pkg/api/service" @@ -253,7 +252,6 @@ func (al *RESTAllocStuff) healthCheckNodePortUpdate(oldService, service *api.Ser oldHealthCheckNodePort := oldService.Spec.HealthCheckNodePort needsHealthCheckNodePort := apiservice.NeedsHealthCheck(service) - newHealthCheckNodePort := service.Spec.HealthCheckNodePort switch { // Case 1: Transition from don't need HealthCheckNodePort to needs HealthCheckNodePort. @@ -271,130 +269,88 @@ func (al *RESTAllocStuff) healthCheckNodePortUpdate(oldService, service *api.Ser klog.Infof("Transition to non LoadBalancer type service or LoadBalancer type service with ExternalTrafficPolicy=Global") klog.V(4).Infof("Releasing healthCheckNodePort: %d", oldHealthCheckNodePort) nodePortOp.ReleaseDeferred(int(oldHealthCheckNodePort)) - - // Case 3: Remain in needs HealthCheckNodePort. - // Reject changing the value of the HealthCheckNodePort field. - case neededHealthCheckNodePort && needsHealthCheckNodePort: - if oldHealthCheckNodePort != newHealthCheckNodePort { - //FIXME: Let validation do this. - klog.Warningf("Attempt to change value of health check node port DENIED") - fldPath := field.NewPath("spec", "healthCheckNodePort") - el := field.ErrorList{field.Invalid(fldPath, newHealthCheckNodePort, - "cannot change healthCheckNodePort on loadBalancer service with externalTraffic=Local during update")} - return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) - } } return true, nil } func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { - oldObj, err := rs.services.Get(ctx, name, &metav1.GetOptions{}) - if err != nil { - // Support create on update, if forced to. - if forceAllowCreate { - obj, err := objInfo.UpdatedObject(ctx, nil) - if err != nil { - return nil, false, err - } - createdObj, err := rs.Create(ctx, obj, createValidation, &metav1.CreateOptions{DryRun: options.DryRun}) - if err != nil { - return nil, false, err - } - return createdObj, true, nil - } - return nil, false, err - } - oldService := oldObj.(*api.Service) - obj, err := objInfo.UpdatedObject(ctx, oldService) - if err != nil { - return nil, false, err + return rs.services.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) +} + +func (al *RESTAllocStuff) allocateUpdate(service, oldService *api.Service, dryRun bool) (transaction, error) { + result := metaTransaction{} + + // Ensure IP family fields are correctly initialized. We do it here, since + // we want this to be visible even when dryRun == true. + if err := al.initIPFamilyFields(oldService, service); err != nil { + return nil, err } - service := obj.(*api.Service) - - if !rest.ValidNamespace(ctx, &service.ObjectMeta) { - return nil, false, errors.NewConflict(api.Resource("services"), service.Namespace, fmt.Errorf("Service.Namespace does not match the provided context")) - } - - // Copy over non-user fields - if err := rest.BeforeUpdate(rs.strategy, ctx, service, oldService); err != nil { - return nil, false, err - } - - var allocated map[api.IPFamily]string - var toReleaseIPs map[api.IPFamily]string - - performRelease := false // when set, any clusterIP that should be released will be released - // cleanup - // on failure: Any allocated ip must be released back - // on failure: any ip that should be released, will *not* be released - // on success: any ip that should be released, will be released - defer func() { - //FIXME: plumb dryRun down here - // release the allocated, this is expected to be cleared if the entire function ran to success - if allocated_released, err := rs.alloc.releaseClusterIPs(allocated); err != nil { - klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. Allocated/Released:%v/%v", service.Namespace, service.Name, err, allocated, allocated_released) - - } - // performRelease is set when the enture function ran to success - if performRelease { - //FIXME: plumb dryRun down here - if toReleaseIPs_released, err := rs.alloc.releaseClusterIPs(toReleaseIPs); err != nil { - klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. ShouldRelease/Released:%v/%v", service.Namespace, service.Name, err, toReleaseIPs, toReleaseIPs_released) - } - } - }() - - nodePortOp := portallocator.StartOperation(rs.alloc.serviceNodePorts, dryrun.IsDryRun(options.DryRun)) - defer nodePortOp.Finish() - - // try set ip families (for missing ip families) - if err := rs.alloc.initIPFamilyFields(oldService, service); err != nil { - return nil, false, err - } - - if !dryrun.IsDryRun(options.DryRun) { - allocated, toReleaseIPs, err = rs.alloc.handleClusterIPsForUpdatedService(oldService, service) - if err != nil { - return nil, false, err + // Allocate ClusterIPs + //FIXME: we need to put values in, even if dry run - else validation should + //not pass. It does but that should be fixed. Plumb dryRun thru update + //logic. + // xref: https://groups.google.com/g/kubernetes-sig-api-machinery/c/_-5TKHXHcXE/m/RfKj7CtzAQAJ + if !dryRun { + if txn, err := al.allocUpdateServiceClusterIPsNew(service, oldService); err != nil { + result.Revert() + return nil, err + } else { + result = append(result, txn) } } + + // Allocate ports + if txn, err := al.allocUpdateServiceNodePortsNew(service, oldService, dryRun); err != nil { + result.Revert() + return nil, err + } else { + result = append(result, txn) + } + + return result, nil +} + +//FIXME: rename and merge with updateNodePorts? +func (al *RESTAllocStuff) allocUpdateServiceNodePortsNew(service, oldService *api.Service, dryRun bool) (transaction, error) { + // The allocator tracks dry-run-ness internally. + nodePortOp := portallocator.StartOperation(al.serviceNodePorts, dryRun) + + txn := callbackTransaction{ + commit: func() { + nodePortOp.Commit() + // We don't NEED to call Finish() here, but for that package says + // to, so for future-safety, we will. + nodePortOp.Finish() + }, + revert: func() { + // Weirdly named but this will revert if commit wasn't called + nodePortOp.Finish() + }, + } + // Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists. if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) && (service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) { releaseNodePorts(oldService, nodePortOp) } + // Update service from any type to NodePort or LoadBalancer, should update NodePort. if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { if err := updateNodePorts(oldService, service, nodePortOp); err != nil { - return nil, false, err + txn.Revert() + return nil, err } } - // Update service from LoadBalancer to non-LoadBalancer, should remove any LoadBalancerStatus. - if service.Spec.Type != api.ServiceTypeLoadBalancer { - // Although loadbalancer delete is actually asynchronous, we don't need to expose the user to that complexity. - service.Status.LoadBalancer = api.LoadBalancerStatus{} - } // Handle ExternalTraffic related updates. - success, err := rs.alloc.healthCheckNodePortUpdate(oldService, service, nodePortOp) + success, err := al.healthCheckNodePortUpdate(oldService, service, nodePortOp) if !success || err != nil { - return nil, false, err + txn.Revert() + return nil, err } - out, created, err := rs.services.Update(ctx, service.Name, rest.DefaultUpdatedObjectInfo(service), createValidation, updateValidation, forceAllowCreate, options) - if err == nil { - el := nodePortOp.Commit() - if el != nil { - // problems should be fixed by an eventual reconciliation / restart - utilruntime.HandleError(fmt.Errorf("error(s) committing NodePorts changes: %v", el)) - } - } - // all good - allocated = nil // if something was allocated, keep it allocated - performRelease = true // if something that should be released then go ahead and release it - - return out, created, err + return txn, nil } // GetResetFields implements rest.ResetFieldsStrategy @@ -635,6 +591,35 @@ func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service, dryRun bo return allocated, err } +//FIXME: rename and merge with handleClusterIPsForUpdatedService +func (al *RESTAllocStuff) allocUpdateServiceClusterIPsNew(service *api.Service, oldService *api.Service) (transaction, error) { + allocated, released, err := al.handleClusterIPsForUpdatedService(oldService, service) + if err != nil { + return nil, err + } + + // on failure: Any newly allocated IP must be released back + // on failure: Any previously allocated IP that would have been released, + // must *not* be released + // on success: Any previously allocated IP that should be released, will be + // released + txn := callbackTransaction{ + commit: func() { + if actuallyReleased, err := al.releaseClusterIPs(released); err != nil { + klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. ShouldRelease/Released:%v/%v", + service.Namespace, service.Name, err, released, actuallyReleased) + } + }, + revert: func() { + if actuallyReleased, err := al.releaseClusterIPs(allocated); err != nil { + klog.V(4).Infof("service %v/%v failed to clean up after failed service update error:%v. Allocated/Released:%v/%v", + service.Namespace, service.Name, err, allocated, actuallyReleased) + } + }, + } + return txn, nil +} + // handles type change/upgrade/downgrade change type for an update service // this func does not perform actual release of clusterIPs. it returns // a map[family]ip for the caller to release when everything else has diff --git a/pkg/registry/core/service/storage/rest_test.go b/pkg/registry/core/service/storage/rest_test.go index 0b70abeb2df..838e08483f7 100644 --- a/pkg/registry/core/service/storage/rest_test.go +++ b/pkg/registry/core/service/storage/rest_test.go @@ -133,14 +133,17 @@ func (s *serviceStorage) Create(ctx context.Context, obj runtime.Object, createV } func (s *serviceStorage) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { - obj, err := objInfo.UpdatedObject(ctx, nil) + ret, created, err := s.inner.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) if err != nil { - return nil, false, err + return ret, created, err } - if !dryrun.IsDryRun(options.DryRun) { - s.saveService(obj.(*api.Service)) + if dryrun.IsDryRun(options.DryRun) { + return ret.DeepCopyObject(), created, err } - return obj, false, nil + svc := ret.(*api.Service) + s.saveService(svc) + + return s.Services[name].DeepCopy(), created, nil } func (s *serviceStorage) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { @@ -716,12 +719,13 @@ func TestServiceRegistryUpdateLoadBalancerService(t *testing.T) { svc2 := obj.(*api.Service).DeepCopy() svc2.Spec.Type = api.ServiceTypeLoadBalancer svc2.Spec.AllocateLoadBalancerNodePorts = utilpointer.BoolPtr(true) - if _, _, err := storage.Update(ctx, svc2.Name, rest.DefaultUpdatedObjectInfo(svc2), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil { + obj, _, err = storage.Update(ctx, svc2.Name, rest.DefaultUpdatedObjectInfo(svc2), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if err != nil { t.Fatalf("Unexpected error: %v", err) } // Change port. - svc3 := svc2.DeepCopy() + svc3 := obj.(*api.Service).DeepCopy() svc3.Spec.Ports[0].Port = 6504 if _, _, err := storage.Update(ctx, svc3.Name, rest.DefaultUpdatedObjectInfo(svc3), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil { t.Fatalf("Unexpected error: %v", err) @@ -971,7 +975,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) { } } - update = createdService.DeepCopy() + update = updatedService.DeepCopy() update.Spec.Ports[0].Port = 6503 update.Spec.ClusterIP = testIP update.Spec.ClusterIPs[0] = testIP diff --git a/pkg/registry/core/service/storage/storage.go b/pkg/registry/core/service/storage/storage.go index 6e18ad27d97..a385fc7a96e 100644 --- a/pkg/registry/core/service/storage/storage.go +++ b/pkg/registry/core/service/storage/storage.go @@ -326,15 +326,27 @@ func (r *GenericREST) beginUpdate(ctx context.Context, obj, oldObj runtime.Objec newSvc := obj.(*api.Service) oldSvc := oldObj.(*api.Service) + // Fix up allocated values that the client may have not specified (for + // idempotence). + svcreg.PatchAllocatedValues(newSvc, oldSvc) + // Make sure ClusterIP and ClusterIPs are in sync. This has to happen // early, before anyone looks at them. // NOTE: the args are (old, new) svcreg.NormalizeClusterIPs(oldSvc, newSvc) + // Allocate and initialize fields. + txn, err := r.alloc.allocateUpdate(newSvc, oldSvc, dryrun.IsDryRun(options.DryRun)) + if err != nil { + return nil, err + } + // Our cleanup callback finish := func(_ context.Context, success bool) { if success { + txn.Commit() } else { + txn.Revert() } } diff --git a/pkg/registry/core/service/storage/storage_test.go b/pkg/registry/core/service/storage/storage_test.go index 2acb9dceace..ab31df2e5a1 100644 --- a/pkg/registry/core/service/storage/storage_test.go +++ b/pkg/registry/core/service/storage/storage_test.go @@ -5837,3 +5837,5 @@ func TestDeleteDryRun(t *testing.T) { }) } } + +//FIXME: Tests for update() diff --git a/pkg/registry/core/service/strategy.go b/pkg/registry/core/service/strategy.go index 3439b423924..0a34b59876c 100644 --- a/pkg/registry/core/service/strategy.go +++ b/pkg/registry/core/service/strategy.go @@ -120,7 +120,6 @@ func (strategy svcStrategy) PrepareForUpdate(ctx context.Context, obj, old runti oldService := old.(*api.Service) newService.Status = oldService.Status - patchAllocatedValues(newService, oldService) //FIXME: Normalize is now called from BeginUpdate in pkg/registry/core/service/storage NormalizeClusterIPs(oldService, newService) dropServiceDisabledFields(newService, oldService) @@ -305,11 +304,12 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt return nil } -// patchAllocatedValues allows clients to avoid a read-modify-write cycle while +// PatchAllocatedValues allows clients to avoid a read-modify-write cycle while // preserving values that we allocated on their behalf. For example, they // might create a Service without specifying the ClusterIP, in which case we // allocate one. If they resubmit that same YAML, we want it to succeed. -func patchAllocatedValues(newSvc, oldSvc *api.Service) { +//FIXME: move this to pkg/registry/core/service/storage +func PatchAllocatedValues(newSvc, oldSvc *api.Service) { if needsClusterIP(oldSvc) && needsClusterIP(newSvc) { if newSvc.Spec.ClusterIP == "" { newSvc.Spec.ClusterIP = oldSvc.Spec.ClusterIP @@ -521,6 +521,13 @@ func dropTypeDependentFields(newSvc *api.Service, oldSvc *api.Service) { // NOTE: there are other fields like `selector` which we could wipe. // Historically we did not wipe them and they are not allocated from // finite pools, so we are (currently) choosing to leave them alone. + + // Clear the load-balancer status if it is no longer appropriate. Although + // LB de-provisioning is actually asynchronous, we don't need to expose the + // user to that complexity. + if newSvc.Spec.Type != api.ServiceTypeLoadBalancer { + newSvc.Status.LoadBalancer = api.LoadBalancerStatus{} + } } func needsClusterIP(svc *api.Service) bool { diff --git a/pkg/registry/core/service/strategy_test.go b/pkg/registry/core/service/strategy_test.go index bc5759788d9..c405c334941 100644 --- a/pkg/registry/core/service/strategy_test.go +++ b/pkg/registry/core/service/strategy_test.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/intstr" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -124,75 +123,6 @@ func makeServiceWithClusterIp(clusterIP string, clusterIPs []string) *api.Servic } } -// TODO: This should be done on types that are not part of our API -func TestBeforeUpdate(t *testing.T) { - testCases := []struct { - name string - tweakSvc func(oldSvc, newSvc *api.Service) // given basic valid services, each test case can customize them - expectErr bool - }{ - { - name: "no change", - tweakSvc: func(oldSvc, newSvc *api.Service) { - // nothing - }, - expectErr: false, - }, - { - name: "change port", - tweakSvc: func(oldSvc, newSvc *api.Service) { - newSvc.Spec.Ports[0].Port++ - }, - expectErr: false, - }, - { - name: "bad namespace", - tweakSvc: func(oldSvc, newSvc *api.Service) { - newSvc.Namespace = "#$%%invalid" - }, - expectErr: true, - }, - { - name: "change name", - tweakSvc: func(oldSvc, newSvc *api.Service) { - newSvc.Name += "2" - }, - expectErr: true, - }, - { - name: "change ClusterIP", - tweakSvc: func(oldSvc, newSvc *api.Service) { - oldSvc.Spec.ClusterIPs = []string{"1.2.3.4"} - newSvc.Spec.ClusterIPs = []string{"4.3.2.1"} - }, - expectErr: true, - }, - { - name: "change selector", - tweakSvc: func(oldSvc, newSvc *api.Service) { - newSvc.Spec.Selector = map[string]string{"newkey": "newvalue"} - }, - expectErr: false, - }, - } - - for _, tc := range testCases { - strategy, _ := newStrategy("172.30.0.0/16", false) - - oldSvc := makeValidService() - newSvc := makeValidService() - tc.tweakSvc(oldSvc, newSvc) - ctx := genericapirequest.NewDefaultContext() - err := rest.BeforeUpdate(strategy, ctx, runtime.Object(oldSvc), runtime.Object(newSvc)) - if tc.expectErr && err == nil { - t.Errorf("unexpected non-error for %q", tc.name) - } - if !tc.expectErr && err != nil { - t.Errorf("unexpected error for %q: %v", tc.name, err) - } - } -} - func TestServiceStatusStrategy(t *testing.T) { _, testStatusStrategy := newStrategy("10.0.0.0/16", false) ctx := genericapirequest.NewDefaultContext()