diff --git a/pkg/registry/core/service/storage/rest.go b/pkg/registry/core/service/storage/rest.go index 57d2573b2c2..04d04057646 100644 --- a/pkg/registry/core/service/storage/rest.go +++ b/pkg/registry/core/service/storage/rest.go @@ -182,53 +182,24 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation service := obj.(*api.Service) - // bag of clusterIPs allocated in the process of creation - // failed allocation will automatically trigger release - var toReleaseClusterIPs map[api.IPFamily]string - if err := rest.BeforeCreate(rs.strategy, ctx, obj); err != nil { return nil, err } - // TODO: this should probably move to strategy.PrepareForCreate() - defer func() { - released, err := rs.alloc.releaseClusterIPs(toReleaseClusterIPs) - if err != nil { - klog.Warningf("failed to release clusterIPs for failed new service:%v allocated:%v released:%v error:%v", - service.Name, toReleaseClusterIPs, released, err) - } - }() - - // try set ip families (for missing ip families) - // we do it here, since we want this to be visible - // even when dryRun == true - if err := rs.alloc.tryDefaultValidateServiceClusterIPFields(nil, service); err != nil { + // Allocate IPs and ports. If we had a transactional store, this would just + // be part of the larger transaction. We don't have that, so we have to do + // it manually. This has to happen here and not in any earlier hooks (e.g. + // defaulting) because it needs to be aware of flags and be able to access + // API storage. + txn, err := rs.alloc.allocateCreate(service, dryrun.IsDryRun(options.DryRun)) + if err != nil { return nil, err } - - var err error - if !dryrun.IsDryRun(options.DryRun) { - toReleaseClusterIPs, err = rs.alloc.allocServiceClusterIPs(service) - if err != nil { - return nil, err + defer func() { + if txn != nil { + txn.Revert() } - } - - nodePortOp := portallocator.StartOperation(rs.alloc.serviceNodePorts, dryrun.IsDryRun(options.DryRun)) - defer nodePortOp.Finish() - - if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { - if err := initNodePorts(service, nodePortOp); err != nil { - return nil, err - } - } - - // Handle ExternalTraffic related fields during service creation. - if apiservice.NeedsHealthCheck(service) { - if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil { - return nil, errors.NewInternalError(err) - } - } + }() out, err := rs.services.Create(ctx, service, createValidation, options) if err != nil { @@ -236,19 +207,45 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation } if err == nil { - el := nodePortOp.Commit() - if el != nil { - // these should be caught by an eventual reconciliation / restart - utilruntime.HandleError(fmt.Errorf("error(s) committing service node-ports changes: %v", el)) - } - - // no clusterips to release - toReleaseClusterIPs = nil + txn.Commit() + txn = nil } return out, err } +func (al *RESTAllocStuff) allocateCreate(service *api.Service, dryRun bool) (transaction, error) { + result := metaTransaction{} + + // Ensure IP family fields are correctly initialized. We do it here, since + // we want this to be visible even when dryRun == true. + if err := al.tryDefaultValidateServiceClusterIPFields(nil, service); err != nil { + return nil, err + } + + // Allocate ClusterIPs + //FIXME: we need to put values in, even if dry run - else validation should + //not pass. It does but that should be fixed. + if !dryRun { + if txn, err := al.allocServiceClusterIPsNew(service); err != nil { + result.Revert() + return nil, err + } else { + result = append(result, txn) + } + } + + // Allocate ports + if txn, err := al.allocServiceNodePortsNew(service, dryRun); err != nil { + result.Revert() + return nil, err + } else { + result = append(result, txn) + } + + return result, nil +} + func (rs *REST) Delete(ctx context.Context, id string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { // TODO: handle graceful obj, _, err := rs.services.Delete(ctx, id, deleteValidation, options) @@ -631,6 +628,27 @@ func (al *RESTAllocStuff) allocServiceClusterIP(service *api.Service) (map[api.I return allocated, err } +//FIXME: merge into allocServiceClusterIPs rather than call it +func (al *RESTAllocStuff) allocServiceClusterIPsNew(service *api.Service) (transaction, error) { + // clusterIPs that were allocated may need to be released in case of + // failure at a higher level. + toReleaseClusterIPs, err := al.allocServiceClusterIPs(service) + if err != nil { + return nil, err + } + + txn := callbackTransaction{ + revert: func() { + released, err := al.releaseClusterIPs(toReleaseClusterIPs) + if err != nil { + klog.Warningf("failed to release clusterIPs for failed new service:%v allocated:%v released:%v error:%v", + service.Name, toReleaseClusterIPs, released, err) + } + }, + } + return txn, nil +} + // allocates ClusterIPs for a service func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]string, error) { // external name don't get ClusterIPs @@ -1135,6 +1153,43 @@ func allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator return nil } +//FIXME: rename and merge with initNodePorts +func (al *RESTAllocStuff) allocServiceNodePortsNew(service *api.Service, dryRun bool) (transaction, error) { + // The allocator tracks dry-run-ness internally. + nodePortOp := portallocator.StartOperation(al.serviceNodePorts, dryRun) + + txn := callbackTransaction{ + commit: func() { + nodePortOp.Commit() + // We don't NEED to call Finish() here, but for that package says + // to, so for future-safety, we will. + nodePortOp.Finish() + }, + revert: func() { + // Weirdly named but this will revert if commit wasn't called + nodePortOp.Finish() + }, + } + + // Allocate NodePorts, if needed. + if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { + if err := initNodePorts(service, nodePortOp); err != nil { + txn.Revert() + return nil, err + } + } + + // Handle ExternalTraffic related fields during service creation. + if apiservice.NeedsHealthCheck(service) { + if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil { + txn.Revert() + return nil, errors.NewInternalError(err) + } + } + + return txn, nil +} + func initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error { svcPortToNodePort := map[int]int{} for i := range service.Spec.Ports { diff --git a/pkg/registry/core/service/storage/rest_test.go b/pkg/registry/core/service/storage/rest_test.go index 43eb8ef56fa..096b0557809 100644 --- a/pkg/registry/core/service/storage/rest_test.go +++ b/pkg/registry/core/service/storage/rest_test.go @@ -1089,18 +1089,22 @@ func TestAllocateLoadBalancerNodePorts(t *testing.T) { if tc.expectError { return } - t.Errorf("%s; Failed to create service: %#v", tc.name, err) + t.Errorf("failed to create service: %#v", err) } srv, err := getService(storage, ctx, tc.svc.Name, &metav1.GetOptions{}) if err != nil { - t.Errorf("%s; Unexpected error: %v", tc.name, err) + t.Errorf("unexpected error: %v", err) } if srv == nil { - t.Fatalf("%s; Failed to find service: %s", tc.name, tc.svc.Name) + t.Fatalf("failed to find service: %s", tc.svc.Name) } serviceNodePorts := collectServiceNodePorts(srv) if (len(serviceNodePorts) != 0) != tc.expectNodePorts { - t.Errorf("%s; Allocated NodePorts not as expected", tc.name) + exp := "0" + if tc.expectNodePorts { + exp = ">0" + } + t.Errorf("allocated NodePorts not as expected: expected %v, got %v", exp, len(serviceNodePorts)) } }) }