Create port allocator, based on IP allocator mechanism

Including some refactoring of IP allocator
This commit is contained in:
Justin Santa Barbara 2015-05-22 18:28:48 -04:00
parent e49ad95462
commit 3bb2fe2425
23 changed files with 1313 additions and 465 deletions

View File

@ -86,6 +86,7 @@ type APIServer struct {
CorsAllowedOriginList util.StringList
AllowPrivileged bool
PortalNet util.IPNet // TODO: make this a list
ServiceNodePorts util.PortRange
EnableLogsSupport bool
MasterServiceNamespace string
RuntimeConfig util.ConfigurationMap
@ -183,6 +184,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.Var(&s.CorsAllowedOriginList, "cors-allowed-origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.Var(&s.PortalNet, "portal-net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
fs.Var(&s.ServiceNodePorts, "service-node-ports", "A port range to reserve for services with NodePort visibility. Example: '30000-32767'. Inclusive at both ends of the range.")
fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods")
fs.Var(&s.RuntimeConfig, "runtime-config", "A set of key=value pairs that describe runtime configuration that may be passed to the apiserver. api/<version> key can be used to turn on/off specific api versions. api/all and api/legacy are special keys to control all and legacy api versions respectively.")
client.BindKubeletClientConfigFlags(fs, &s.KubeletConfig)

View File

@ -854,18 +854,6 @@ func addConversionFuncs() {
return err
}
typeIn := in.Type
if typeIn == "" {
if in.CreateExternalLoadBalancer {
typeIn = ServiceTypeLoadBalancer
} else {
typeIn = ServiceTypeClusterIP
}
}
if err := s.Convert(&typeIn, &out.Spec.Type, 0); err != nil {
return err
}
return nil
},

View File

@ -776,18 +776,6 @@ func addConversionFuncs() {
return err
}
typeIn := in.Type
if typeIn == "" {
if in.CreateExternalLoadBalancer {
typeIn = ServiceTypeLoadBalancer
} else {
typeIn = ServiceTypeClusterIP
}
}
if err := s.Convert(&typeIn, &out.Spec.Type, 0); err != nil {
return err
}
return nil
},

View File

@ -28,6 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
servicecontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator/controller"
portallocatorcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/portallocator/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
@ -39,12 +40,16 @@ import (
type Controller struct {
NamespaceRegistry namespace.Registry
ServiceRegistry service.Registry
ServiceIPRegistry service.IPRegistry
ServiceIPRegistry service.RangeRegistry
EndpointRegistry endpoint.Registry
PortalNet *net.IPNet
// TODO: MasterCount is yucky
MasterCount int
ServiceNodePortRegistry service.RangeRegistry
ServiceNodePortInterval time.Duration
ServiceNodePorts util.PortRange
PortalIPInterval time.Duration
EndpointInterval time.Duration
@ -68,12 +73,16 @@ func (c *Controller) Start() {
return
}
repair := servicecontroller.NewRepair(c.PortalIPInterval, c.ServiceRegistry, c.PortalNet, c.ServiceIPRegistry)
repairPortals := servicecontroller.NewRepair(c.PortalIPInterval, c.ServiceRegistry, c.PortalNet, c.ServiceIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceRegistry, c.ServiceNodePorts, c.ServiceNodePortRegistry)
// run all of the controllers once prior to returning from Start.
if err := repair.RunOnce(); err != nil {
if err := repairPortals.RunOnce(); err != nil {
glog.Errorf("Unable to perform initial IP allocation check: %v", err)
}
if err := repairNodePorts.RunOnce(); err != nil {
glog.Errorf("Unable to perform initial service nodePort check: %v", err)
}
if err := c.UpdateKubernetesService(); err != nil {
glog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err)
}
@ -81,7 +90,7 @@ func (c *Controller) Start() {
glog.Errorf("Unable to perform initial Kubernetes RO service initialization: %v", err)
}
c.runner = util.NewRunner(c.RunKubernetesService, c.RunKubernetesROService, repair.RunUntil)
c.runner = util.NewRunner(c.RunKubernetesService, c.RunKubernetesROService, repairPortals.RunUntil, repairNodePorts.RunUntil)
c.runner.Start()
}

View File

@ -63,13 +63,15 @@ import (
resourcequotaetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequota/etcd"
secretetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/secret/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
etcdallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/etcd"
ipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator"
etcdipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator/etcd"
serviceaccountetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/serviceaccount/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/ui"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/portallocator"
"github.com/emicklei/go-restful"
"github.com/emicklei/go-restful/swagger"
"github.com/golang/glog"
@ -141,13 +143,17 @@ type Config struct {
// The name of the cluster.
ClusterName string
// The range of ports to be assigned to services with visibility=NodePort or greater
ServiceNodePorts util.PortRange
}
// Master contains state for a Kubernetes cluster master/api server.
type Master struct {
// "Inputs", Copied from Config
portalNet *net.IPNet
cacheTimeout time.Duration
portalNet *net.IPNet
serviceNodePorts util.PortRange
cacheTimeout time.Duration
mux apiserver.Mux
muxHelper *apiserver.MuxHelper
@ -188,11 +194,12 @@ type Master struct {
// registries are internal client APIs for accessing the storage layer
// TODO: define the internal typed interface in a way that clients can
// also be replaced
nodeRegistry minion.Registry
namespaceRegistry namespace.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
portalAllocator service.IPRegistry
nodeRegistry minion.Registry
namespaceRegistry namespace.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
portalAllocator service.RangeRegistry
serviceNodePortAllocator service.RangeRegistry
// "Outputs"
Handler http.Handler
@ -226,6 +233,15 @@ func setDefaults(c *Config) {
}
c.PortalNet = portalNet
}
if c.ServiceNodePorts.Size == 0 {
// TODO: Currently no way to specify an empty range (do we need to allow this?)
// We should probably allow this for clouds that don't require NodePort to do load-balancing (GCE)
// but then that breaks the strict nestedness of visibility.
// Review post-v1
defaultServiceNodePorts := util.PortRange{Base: 30000, Size: 2767}
c.ServiceNodePorts = defaultServiceNodePorts
glog.Infof("Node port range unspecified. Defaulting to %v.", c.ServiceNodePorts)
}
if c.MasterCount == 0 {
// Clearly, there will be at least one master.
c.MasterCount = 1
@ -260,6 +276,7 @@ func setDefaults(c *Config) {
// Certain config fields will be set to a default value if unset,
// including:
// PortalNet
// ServiceNodePorts
// MasterCount
// ReadOnlyPort
// ReadWritePort
@ -299,6 +316,7 @@ func New(c *Config) *Master {
m := &Master{
portalNet: c.PortalNet,
serviceNodePorts: c.ServiceNodePorts,
rootWebService: new(restful.WebService),
enableCoreControllers: c.EnableCoreControllers,
enableLogsSupport: c.EnableLogsSupport,
@ -424,9 +442,23 @@ func (m *Master) init(c *Config) {
registry := etcd.NewRegistry(c.EtcdHelper, podRegistry, m.endpointRegistry)
m.serviceRegistry = registry
ipAllocator := ipallocator.NewCIDRRange(m.portalNet)
portalAllocator := etcdipallocator.NewEtcd(ipAllocator, c.EtcdHelper)
m.portalAllocator = portalAllocator
var portalRangeRegistry service.RangeRegistry
portalAllocator := ipallocator.NewAllocatorCIDRRange(m.portalNet, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", c.EtcdHelper)
portalRangeRegistry = etcd
return etcd
})
m.portalAllocator = portalRangeRegistry
var serviceNodePortRegistry service.RangeRegistry
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.serviceNodePorts, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", "servicenodeportallocation", c.EtcdHelper)
serviceNodePortRegistry = etcd
return etcd
})
m.serviceNodePortAllocator = serviceNodePortRegistry
controllerStorage := controlleretcd.NewREST(c.EtcdHelper)
@ -444,7 +476,7 @@ func (m *Master) init(c *Config) {
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage,
"services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, portalAllocator, c.ClusterName),
"services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, portalAllocator, serviceNodePortAllocator, c.ClusterName),
"endpoints": endpointsStorage,
"minions": nodeStorage,
"minions/status": nodeStatusStorage,
@ -589,8 +621,12 @@ func (m *Master) NewBootstrapController() *Controller {
PortalNet: m.portalNet,
MasterCount: m.masterCount,
PortalIPInterval: 3 * time.Minute,
EndpointInterval: 10 * time.Second,
ServiceNodePortRegistry: m.serviceNodePortAllocator,
ServiceNodePorts: m.serviceNodePorts,
ServiceNodePortInterval: 3 * time.Minute,
PortalIPInterval: 3 * time.Minute,
EndpointInterval: 10 * time.Second,
PublicIP: m.clusterIP,

View File

@ -0,0 +1,168 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package allocator
import (
"errors"
"math/big"
"math/rand"
"sync"
)
// AllocationBitmap is a contiguous block of resources that can be allocated atomically.
//
// Each resource has an offset. The internal structure is a bitmap, with a bit for each offset.
//
// If a resource is taken, the bit at that offset is set to one.
// r.count is always equal to the number of set bits and can be recalculated at any time
// by counting the set bits in r.allocated.
//
// TODO: use RLE and compact the allocator to minimize space.
type AllocationBitmap struct {
// strategy is the strategy for choosing the next available item out of the range
strategy allocateStrategy
// max is the maximum size of the usable items in the range
max int
// rangeSpec is the range specifier, matching RangeAllocation.Range
rangeSpec string
// lock guards the following members
lock sync.Mutex
// count is the number of currently allocated elements in the range
count int
// allocated is a bit array of the allocated items in the range
allocated *big.Int
}
// AllocationBitmap implements Interface and Snapshottable
var _ Interface = &AllocationBitmap{}
var _ Snapshottable = &AllocationBitmap{}
// allocateStrategy is a search strategy in the allocation map for a valid item.
type allocateStrategy func(allocated *big.Int, max, count int) (int, bool)
func NewAllocationMap(max int, rangeSpec string) *AllocationBitmap {
a := AllocationBitmap{
strategy: randomScanStrategy,
allocated: big.NewInt(0),
count: 0,
max: max,
rangeSpec: rangeSpec,
}
return &a
}
// Allocate attempts to reserve the provided item.
// Returns true if it was allocated, false if it was already in use
func (r *AllocationBitmap) Allocate(offset int) (bool, error) {
r.lock.Lock()
defer r.lock.Unlock()
if r.allocated.Bit(offset) == 1 {
return false, nil
}
r.allocated = r.allocated.SetBit(r.allocated, offset, 1)
r.count++
return true, nil
}
// AllocateNext reserves one of the items from the pool.
// (0, false, nil) may be returned if there are no items left.
func (r *AllocationBitmap) AllocateNext() (int, bool, error) {
r.lock.Lock()
defer r.lock.Unlock()
next, ok := r.strategy(r.allocated, r.max, r.count)
if !ok {
return 0, false, nil
}
r.count++
r.allocated = r.allocated.SetBit(r.allocated, next, 1)
return next, true, nil
}
// Release releases the item back to the pool. Releasing an
// unallocated item or an item out of the range is a no-op and
// returns no error.
func (r *AllocationBitmap) Release(offset int) error {
r.lock.Lock()
defer r.lock.Unlock()
if r.allocated.Bit(offset) == 0 {
return nil
}
r.allocated = r.allocated.SetBit(r.allocated, offset, 0)
r.count--
return nil
}
// Has returns true if the provided item is already allocated and a call
// to Allocate(offset) would fail.
func (r *AllocationBitmap) Has(offset int) bool {
r.lock.Lock()
defer r.lock.Unlock()
return r.allocated.Bit(offset) == 1
}
// Free returns the count of items left in the range.
func (r *AllocationBitmap) Free() int {
r.lock.Lock()
defer r.lock.Unlock()
return r.max - r.count
}
// Snapshot saves the current state of the pool.
func (r *AllocationBitmap) Snapshot() (string, []byte) {
r.lock.Lock()
defer r.lock.Unlock()
return r.rangeSpec, r.allocated.Bytes()
}
// Restore restores the pool to the previously captured state.
func (r *AllocationBitmap) Restore(rangeSpec string, data []byte) error {
r.lock.Lock()
defer r.lock.Unlock()
if r.rangeSpec != rangeSpec {
return errors.New("the provided range does not match the current range")
}
r.allocated = big.NewInt(0).SetBytes(data)
r.count = countBits(r.allocated)
return nil
}
// randomScanStrategy chooses a random address from the provided big.Int, and then
// scans forward looking for the next available address (it will wrap the range if
// necessary).
func randomScanStrategy(allocated *big.Int, max, count int) (int, bool) {
if count >= max {
return 0, false
}
offset := rand.Intn(max)
for i := 0; i < max; i++ {
at := (offset + i) % max
if allocated.Bit(at) == 0 {
return at, true
}
}
return 0, false
}

View File

@ -0,0 +1,239 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"errors"
"fmt"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
k8serr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
)
var (
errorUnableToAllocate = errors.New("unable to allocate")
)
// Etcd exposes a service.Allocator that is backed by etcd.
// TODO: allow multiple allocations to be tried at once
// TODO: subdivide the keyspace to reduce conflicts
// TODO: investigate issuing a CAS without reading first
type Etcd struct {
lock sync.Mutex
alloc allocator.Snapshottable
helper tools.EtcdHelper
last string
baseKey string
kind string
}
// Etcd implements allocator.Interface and service.RangeRegistry
var _ allocator.Interface = &Etcd{}
var _ service.RangeRegistry = &Etcd{}
// NewEtcd returns an allocator that is backed by Etcd and can manage
// persisting the snapshot state of allocation after each allocation is made.
func NewEtcd(alloc allocator.Snapshottable, baseKey string, kind string, helper tools.EtcdHelper) *Etcd {
return &Etcd{
alloc: alloc,
helper: helper,
baseKey: baseKey,
kind: kind,
}
}
// Allocate attempts to allocate the item locally and then in etcd.
func (e *Etcd) Allocate(offset int) (bool, error) {
e.lock.Lock()
defer e.lock.Unlock()
ok, err := e.alloc.Allocate(offset)
if !ok || err != nil {
return ok, err
}
err = e.tryUpdate(func() error {
ok, err := e.alloc.Allocate(offset)
if err != nil {
return err
}
if !ok {
return errorUnableToAllocate
}
return nil
})
if err != nil {
if err == errorUnableToAllocate {
return false, nil
}
return false, err
}
return true, nil
}
// AllocateNext attempts to allocate the next item locally and then in etcd.
func (e *Etcd) AllocateNext() (int, bool, error) {
e.lock.Lock()
defer e.lock.Unlock()
offset, ok, err := e.alloc.AllocateNext()
if !ok || err != nil {
return offset, ok, err
}
err = e.tryUpdate(func() error {
ok, err := e.alloc.Allocate(offset)
if err != nil {
return err
}
if !ok {
// update the offset here
offset, ok, err = e.alloc.AllocateNext()
if err != nil {
return err
}
if !ok {
return errorUnableToAllocate
}
return nil
}
return nil
})
return offset, ok, err
}
// Release attempts to release the provided item locally and then in etcd.
func (e *Etcd) Release(item int) error {
e.lock.Lock()
defer e.lock.Unlock()
if err := e.alloc.Release(item); err != nil {
return err
}
return e.tryUpdate(func() error {
return e.alloc.Release(item)
})
}
// tryUpdate performs a read-update to persist the latest snapshot state of allocation.
func (e *Etcd) tryUpdate(fn func() error) error {
err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true,
func(input runtime.Object) (output runtime.Object, ttl uint64, err error) {
existing := input.(*api.RangeAllocation)
if len(existing.ResourceVersion) == 0 {
return nil, 0, fmt.Errorf("cannot allocate resources of type %s at this time", e.kind)
}
if existing.ResourceVersion != e.last {
if err := e.alloc.Restore(existing.Range, existing.Data); err != nil {
return nil, 0, err
}
if err := fn(); err != nil {
return nil, 0, err
}
}
e.last = existing.ResourceVersion
rangeSpec, data := e.alloc.Snapshot()
existing.Range = rangeSpec
existing.Data = data
return existing, 0, nil
},
)
return etcderr.InterpretUpdateError(err, e.kind, "")
}
// Refresh reloads the RangeAllocation from etcd.
func (e *Etcd) Refresh() (*api.RangeAllocation, error) {
e.lock.Lock()
defer e.lock.Unlock()
existing := &api.RangeAllocation{}
if err := e.helper.ExtractObj(e.baseKey, existing, false); err != nil {
if tools.IsEtcdNotFound(err) {
return nil, nil
}
return nil, etcderr.InterpretGetError(err, e.kind, "")
}
return existing, nil
}
// Get returns an api.RangeAllocation that represents the current state in
// etcd. If the key does not exist, the object will have an empty ResourceVersion.
func (e *Etcd) Get() (*api.RangeAllocation, error) {
existing := &api.RangeAllocation{}
if err := e.helper.ExtractObj(e.baseKey, existing, true); err != nil {
return nil, etcderr.InterpretGetError(err, e.kind, "")
}
return existing, nil
}
// CreateOrUpdate attempts to update the current etcd state with the provided
// allocation.
func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error {
e.lock.Lock()
defer e.lock.Unlock()
last := ""
err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true,
func(input runtime.Object) (output runtime.Object, ttl uint64, err error) {
existing := input.(*api.RangeAllocation)
switch {
case len(snapshot.ResourceVersion) != 0 && len(existing.ResourceVersion) != 0:
if snapshot.ResourceVersion != existing.ResourceVersion {
return nil, 0, k8serr.NewConflict(e.kind, "", fmt.Errorf("the provided resource version does not match"))
}
case len(existing.ResourceVersion) != 0:
return nil, 0, k8serr.NewConflict(e.kind, "", fmt.Errorf("another caller has already initialized the resource"))
}
last = snapshot.ResourceVersion
return snapshot, 0, nil
},
)
if err != nil {
return etcderr.InterpretUpdateError(err, e.kind, "")
}
err = e.alloc.Restore(snapshot.Range, snapshot.Data)
if err == nil {
e.last = last
}
return err
}
// Implements allocator.Interface::Has
func (e *Etcd) Has(item int) bool {
e.lock.Lock()
defer e.lock.Unlock()
return e.alloc.Has(item)
}
// Implements allocator.Interface::Free
func (e *Etcd) Free() int {
e.lock.Lock()
defer e.lock.Unlock()
return e.alloc.Free()
}

View File

@ -0,0 +1,125 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"testing"
"github.com/coreos/go-etcd/etcd"
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest"
)
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeEtcdClient.TestIndex = true
helper := tools.NewEtcdHelper(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix())
return fakeEtcdClient, helper
}
func newStorage(t *testing.T) (*Etcd, allocator.Interface, *tools.FakeEtcdClient) {
fakeEtcdClient, h := newHelper(t)
mem := allocator.NewAllocationMap(100, "rangeSpecValue")
etcd := NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", h)
return etcd, mem, fakeEtcdClient
}
func key() string {
s := "/ranges/serviceips"
return etcdtest.AddPrefix(s)
}
func TestEmpty(t *testing.T) {
storage, _, ecli := newStorage(t)
ecli.ExpectNotFoundGet(key())
if _, err := storage.Allocate(1); fmt.Sprintf("%v", err) != "cannot allocate resources of type serviceipallocation at this time" {
t.Fatal(err)
}
}
func initialObject(ecli *tools.FakeEtcdClient) {
ecli.Data[key()] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
CreatedIndex: 1,
ModifiedIndex: 2,
Value: runtime.EncodeOrDie(testapi.Codec(), &api.RangeAllocation{
Range: "rangeSpecValue",
}),
},
},
E: nil,
}
}
func TestStore(t *testing.T) {
storage, backing, ecli := newStorage(t)
initialObject(ecli)
if _, err := storage.Allocate(2); err != nil {
t.Fatal(err)
}
ok, err := backing.Allocate(2)
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("Expected backing allocation to fail")
}
if ok, err := storage.Allocate(2); ok || err != nil {
t.Fatal("Expected allocation to fail")
}
obj := ecli.Data[key()]
if obj.R == nil || obj.R.Node == nil {
t.Fatalf("%s is empty: %#v", key(), obj)
}
t.Logf("data: %#v", obj.R.Node)
other := allocator.NewAllocationMap(100, "rangeSpecValue")
allocation := &api.RangeAllocation{}
if err := storage.helper.ExtractObj(key(), allocation, false); err != nil {
t.Fatal(err)
}
if allocation.ResourceVersion != "1" {
t.Fatalf("%#v", allocation)
}
if allocation.Range != "rangeSpecValue" {
t.Errorf("unexpected stored Range: %s", allocation.Range)
}
if err := other.Restore("rangeSpecValue", allocation.Data); err != nil {
t.Fatal(err)
}
if !other.Has(2) {
t.Fatalf("could not restore allocated IP: %#v", other)
}
other = allocator.NewAllocationMap(100, "rangeSpecValue")
otherStorage := NewEtcd(other, "/ranges/serviceips", "serviceipallocation", storage.helper)
if ok, err := otherStorage.Allocate(2); ok || err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,41 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package allocator
// Interface manages the allocation of items out of a range. Interface
// should be threadsafe.
type Interface interface {
Allocate(int) (bool, error)
AllocateNext() (int, bool, error)
Release(int) error
// For testing
Has(int) bool
// For testing
Free() int
}
// Snapshottable is an Interface that can be snapshotted and restored. Snapshottable
// should be threadsafe.
type Snapshottable interface {
Interface
Snapshot() (string, []byte)
Restore(string, []byte) error
}
type AllocatorFactory func(max int, rangeSpec string) Interface

View File

@ -0,0 +1,64 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package allocator
import "math/big"
// countBits returns the number of set bits in n
func countBits(n *big.Int) int {
var count int = 0
for _, b := range n.Bytes() {
count += int(bitCounts[b])
}
return count
}
// bitCounts is all of the bits counted for each number between 0-255
var bitCounts = []int8{
0, 1, 1, 2, 1, 2, 2, 3,
1, 2, 2, 3, 2, 3, 3, 4,
1, 2, 2, 3, 2, 3, 3, 4,
2, 3, 3, 4, 3, 4, 4, 5,
1, 2, 2, 3, 2, 3, 3, 4,
2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
1, 2, 2, 3, 2, 3, 3, 4,
2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6,
4, 5, 5, 6, 5, 6, 6, 7,
1, 2, 2, 3, 2, 3, 3, 4,
2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6,
4, 5, 5, 6, 5, 6, 6, 7,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6,
4, 5, 5, 6, 5, 6, 6, 7,
3, 4, 4, 5, 4, 5, 5, 6,
4, 5, 5, 6, 5, 6, 6, 7,
4, 5, 5, 6, 5, 6, 6, 7,
5, 6, 6, 7, 6, 7, 7, 8,
}

View File

@ -0,0 +1,33 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package allocator
import "testing"
func TestBitCount(t *testing.T) {
for i, c := range bitCounts {
actual := 0
for j := 0; j < 8; j++ {
if ((1 << uint(j)) & i) != 0 {
actual++
}
}
if actual != int(c) {
t.Errorf("%d should have %d bits but recorded as %d", i, actual, c)
}
}
}

View File

@ -19,10 +19,10 @@ package ipallocator
import (
"errors"
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator"
"math/big"
"math/rand"
"net"
"sync"
)
// Interface manages the allocation of IP addresses out of a range. Interface
@ -33,20 +33,11 @@ type Interface interface {
Release(net.IP) error
}
// Snapshottable is an Interface that can be snapshotted and restored. Snapshottable
// should be threadsafe.
type Snapshottable interface {
Interface
Snapshot() (*net.IPNet, []byte)
Restore(*net.IPNet, []byte) error
}
var (
ErrFull = errors.New("range is full")
ErrNotInRange = errors.New("provided IP is not in the valid range")
ErrAllocated = errors.New("provided IP is already allocated")
ErrMismatchedNetwork = errors.New("the provided network does not match the current range")
ErrAllocationDisabled = errors.New("IP addresses cannot be allocated at this time")
ErrFull = errors.New("range is full")
ErrNotInRange = errors.New("provided IP is not in the valid range")
ErrAllocated = errors.New("provided IP is already allocated")
ErrMismatchedNetwork = errors.New("the provided network does not match the current range")
)
// Range is a contiguous block of IPs that can be allocated atomically.
@ -64,52 +55,39 @@ var (
// | |
// r.base r.base + r.max
// | |
// first bit of r.allocated last bit of r.allocated
//
// If an address is taken, the bit at offset:
//
// bit offset := IP - r.base
//
// is set to one. r.count is always equal to the number of set bits and
// can be recalculated at any time by counting the set bits in r.allocated.
//
// TODO: use RLE and compact the allocator to minimize space.
// offset #0 of r.allocated last offset of r.allocated
type Range struct {
net *net.IPNet
// base is a cached version of the start IP in the CIDR range as a *big.Int
base *big.Int
// strategy is the strategy for choosing the next available IP out of the range
strategy allocateStrategy
// max is the maximum size of the usable addresses in the range
max int
// lock guards the following members
lock sync.Mutex
// count is the number of currently allocated elements in the range
count int
// allocated is a bit array of the allocated ips in the range
allocated *big.Int
alloc allocator.Interface
}
// allocateStrategy is a search strategy in the allocation map for a valid IP.
type allocateStrategy func(allocated *big.Int, max, count int) (int, error)
// NewCIDRRange creates a Range over a net.IPNet.
func NewCIDRRange(cidr *net.IPNet) *Range {
// NewAllocatorCIDRRange creates a Range over a net.IPNet, calling allocatorFactory to construct the backing store.
func NewAllocatorCIDRRange(cidr *net.IPNet, allocatorFactory allocator.AllocatorFactory) *Range {
max := RangeSize(cidr)
base := bigForIP(cidr.IP)
r := Range{
net: cidr,
strategy: randomScanStrategy,
base: base.Add(base, big.NewInt(1)), // don't use the network base
max: maximum(0, int(max-2)), // don't use the network broadcast
rangeSpec := cidr.String()
allocated: big.NewInt(0),
count: 0,
r := Range{
net: cidr,
base: base.Add(base, big.NewInt(1)), // don't use the network base
max: maximum(0, int(max-2)), // don't use the network broadcast,
}
r.alloc = allocatorFactory(r.max, rangeSpec)
return &r
}
// Helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store.
func NewCIDRRange(cidr *net.IPNet) *Range {
return NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) allocator.Interface {
return allocator.NewAllocationMap(max, rangeSpec)
})
}
func maximum(a, b int) int {
if a > b {
return a
@ -119,9 +97,7 @@ func maximum(a, b int) int {
// Free returns the count of IP addresses left in the range.
func (r *Range) Free() int {
r.lock.Lock()
defer r.lock.Unlock()
return r.max - r.count
return r.alloc.Free()
}
// Allocate attempts to reserve the provided IP. ErrNotInRange or
@ -134,30 +110,27 @@ func (r *Range) Allocate(ip net.IP) error {
return ErrNotInRange
}
r.lock.Lock()
defer r.lock.Unlock()
if r.allocated.Bit(offset) == 1 {
allocated, err := r.alloc.Allocate(offset)
if err != nil {
return err
}
if !allocated {
return ErrAllocated
}
r.allocated = r.allocated.SetBit(r.allocated, offset, 1)
r.count++
return nil
}
// AllocateNext reserves one of the IPs from the pool. ErrFull may
// be returned if there are no addresses left.
func (r *Range) AllocateNext() (net.IP, error) {
r.lock.Lock()
defer r.lock.Unlock()
next, err := r.strategy(r.allocated, r.max, r.count)
offset, ok, err := r.alloc.AllocateNext()
if err != nil {
return nil, err
}
r.count++
r.allocated = r.allocated.SetBit(r.allocated, next, 1)
return addIPOffset(r.base, next), nil
if !ok {
return nil, ErrFull
}
return addIPOffset(r.base, offset), nil
}
// Release releases the IP back to the pool. Releasing an
@ -169,16 +142,7 @@ func (r *Range) Release(ip net.IP) error {
return nil
}
r.lock.Lock()
defer r.lock.Unlock()
if r.allocated.Bit(offset) == 0 {
return nil
}
r.allocated = r.allocated.SetBit(r.allocated, offset, 0)
r.count--
return nil
return r.alloc.Release(offset)
}
// Has returns true if the provided IP is already allocated and a call
@ -189,31 +153,32 @@ func (r *Range) Has(ip net.IP) bool {
return false
}
r.lock.Lock()
defer r.lock.Unlock()
return r.allocated.Bit(offset) == 1
return r.alloc.Has(offset)
}
// Snapshot saves the current state of the pool.
func (r *Range) Snapshot() (*net.IPNet, []byte) {
r.lock.Lock()
defer r.lock.Unlock()
return r.net, r.allocated.Bytes()
func (r *Range) Snapshot(dst *api.RangeAllocation) error {
snapshottable, ok := r.alloc.(allocator.Snapshottable)
if !ok {
return fmt.Errorf("not a snapshottable allocator")
}
rangeString, data := snapshottable.Snapshot()
dst.Range = rangeString
dst.Data = data
return nil
}
// Restore restores the pool to the previously captured state. ErrMismatchedNetwork
// is returned if the provided IPNet range doesn't exactly match the previous range.
func (r *Range) Restore(net *net.IPNet, data []byte) error {
r.lock.Lock()
defer r.lock.Unlock()
if !net.IP.Equal(r.net.IP) || net.Mask.String() != r.net.Mask.String() {
return ErrMismatchedNetwork
}
r.allocated = big.NewInt(0).SetBytes(data)
r.count = countBits(r.allocated)
snapshottable, ok := r.alloc.(allocator.Snapshottable)
if !ok {
return fmt.Errorf("not a snapshottable allocator")
}
snapshottable.Restore(net.String(), data)
return nil
}
@ -252,68 +217,6 @@ func calculateIPOffset(base *big.Int, ip net.IP) int {
return int(big.NewInt(0).Sub(bigForIP(ip), base).Int64())
}
// randomScanStrategy chooses a random address from the provided big.Int, and then
// scans forward looking for the next available address (it will wrap the range if
// necessary).
func randomScanStrategy(allocated *big.Int, max, count int) (int, error) {
if count >= max {
return 0, ErrFull
}
offset := rand.Intn(max)
for i := 0; i < max; i++ {
at := (offset + i) % max
if allocated.Bit(at) == 0 {
return at, nil
}
}
return 0, ErrFull
}
// countBits returns the number of set bits in n
func countBits(n *big.Int) int {
var count int = 0
for _, b := range n.Bytes() {
count += int(bitCounts[b])
}
return count
}
// bitCounts is all of the bits counted for each number between 0-255
var bitCounts = []int8{
0, 1, 1, 2, 1, 2, 2, 3,
1, 2, 2, 3, 2, 3, 3, 4,
1, 2, 2, 3, 2, 3, 3, 4,
2, 3, 3, 4, 3, 4, 4, 5,
1, 2, 2, 3, 2, 3, 3, 4,
2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
1, 2, 2, 3, 2, 3, 3, 4,
2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6,
4, 5, 5, 6, 5, 6, 6, 7,
1, 2, 2, 3, 2, 3, 3, 4,
2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6,
4, 5, 5, 6, 5, 6, 6, 7,
2, 3, 3, 4, 3, 4, 4, 5,
3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6,
4, 5, 5, 6, 5, 6, 6, 7,
3, 4, 4, 5, 4, 5, 5, 6,
4, 5, 5, 6, 5, 6, 6, 7,
4, 5, 5, 6, 5, 6, 6, 7,
5, 6, 6, 7, 6, 7, 7, 8,
}
// RangeSize returns the size of a range in valid addresses.
func RangeSize(subnet *net.IPNet) int64 {
ones, bits := subnet.Mask.Size()

View File

@ -20,6 +20,7 @@ import (
"net"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
@ -142,27 +143,13 @@ func TestAllocateSmall(t *testing.T) {
}
}
if r.count != 2 && r.Free() != 0 && r.max != 2 {
if r.Free() != 0 && r.max != 2 {
t.Fatalf("unexpected range: %v", r)
}
t.Logf("allocated: %v", found)
}
func TestBitCount(t *testing.T) {
for i, c := range bitCounts {
actual := 0
for j := 0; j < 8; j++ {
if ((1 << uint(j)) & i) != 0 {
actual++
}
}
if actual != int(c) {
t.Errorf("%d should have %d bits but recorded as %d", i, actual, c)
}
}
}
func TestRangeSize(t *testing.T) {
testCases := map[string]int64{
"192.168.1.0/24": 256,
@ -195,7 +182,17 @@ func TestSnapshot(t *testing.T) {
ip = append(ip, n)
}
network, data := r.Snapshot()
var dst api.RangeAllocation
err = r.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
_, network, err := net.ParseCIDR(dst.Range)
if err != nil {
t.Fatal(err)
}
if !network.IP.Equal(cidr.IP) || network.Mask.String() != cidr.Mask.String() {
t.Fatalf("mismatched networks: %s : %s", network, cidr)
}
@ -205,11 +202,11 @@ func TestSnapshot(t *testing.T) {
t.Fatal(err)
}
other := NewCIDRRange(otherCidr)
if err := r.Restore(otherCidr, data); err != ErrMismatchedNetwork {
if err := r.Restore(otherCidr, dst.Data); err != ErrMismatchedNetwork {
t.Fatal(err)
}
other = NewCIDRRange(network)
if err := other.Restore(network, data); err != nil {
if err := other.Restore(network, dst.Data); err != nil {
t.Fatal(err)
}

View File

@ -46,12 +46,12 @@ type Repair struct {
interval time.Duration
registry service.Registry
network *net.IPNet
alloc service.IPRegistry
alloc service.RangeRegistry
}
// NewRepair creates a controller that periodically ensures that all portalIPs are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, registry service.Registry, network *net.IPNet, alloc service.IPRegistry) *Repair {
func NewRepair(interval time.Duration, registry service.Registry, network *net.IPNet, alloc service.RangeRegistry) *Repair {
return &Repair{
interval: interval,
registry: registry,
@ -71,6 +71,13 @@ func (c *Repair) RunUntil(ch chan struct{}) {
// RunOnce verifies the state of the portal IP allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) RunOnce() error {
// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
// or if they are executed against different leaders,
// the ordering guarantee required to ensure no IP is allocated twice is violated.
// ListServices must return a ResourceVersion higher than the etcd index Get triggers,
// and the release code must not release services that have had IPs allocated but not yet been created
// See #8295
latest, err := c.alloc.Get()
if err != nil {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
@ -111,7 +118,10 @@ func (c *Repair) RunOnce() error {
}
}
service.SnapshotRange(latest, r)
err = r.Snapshot(latest)
if err != nil {
return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)
}
if err := c.alloc.CreateOrUpdate(latest); err != nil {
return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)

View File

@ -27,7 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator"
)
type mockIPRegistry struct {
type mockRangeRegistry struct {
getCalled bool
item *api.RangeAllocation
err error
@ -37,12 +37,12 @@ type mockIPRegistry struct {
updateErr error
}
func (r *mockIPRegistry) Get() (*api.RangeAllocation, error) {
func (r *mockRangeRegistry) Get() (*api.RangeAllocation, error) {
r.getCalled = true
return r.item, r.err
}
func (r *mockIPRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
r.updateCalled = true
r.updated = alloc
return r.updateErr
@ -51,7 +51,7 @@ func (r *mockIPRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
func TestRepair(t *testing.T) {
registry := registrytest.NewServiceRegistry()
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
ipregistry := &mockIPRegistry{
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{},
}
r := NewRepair(0, registry, cidr, ipregistry)
@ -63,7 +63,7 @@ func TestRepair(t *testing.T) {
t.Errorf("unexpected ipregistry: %#v", ipregistry)
}
ipregistry = &mockIPRegistry{
ipregistry = &mockRangeRegistry{
item: &api.RangeAllocation{},
updateErr: fmt.Errorf("test error"),
}
@ -77,16 +77,21 @@ func TestRepairEmpty(t *testing.T) {
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
previous := ipallocator.NewCIDRRange(cidr)
previous.Allocate(net.ParseIP("192.168.1.10"))
network, data := previous.Snapshot()
var dst api.RangeAllocation
err := previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
registry := registrytest.NewServiceRegistry()
ipregistry := &mockIPRegistry{
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: api.ObjectMeta{
ResourceVersion: "1",
},
Range: network.String(),
Data: data,
Range: dst.Range,
Data: dst.Data,
},
}
r := NewRepair(0, registry, cidr, ipregistry)
@ -105,7 +110,13 @@ func TestRepairEmpty(t *testing.T) {
func TestRepairWithExisting(t *testing.T) {
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
previous := ipallocator.NewCIDRRange(cidr)
network, data := previous.Snapshot()
var dst api.RangeAllocation
err := previous.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
registry := registrytest.NewServiceRegistry()
registry.List = api.ServiceList{
Items: []api.Service{
@ -130,13 +141,13 @@ func TestRepairWithExisting(t *testing.T) {
},
}
ipregistry := &mockIPRegistry{
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: api.ObjectMeta{
ResourceVersion: "1",
},
Range: network.String(),
Data: data,
Range: dst.Range,
Data: dst.Data,
},
}
r := NewRepair(0, registry, cidr, ipregistry)

View File

@ -16,179 +16,4 @@ limitations under the License.
package etcd
import (
"fmt"
"net"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
)
// Etcd exposes a service.Allocator that is backed by etcd.
// TODO: allow multiple allocations to be tried at once
// TODO: subdivide the keyspace to reduce conflicts
// TODO: investigate issuing a CAS without reading first
type Etcd struct {
lock sync.Mutex
alloc ipallocator.Snapshottable
helper tools.EtcdHelper
last string
}
// Etcd implements ipallocator.Interface and service.IPRegistry
var _ ipallocator.Interface = &Etcd{}
var _ service.IPRegistry = &Etcd{}
const baseKey = "/ranges/serviceips"
// NewEtcd returns a service PortalIP ipallocator that is backed by Etcd and can manage
// persisting the snapshot state of allocation after each allocation is made.
func NewEtcd(alloc ipallocator.Snapshottable, helper tools.EtcdHelper) *Etcd {
return &Etcd{
alloc: alloc,
helper: helper,
}
}
// Allocate attempts to allocate the IP locally and then in etcd.
func (e *Etcd) Allocate(ip net.IP) error {
e.lock.Lock()
defer e.lock.Unlock()
if err := e.alloc.Allocate(ip); err != nil {
return err
}
return e.tryUpdate(func() error {
return e.alloc.Allocate(ip)
})
}
// AllocateNext attempts to allocate the next IP locally and then in etcd.
func (e *Etcd) AllocateNext() (net.IP, error) {
e.lock.Lock()
defer e.lock.Unlock()
ip, err := e.alloc.AllocateNext()
if err != nil {
return nil, err
}
err = e.tryUpdate(func() error {
if err := e.alloc.Allocate(ip); err != nil {
if err != ipallocator.ErrAllocated {
return err
}
// update the ip here
ip, err = e.alloc.AllocateNext()
if err != nil {
return err
}
}
return nil
})
return ip, err
}
// Release attempts to release the provided IP locally and then in etcd.
func (e *Etcd) Release(ip net.IP) error {
e.lock.Lock()
defer e.lock.Unlock()
if err := e.alloc.Release(ip); err != nil {
return err
}
return e.tryUpdate(func() error {
return e.alloc.Release(ip)
})
}
// tryUpdate performs a read-update to persist the latest snapshot state of allocation.
func (e *Etcd) tryUpdate(fn func() error) error {
err := e.helper.GuaranteedUpdate(baseKey, &api.RangeAllocation{}, true,
func(input runtime.Object) (output runtime.Object, ttl uint64, err error) {
existing := input.(*api.RangeAllocation)
if len(existing.ResourceVersion) == 0 {
return nil, 0, ipallocator.ErrAllocationDisabled
}
if existing.ResourceVersion != e.last {
if err := service.RestoreRange(e.alloc, existing); err != nil {
return nil, 0, err
}
if err := fn(); err != nil {
return nil, 0, err
}
}
e.last = existing.ResourceVersion
service.SnapshotRange(existing, e.alloc)
return existing, 0, nil
},
)
return etcderr.InterpretUpdateError(err, "serviceipallocation", "")
}
// Refresh reloads the ipallocator from etcd.
func (e *Etcd) Refresh() error {
e.lock.Lock()
defer e.lock.Unlock()
existing := &api.RangeAllocation{}
if err := e.helper.ExtractObj(baseKey, existing, false); err != nil {
if tools.IsEtcdNotFound(err) {
return ipallocator.ErrAllocationDisabled
}
return etcderr.InterpretGetError(err, "serviceipallocation", "")
}
return service.RestoreRange(e.alloc, existing)
}
// Get returns an api.RangeAllocation that represents the current state in
// etcd. If the key does not exist, the object will have an empty ResourceVersion.
func (e *Etcd) Get() (*api.RangeAllocation, error) {
existing := &api.RangeAllocation{}
if err := e.helper.ExtractObj(baseKey, existing, true); err != nil {
return nil, etcderr.InterpretGetError(err, "serviceipallocation", "")
}
return existing, nil
}
// CreateOrUpdate attempts to update the current etcd state with the provided
// allocation.
func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error {
e.lock.Lock()
defer e.lock.Unlock()
last := ""
err := e.helper.GuaranteedUpdate(baseKey, &api.RangeAllocation{}, true,
func(input runtime.Object) (output runtime.Object, ttl uint64, err error) {
existing := input.(*api.RangeAllocation)
switch {
case len(snapshot.ResourceVersion) != 0 && len(existing.ResourceVersion) != 0:
if snapshot.ResourceVersion != existing.ResourceVersion {
return nil, 0, errors.NewConflict("serviceipallocation", "", fmt.Errorf("the provided resource version does not match"))
}
case len(existing.ResourceVersion) != 0:
return nil, 0, errors.NewConflict("serviceipallocation", "", fmt.Errorf("another caller has already initialized the resource"))
}
last = snapshot.ResourceVersion
return snapshot, 0, nil
},
)
if err != nil {
return etcderr.InterpretUpdateError(err, "serviceipallocation", "")
}
err = service.RestoreRange(e.alloc, snapshot)
if err == nil {
e.last = last
}
return err
}
// Keep CI happy; it is unhappy if a directory only contains tests

View File

@ -22,8 +22,11 @@ import (
"github.com/coreos/go-etcd/etcd"
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator"
allocator_etcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
@ -37,15 +40,22 @@ func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
return fakeEtcdClient, helper
}
func newStorage(t *testing.T) (ipallocator.Interface, *ipallocator.Range, *tools.FakeEtcdClient) {
func newStorage(t *testing.T) (ipallocator.Interface, allocator.Interface, *tools.FakeEtcdClient) {
fakeEtcdClient, h := newHelper(t)
_, cidr, err := net.ParseCIDR("192.168.1.0/24")
if err != nil {
t.Fatal(err)
}
r := ipallocator.NewCIDRRange(cidr)
storage := NewEtcd(r, h)
return storage, r, fakeEtcdClient
var backing allocator.Interface
storage := ipallocator.NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
backing = mem
etcd := allocator_etcd.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", h)
return etcd
})
return storage, backing, fakeEtcdClient
}
func key() string {
@ -56,7 +66,7 @@ func key() string {
func TestEmpty(t *testing.T) {
storage, _, ecli := newStorage(t)
ecli.ExpectNotFoundGet(key())
if err := storage.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocationDisabled {
if err := storage.Allocate(net.ParseIP("192.168.1.2")); fmt.Sprintf("%v", err) != "cannot allocate resources of type serviceipallocation at this time" {
t.Fatal(err)
}
}
@ -85,17 +95,19 @@ func initialObject(ecli *tools.FakeEtcdClient) {
}
func TestStore(t *testing.T) {
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
storage, r, ecli := newStorage(t)
initialObject(ecli)
if err := storage.Allocate(net.ParseIP("192.168.1.2")); err != nil {
t.Fatal(err)
}
if err := r.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocated {
ok, err := r.Allocate(1)
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("Expected allocation to fail")
}
if err := storage.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocated {
t.Fatal(err)
}
@ -105,29 +117,4 @@ func TestStore(t *testing.T) {
t.Fatalf("%s is empty: %#v", key(), obj)
}
t.Logf("data: %#v", obj.R.Node)
other := ipallocator.NewCIDRRange(cidr)
allocation := &api.RangeAllocation{}
if err := storage.(*Etcd).helper.ExtractObj(key(), allocation, false); err != nil {
t.Fatal(err)
}
if allocation.ResourceVersion != "1" {
t.Fatalf("%#v", allocation)
}
if allocation.Range != "192.168.1.0/24" {
t.Errorf("unexpected stored Range: %s", allocation.Range)
}
if err := other.Restore(cidr, allocation.Data); err != nil {
t.Fatal(err)
}
if !other.Has(net.ParseIP("192.168.1.2")) {
t.Fatalf("could not restore allocated IP: %#v", other)
}
other = ipallocator.NewCIDRRange(cidr)
otherStorage := NewEtcd(other, storage.(*Etcd).helper)
if err := otherStorage.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocated {
t.Fatal(err)
}
}

View File

@ -0,0 +1,167 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portallocator
import (
"errors"
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// Interface manages the allocation of ports out of a range. Interface
// should be threadsafe.
type Interface interface {
Allocate(int) error
AllocateNext() (int, error)
Release(int) error
}
var (
ErrFull = errors.New("range is full")
ErrNotInRange = errors.New("provided port is not in the valid range")
ErrAllocated = errors.New("provided port is already allocated")
ErrMismatchedNetwork = errors.New("the provided port range does not match the current port range")
)
type PortAllocator struct {
portRange util.PortRange
alloc allocator.Interface
}
// PortAllocator implements Interface and Snapshottable
var _ Interface = &PortAllocator{}
// NewPortAllocatorCustom creates a PortAllocator over a util.PortRange, calling allocatorFactory to construct the backing store.
func NewPortAllocatorCustom(pr util.PortRange, allocatorFactory allocator.AllocatorFactory) *PortAllocator {
max := pr.Size
rangeSpec := pr.String()
a := &PortAllocator{
portRange: pr,
}
a.alloc = allocatorFactory(max, rangeSpec)
return a
}
// Helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store.
func NewPortAllocator(pr util.PortRange) *PortAllocator {
return NewPortAllocatorCustom(pr, func(max int, rangeSpec string) allocator.Interface {
return allocator.NewAllocationMap(max, rangeSpec)
})
}
// Free returns the count of port left in the range.
func (r *PortAllocator) Free() int {
return r.alloc.Free()
}
// Allocate attempts to reserve the provided port. ErrNotInRange or
// ErrAllocated will be returned if the port is not valid for this range
// or has already been reserved. ErrFull will be returned if there
// are no ports left.
func (r *PortAllocator) Allocate(port int) error {
ok, offset := r.contains(port)
if !ok {
return ErrNotInRange
}
allocated, err := r.alloc.Allocate(offset)
if err != nil {
return err
}
if !allocated {
return ErrAllocated
}
return nil
}
// AllocateNext reserves one of the ports from the pool. ErrFull may
// be returned if there are no ports left.
func (r *PortAllocator) AllocateNext() (int, error) {
offset, ok, err := r.alloc.AllocateNext()
if err != nil {
return 0, err
}
if !ok {
return 0, ErrFull
}
return r.portRange.Base + offset, nil
}
// Release releases the port back to the pool. Releasing an
// unallocated port or a port out of the range is a no-op and
// returns no error.
func (r *PortAllocator) Release(port int) error {
ok, offset := r.contains(port)
if !ok {
// TODO: log a warning
return nil
}
return r.alloc.Release(offset)
}
// Has returns true if the provided port is already allocated and a call
// to Allocate(port) would fail with ErrAllocated.
func (r *PortAllocator) Has(port int) bool {
ok, offset := r.contains(port)
if !ok {
return false
}
return r.alloc.Has(offset)
}
// Snapshot saves the current state of the pool.
func (r *PortAllocator) Snapshot(dst *api.RangeAllocation) error {
snapshottable, ok := r.alloc.(allocator.Snapshottable)
if !ok {
return fmt.Errorf("not a snapshottable allocator")
}
rangeString, data := snapshottable.Snapshot()
dst.Range = rangeString
dst.Data = data
return nil
}
// Restore restores the pool to the previously captured state. ErrMismatchedNetwork
// is returned if the provided port range doesn't exactly match the previous range.
func (r *PortAllocator) Restore(pr util.PortRange, data []byte) error {
if pr.String() != r.portRange.String() {
return ErrMismatchedNetwork
}
snapshottable, ok := r.alloc.(allocator.Snapshottable)
if !ok {
return fmt.Errorf("not a snapshottable allocator")
}
snapshottable.Restore(pr.String(), data)
return nil
}
// contains returns true and the offset if the port is in the range, and false
// and nil otherwise.
func (r *PortAllocator) contains(port int) (bool, int) {
if !r.portRange.Contains(port) {
return false, 0
}
offset := port - r.portRange.Base
return true, offset
}

View File

@ -0,0 +1,148 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portallocator
import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"strconv"
)
func TestAllocate(t *testing.T) {
pr, err := util.ParsePortRange("10000-10200")
if err != nil {
t.Fatal(err)
}
r := NewPortAllocator(*pr)
if f := r.Free(); f != 201 {
t.Errorf("unexpected free %d", f)
}
found := util.NewStringSet()
count := 0
for r.Free() > 0 {
p, err := r.AllocateNext()
if err != nil {
t.Fatalf("error @ %d: %v", count, err)
}
count++
if !pr.Contains(p) {
t.Fatalf("allocated %s which is outside of %s", p, pr)
}
if found.Has(strconv.Itoa(p)) {
t.Fatalf("allocated %s twice @ %d", p, count)
}
found.Insert(strconv.Itoa(p))
}
if _, err := r.AllocateNext(); err != ErrFull {
t.Fatal(err)
}
released := 10005
if err := r.Release(released); err != nil {
t.Fatal(err)
}
if f := r.Free(); f != 1 {
t.Errorf("unexpected free %d", f)
}
p, err := r.AllocateNext()
if err != nil {
t.Fatal(err)
}
if released != p {
t.Errorf("unexpected %s : %s", p, released)
}
if err := r.Release(released); err != nil {
t.Fatal(err)
}
if err := r.Allocate(1); err != ErrNotInRange {
t.Fatal(err)
}
if err := r.Allocate(10001); err != ErrAllocated {
t.Fatal(err)
}
if err := r.Allocate(20000); err != ErrNotInRange {
t.Fatal(err)
}
if err := r.Allocate(10201); err != ErrNotInRange {
t.Fatal(err)
}
if f := r.Free(); f != 1 {
t.Errorf("unexpected free %d", f)
}
if err := r.Allocate(released); err != nil {
t.Fatal(err)
}
if f := r.Free(); f != 0 {
t.Errorf("unexpected free %d", f)
}
}
func TestSnapshot(t *testing.T) {
pr, err := util.ParsePortRange("10000-10200")
if err != nil {
t.Fatal(err)
}
r := NewPortAllocator(*pr)
ports := []int{}
for i := 0; i < 10; i++ {
port, err := r.AllocateNext()
if err != nil {
t.Fatal(err)
}
ports = append(ports, port)
}
var dst api.RangeAllocation
err = r.Snapshot(&dst)
if err != nil {
t.Fatal(err)
}
pr2, err := util.ParsePortRange(dst.Range)
if err != nil {
t.Fatal(err)
}
if pr.String() != pr2.String() {
t.Fatalf("mismatched networks: %s : %s", pr, pr2)
}
otherPr, err := util.ParsePortRange("200-300")
if err != nil {
t.Fatal(err)
}
other := NewPortAllocator(*otherPr)
if err := r.Restore(*otherPr, dst.Data); err != ErrMismatchedNetwork {
t.Fatal(err)
}
other = NewPortAllocator(*pr2)
if err := other.Restore(*pr2, dst.Data); err != nil {
t.Fatal(err)
}
for _, n := range ports {
if !other.Has(n) {
t.Errorf("restored range does not have %s", n)
}
}
if other.Free() != r.Free() {
t.Errorf("counts do not match: %d", other.Free())
}
}

View File

@ -0,0 +1,115 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/portallocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// See ipallocator/controller/repair.go; this is a copy for ports.
type Repair struct {
interval time.Duration
registry service.Registry
portRange util.PortRange
alloc service.RangeRegistry
}
// NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, registry service.Registry, portRange util.PortRange, alloc service.RangeRegistry) *Repair {
return &Repair{
interval: interval,
registry: registry,
portRange: portRange,
alloc: alloc,
}
}
// RunUntil starts the controller until the provided ch is closed.
func (c *Repair) RunUntil(ch chan struct{}) {
util.Until(func() {
if err := c.RunOnce(); err != nil {
util.HandleError(err)
}
}, c.interval, ch)
}
// RunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) RunOnce() error {
// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
// or if they are executed against different leaders,
// the ordering guarantee required to ensure no port is allocated twice is violated.
// ListServices must return a ResourceVersion higher than the etcd index Get triggers,
// and the release code must not release services that have had ports allocated but not yet been created
// See #8295
latest, err := c.alloc.Get()
if err != nil {
return fmt.Errorf("unable to refresh the port block: %v", err)
}
ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
list, err := c.registry.ListServices(ctx)
if err != nil {
return fmt.Errorf("unable to refresh the port block: %v", err)
}
r := portallocator.NewPortAllocator(c.portRange)
for _, svc := range list.Items {
ports := []int{}
// TODO(justinsb): Collect NodePorts
if len(ports) == 0 {
continue
}
for _, port := range ports {
switch err := r.Allocate(port); err {
case nil:
case portallocator.ErrAllocated:
// TODO: send event
// port is broken, reallocate
util.HandleError(fmt.Errorf("the port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
case portallocator.ErrNotInRange:
// TODO: send event
// port is broken, reallocate
util.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange))
case portallocator.ErrFull:
// TODO: send event
return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange)
default:
return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err)
}
}
}
err = r.Snapshot(latest)
if err != nil {
return fmt.Errorf("unable to persist the updated port allocations: %v", err)
}
if err := c.alloc.CreateOrUpdate(latest); err != nil {
return fmt.Errorf("unable to persist the updated port allocations: %v", err)
}
return nil
}

View File

@ -17,12 +17,9 @@ limitations under the License.
package service
import (
"net"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
@ -36,8 +33,9 @@ type Registry interface {
WatchServices(ctx api.Context, labels labels.Selector, fields fields.Selector, resourceVersion string) (watch.Interface, error)
}
// IPRegistry is a registry that can retrieve or persist a RangeAllocation object.
type IPRegistry interface {
// TODO: Move to a general location (as other components may need allocation in future; it's not service specific)
// RangeRegistry is a registry that can retrieve or persist a RangeAllocation object.
type RangeRegistry interface {
// Get returns the latest allocation, an empty object if no allocation has been made,
// or an error if the allocation could not be retrieved.
Get() (*api.RangeAllocation, error)
@ -45,19 +43,3 @@ type IPRegistry interface {
// has occured since the item was last created.
CreateOrUpdate(*api.RangeAllocation) error
}
// RestoreRange updates a snapshottable ipallocator from a RangeAllocation
func RestoreRange(dst ipallocator.Snapshottable, src *api.RangeAllocation) error {
_, network, err := net.ParseCIDR(src.Range)
if err != nil {
return err
}
return dst.Restore(network, src.Data)
}
// SnapshotRange updates a RangeAllocation to match a snapshottable ipallocator
func SnapshotRange(dst *api.RangeAllocation, src ipallocator.Snapshottable) {
network, data := src.Snapshot()
dst.Range = network.String()
dst.Data = data
}

View File

@ -33,6 +33,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/portallocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors"
@ -41,22 +42,24 @@ import (
// REST adapts a service registry into apiserver's RESTStorage model.
type REST struct {
registry Registry
machines minion.Registry
endpoints endpoint.Registry
portals ipallocator.Interface
clusterName string
registry Registry
machines minion.Registry
endpoints endpoint.Registry
portals ipallocator.Interface
serviceNodePorts portallocator.Interface
clusterName string
}
// NewStorage returns a new REST.
func NewStorage(registry Registry, machines minion.Registry, endpoints endpoint.Registry, portals ipallocator.Interface,
clusterName string) *REST {
serviceNodePorts portallocator.Interface, clusterName string) *REST {
return &REST{
registry: registry,
machines: machines,
endpoints: endpoints,
portals: portals,
clusterName: clusterName,
registry: registry,
machines: machines,
endpoints: endpoints,
portals: portals,
serviceNodePorts: serviceNodePorts,
clusterName: clusterName,
}
}

View File

@ -30,6 +30,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/portallocator"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry) {
@ -40,7 +42,12 @@ func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registryte
}
nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{})
r := ipallocator.NewCIDRRange(makeIPNet(t))
storage := NewStorage(registry, nodeRegistry, endpointRegistry, r, "kubernetes")
portRange := util.PortRange{Base: 30000, Size: 1000}
portAllocator := portallocator.NewPortAllocator(portRange)
storage := NewStorage(registry, nodeRegistry, endpointRegistry, r, portAllocator, "kubernetes")
return storage, registry
}