diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index a70ca1ded1a..7b878d5acfb 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -86,6 +86,7 @@ type APIServer struct { CorsAllowedOriginList util.StringList AllowPrivileged bool PortalNet util.IPNet // TODO: make this a list + ServiceNodePorts util.PortRange EnableLogsSupport bool MasterServiceNamespace string RuntimeConfig util.ConfigurationMap @@ -183,6 +184,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.Var(&s.CorsAllowedOriginList, "cors-allowed-origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") fs.Var(&s.PortalNet, "portal-net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.") + fs.Var(&s.ServiceNodePorts, "service-node-ports", "A port range to reserve for services with NodePort visibility. Example: '30000-32767'. Inclusive at both ends of the range.") fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods") fs.Var(&s.RuntimeConfig, "runtime-config", "A set of key=value pairs that describe runtime configuration that may be passed to the apiserver. api/ key can be used to turn on/off specific api versions. api/all and api/legacy are special keys to control all and legacy api versions respectively.") client.BindKubeletClientConfigFlags(fs, &s.KubeletConfig) diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index ab151c95520..134ec005876 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -854,18 +854,6 @@ func addConversionFuncs() { return err } - typeIn := in.Type - if typeIn == "" { - if in.CreateExternalLoadBalancer { - typeIn = ServiceTypeLoadBalancer - } else { - typeIn = ServiceTypeClusterIP - } - } - if err := s.Convert(&typeIn, &out.Spec.Type, 0); err != nil { - return err - } - return nil }, diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index 8e02bcbf6ba..d3f063de097 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -776,18 +776,6 @@ func addConversionFuncs() { return err } - typeIn := in.Type - if typeIn == "" { - if in.CreateExternalLoadBalancer { - typeIn = ServiceTypeLoadBalancer - } else { - typeIn = ServiceTypeClusterIP - } - } - if err := s.Convert(&typeIn, &out.Spec.Type, 0); err != nil { - return err - } - return nil }, diff --git a/pkg/master/controller.go b/pkg/master/controller.go index cf2b1bf7090..b1232bf2751 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -28,6 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" servicecontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator/controller" + portallocatorcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/portallocator/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" @@ -39,12 +40,16 @@ import ( type Controller struct { NamespaceRegistry namespace.Registry ServiceRegistry service.Registry - ServiceIPRegistry service.IPRegistry + ServiceIPRegistry service.RangeRegistry EndpointRegistry endpoint.Registry PortalNet *net.IPNet // TODO: MasterCount is yucky MasterCount int + ServiceNodePortRegistry service.RangeRegistry + ServiceNodePortInterval time.Duration + ServiceNodePorts util.PortRange + PortalIPInterval time.Duration EndpointInterval time.Duration @@ -68,12 +73,16 @@ func (c *Controller) Start() { return } - repair := servicecontroller.NewRepair(c.PortalIPInterval, c.ServiceRegistry, c.PortalNet, c.ServiceIPRegistry) + repairPortals := servicecontroller.NewRepair(c.PortalIPInterval, c.ServiceRegistry, c.PortalNet, c.ServiceIPRegistry) + repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceRegistry, c.ServiceNodePorts, c.ServiceNodePortRegistry) // run all of the controllers once prior to returning from Start. - if err := repair.RunOnce(); err != nil { + if err := repairPortals.RunOnce(); err != nil { glog.Errorf("Unable to perform initial IP allocation check: %v", err) } + if err := repairNodePorts.RunOnce(); err != nil { + glog.Errorf("Unable to perform initial service nodePort check: %v", err) + } if err := c.UpdateKubernetesService(); err != nil { glog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err) } @@ -81,7 +90,7 @@ func (c *Controller) Start() { glog.Errorf("Unable to perform initial Kubernetes RO service initialization: %v", err) } - c.runner = util.NewRunner(c.RunKubernetesService, c.RunKubernetesROService, repair.RunUntil) + c.runner = util.NewRunner(c.RunKubernetesService, c.RunKubernetesROService, repairPortals.RunUntil, repairNodePorts.RunUntil) c.runner.Start() } diff --git a/pkg/master/master.go b/pkg/master/master.go index 4af27c1191e..9920cd2d4ff 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -63,13 +63,15 @@ import ( resourcequotaetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequota/etcd" secretetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/secret/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + etcdallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/etcd" ipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" - etcdipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator/etcd" serviceaccountetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/serviceaccount/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/ui" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/portallocator" "github.com/emicklei/go-restful" "github.com/emicklei/go-restful/swagger" "github.com/golang/glog" @@ -141,13 +143,17 @@ type Config struct { // The name of the cluster. ClusterName string + + // The range of ports to be assigned to services with visibility=NodePort or greater + ServiceNodePorts util.PortRange } // Master contains state for a Kubernetes cluster master/api server. type Master struct { // "Inputs", Copied from Config - portalNet *net.IPNet - cacheTimeout time.Duration + portalNet *net.IPNet + serviceNodePorts util.PortRange + cacheTimeout time.Duration mux apiserver.Mux muxHelper *apiserver.MuxHelper @@ -188,11 +194,12 @@ type Master struct { // registries are internal client APIs for accessing the storage layer // TODO: define the internal typed interface in a way that clients can // also be replaced - nodeRegistry minion.Registry - namespaceRegistry namespace.Registry - serviceRegistry service.Registry - endpointRegistry endpoint.Registry - portalAllocator service.IPRegistry + nodeRegistry minion.Registry + namespaceRegistry namespace.Registry + serviceRegistry service.Registry + endpointRegistry endpoint.Registry + portalAllocator service.RangeRegistry + serviceNodePortAllocator service.RangeRegistry // "Outputs" Handler http.Handler @@ -226,6 +233,15 @@ func setDefaults(c *Config) { } c.PortalNet = portalNet } + if c.ServiceNodePorts.Size == 0 { + // TODO: Currently no way to specify an empty range (do we need to allow this?) + // We should probably allow this for clouds that don't require NodePort to do load-balancing (GCE) + // but then that breaks the strict nestedness of visibility. + // Review post-v1 + defaultServiceNodePorts := util.PortRange{Base: 30000, Size: 2767} + c.ServiceNodePorts = defaultServiceNodePorts + glog.Infof("Node port range unspecified. Defaulting to %v.", c.ServiceNodePorts) + } if c.MasterCount == 0 { // Clearly, there will be at least one master. c.MasterCount = 1 @@ -260,6 +276,7 @@ func setDefaults(c *Config) { // Certain config fields will be set to a default value if unset, // including: // PortalNet +// ServiceNodePorts // MasterCount // ReadOnlyPort // ReadWritePort @@ -299,6 +316,7 @@ func New(c *Config) *Master { m := &Master{ portalNet: c.PortalNet, + serviceNodePorts: c.ServiceNodePorts, rootWebService: new(restful.WebService), enableCoreControllers: c.EnableCoreControllers, enableLogsSupport: c.EnableLogsSupport, @@ -424,9 +442,23 @@ func (m *Master) init(c *Config) { registry := etcd.NewRegistry(c.EtcdHelper, podRegistry, m.endpointRegistry) m.serviceRegistry = registry - ipAllocator := ipallocator.NewCIDRRange(m.portalNet) - portalAllocator := etcdipallocator.NewEtcd(ipAllocator, c.EtcdHelper) - m.portalAllocator = portalAllocator + var portalRangeRegistry service.RangeRegistry + portalAllocator := ipallocator.NewAllocatorCIDRRange(m.portalNet, func(max int, rangeSpec string) allocator.Interface { + mem := allocator.NewAllocationMap(max, rangeSpec) + etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", c.EtcdHelper) + portalRangeRegistry = etcd + return etcd + }) + m.portalAllocator = portalRangeRegistry + + var serviceNodePortRegistry service.RangeRegistry + serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.serviceNodePorts, func(max int, rangeSpec string) allocator.Interface { + mem := allocator.NewAllocationMap(max, rangeSpec) + etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", "servicenodeportallocation", c.EtcdHelper) + serviceNodePortRegistry = etcd + return etcd + }) + m.serviceNodePortAllocator = serviceNodePortRegistry controllerStorage := controlleretcd.NewREST(c.EtcdHelper) @@ -444,7 +476,7 @@ func (m *Master) init(c *Config) { "podTemplates": podTemplateStorage, "replicationControllers": controllerStorage, - "services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, portalAllocator, c.ClusterName), + "services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, portalAllocator, serviceNodePortAllocator, c.ClusterName), "endpoints": endpointsStorage, "minions": nodeStorage, "minions/status": nodeStatusStorage, @@ -589,8 +621,12 @@ func (m *Master) NewBootstrapController() *Controller { PortalNet: m.portalNet, MasterCount: m.masterCount, - PortalIPInterval: 3 * time.Minute, - EndpointInterval: 10 * time.Second, + ServiceNodePortRegistry: m.serviceNodePortAllocator, + ServiceNodePorts: m.serviceNodePorts, + + ServiceNodePortInterval: 3 * time.Minute, + PortalIPInterval: 3 * time.Minute, + EndpointInterval: 10 * time.Second, PublicIP: m.clusterIP, diff --git a/pkg/registry/service/allocator/bitmap.go b/pkg/registry/service/allocator/bitmap.go new file mode 100644 index 00000000000..f5a50363bfc --- /dev/null +++ b/pkg/registry/service/allocator/bitmap.go @@ -0,0 +1,168 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package allocator + +import ( + "errors" + "math/big" + "math/rand" + "sync" +) + +// AllocationBitmap is a contiguous block of resources that can be allocated atomically. +// +// Each resource has an offset. The internal structure is a bitmap, with a bit for each offset. +// +// If a resource is taken, the bit at that offset is set to one. +// r.count is always equal to the number of set bits and can be recalculated at any time +// by counting the set bits in r.allocated. +// +// TODO: use RLE and compact the allocator to minimize space. +type AllocationBitmap struct { + // strategy is the strategy for choosing the next available item out of the range + strategy allocateStrategy + // max is the maximum size of the usable items in the range + max int + // rangeSpec is the range specifier, matching RangeAllocation.Range + rangeSpec string + + // lock guards the following members + lock sync.Mutex + // count is the number of currently allocated elements in the range + count int + // allocated is a bit array of the allocated items in the range + allocated *big.Int +} + +// AllocationBitmap implements Interface and Snapshottable +var _ Interface = &AllocationBitmap{} +var _ Snapshottable = &AllocationBitmap{} + +// allocateStrategy is a search strategy in the allocation map for a valid item. +type allocateStrategy func(allocated *big.Int, max, count int) (int, bool) + +func NewAllocationMap(max int, rangeSpec string) *AllocationBitmap { + a := AllocationBitmap{ + strategy: randomScanStrategy, + allocated: big.NewInt(0), + count: 0, + max: max, + rangeSpec: rangeSpec, + } + return &a +} + +// Allocate attempts to reserve the provided item. +// Returns true if it was allocated, false if it was already in use +func (r *AllocationBitmap) Allocate(offset int) (bool, error) { + r.lock.Lock() + defer r.lock.Unlock() + + if r.allocated.Bit(offset) == 1 { + return false, nil + } + r.allocated = r.allocated.SetBit(r.allocated, offset, 1) + r.count++ + return true, nil +} + +// AllocateNext reserves one of the items from the pool. +// (0, false, nil) may be returned if there are no items left. +func (r *AllocationBitmap) AllocateNext() (int, bool, error) { + r.lock.Lock() + defer r.lock.Unlock() + + next, ok := r.strategy(r.allocated, r.max, r.count) + if !ok { + return 0, false, nil + } + r.count++ + r.allocated = r.allocated.SetBit(r.allocated, next, 1) + return next, true, nil +} + +// Release releases the item back to the pool. Releasing an +// unallocated item or an item out of the range is a no-op and +// returns no error. +func (r *AllocationBitmap) Release(offset int) error { + r.lock.Lock() + defer r.lock.Unlock() + + if r.allocated.Bit(offset) == 0 { + return nil + } + + r.allocated = r.allocated.SetBit(r.allocated, offset, 0) + r.count-- + return nil +} + +// Has returns true if the provided item is already allocated and a call +// to Allocate(offset) would fail. +func (r *AllocationBitmap) Has(offset int) bool { + r.lock.Lock() + defer r.lock.Unlock() + + return r.allocated.Bit(offset) == 1 +} + +// Free returns the count of items left in the range. +func (r *AllocationBitmap) Free() int { + r.lock.Lock() + defer r.lock.Unlock() + return r.max - r.count +} + +// Snapshot saves the current state of the pool. +func (r *AllocationBitmap) Snapshot() (string, []byte) { + r.lock.Lock() + defer r.lock.Unlock() + + return r.rangeSpec, r.allocated.Bytes() +} + +// Restore restores the pool to the previously captured state. +func (r *AllocationBitmap) Restore(rangeSpec string, data []byte) error { + r.lock.Lock() + defer r.lock.Unlock() + + if r.rangeSpec != rangeSpec { + return errors.New("the provided range does not match the current range") + } + + r.allocated = big.NewInt(0).SetBytes(data) + r.count = countBits(r.allocated) + + return nil +} + +// randomScanStrategy chooses a random address from the provided big.Int, and then +// scans forward looking for the next available address (it will wrap the range if +// necessary). +func randomScanStrategy(allocated *big.Int, max, count int) (int, bool) { + if count >= max { + return 0, false + } + offset := rand.Intn(max) + for i := 0; i < max; i++ { + at := (offset + i) % max + if allocated.Bit(at) == 0 { + return at, true + } + } + return 0, false +} diff --git a/pkg/registry/service/allocator/etcd/etcd.go b/pkg/registry/service/allocator/etcd/etcd.go new file mode 100644 index 00000000000..c2460405d64 --- /dev/null +++ b/pkg/registry/service/allocator/etcd/etcd.go @@ -0,0 +1,239 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "errors" + "fmt" + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + k8serr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" +) + +var ( + errorUnableToAllocate = errors.New("unable to allocate") +) + +// Etcd exposes a service.Allocator that is backed by etcd. +// TODO: allow multiple allocations to be tried at once +// TODO: subdivide the keyspace to reduce conflicts +// TODO: investigate issuing a CAS without reading first +type Etcd struct { + lock sync.Mutex + + alloc allocator.Snapshottable + helper tools.EtcdHelper + last string + + baseKey string + kind string +} + +// Etcd implements allocator.Interface and service.RangeRegistry +var _ allocator.Interface = &Etcd{} +var _ service.RangeRegistry = &Etcd{} + +// NewEtcd returns an allocator that is backed by Etcd and can manage +// persisting the snapshot state of allocation after each allocation is made. +func NewEtcd(alloc allocator.Snapshottable, baseKey string, kind string, helper tools.EtcdHelper) *Etcd { + return &Etcd{ + alloc: alloc, + helper: helper, + baseKey: baseKey, + kind: kind, + } +} + +// Allocate attempts to allocate the item locally and then in etcd. +func (e *Etcd) Allocate(offset int) (bool, error) { + e.lock.Lock() + defer e.lock.Unlock() + + ok, err := e.alloc.Allocate(offset) + if !ok || err != nil { + return ok, err + } + + err = e.tryUpdate(func() error { + ok, err := e.alloc.Allocate(offset) + if err != nil { + return err + } + if !ok { + return errorUnableToAllocate + } + return nil + }) + if err != nil { + if err == errorUnableToAllocate { + return false, nil + } + return false, err + } + return true, nil +} + +// AllocateNext attempts to allocate the next item locally and then in etcd. +func (e *Etcd) AllocateNext() (int, bool, error) { + e.lock.Lock() + defer e.lock.Unlock() + + offset, ok, err := e.alloc.AllocateNext() + if !ok || err != nil { + return offset, ok, err + } + + err = e.tryUpdate(func() error { + ok, err := e.alloc.Allocate(offset) + if err != nil { + return err + } + if !ok { + // update the offset here + offset, ok, err = e.alloc.AllocateNext() + if err != nil { + return err + } + if !ok { + return errorUnableToAllocate + } + return nil + } + return nil + }) + return offset, ok, err +} + +// Release attempts to release the provided item locally and then in etcd. +func (e *Etcd) Release(item int) error { + e.lock.Lock() + defer e.lock.Unlock() + + if err := e.alloc.Release(item); err != nil { + return err + } + + return e.tryUpdate(func() error { + return e.alloc.Release(item) + }) +} + +// tryUpdate performs a read-update to persist the latest snapshot state of allocation. +func (e *Etcd) tryUpdate(fn func() error) error { + err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true, + func(input runtime.Object) (output runtime.Object, ttl uint64, err error) { + existing := input.(*api.RangeAllocation) + if len(existing.ResourceVersion) == 0 { + return nil, 0, fmt.Errorf("cannot allocate resources of type %s at this time", e.kind) + } + if existing.ResourceVersion != e.last { + if err := e.alloc.Restore(existing.Range, existing.Data); err != nil { + return nil, 0, err + } + if err := fn(); err != nil { + return nil, 0, err + } + } + e.last = existing.ResourceVersion + rangeSpec, data := e.alloc.Snapshot() + existing.Range = rangeSpec + existing.Data = data + return existing, 0, nil + }, + ) + return etcderr.InterpretUpdateError(err, e.kind, "") +} + +// Refresh reloads the RangeAllocation from etcd. +func (e *Etcd) Refresh() (*api.RangeAllocation, error) { + e.lock.Lock() + defer e.lock.Unlock() + + existing := &api.RangeAllocation{} + if err := e.helper.ExtractObj(e.baseKey, existing, false); err != nil { + if tools.IsEtcdNotFound(err) { + return nil, nil + } + return nil, etcderr.InterpretGetError(err, e.kind, "") + } + + return existing, nil +} + +// Get returns an api.RangeAllocation that represents the current state in +// etcd. If the key does not exist, the object will have an empty ResourceVersion. +func (e *Etcd) Get() (*api.RangeAllocation, error) { + existing := &api.RangeAllocation{} + if err := e.helper.ExtractObj(e.baseKey, existing, true); err != nil { + return nil, etcderr.InterpretGetError(err, e.kind, "") + } + return existing, nil +} + +// CreateOrUpdate attempts to update the current etcd state with the provided +// allocation. +func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error { + e.lock.Lock() + defer e.lock.Unlock() + + last := "" + err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true, + func(input runtime.Object) (output runtime.Object, ttl uint64, err error) { + existing := input.(*api.RangeAllocation) + switch { + case len(snapshot.ResourceVersion) != 0 && len(existing.ResourceVersion) != 0: + if snapshot.ResourceVersion != existing.ResourceVersion { + return nil, 0, k8serr.NewConflict(e.kind, "", fmt.Errorf("the provided resource version does not match")) + } + case len(existing.ResourceVersion) != 0: + return nil, 0, k8serr.NewConflict(e.kind, "", fmt.Errorf("another caller has already initialized the resource")) + } + last = snapshot.ResourceVersion + return snapshot, 0, nil + }, + ) + if err != nil { + return etcderr.InterpretUpdateError(err, e.kind, "") + } + err = e.alloc.Restore(snapshot.Range, snapshot.Data) + if err == nil { + e.last = last + } + return err +} + +// Implements allocator.Interface::Has +func (e *Etcd) Has(item int) bool { + e.lock.Lock() + defer e.lock.Unlock() + + return e.alloc.Has(item) +} + +// Implements allocator.Interface::Free +func (e *Etcd) Free() int { + e.lock.Lock() + defer e.lock.Unlock() + + return e.alloc.Free() +} diff --git a/pkg/registry/service/allocator/etcd/etcd_test.go b/pkg/registry/service/allocator/etcd/etcd_test.go new file mode 100644 index 00000000000..03612f647c9 --- /dev/null +++ b/pkg/registry/service/allocator/etcd/etcd_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "testing" + + "github.com/coreos/go-etcd/etcd" + + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" +) + +func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { + fakeEtcdClient := tools.NewFakeEtcdClient(t) + fakeEtcdClient.TestIndex = true + helper := tools.NewEtcdHelper(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) + return fakeEtcdClient, helper +} + +func newStorage(t *testing.T) (*Etcd, allocator.Interface, *tools.FakeEtcdClient) { + fakeEtcdClient, h := newHelper(t) + + mem := allocator.NewAllocationMap(100, "rangeSpecValue") + etcd := NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", h) + + return etcd, mem, fakeEtcdClient +} + +func key() string { + s := "/ranges/serviceips" + return etcdtest.AddPrefix(s) +} + +func TestEmpty(t *testing.T) { + storage, _, ecli := newStorage(t) + ecli.ExpectNotFoundGet(key()) + if _, err := storage.Allocate(1); fmt.Sprintf("%v", err) != "cannot allocate resources of type serviceipallocation at this time" { + t.Fatal(err) + } +} + +func initialObject(ecli *tools.FakeEtcdClient) { + ecli.Data[key()] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + CreatedIndex: 1, + ModifiedIndex: 2, + Value: runtime.EncodeOrDie(testapi.Codec(), &api.RangeAllocation{ + Range: "rangeSpecValue", + }), + }, + }, + E: nil, + } +} + +func TestStore(t *testing.T) { + storage, backing, ecli := newStorage(t) + initialObject(ecli) + + if _, err := storage.Allocate(2); err != nil { + t.Fatal(err) + } + ok, err := backing.Allocate(2) + if err != nil { + t.Fatal(err) + } + if ok { + t.Fatal("Expected backing allocation to fail") + } + if ok, err := storage.Allocate(2); ok || err != nil { + t.Fatal("Expected allocation to fail") + } + + obj := ecli.Data[key()] + if obj.R == nil || obj.R.Node == nil { + t.Fatalf("%s is empty: %#v", key(), obj) + } + t.Logf("data: %#v", obj.R.Node) + + other := allocator.NewAllocationMap(100, "rangeSpecValue") + + allocation := &api.RangeAllocation{} + if err := storage.helper.ExtractObj(key(), allocation, false); err != nil { + t.Fatal(err) + } + if allocation.ResourceVersion != "1" { + t.Fatalf("%#v", allocation) + } + if allocation.Range != "rangeSpecValue" { + t.Errorf("unexpected stored Range: %s", allocation.Range) + } + if err := other.Restore("rangeSpecValue", allocation.Data); err != nil { + t.Fatal(err) + } + if !other.Has(2) { + t.Fatalf("could not restore allocated IP: %#v", other) + } + + other = allocator.NewAllocationMap(100, "rangeSpecValue") + otherStorage := NewEtcd(other, "/ranges/serviceips", "serviceipallocation", storage.helper) + if ok, err := otherStorage.Allocate(2); ok || err != nil { + t.Fatal(err) + } +} diff --git a/pkg/registry/service/allocator/interfaces.go b/pkg/registry/service/allocator/interfaces.go new file mode 100644 index 00000000000..9d44090989b --- /dev/null +++ b/pkg/registry/service/allocator/interfaces.go @@ -0,0 +1,41 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package allocator + +// Interface manages the allocation of items out of a range. Interface +// should be threadsafe. +type Interface interface { + Allocate(int) (bool, error) + AllocateNext() (int, bool, error) + Release(int) error + + // For testing + Has(int) bool + + // For testing + Free() int +} + +// Snapshottable is an Interface that can be snapshotted and restored. Snapshottable +// should be threadsafe. +type Snapshottable interface { + Interface + Snapshot() (string, []byte) + Restore(string, []byte) error +} + +type AllocatorFactory func(max int, rangeSpec string) Interface diff --git a/pkg/registry/service/allocator/utils.go b/pkg/registry/service/allocator/utils.go new file mode 100644 index 00000000000..fc7cff70e2b --- /dev/null +++ b/pkg/registry/service/allocator/utils.go @@ -0,0 +1,64 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package allocator + +import "math/big" + +// countBits returns the number of set bits in n +func countBits(n *big.Int) int { + var count int = 0 + for _, b := range n.Bytes() { + count += int(bitCounts[b]) + } + return count +} + +// bitCounts is all of the bits counted for each number between 0-255 +var bitCounts = []int8{ + 0, 1, 1, 2, 1, 2, 2, 3, + 1, 2, 2, 3, 2, 3, 3, 4, + 1, 2, 2, 3, 2, 3, 3, 4, + 2, 3, 3, 4, 3, 4, 4, 5, + 1, 2, 2, 3, 2, 3, 3, 4, + 2, 3, 3, 4, 3, 4, 4, 5, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 1, 2, 2, 3, 2, 3, 3, 4, + 2, 3, 3, 4, 3, 4, 4, 5, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, + 1, 2, 2, 3, 2, 3, 3, 4, + 2, 3, 3, 4, 3, 4, 4, 5, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, + 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, + 4, 5, 5, 6, 5, 6, 6, 7, + 5, 6, 6, 7, 6, 7, 7, 8, +} diff --git a/pkg/registry/service/allocator/utils_test.go b/pkg/registry/service/allocator/utils_test.go new file mode 100644 index 00000000000..fee824f0340 --- /dev/null +++ b/pkg/registry/service/allocator/utils_test.go @@ -0,0 +1,33 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package allocator + +import "testing" + +func TestBitCount(t *testing.T) { + for i, c := range bitCounts { + actual := 0 + for j := 0; j < 8; j++ { + if ((1 << uint(j)) & i) != 0 { + actual++ + } + } + if actual != int(c) { + t.Errorf("%d should have %d bits but recorded as %d", i, actual, c) + } + } +} diff --git a/pkg/registry/service/ipallocator/allocator.go b/pkg/registry/service/ipallocator/allocator.go index 47623d1c881..40f12575da5 100644 --- a/pkg/registry/service/ipallocator/allocator.go +++ b/pkg/registry/service/ipallocator/allocator.go @@ -19,10 +19,10 @@ package ipallocator import ( "errors" "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator" "math/big" - "math/rand" "net" - "sync" ) // Interface manages the allocation of IP addresses out of a range. Interface @@ -33,20 +33,11 @@ type Interface interface { Release(net.IP) error } -// Snapshottable is an Interface that can be snapshotted and restored. Snapshottable -// should be threadsafe. -type Snapshottable interface { - Interface - Snapshot() (*net.IPNet, []byte) - Restore(*net.IPNet, []byte) error -} - var ( - ErrFull = errors.New("range is full") - ErrNotInRange = errors.New("provided IP is not in the valid range") - ErrAllocated = errors.New("provided IP is already allocated") - ErrMismatchedNetwork = errors.New("the provided network does not match the current range") - ErrAllocationDisabled = errors.New("IP addresses cannot be allocated at this time") + ErrFull = errors.New("range is full") + ErrNotInRange = errors.New("provided IP is not in the valid range") + ErrAllocated = errors.New("provided IP is already allocated") + ErrMismatchedNetwork = errors.New("the provided network does not match the current range") ) // Range is a contiguous block of IPs that can be allocated atomically. @@ -64,52 +55,39 @@ var ( // | | // r.base r.base + r.max // | | -// first bit of r.allocated last bit of r.allocated -// -// If an address is taken, the bit at offset: -// -// bit offset := IP - r.base -// -// is set to one. r.count is always equal to the number of set bits and -// can be recalculated at any time by counting the set bits in r.allocated. -// -// TODO: use RLE and compact the allocator to minimize space. +// offset #0 of r.allocated last offset of r.allocated type Range struct { net *net.IPNet // base is a cached version of the start IP in the CIDR range as a *big.Int base *big.Int - // strategy is the strategy for choosing the next available IP out of the range - strategy allocateStrategy // max is the maximum size of the usable addresses in the range max int - // lock guards the following members - lock sync.Mutex - // count is the number of currently allocated elements in the range - count int - // allocated is a bit array of the allocated ips in the range - allocated *big.Int + alloc allocator.Interface } -// allocateStrategy is a search strategy in the allocation map for a valid IP. -type allocateStrategy func(allocated *big.Int, max, count int) (int, error) - -// NewCIDRRange creates a Range over a net.IPNet. -func NewCIDRRange(cidr *net.IPNet) *Range { +// NewAllocatorCIDRRange creates a Range over a net.IPNet, calling allocatorFactory to construct the backing store. +func NewAllocatorCIDRRange(cidr *net.IPNet, allocatorFactory allocator.AllocatorFactory) *Range { max := RangeSize(cidr) base := bigForIP(cidr.IP) - r := Range{ - net: cidr, - strategy: randomScanStrategy, - base: base.Add(base, big.NewInt(1)), // don't use the network base - max: maximum(0, int(max-2)), // don't use the network broadcast + rangeSpec := cidr.String() - allocated: big.NewInt(0), - count: 0, + r := Range{ + net: cidr, + base: base.Add(base, big.NewInt(1)), // don't use the network base + max: maximum(0, int(max-2)), // don't use the network broadcast, } + r.alloc = allocatorFactory(r.max, rangeSpec) return &r } +// Helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store. +func NewCIDRRange(cidr *net.IPNet) *Range { + return NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) allocator.Interface { + return allocator.NewAllocationMap(max, rangeSpec) + }) +} + func maximum(a, b int) int { if a > b { return a @@ -119,9 +97,7 @@ func maximum(a, b int) int { // Free returns the count of IP addresses left in the range. func (r *Range) Free() int { - r.lock.Lock() - defer r.lock.Unlock() - return r.max - r.count + return r.alloc.Free() } // Allocate attempts to reserve the provided IP. ErrNotInRange or @@ -134,30 +110,27 @@ func (r *Range) Allocate(ip net.IP) error { return ErrNotInRange } - r.lock.Lock() - defer r.lock.Unlock() - - if r.allocated.Bit(offset) == 1 { + allocated, err := r.alloc.Allocate(offset) + if err != nil { + return err + } + if !allocated { return ErrAllocated } - r.allocated = r.allocated.SetBit(r.allocated, offset, 1) - r.count++ return nil } // AllocateNext reserves one of the IPs from the pool. ErrFull may // be returned if there are no addresses left. func (r *Range) AllocateNext() (net.IP, error) { - r.lock.Lock() - defer r.lock.Unlock() - - next, err := r.strategy(r.allocated, r.max, r.count) + offset, ok, err := r.alloc.AllocateNext() if err != nil { return nil, err } - r.count++ - r.allocated = r.allocated.SetBit(r.allocated, next, 1) - return addIPOffset(r.base, next), nil + if !ok { + return nil, ErrFull + } + return addIPOffset(r.base, offset), nil } // Release releases the IP back to the pool. Releasing an @@ -169,16 +142,7 @@ func (r *Range) Release(ip net.IP) error { return nil } - r.lock.Lock() - defer r.lock.Unlock() - - if r.allocated.Bit(offset) == 0 { - return nil - } - - r.allocated = r.allocated.SetBit(r.allocated, offset, 0) - r.count-- - return nil + return r.alloc.Release(offset) } // Has returns true if the provided IP is already allocated and a call @@ -189,31 +153,32 @@ func (r *Range) Has(ip net.IP) bool { return false } - r.lock.Lock() - defer r.lock.Unlock() - - return r.allocated.Bit(offset) == 1 + return r.alloc.Has(offset) } // Snapshot saves the current state of the pool. -func (r *Range) Snapshot() (*net.IPNet, []byte) { - r.lock.Lock() - defer r.lock.Unlock() - - return r.net, r.allocated.Bytes() +func (r *Range) Snapshot(dst *api.RangeAllocation) error { + snapshottable, ok := r.alloc.(allocator.Snapshottable) + if !ok { + return fmt.Errorf("not a snapshottable allocator") + } + rangeString, data := snapshottable.Snapshot() + dst.Range = rangeString + dst.Data = data + return nil } // Restore restores the pool to the previously captured state. ErrMismatchedNetwork // is returned if the provided IPNet range doesn't exactly match the previous range. func (r *Range) Restore(net *net.IPNet, data []byte) error { - r.lock.Lock() - defer r.lock.Unlock() - if !net.IP.Equal(r.net.IP) || net.Mask.String() != r.net.Mask.String() { return ErrMismatchedNetwork } - r.allocated = big.NewInt(0).SetBytes(data) - r.count = countBits(r.allocated) + snapshottable, ok := r.alloc.(allocator.Snapshottable) + if !ok { + return fmt.Errorf("not a snapshottable allocator") + } + snapshottable.Restore(net.String(), data) return nil } @@ -252,68 +217,6 @@ func calculateIPOffset(base *big.Int, ip net.IP) int { return int(big.NewInt(0).Sub(bigForIP(ip), base).Int64()) } -// randomScanStrategy chooses a random address from the provided big.Int, and then -// scans forward looking for the next available address (it will wrap the range if -// necessary). -func randomScanStrategy(allocated *big.Int, max, count int) (int, error) { - if count >= max { - return 0, ErrFull - } - offset := rand.Intn(max) - for i := 0; i < max; i++ { - at := (offset + i) % max - if allocated.Bit(at) == 0 { - return at, nil - } - } - return 0, ErrFull -} - -// countBits returns the number of set bits in n -func countBits(n *big.Int) int { - var count int = 0 - for _, b := range n.Bytes() { - count += int(bitCounts[b]) - } - return count -} - -// bitCounts is all of the bits counted for each number between 0-255 -var bitCounts = []int8{ - 0, 1, 1, 2, 1, 2, 2, 3, - 1, 2, 2, 3, 2, 3, 3, 4, - 1, 2, 2, 3, 2, 3, 3, 4, - 2, 3, 3, 4, 3, 4, 4, 5, - 1, 2, 2, 3, 2, 3, 3, 4, - 2, 3, 3, 4, 3, 4, 4, 5, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 1, 2, 2, 3, 2, 3, 3, 4, - 2, 3, 3, 4, 3, 4, 4, 5, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 3, 4, 4, 5, 4, 5, 5, 6, - 4, 5, 5, 6, 5, 6, 6, 7, - 1, 2, 2, 3, 2, 3, 3, 4, - 2, 3, 3, 4, 3, 4, 4, 5, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 3, 4, 4, 5, 4, 5, 5, 6, - 4, 5, 5, 6, 5, 6, 6, 7, - 2, 3, 3, 4, 3, 4, 4, 5, - 3, 4, 4, 5, 4, 5, 5, 6, - 3, 4, 4, 5, 4, 5, 5, 6, - 4, 5, 5, 6, 5, 6, 6, 7, - 3, 4, 4, 5, 4, 5, 5, 6, - 4, 5, 5, 6, 5, 6, 6, 7, - 4, 5, 5, 6, 5, 6, 6, 7, - 5, 6, 6, 7, 6, 7, 7, 8, -} - // RangeSize returns the size of a range in valid addresses. func RangeSize(subnet *net.IPNet) int64 { ones, bits := subnet.Mask.Size() diff --git a/pkg/registry/service/ipallocator/allocator_test.go b/pkg/registry/service/ipallocator/allocator_test.go index b83903432bd..39c54f5744b 100644 --- a/pkg/registry/service/ipallocator/allocator_test.go +++ b/pkg/registry/service/ipallocator/allocator_test.go @@ -20,6 +20,7 @@ import ( "net" "testing" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -142,27 +143,13 @@ func TestAllocateSmall(t *testing.T) { } } - if r.count != 2 && r.Free() != 0 && r.max != 2 { + if r.Free() != 0 && r.max != 2 { t.Fatalf("unexpected range: %v", r) } t.Logf("allocated: %v", found) } -func TestBitCount(t *testing.T) { - for i, c := range bitCounts { - actual := 0 - for j := 0; j < 8; j++ { - if ((1 << uint(j)) & i) != 0 { - actual++ - } - } - if actual != int(c) { - t.Errorf("%d should have %d bits but recorded as %d", i, actual, c) - } - } -} - func TestRangeSize(t *testing.T) { testCases := map[string]int64{ "192.168.1.0/24": 256, @@ -195,7 +182,17 @@ func TestSnapshot(t *testing.T) { ip = append(ip, n) } - network, data := r.Snapshot() + var dst api.RangeAllocation + err = r.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } + + _, network, err := net.ParseCIDR(dst.Range) + if err != nil { + t.Fatal(err) + } + if !network.IP.Equal(cidr.IP) || network.Mask.String() != cidr.Mask.String() { t.Fatalf("mismatched networks: %s : %s", network, cidr) } @@ -205,11 +202,11 @@ func TestSnapshot(t *testing.T) { t.Fatal(err) } other := NewCIDRRange(otherCidr) - if err := r.Restore(otherCidr, data); err != ErrMismatchedNetwork { + if err := r.Restore(otherCidr, dst.Data); err != ErrMismatchedNetwork { t.Fatal(err) } other = NewCIDRRange(network) - if err := other.Restore(network, data); err != nil { + if err := other.Restore(network, dst.Data); err != nil { t.Fatal(err) } diff --git a/pkg/registry/service/ipallocator/controller/repair.go b/pkg/registry/service/ipallocator/controller/repair.go index 29ef7e7ff22..451f3f32d69 100644 --- a/pkg/registry/service/ipallocator/controller/repair.go +++ b/pkg/registry/service/ipallocator/controller/repair.go @@ -46,12 +46,12 @@ type Repair struct { interval time.Duration registry service.Registry network *net.IPNet - alloc service.IPRegistry + alloc service.RangeRegistry } // NewRepair creates a controller that periodically ensures that all portalIPs are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. -func NewRepair(interval time.Duration, registry service.Registry, network *net.IPNet, alloc service.IPRegistry) *Repair { +func NewRepair(interval time.Duration, registry service.Registry, network *net.IPNet, alloc service.RangeRegistry) *Repair { return &Repair{ interval: interval, registry: registry, @@ -71,6 +71,13 @@ func (c *Repair) RunUntil(ch chan struct{}) { // RunOnce verifies the state of the portal IP allocations and returns an error if an unrecoverable problem occurs. func (c *Repair) RunOnce() error { + // TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read, + // or if they are executed against different leaders, + // the ordering guarantee required to ensure no IP is allocated twice is violated. + // ListServices must return a ResourceVersion higher than the etcd index Get triggers, + // and the release code must not release services that have had IPs allocated but not yet been created + // See #8295 + latest, err := c.alloc.Get() if err != nil { return fmt.Errorf("unable to refresh the service IP block: %v", err) @@ -111,7 +118,10 @@ func (c *Repair) RunOnce() error { } } - service.SnapshotRange(latest, r) + err = r.Snapshot(latest) + if err != nil { + return fmt.Errorf("unable to persist the updated service IP allocations: %v", err) + } if err := c.alloc.CreateOrUpdate(latest); err != nil { return fmt.Errorf("unable to persist the updated service IP allocations: %v", err) diff --git a/pkg/registry/service/ipallocator/controller/repair_test.go b/pkg/registry/service/ipallocator/controller/repair_test.go index 8a59e802ff8..ff1ea660c57 100644 --- a/pkg/registry/service/ipallocator/controller/repair_test.go +++ b/pkg/registry/service/ipallocator/controller/repair_test.go @@ -27,7 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" ) -type mockIPRegistry struct { +type mockRangeRegistry struct { getCalled bool item *api.RangeAllocation err error @@ -37,12 +37,12 @@ type mockIPRegistry struct { updateErr error } -func (r *mockIPRegistry) Get() (*api.RangeAllocation, error) { +func (r *mockRangeRegistry) Get() (*api.RangeAllocation, error) { r.getCalled = true return r.item, r.err } -func (r *mockIPRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error { +func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error { r.updateCalled = true r.updated = alloc return r.updateErr @@ -51,7 +51,7 @@ func (r *mockIPRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error { func TestRepair(t *testing.T) { registry := registrytest.NewServiceRegistry() _, cidr, _ := net.ParseCIDR("192.168.1.0/24") - ipregistry := &mockIPRegistry{ + ipregistry := &mockRangeRegistry{ item: &api.RangeAllocation{}, } r := NewRepair(0, registry, cidr, ipregistry) @@ -63,7 +63,7 @@ func TestRepair(t *testing.T) { t.Errorf("unexpected ipregistry: %#v", ipregistry) } - ipregistry = &mockIPRegistry{ + ipregistry = &mockRangeRegistry{ item: &api.RangeAllocation{}, updateErr: fmt.Errorf("test error"), } @@ -77,16 +77,21 @@ func TestRepairEmpty(t *testing.T) { _, cidr, _ := net.ParseCIDR("192.168.1.0/24") previous := ipallocator.NewCIDRRange(cidr) previous.Allocate(net.ParseIP("192.168.1.10")) - network, data := previous.Snapshot() + + var dst api.RangeAllocation + err := previous.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } registry := registrytest.NewServiceRegistry() - ipregistry := &mockIPRegistry{ + ipregistry := &mockRangeRegistry{ item: &api.RangeAllocation{ ObjectMeta: api.ObjectMeta{ ResourceVersion: "1", }, - Range: network.String(), - Data: data, + Range: dst.Range, + Data: dst.Data, }, } r := NewRepair(0, registry, cidr, ipregistry) @@ -105,7 +110,13 @@ func TestRepairEmpty(t *testing.T) { func TestRepairWithExisting(t *testing.T) { _, cidr, _ := net.ParseCIDR("192.168.1.0/24") previous := ipallocator.NewCIDRRange(cidr) - network, data := previous.Snapshot() + + var dst api.RangeAllocation + err := previous.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } + registry := registrytest.NewServiceRegistry() registry.List = api.ServiceList{ Items: []api.Service{ @@ -130,13 +141,13 @@ func TestRepairWithExisting(t *testing.T) { }, } - ipregistry := &mockIPRegistry{ + ipregistry := &mockRangeRegistry{ item: &api.RangeAllocation{ ObjectMeta: api.ObjectMeta{ ResourceVersion: "1", }, - Range: network.String(), - Data: data, + Range: dst.Range, + Data: dst.Data, }, } r := NewRepair(0, registry, cidr, ipregistry) diff --git a/pkg/registry/service/ipallocator/etcd/etcd.go b/pkg/registry/service/ipallocator/etcd/etcd.go index a423f8bbd45..35118afbdf0 100644 --- a/pkg/registry/service/ipallocator/etcd/etcd.go +++ b/pkg/registry/service/ipallocator/etcd/etcd.go @@ -16,179 +16,4 @@ limitations under the License. package etcd -import ( - "fmt" - "net" - "sync" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" -) - -// Etcd exposes a service.Allocator that is backed by etcd. -// TODO: allow multiple allocations to be tried at once -// TODO: subdivide the keyspace to reduce conflicts -// TODO: investigate issuing a CAS without reading first -type Etcd struct { - lock sync.Mutex - - alloc ipallocator.Snapshottable - helper tools.EtcdHelper - last string -} - -// Etcd implements ipallocator.Interface and service.IPRegistry -var _ ipallocator.Interface = &Etcd{} -var _ service.IPRegistry = &Etcd{} - -const baseKey = "/ranges/serviceips" - -// NewEtcd returns a service PortalIP ipallocator that is backed by Etcd and can manage -// persisting the snapshot state of allocation after each allocation is made. -func NewEtcd(alloc ipallocator.Snapshottable, helper tools.EtcdHelper) *Etcd { - return &Etcd{ - alloc: alloc, - helper: helper, - } -} - -// Allocate attempts to allocate the IP locally and then in etcd. -func (e *Etcd) Allocate(ip net.IP) error { - e.lock.Lock() - defer e.lock.Unlock() - - if err := e.alloc.Allocate(ip); err != nil { - return err - } - - return e.tryUpdate(func() error { - return e.alloc.Allocate(ip) - }) -} - -// AllocateNext attempts to allocate the next IP locally and then in etcd. -func (e *Etcd) AllocateNext() (net.IP, error) { - e.lock.Lock() - defer e.lock.Unlock() - - ip, err := e.alloc.AllocateNext() - if err != nil { - return nil, err - } - - err = e.tryUpdate(func() error { - if err := e.alloc.Allocate(ip); err != nil { - if err != ipallocator.ErrAllocated { - return err - } - // update the ip here - ip, err = e.alloc.AllocateNext() - if err != nil { - return err - } - } - return nil - }) - return ip, err -} - -// Release attempts to release the provided IP locally and then in etcd. -func (e *Etcd) Release(ip net.IP) error { - e.lock.Lock() - defer e.lock.Unlock() - - if err := e.alloc.Release(ip); err != nil { - return err - } - - return e.tryUpdate(func() error { - return e.alloc.Release(ip) - }) -} - -// tryUpdate performs a read-update to persist the latest snapshot state of allocation. -func (e *Etcd) tryUpdate(fn func() error) error { - err := e.helper.GuaranteedUpdate(baseKey, &api.RangeAllocation{}, true, - func(input runtime.Object) (output runtime.Object, ttl uint64, err error) { - existing := input.(*api.RangeAllocation) - if len(existing.ResourceVersion) == 0 { - return nil, 0, ipallocator.ErrAllocationDisabled - } - if existing.ResourceVersion != e.last { - if err := service.RestoreRange(e.alloc, existing); err != nil { - return nil, 0, err - } - if err := fn(); err != nil { - return nil, 0, err - } - } - e.last = existing.ResourceVersion - service.SnapshotRange(existing, e.alloc) - return existing, 0, nil - }, - ) - return etcderr.InterpretUpdateError(err, "serviceipallocation", "") -} - -// Refresh reloads the ipallocator from etcd. -func (e *Etcd) Refresh() error { - e.lock.Lock() - defer e.lock.Unlock() - - existing := &api.RangeAllocation{} - if err := e.helper.ExtractObj(baseKey, existing, false); err != nil { - if tools.IsEtcdNotFound(err) { - return ipallocator.ErrAllocationDisabled - } - return etcderr.InterpretGetError(err, "serviceipallocation", "") - } - - return service.RestoreRange(e.alloc, existing) -} - -// Get returns an api.RangeAllocation that represents the current state in -// etcd. If the key does not exist, the object will have an empty ResourceVersion. -func (e *Etcd) Get() (*api.RangeAllocation, error) { - existing := &api.RangeAllocation{} - if err := e.helper.ExtractObj(baseKey, existing, true); err != nil { - return nil, etcderr.InterpretGetError(err, "serviceipallocation", "") - } - return existing, nil -} - -// CreateOrUpdate attempts to update the current etcd state with the provided -// allocation. -func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error { - e.lock.Lock() - defer e.lock.Unlock() - - last := "" - err := e.helper.GuaranteedUpdate(baseKey, &api.RangeAllocation{}, true, - func(input runtime.Object) (output runtime.Object, ttl uint64, err error) { - existing := input.(*api.RangeAllocation) - switch { - case len(snapshot.ResourceVersion) != 0 && len(existing.ResourceVersion) != 0: - if snapshot.ResourceVersion != existing.ResourceVersion { - return nil, 0, errors.NewConflict("serviceipallocation", "", fmt.Errorf("the provided resource version does not match")) - } - case len(existing.ResourceVersion) != 0: - return nil, 0, errors.NewConflict("serviceipallocation", "", fmt.Errorf("another caller has already initialized the resource")) - } - last = snapshot.ResourceVersion - return snapshot, 0, nil - }, - ) - if err != nil { - return etcderr.InterpretUpdateError(err, "serviceipallocation", "") - } - err = service.RestoreRange(e.alloc, snapshot) - if err == nil { - e.last = last - } - return err -} +// Keep CI happy; it is unhappy if a directory only contains tests diff --git a/pkg/registry/service/ipallocator/etcd/etcd_test.go b/pkg/registry/service/ipallocator/etcd/etcd_test.go index 1721ef041f7..7c9b1427bc5 100644 --- a/pkg/registry/service/ipallocator/etcd/etcd_test.go +++ b/pkg/registry/service/ipallocator/etcd/etcd_test.go @@ -22,8 +22,11 @@ import ( "github.com/coreos/go-etcd/etcd" + "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator" + allocator_etcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" @@ -37,15 +40,22 @@ func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { return fakeEtcdClient, helper } -func newStorage(t *testing.T) (ipallocator.Interface, *ipallocator.Range, *tools.FakeEtcdClient) { +func newStorage(t *testing.T) (ipallocator.Interface, allocator.Interface, *tools.FakeEtcdClient) { fakeEtcdClient, h := newHelper(t) _, cidr, err := net.ParseCIDR("192.168.1.0/24") if err != nil { t.Fatal(err) } - r := ipallocator.NewCIDRRange(cidr) - storage := NewEtcd(r, h) - return storage, r, fakeEtcdClient + + var backing allocator.Interface + storage := ipallocator.NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) allocator.Interface { + mem := allocator.NewAllocationMap(max, rangeSpec) + backing = mem + etcd := allocator_etcd.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", h) + return etcd + }) + + return storage, backing, fakeEtcdClient } func key() string { @@ -56,7 +66,7 @@ func key() string { func TestEmpty(t *testing.T) { storage, _, ecli := newStorage(t) ecli.ExpectNotFoundGet(key()) - if err := storage.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocationDisabled { + if err := storage.Allocate(net.ParseIP("192.168.1.2")); fmt.Sprintf("%v", err) != "cannot allocate resources of type serviceipallocation at this time" { t.Fatal(err) } } @@ -85,17 +95,19 @@ func initialObject(ecli *tools.FakeEtcdClient) { } func TestStore(t *testing.T) { - _, cidr, _ := net.ParseCIDR("192.168.1.0/24") - storage, r, ecli := newStorage(t) initialObject(ecli) if err := storage.Allocate(net.ParseIP("192.168.1.2")); err != nil { t.Fatal(err) } - if err := r.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocated { + ok, err := r.Allocate(1) + if err != nil { t.Fatal(err) } + if ok { + t.Fatal("Expected allocation to fail") + } if err := storage.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocated { t.Fatal(err) } @@ -105,29 +117,4 @@ func TestStore(t *testing.T) { t.Fatalf("%s is empty: %#v", key(), obj) } t.Logf("data: %#v", obj.R.Node) - - other := ipallocator.NewCIDRRange(cidr) - - allocation := &api.RangeAllocation{} - if err := storage.(*Etcd).helper.ExtractObj(key(), allocation, false); err != nil { - t.Fatal(err) - } - if allocation.ResourceVersion != "1" { - t.Fatalf("%#v", allocation) - } - if allocation.Range != "192.168.1.0/24" { - t.Errorf("unexpected stored Range: %s", allocation.Range) - } - if err := other.Restore(cidr, allocation.Data); err != nil { - t.Fatal(err) - } - if !other.Has(net.ParseIP("192.168.1.2")) { - t.Fatalf("could not restore allocated IP: %#v", other) - } - - other = ipallocator.NewCIDRRange(cidr) - otherStorage := NewEtcd(other, storage.(*Etcd).helper) - if err := otherStorage.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocated { - t.Fatal(err) - } } diff --git a/pkg/registry/service/portallocator/allocator.go b/pkg/registry/service/portallocator/allocator.go new file mode 100644 index 00000000000..72b34d4123e --- /dev/null +++ b/pkg/registry/service/portallocator/allocator.go @@ -0,0 +1,167 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package portallocator + +import ( + "errors" + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Interface manages the allocation of ports out of a range. Interface +// should be threadsafe. +type Interface interface { + Allocate(int) error + AllocateNext() (int, error) + Release(int) error +} + +var ( + ErrFull = errors.New("range is full") + ErrNotInRange = errors.New("provided port is not in the valid range") + ErrAllocated = errors.New("provided port is already allocated") + ErrMismatchedNetwork = errors.New("the provided port range does not match the current port range") +) + +type PortAllocator struct { + portRange util.PortRange + + alloc allocator.Interface +} + +// PortAllocator implements Interface and Snapshottable +var _ Interface = &PortAllocator{} + +// NewPortAllocatorCustom creates a PortAllocator over a util.PortRange, calling allocatorFactory to construct the backing store. +func NewPortAllocatorCustom(pr util.PortRange, allocatorFactory allocator.AllocatorFactory) *PortAllocator { + max := pr.Size + rangeSpec := pr.String() + + a := &PortAllocator{ + portRange: pr, + } + a.alloc = allocatorFactory(max, rangeSpec) + return a +} + +// Helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store. +func NewPortAllocator(pr util.PortRange) *PortAllocator { + return NewPortAllocatorCustom(pr, func(max int, rangeSpec string) allocator.Interface { + return allocator.NewAllocationMap(max, rangeSpec) + }) +} + +// Free returns the count of port left in the range. +func (r *PortAllocator) Free() int { + return r.alloc.Free() +} + +// Allocate attempts to reserve the provided port. ErrNotInRange or +// ErrAllocated will be returned if the port is not valid for this range +// or has already been reserved. ErrFull will be returned if there +// are no ports left. +func (r *PortAllocator) Allocate(port int) error { + ok, offset := r.contains(port) + if !ok { + return ErrNotInRange + } + + allocated, err := r.alloc.Allocate(offset) + if err != nil { + return err + } + if !allocated { + return ErrAllocated + } + return nil +} + +// AllocateNext reserves one of the ports from the pool. ErrFull may +// be returned if there are no ports left. +func (r *PortAllocator) AllocateNext() (int, error) { + offset, ok, err := r.alloc.AllocateNext() + if err != nil { + return 0, err + } + if !ok { + return 0, ErrFull + } + return r.portRange.Base + offset, nil +} + +// Release releases the port back to the pool. Releasing an +// unallocated port or a port out of the range is a no-op and +// returns no error. +func (r *PortAllocator) Release(port int) error { + ok, offset := r.contains(port) + if !ok { + // TODO: log a warning + return nil + } + + return r.alloc.Release(offset) +} + +// Has returns true if the provided port is already allocated and a call +// to Allocate(port) would fail with ErrAllocated. +func (r *PortAllocator) Has(port int) bool { + ok, offset := r.contains(port) + if !ok { + return false + } + + return r.alloc.Has(offset) +} + +// Snapshot saves the current state of the pool. +func (r *PortAllocator) Snapshot(dst *api.RangeAllocation) error { + snapshottable, ok := r.alloc.(allocator.Snapshottable) + if !ok { + return fmt.Errorf("not a snapshottable allocator") + } + rangeString, data := snapshottable.Snapshot() + dst.Range = rangeString + dst.Data = data + return nil +} + +// Restore restores the pool to the previously captured state. ErrMismatchedNetwork +// is returned if the provided port range doesn't exactly match the previous range. +func (r *PortAllocator) Restore(pr util.PortRange, data []byte) error { + if pr.String() != r.portRange.String() { + return ErrMismatchedNetwork + } + snapshottable, ok := r.alloc.(allocator.Snapshottable) + if !ok { + return fmt.Errorf("not a snapshottable allocator") + } + snapshottable.Restore(pr.String(), data) + return nil +} + +// contains returns true and the offset if the port is in the range, and false +// and nil otherwise. +func (r *PortAllocator) contains(port int) (bool, int) { + if !r.portRange.Contains(port) { + return false, 0 + } + + offset := port - r.portRange.Base + return true, offset +} diff --git a/pkg/registry/service/portallocator/allocator_test.go b/pkg/registry/service/portallocator/allocator_test.go new file mode 100644 index 00000000000..6a34024c286 --- /dev/null +++ b/pkg/registry/service/portallocator/allocator_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package portallocator + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "strconv" +) + +func TestAllocate(t *testing.T) { + pr, err := util.ParsePortRange("10000-10200") + if err != nil { + t.Fatal(err) + } + r := NewPortAllocator(*pr) + if f := r.Free(); f != 201 { + t.Errorf("unexpected free %d", f) + } + found := util.NewStringSet() + count := 0 + for r.Free() > 0 { + p, err := r.AllocateNext() + if err != nil { + t.Fatalf("error @ %d: %v", count, err) + } + count++ + if !pr.Contains(p) { + t.Fatalf("allocated %s which is outside of %s", p, pr) + } + if found.Has(strconv.Itoa(p)) { + t.Fatalf("allocated %s twice @ %d", p, count) + } + found.Insert(strconv.Itoa(p)) + } + if _, err := r.AllocateNext(); err != ErrFull { + t.Fatal(err) + } + + released := 10005 + if err := r.Release(released); err != nil { + t.Fatal(err) + } + if f := r.Free(); f != 1 { + t.Errorf("unexpected free %d", f) + } + p, err := r.AllocateNext() + if err != nil { + t.Fatal(err) + } + if released != p { + t.Errorf("unexpected %s : %s", p, released) + } + + if err := r.Release(released); err != nil { + t.Fatal(err) + } + if err := r.Allocate(1); err != ErrNotInRange { + t.Fatal(err) + } + if err := r.Allocate(10001); err != ErrAllocated { + t.Fatal(err) + } + if err := r.Allocate(20000); err != ErrNotInRange { + t.Fatal(err) + } + if err := r.Allocate(10201); err != ErrNotInRange { + t.Fatal(err) + } + if f := r.Free(); f != 1 { + t.Errorf("unexpected free %d", f) + } + if err := r.Allocate(released); err != nil { + t.Fatal(err) + } + if f := r.Free(); f != 0 { + t.Errorf("unexpected free %d", f) + } +} + +func TestSnapshot(t *testing.T) { + pr, err := util.ParsePortRange("10000-10200") + if err != nil { + t.Fatal(err) + } + r := NewPortAllocator(*pr) + ports := []int{} + for i := 0; i < 10; i++ { + port, err := r.AllocateNext() + if err != nil { + t.Fatal(err) + } + ports = append(ports, port) + } + + var dst api.RangeAllocation + err = r.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } + + pr2, err := util.ParsePortRange(dst.Range) + if err != nil { + t.Fatal(err) + } + + if pr.String() != pr2.String() { + t.Fatalf("mismatched networks: %s : %s", pr, pr2) + } + + otherPr, err := util.ParsePortRange("200-300") + if err != nil { + t.Fatal(err) + } + other := NewPortAllocator(*otherPr) + if err := r.Restore(*otherPr, dst.Data); err != ErrMismatchedNetwork { + t.Fatal(err) + } + other = NewPortAllocator(*pr2) + if err := other.Restore(*pr2, dst.Data); err != nil { + t.Fatal(err) + } + + for _, n := range ports { + if !other.Has(n) { + t.Errorf("restored range does not have %s", n) + } + } + if other.Free() != r.Free() { + t.Errorf("counts do not match: %d", other.Free()) + } +} diff --git a/pkg/registry/service/portallocator/controller/repair.go b/pkg/registry/service/portallocator/controller/repair.go new file mode 100644 index 00000000000..ea760f6bfc0 --- /dev/null +++ b/pkg/registry/service/portallocator/controller/repair.go @@ -0,0 +1,115 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/portallocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// See ipallocator/controller/repair.go; this is a copy for ports. +type Repair struct { + interval time.Duration + registry service.Registry + portRange util.PortRange + alloc service.RangeRegistry +} + +// NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster +// and generates informational warnings for a cluster that is not in sync. +func NewRepair(interval time.Duration, registry service.Registry, portRange util.PortRange, alloc service.RangeRegistry) *Repair { + return &Repair{ + interval: interval, + registry: registry, + portRange: portRange, + alloc: alloc, + } +} + +// RunUntil starts the controller until the provided ch is closed. +func (c *Repair) RunUntil(ch chan struct{}) { + util.Until(func() { + if err := c.RunOnce(); err != nil { + util.HandleError(err) + } + }, c.interval, ch) +} + +// RunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs. +func (c *Repair) RunOnce() error { + // TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read, + // or if they are executed against different leaders, + // the ordering guarantee required to ensure no port is allocated twice is violated. + // ListServices must return a ResourceVersion higher than the etcd index Get triggers, + // and the release code must not release services that have had ports allocated but not yet been created + // See #8295 + + latest, err := c.alloc.Get() + if err != nil { + return fmt.Errorf("unable to refresh the port block: %v", err) + } + + ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll) + list, err := c.registry.ListServices(ctx) + if err != nil { + return fmt.Errorf("unable to refresh the port block: %v", err) + } + + r := portallocator.NewPortAllocator(c.portRange) + for _, svc := range list.Items { + ports := []int{} + + // TODO(justinsb): Collect NodePorts + if len(ports) == 0 { + continue + } + + for _, port := range ports { + switch err := r.Allocate(port); err { + case nil: + case portallocator.ErrAllocated: + // TODO: send event + // port is broken, reallocate + util.HandleError(fmt.Errorf("the port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) + case portallocator.ErrNotInRange: + // TODO: send event + // port is broken, reallocate + util.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange)) + case portallocator.ErrFull: + // TODO: send event + return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange) + default: + return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err) + } + } + } + + err = r.Snapshot(latest) + if err != nil { + return fmt.Errorf("unable to persist the updated port allocations: %v", err) + } + + if err := c.alloc.CreateOrUpdate(latest); err != nil { + return fmt.Errorf("unable to persist the updated port allocations: %v", err) + } + return nil +} diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index 7404d5ecf45..239fe47308a 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -17,12 +17,9 @@ limitations under the License. package service import ( - "net" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -36,8 +33,9 @@ type Registry interface { WatchServices(ctx api.Context, labels labels.Selector, fields fields.Selector, resourceVersion string) (watch.Interface, error) } -// IPRegistry is a registry that can retrieve or persist a RangeAllocation object. -type IPRegistry interface { +// TODO: Move to a general location (as other components may need allocation in future; it's not service specific) +// RangeRegistry is a registry that can retrieve or persist a RangeAllocation object. +type RangeRegistry interface { // Get returns the latest allocation, an empty object if no allocation has been made, // or an error if the allocation could not be retrieved. Get() (*api.RangeAllocation, error) @@ -45,19 +43,3 @@ type IPRegistry interface { // has occured since the item was last created. CreateOrUpdate(*api.RangeAllocation) error } - -// RestoreRange updates a snapshottable ipallocator from a RangeAllocation -func RestoreRange(dst ipallocator.Snapshottable, src *api.RangeAllocation) error { - _, network, err := net.ParseCIDR(src.Range) - if err != nil { - return err - } - return dst.Restore(network, src.Data) -} - -// SnapshotRange updates a RangeAllocation to match a snapshottable ipallocator -func SnapshotRange(dst *api.RangeAllocation, src ipallocator.Snapshottable) { - network, data := src.Snapshot() - dst.Range = network.String() - dst.Data = data -} diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 512978215e1..83da560ce9a 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -33,6 +33,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/portallocator" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" @@ -41,22 +42,24 @@ import ( // REST adapts a service registry into apiserver's RESTStorage model. type REST struct { - registry Registry - machines minion.Registry - endpoints endpoint.Registry - portals ipallocator.Interface - clusterName string + registry Registry + machines minion.Registry + endpoints endpoint.Registry + portals ipallocator.Interface + serviceNodePorts portallocator.Interface + clusterName string } // NewStorage returns a new REST. func NewStorage(registry Registry, machines minion.Registry, endpoints endpoint.Registry, portals ipallocator.Interface, - clusterName string) *REST { + serviceNodePorts portallocator.Interface, clusterName string) *REST { return &REST{ - registry: registry, - machines: machines, - endpoints: endpoints, - portals: portals, - clusterName: clusterName, + registry: registry, + machines: machines, + endpoints: endpoints, + portals: portals, + serviceNodePorts: serviceNodePorts, + clusterName: clusterName, } } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 137809ef8c7..4ab1310b994 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -30,6 +30,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/portallocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry) { @@ -40,7 +42,12 @@ func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registryte } nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{}) r := ipallocator.NewCIDRRange(makeIPNet(t)) - storage := NewStorage(registry, nodeRegistry, endpointRegistry, r, "kubernetes") + + portRange := util.PortRange{Base: 30000, Size: 1000} + portAllocator := portallocator.NewPortAllocator(portRange) + + storage := NewStorage(registry, nodeRegistry, endpointRegistry, r, portAllocator, "kubernetes") + return storage, registry }