mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Wire up the portallocator to service NodePorts
We still don't actually enable this though!
This commit is contained in:
parent
3bb2fe2425
commit
03cdc077c3
117
pkg/registry/service/portallocator/operation.go
Normal file
117
pkg/registry/service/portallocator/operation.go
Normal file
@ -0,0 +1,117 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package portallocator
|
||||
|
||||
// Encapsulates the semantics of a port allocation 'transaction':
|
||||
// It is better to leak ports than to double-allocate them,
|
||||
// so we allocate immediately, but defer release.
|
||||
// On commit we best-effort release the deferred releases.
|
||||
// On rollback we best-effort release any allocations we did.
|
||||
//
|
||||
// Pattern for use:
|
||||
// op := StartPortAllocationOperation(...)
|
||||
// defer op.Finish
|
||||
// ...
|
||||
// write(updatedOwner)
|
||||
/// op.Commit()
|
||||
type portAllocationOperation struct {
|
||||
pa Interface
|
||||
allocated []int
|
||||
releaseDeferred []int
|
||||
shouldRollback bool
|
||||
}
|
||||
|
||||
// Creates a portAllocationOperation, tracking a set of allocations & releases
|
||||
func StartOperation(pa Interface) *portAllocationOperation {
|
||||
op := &portAllocationOperation{}
|
||||
op.pa = pa
|
||||
op.allocated = []int{}
|
||||
op.releaseDeferred = []int{}
|
||||
op.shouldRollback = true
|
||||
return op
|
||||
}
|
||||
|
||||
// Will rollback unless marked as shouldRollback = false by a Commit(). Call from a defer block
|
||||
func (op *portAllocationOperation) Finish() {
|
||||
if op.shouldRollback {
|
||||
op.Rollback()
|
||||
}
|
||||
}
|
||||
|
||||
// (Try to) undo any operations we did
|
||||
func (op *portAllocationOperation) Rollback() []error {
|
||||
errors := []error{}
|
||||
|
||||
for _, allocated := range op.allocated {
|
||||
err := op.pa.Release(allocated)
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errors) == 0 {
|
||||
return nil
|
||||
}
|
||||
return errors
|
||||
}
|
||||
|
||||
// (Try to) perform any deferred operations.
|
||||
// Note that even if this fails, we don't rollback; we always want to err on the side of over-allocation,
|
||||
// and Commit should be called _after_ the owner is written
|
||||
func (op *portAllocationOperation) Commit() []error {
|
||||
errors := []error{}
|
||||
|
||||
for _, release := range op.releaseDeferred {
|
||||
err := op.pa.Release(release)
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Even on error, we don't rollback
|
||||
// Problems should be fixed by an eventual reconciliation / restart
|
||||
op.shouldRollback = false
|
||||
|
||||
if len(errors) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors
|
||||
}
|
||||
|
||||
// Allocates a port, and record it for future rollback
|
||||
func (op *portAllocationOperation) Allocate(port int) error {
|
||||
err := op.pa.Allocate(port)
|
||||
if err == nil {
|
||||
op.allocated = append(op.allocated, port)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Allocates a port, and record it for future rollback
|
||||
func (op *portAllocationOperation) AllocateNext() (int, error) {
|
||||
port, err := op.pa.AllocateNext()
|
||||
if err == nil {
|
||||
op.allocated = append(op.allocated, port)
|
||||
}
|
||||
return port, err
|
||||
}
|
||||
|
||||
// Marks a port so that it will be released if this operation Commits
|
||||
func (op *portAllocationOperation) ReleaseDeferred(port int) {
|
||||
op.releaseDeferred = append(op.releaseDeferred, port)
|
||||
}
|
@ -38,6 +38,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// REST adapts a service registry into apiserver's RESTStorage model.
|
||||
@ -79,6 +80,9 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
|
||||
}
|
||||
}()
|
||||
|
||||
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts)
|
||||
defer nodePortOp.Finish()
|
||||
|
||||
if api.IsServiceIPRequested(service) {
|
||||
// Allocate next available.
|
||||
ip, err := rs.portals.AllocateNext()
|
||||
@ -97,12 +101,37 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
|
||||
releaseServiceIP = true
|
||||
}
|
||||
|
||||
assignNodePorts := shouldAssignNodePorts(service)
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
if servicePort.NodePort != 0 {
|
||||
err := nodePortOp.Allocate(servicePort.NodePort)
|
||||
if err != nil {
|
||||
el := fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("nodePort", servicePort.NodePort, err.Error())}.PrefixIndex(i).Prefix("spec.ports")
|
||||
return nil, errors.NewInvalid("Service", service.Name, el)
|
||||
}
|
||||
} else if assignNodePorts {
|
||||
nodePort, err := nodePortOp.AllocateNext()
|
||||
if err != nil {
|
||||
el := fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("nodePort", servicePort.NodePort, err.Error())}.PrefixIndex(i).Prefix("spec.ports")
|
||||
return nil, errors.NewInvalid("Service", service.Name, el)
|
||||
}
|
||||
servicePort.NodePort = nodePort
|
||||
}
|
||||
}
|
||||
|
||||
out, err := rs.registry.CreateService(ctx, service)
|
||||
if err != nil {
|
||||
err = rest.CheckGeneratedNameError(rest.Services, err, service)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
el := nodePortOp.Commit()
|
||||
if el != nil {
|
||||
// these should be caught by an eventual reconciliation / restart
|
||||
glog.Errorf("error(s) committing service node-ports changes: %v", el)
|
||||
}
|
||||
|
||||
releaseServiceIP = false
|
||||
}
|
||||
|
||||
@ -114,10 +143,25 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = rs.registry.DeleteService(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if api.IsServiceIPSet(service) {
|
||||
rs.portals.Release(net.ParseIP(service.Spec.PortalIP))
|
||||
}
|
||||
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id)
|
||||
|
||||
for _, nodePort := range collectServiceNodePorts(service) {
|
||||
err := rs.serviceNodePorts.Release(nodePort)
|
||||
if err != nil {
|
||||
// these should be caught by an eventual reconciliation / restart
|
||||
glog.Errorf("Error releasing service %s node port %d: %v", service.Name, nodePort, err)
|
||||
}
|
||||
}
|
||||
|
||||
return &api.Status{Status: api.StatusSuccess}, nil
|
||||
}
|
||||
|
||||
func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
||||
@ -173,7 +217,64 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
|
||||
if errs := validation.ValidateServiceUpdate(oldService, service); len(errs) > 0 {
|
||||
return nil, false, errors.NewInvalid("service", service.Name, errs)
|
||||
}
|
||||
|
||||
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts)
|
||||
defer nodePortOp.Finish()
|
||||
|
||||
assignNodePorts := shouldAssignNodePorts(service)
|
||||
|
||||
oldNodePorts := collectServiceNodePorts(oldService)
|
||||
|
||||
newNodePorts := []int{}
|
||||
if assignNodePorts {
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
nodePort := servicePort.NodePort
|
||||
if nodePort != 0 {
|
||||
if !contains(oldNodePorts, nodePort) {
|
||||
err := nodePortOp.Allocate(nodePort)
|
||||
if err != nil {
|
||||
el := fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("nodePort", nodePort, err.Error())}.PrefixIndex(i).Prefix("spec.ports")
|
||||
return nil, false, errors.NewInvalid("Service", service.Name, el)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
nodePort, err = nodePortOp.AllocateNext()
|
||||
if err != nil {
|
||||
el := fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("nodePort", nodePort, err.Error())}.PrefixIndex(i).Prefix("spec.ports")
|
||||
return nil, false, errors.NewInvalid("Service", service.Name, el)
|
||||
}
|
||||
servicePort.NodePort = nodePort
|
||||
}
|
||||
// Detect duplicate node ports; this should have been caught by validation, so we panic
|
||||
if contains(newNodePorts, nodePort) {
|
||||
panic("duplicate node port")
|
||||
}
|
||||
newNodePorts = append(newNodePorts, nodePort)
|
||||
}
|
||||
} else {
|
||||
// Validate should have validated that nodePort == 0
|
||||
}
|
||||
|
||||
// The comparison loops are O(N^2), but we don't expect N to be huge
|
||||
// (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot)
|
||||
for _, oldNodePort := range oldNodePorts {
|
||||
if !contains(newNodePorts, oldNodePort) {
|
||||
continue
|
||||
}
|
||||
nodePortOp.ReleaseDeferred(oldNodePort)
|
||||
}
|
||||
|
||||
out, err := rs.registry.UpdateService(ctx, service)
|
||||
|
||||
if err == nil {
|
||||
el := nodePortOp.Commit()
|
||||
if el != nil {
|
||||
// problems should be fixed by an eventual reconciliation / restart
|
||||
glog.Errorf("error(s) committing NodePorts changes: %v", el)
|
||||
}
|
||||
}
|
||||
|
||||
return out, false, err
|
||||
}
|
||||
|
||||
@ -215,3 +316,41 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou
|
||||
}
|
||||
return nil, nil, fmt.Errorf("no endpoints available for %q", id)
|
||||
}
|
||||
|
||||
// This is O(N), but we expect haystack to be small;
|
||||
// so small that we expect a linear search to be faster
|
||||
func contains(haystack []int, needle int) bool {
|
||||
for _, v := range haystack {
|
||||
if v == needle {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func collectServiceNodePorts(service *api.Service) []int {
|
||||
servicePorts := []int{}
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
if servicePort.NodePort != 0 {
|
||||
servicePorts = append(servicePorts, servicePort.NodePort)
|
||||
}
|
||||
}
|
||||
return servicePorts
|
||||
}
|
||||
|
||||
func shouldAssignNodePorts(service *api.Service) bool {
|
||||
// TODO(justinsb): Switch on service.Spec.Type
|
||||
// switch service.Spec.Type {
|
||||
// case api.ServiceVisibilityLoadBalancer:
|
||||
// return true
|
||||
// case api.ServiceVisibilityNodePort:
|
||||
// return true
|
||||
// case api.ServiceVisibilityCluster:
|
||||
// return false
|
||||
// default:
|
||||
// glog.Errorf("Unknown visibility value: %v", service.Spec.Visibility)
|
||||
// return false
|
||||
// }
|
||||
return false
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user