Svc REST: Move allocations in Create into funcs

All the logic remains unchanged, just reorganized.  The functions are
imperfect but emphasize the change being made and can be cleaned up
subsequently.

This makes the following steps easier to comprehend.
This commit is contained in:
Tim Hockin 2021-06-26 11:55:30 -07:00
parent 960b36b124
commit f3c7e846f1
2 changed files with 111 additions and 52 deletions

View File

@ -182,53 +182,24 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation
service := obj.(*api.Service)
// bag of clusterIPs allocated in the process of creation
// failed allocation will automatically trigger release
var toReleaseClusterIPs map[api.IPFamily]string
if err := rest.BeforeCreate(rs.strategy, ctx, obj); err != nil {
return nil, err
}
// TODO: this should probably move to strategy.PrepareForCreate()
defer func() {
released, err := rs.alloc.releaseClusterIPs(toReleaseClusterIPs)
if err != nil {
klog.Warningf("failed to release clusterIPs for failed new service:%v allocated:%v released:%v error:%v",
service.Name, toReleaseClusterIPs, released, err)
}
}()
// try set ip families (for missing ip families)
// we do it here, since we want this to be visible
// even when dryRun == true
if err := rs.alloc.tryDefaultValidateServiceClusterIPFields(nil, service); err != nil {
// Allocate IPs and ports. If we had a transactional store, this would just
// be part of the larger transaction. We don't have that, so we have to do
// it manually. This has to happen here and not in any earlier hooks (e.g.
// defaulting) because it needs to be aware of flags and be able to access
// API storage.
txn, err := rs.alloc.allocateCreate(service, dryrun.IsDryRun(options.DryRun))
if err != nil {
return nil, err
}
var err error
if !dryrun.IsDryRun(options.DryRun) {
toReleaseClusterIPs, err = rs.alloc.allocServiceClusterIPs(service)
if err != nil {
return nil, err
defer func() {
if txn != nil {
txn.Revert()
}
}
nodePortOp := portallocator.StartOperation(rs.alloc.serviceNodePorts, dryrun.IsDryRun(options.DryRun))
defer nodePortOp.Finish()
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
if err := initNodePorts(service, nodePortOp); err != nil {
return nil, err
}
}
// Handle ExternalTraffic related fields during service creation.
if apiservice.NeedsHealthCheck(service) {
if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil {
return nil, errors.NewInternalError(err)
}
}
}()
out, err := rs.services.Create(ctx, service, createValidation, options)
if err != nil {
@ -236,19 +207,45 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation
}
if err == nil {
el := nodePortOp.Commit()
if el != nil {
// these should be caught by an eventual reconciliation / restart
utilruntime.HandleError(fmt.Errorf("error(s) committing service node-ports changes: %v", el))
}
// no clusterips to release
toReleaseClusterIPs = nil
txn.Commit()
txn = nil
}
return out, err
}
func (al *RESTAllocStuff) allocateCreate(service *api.Service, dryRun bool) (transaction, error) {
result := metaTransaction{}
// Ensure IP family fields are correctly initialized. We do it here, since
// we want this to be visible even when dryRun == true.
if err := al.tryDefaultValidateServiceClusterIPFields(nil, service); err != nil {
return nil, err
}
// Allocate ClusterIPs
//FIXME: we need to put values in, even if dry run - else validation should
//not pass. It does but that should be fixed.
if !dryRun {
if txn, err := al.allocServiceClusterIPsNew(service); err != nil {
result.Revert()
return nil, err
} else {
result = append(result, txn)
}
}
// Allocate ports
if txn, err := al.allocServiceNodePortsNew(service, dryRun); err != nil {
result.Revert()
return nil, err
} else {
result = append(result, txn)
}
return result, nil
}
func (rs *REST) Delete(ctx context.Context, id string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
// TODO: handle graceful
obj, _, err := rs.services.Delete(ctx, id, deleteValidation, options)
@ -631,6 +628,27 @@ func (al *RESTAllocStuff) allocServiceClusterIP(service *api.Service) (map[api.I
return allocated, err
}
//FIXME: merge into allocServiceClusterIPs rather than call it
func (al *RESTAllocStuff) allocServiceClusterIPsNew(service *api.Service) (transaction, error) {
// clusterIPs that were allocated may need to be released in case of
// failure at a higher level.
toReleaseClusterIPs, err := al.allocServiceClusterIPs(service)
if err != nil {
return nil, err
}
txn := callbackTransaction{
revert: func() {
released, err := al.releaseClusterIPs(toReleaseClusterIPs)
if err != nil {
klog.Warningf("failed to release clusterIPs for failed new service:%v allocated:%v released:%v error:%v",
service.Name, toReleaseClusterIPs, released, err)
}
},
}
return txn, nil
}
// allocates ClusterIPs for a service
func (al *RESTAllocStuff) allocServiceClusterIPs(service *api.Service) (map[api.IPFamily]string, error) {
// external name don't get ClusterIPs
@ -1135,6 +1153,43 @@ func allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator
return nil
}
//FIXME: rename and merge with initNodePorts
func (al *RESTAllocStuff) allocServiceNodePortsNew(service *api.Service, dryRun bool) (transaction, error) {
// The allocator tracks dry-run-ness internally.
nodePortOp := portallocator.StartOperation(al.serviceNodePorts, dryRun)
txn := callbackTransaction{
commit: func() {
nodePortOp.Commit()
// We don't NEED to call Finish() here, but for that package says
// to, so for future-safety, we will.
nodePortOp.Finish()
},
revert: func() {
// Weirdly named but this will revert if commit wasn't called
nodePortOp.Finish()
},
}
// Allocate NodePorts, if needed.
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
if err := initNodePorts(service, nodePortOp); err != nil {
txn.Revert()
return nil, err
}
}
// Handle ExternalTraffic related fields during service creation.
if apiservice.NeedsHealthCheck(service) {
if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil {
txn.Revert()
return nil, errors.NewInternalError(err)
}
}
return txn, nil
}
func initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
svcPortToNodePort := map[int]int{}
for i := range service.Spec.Ports {

View File

@ -1089,18 +1089,22 @@ func TestAllocateLoadBalancerNodePorts(t *testing.T) {
if tc.expectError {
return
}
t.Errorf("%s; Failed to create service: %#v", tc.name, err)
t.Errorf("failed to create service: %#v", err)
}
srv, err := getService(storage, ctx, tc.svc.Name, &metav1.GetOptions{})
if err != nil {
t.Errorf("%s; Unexpected error: %v", tc.name, err)
t.Errorf("unexpected error: %v", err)
}
if srv == nil {
t.Fatalf("%s; Failed to find service: %s", tc.name, tc.svc.Name)
t.Fatalf("failed to find service: %s", tc.svc.Name)
}
serviceNodePorts := collectServiceNodePorts(srv)
if (len(serviceNodePorts) != 0) != tc.expectNodePorts {
t.Errorf("%s; Allocated NodePorts not as expected", tc.name)
exp := "0"
if tc.expectNodePorts {
exp = ">0"
}
t.Errorf("allocated NodePorts not as expected: expected %v, got %v", exp, len(serviceNodePorts))
}
})
}