diff --git a/pkg/registry/service/portallocator/operation.go b/pkg/registry/service/portallocator/operation.go new file mode 100644 index 00000000000..a4350104349 --- /dev/null +++ b/pkg/registry/service/portallocator/operation.go @@ -0,0 +1,117 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package portallocator + +// Encapsulates the semantics of a port allocation 'transaction': +// It is better to leak ports than to double-allocate them, +// so we allocate immediately, but defer release. +// On commit we best-effort release the deferred releases. +// On rollback we best-effort release any allocations we did. +// +// Pattern for use: +// op := StartPortAllocationOperation(...) +// defer op.Finish +// ... +// write(updatedOwner) +/// op.Commit() +type portAllocationOperation struct { + pa Interface + allocated []int + releaseDeferred []int + shouldRollback bool +} + +// Creates a portAllocationOperation, tracking a set of allocations & releases +func StartOperation(pa Interface) *portAllocationOperation { + op := &portAllocationOperation{} + op.pa = pa + op.allocated = []int{} + op.releaseDeferred = []int{} + op.shouldRollback = true + return op +} + +// Will rollback unless marked as shouldRollback = false by a Commit(). Call from a defer block +func (op *portAllocationOperation) Finish() { + if op.shouldRollback { + op.Rollback() + } +} + +// (Try to) undo any operations we did +func (op *portAllocationOperation) Rollback() []error { + errors := []error{} + + for _, allocated := range op.allocated { + err := op.pa.Release(allocated) + if err != nil { + errors = append(errors, err) + } + } + + if len(errors) == 0 { + return nil + } + return errors +} + +// (Try to) perform any deferred operations. +// Note that even if this fails, we don't rollback; we always want to err on the side of over-allocation, +// and Commit should be called _after_ the owner is written +func (op *portAllocationOperation) Commit() []error { + errors := []error{} + + for _, release := range op.releaseDeferred { + err := op.pa.Release(release) + if err != nil { + errors = append(errors, err) + } + } + + // Even on error, we don't rollback + // Problems should be fixed by an eventual reconciliation / restart + op.shouldRollback = false + + if len(errors) == 0 { + return nil + } + + return errors +} + +// Allocates a port, and record it for future rollback +func (op *portAllocationOperation) Allocate(port int) error { + err := op.pa.Allocate(port) + if err == nil { + op.allocated = append(op.allocated, port) + } + return err +} + +// Allocates a port, and record it for future rollback +func (op *portAllocationOperation) AllocateNext() (int, error) { + port, err := op.pa.AllocateNext() + if err == nil { + op.allocated = append(op.allocated, port) + } + return port, err +} + +// Marks a port so that it will be released if this operation Commits +func (op *portAllocationOperation) ReleaseDeferred(port int) { + op.releaseDeferred = append(op.releaseDeferred, port) +} diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 83da560ce9a..8e7c23982bf 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -38,6 +38,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" ) // REST adapts a service registry into apiserver's RESTStorage model. @@ -79,6 +80,9 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err } }() + nodePortOp := portallocator.StartOperation(rs.serviceNodePorts) + defer nodePortOp.Finish() + if api.IsServiceIPRequested(service) { // Allocate next available. ip, err := rs.portals.AllocateNext() @@ -97,12 +101,37 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err releaseServiceIP = true } + assignNodePorts := shouldAssignNodePorts(service) + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + if servicePort.NodePort != 0 { + err := nodePortOp.Allocate(servicePort.NodePort) + if err != nil { + el := fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("nodePort", servicePort.NodePort, err.Error())}.PrefixIndex(i).Prefix("spec.ports") + return nil, errors.NewInvalid("Service", service.Name, el) + } + } else if assignNodePorts { + nodePort, err := nodePortOp.AllocateNext() + if err != nil { + el := fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("nodePort", servicePort.NodePort, err.Error())}.PrefixIndex(i).Prefix("spec.ports") + return nil, errors.NewInvalid("Service", service.Name, el) + } + servicePort.NodePort = nodePort + } + } + out, err := rs.registry.CreateService(ctx, service) if err != nil { err = rest.CheckGeneratedNameError(rest.Services, err, service) } if err == nil { + el := nodePortOp.Commit() + if el != nil { + // these should be caught by an eventual reconciliation / restart + glog.Errorf("error(s) committing service node-ports changes: %v", el) + } + releaseServiceIP = false } @@ -114,10 +143,25 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { if err != nil { return nil, err } + + err = rs.registry.DeleteService(ctx, id) + if err != nil { + return nil, err + } + if api.IsServiceIPSet(service) { rs.portals.Release(net.ParseIP(service.Spec.PortalIP)) } - return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) + + for _, nodePort := range collectServiceNodePorts(service) { + err := rs.serviceNodePorts.Release(nodePort) + if err != nil { + // these should be caught by an eventual reconciliation / restart + glog.Errorf("Error releasing service %s node port %d: %v", service.Name, nodePort, err) + } + } + + return &api.Status{Status: api.StatusSuccess}, nil } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { @@ -173,7 +217,64 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo if errs := validation.ValidateServiceUpdate(oldService, service); len(errs) > 0 { return nil, false, errors.NewInvalid("service", service.Name, errs) } + + nodePortOp := portallocator.StartOperation(rs.serviceNodePorts) + defer nodePortOp.Finish() + + assignNodePorts := shouldAssignNodePorts(service) + + oldNodePorts := collectServiceNodePorts(oldService) + + newNodePorts := []int{} + if assignNodePorts { + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + nodePort := servicePort.NodePort + if nodePort != 0 { + if !contains(oldNodePorts, nodePort) { + err := nodePortOp.Allocate(nodePort) + if err != nil { + el := fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("nodePort", nodePort, err.Error())}.PrefixIndex(i).Prefix("spec.ports") + return nil, false, errors.NewInvalid("Service", service.Name, el) + } + } + } else { + nodePort, err = nodePortOp.AllocateNext() + if err != nil { + el := fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("nodePort", nodePort, err.Error())}.PrefixIndex(i).Prefix("spec.ports") + return nil, false, errors.NewInvalid("Service", service.Name, el) + } + servicePort.NodePort = nodePort + } + // Detect duplicate node ports; this should have been caught by validation, so we panic + if contains(newNodePorts, nodePort) { + panic("duplicate node port") + } + newNodePorts = append(newNodePorts, nodePort) + } + } else { + // Validate should have validated that nodePort == 0 + } + + // The comparison loops are O(N^2), but we don't expect N to be huge + // (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot) + for _, oldNodePort := range oldNodePorts { + if !contains(newNodePorts, oldNodePort) { + continue + } + nodePortOp.ReleaseDeferred(oldNodePort) + } + out, err := rs.registry.UpdateService(ctx, service) + + if err == nil { + el := nodePortOp.Commit() + if el != nil { + // problems should be fixed by an eventual reconciliation / restart + glog.Errorf("error(s) committing NodePorts changes: %v", el) + } + } + return out, false, err } @@ -215,3 +316,41 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou } return nil, nil, fmt.Errorf("no endpoints available for %q", id) } + +// This is O(N), but we expect haystack to be small; +// so small that we expect a linear search to be faster +func contains(haystack []int, needle int) bool { + for _, v := range haystack { + if v == needle { + return true + } + } + return false +} + +func collectServiceNodePorts(service *api.Service) []int { + servicePorts := []int{} + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + if servicePort.NodePort != 0 { + servicePorts = append(servicePorts, servicePort.NodePort) + } + } + return servicePorts +} + +func shouldAssignNodePorts(service *api.Service) bool { + // TODO(justinsb): Switch on service.Spec.Type + // switch service.Spec.Type { + // case api.ServiceVisibilityLoadBalancer: + // return true + // case api.ServiceVisibilityNodePort: + // return true + // case api.ServiceVisibilityCluster: + // return false + // default: + // glog.Errorf("Unknown visibility value: %v", service.Spec.Visibility) + // return false + // } + return false +}