diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 4b67b3688d9..b3dfaaaae89 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -185,17 +185,18 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st // Create a master and install handlers into mux. m := master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: fakeKubeletClient{}, - EnableLogsSupport: false, - EnableProfiling: true, - APIPrefix: "/api", - Authorizer: apiserver.NewAlwaysAllowAuthorizer(), - AdmissionControl: admit.NewAlwaysAdmit(), - ReadWritePort: portNumber, - ReadOnlyPort: portNumber, - PublicAddress: publicAddress, - CacheTimeout: 2 * time.Second, + EtcdHelper: helper, + KubeletClient: fakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableProfiling: true, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), + ReadWritePort: portNumber, + ReadOnlyPort: portNumber, + PublicAddress: publicAddress, + CacheTimeout: 2 * time.Second, }) handler.delegate = m.Handler diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 3676c304565..5cf1d9a578f 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -309,6 +309,7 @@ func (s *APIServer) Run(_ []string) error { EventTTL: s.EventTTL, KubeletClient: kubeletClient, PortalNet: &n, + EnableCoreControllers: true, EnableLogsSupport: s.EnableLogsSupport, EnableUISupport: true, EnableSwaggerSupport: true, diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 87c7cdf4ebf..985f2092108 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -94,11 +94,12 @@ func runApiServer(etcdClient tools.EtcdClient, addr net.IP, port int, masterServ Client: http.DefaultClient, Port: 10250, }, - EnableLogsSupport: false, - EnableSwaggerSupport: true, - EnableProfiling: *enableProfiling, - APIPrefix: "/api", - Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableSwaggerSupport: true, + EnableProfiling: *enableProfiling, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), ReadWritePort: port, ReadOnlyPort: port, diff --git a/docs/services.md b/docs/services.md index d4db060eefd..7f21dc53d2d 100644 --- a/docs/services.md +++ b/docs/services.md @@ -316,6 +316,15 @@ In order to allow users to choose a port number for their `Services`, we must ensure that no two `Services` can collide. We do that by allocating each `Service` its own IP address. +To ensure each service receives a unique IP, an internal allocator atomically +updates a global allocation map in etcd prior to each service. The map object +must exist in the registry for services to get IPs, otherwise creations will +fail with a message indicating an IP could not be allocated. A background +controller is responsible for creating that map (to migrate from older versions +of Kubernetes that used in memory locking) as well as checking for invalid +assignments due to administrator intervention and cleaning up any any IPs +that were allocated but which no service currently uses. + ### IPs and Portals Unlike `Pod` IP addresses, which actually route to a fixed destination, diff --git a/pkg/api/register.go b/pkg/api/register.go index cb816fc138e..176b8852e9d 100644 --- a/pkg/api/register.go +++ b/pkg/api/register.go @@ -65,6 +65,7 @@ func init() { &ComponentStatus{}, &ComponentStatusList{}, &SerializedReference{}, + &RangeAllocation{}, ) // Legacy names are supported Scheme.AddKnownTypeWithName("", "Minion", &Node{}) @@ -111,3 +112,4 @@ func (*PodProxyOptions) IsAnAPIObject() {} func (*ComponentStatus) IsAnAPIObject() {} func (*ComponentStatusList) IsAnAPIObject() {} func (*SerializedReference) IsAnAPIObject() {} +func (*RangeAllocation) IsAnAPIObject() {} diff --git a/pkg/api/types.go b/pkg/api/types.go index ed4f75bf133..39c3e178f85 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1915,3 +1915,24 @@ type SELinuxOptions struct { // SELinux level label. Level string `json:"level,omitempty" description:"the level label to apply to the container"` } + +// RangeAllocation is an opaque API object (not exposed to end users) that can be persisted to record +// the global allocation state of the cluster. The schema of Range and Data generic, in that Range +// should be a string representation of the inputs to a range (for instance, for IP allocation it +// might be a CIDR) and Data is an opaque blob understood by an allocator which is typically a +// binary range. Consumers should use annotations to record additional information (schema version, +// data encoding hints). A range allocation should *ALWAYS* be recreatable at any time by observation +// of the cluster, thus the object is less strongly typed than most. +type RangeAllocation struct { + TypeMeta `json:",inline"` + ObjectMeta `json:"metadata,omitempty"` + // A string representing a unique label for a range of resources, such as a CIDR "10.0.0.0/8" or + // port range "10000-30000". Range is not strongly schema'd here. The Range is expected to define + // a start and end unless there is an implicit end. + Range string `json:"range"` + // A byte array representing the serialized state of a range allocation. Additional clarifiers on + // the type or format of data should be represented with annotations. For IP allocations, this is + // represented as a bit array starting at the base IP of the CIDR in Range, with each bit representing + // a single allocated address (the fifth bit on CIDR 10.0.0.0/8 is 10.0.0.4). + Data []byte `json:"data"` +} diff --git a/pkg/api/v1/register.go b/pkg/api/v1/register.go index e4219018187..c2f971320f4 100644 --- a/pkg/api/v1/register.go +++ b/pkg/api/v1/register.go @@ -64,6 +64,7 @@ func init() { &ComponentStatus{}, &ComponentStatusList{}, &SerializedReference{}, + &RangeAllocation{}, ) // Legacy names are supported api.Scheme.AddKnownTypeWithName("v1", "Minion", &Node{}) @@ -108,3 +109,4 @@ func (*PodProxyOptions) IsAnAPIObject() {} func (*ComponentStatus) IsAnAPIObject() {} func (*ComponentStatusList) IsAnAPIObject() {} func (*SerializedReference) IsAnAPIObject() {} +func (*RangeAllocation) IsAnAPIObject() {} diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index baa8b431653..50f2e4a4397 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -1780,3 +1780,12 @@ type SELinuxOptions struct { // SELinux level label. Level string `json:"level,omitempty" description:"the level label to apply to the container"` } + +// RangeAllocation is not a public type +type RangeAllocation struct { + TypeMeta `json:",inline"` + ObjectMeta `json:"metadata,omitempty" description:"standard list metadata; see http://docs.k8s.io/api-conventions.md#metadata"` + + Range string `json:"range" description:"a range string that identifies the range represented by 'data'; required"` + Data []byte `json:"data" description:"a bit array containing all allocated addresses in the previous segment"` +} diff --git a/pkg/api/v1beta1/register.go b/pkg/api/v1beta1/register.go index dfae88c705e..369db60ba58 100644 --- a/pkg/api/v1beta1/register.go +++ b/pkg/api/v1beta1/register.go @@ -71,6 +71,7 @@ func init() { &ComponentStatus{}, &ComponentStatusList{}, &SerializedReference{}, + &RangeAllocation{}, ) // Future names are supported api.Scheme.AddKnownTypeWithName("v1beta1", "Node", &Minion{}) @@ -116,3 +117,4 @@ func (*PodProxyOptions) IsAnAPIObject() {} func (*ComponentStatus) IsAnAPIObject() {} func (*ComponentStatusList) IsAnAPIObject() {} func (*SerializedReference) IsAnAPIObject() {} +func (*RangeAllocation) IsAnAPIObject() {} diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index de8ada33a15..9dc4811ffd0 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -1700,3 +1700,11 @@ type SELinuxOptions struct { // SELinux level label. Level string `json:"level,omitempty" description:"the level label to apply to the container"` } + +// RangeAllocation is not a public type +type RangeAllocation struct { + TypeMeta `json:",inline"` + + Range string `json:"range" description:"a range string that identifies the range represented by 'data'; required"` + Data []byte `json:"data" description:"a bit array containing all allocated addresses in the previous segment"` +} diff --git a/pkg/api/v1beta2/register.go b/pkg/api/v1beta2/register.go index ab270fdbf7c..337015150cd 100644 --- a/pkg/api/v1beta2/register.go +++ b/pkg/api/v1beta2/register.go @@ -71,6 +71,7 @@ func init() { &ComponentStatus{}, &ComponentStatusList{}, &SerializedReference{}, + &RangeAllocation{}, ) // Future names are supported api.Scheme.AddKnownTypeWithName("v1beta2", "Node", &Minion{}) @@ -116,3 +117,4 @@ func (*PodProxyOptions) IsAnAPIObject() {} func (*ComponentStatus) IsAnAPIObject() {} func (*ComponentStatusList) IsAnAPIObject() {} func (*SerializedReference) IsAnAPIObject() {} +func (*RangeAllocation) IsAnAPIObject() {} diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 6275b6784f2..574c3c30467 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -1762,3 +1762,11 @@ type SELinuxOptions struct { // SELinux level label. Level string `json:"level,omitempty" description:"the level label to apply to the container"` } + +// RangeAllocation is not a public type +type RangeAllocation struct { + TypeMeta `json:",inline"` + + Range string `json:"range" description:"a range string that identifies the range represented by 'data'; required"` + Data []byte `json:"data" description:"a bit array containing all allocated addresses in the previous segment"` +} diff --git a/pkg/api/v1beta3/register.go b/pkg/api/v1beta3/register.go index 781ae90f67c..11dbda95740 100644 --- a/pkg/api/v1beta3/register.go +++ b/pkg/api/v1beta3/register.go @@ -64,6 +64,7 @@ func init() { &ComponentStatus{}, &ComponentStatusList{}, &SerializedReference{}, + &RangeAllocation{}, ) // Legacy names are supported api.Scheme.AddKnownTypeWithName("v1beta3", "Minion", &Node{}) @@ -108,3 +109,4 @@ func (*PodProxyOptions) IsAnAPIObject() {} func (*ComponentStatus) IsAnAPIObject() {} func (*ComponentStatusList) IsAnAPIObject() {} func (*SerializedReference) IsAnAPIObject() {} +func (*RangeAllocation) IsAnAPIObject() {} diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 4482931d485..40bcdabd63d 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -1780,3 +1780,12 @@ type SELinuxOptions struct { // SELinux level label. Level string `json:"level,omitempty" description:"the level label to apply to the container"` } + +// RangeAllocation is not a public type +type RangeAllocation struct { + TypeMeta `json:",inline"` + ObjectMeta `json:"metadata,omitempty" description:"standard list metadata; see http://docs.k8s.io/api-conventions.md#metadata"` + + Range string `json:"range" description:"a range string that identifies the range represented by 'data'; required"` + Data []byte `json:"data" description:"a bit array containing all allocated addresses in the previous segment"` +} diff --git a/pkg/master/master.go b/pkg/master/master.go index 1ddf2d25e32..d72de2b791d 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -63,6 +63,8 @@ import ( resourcequotaetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequota/etcd" secretetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/secret/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + ipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator" + etcdipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/ui" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -78,13 +80,15 @@ const ( // Config is a structure used to configure a Master. type Config struct { - EtcdHelper tools.EtcdHelper - EventTTL time.Duration - MinionRegexp string - KubeletClient client.KubeletClient - PortalNet *net.IPNet - EnableLogsSupport bool - EnableUISupport bool + EtcdHelper tools.EtcdHelper + EventTTL time.Duration + MinionRegexp string + KubeletClient client.KubeletClient + PortalNet *net.IPNet + // allow downstream consumers to disable the core controller loops + EnableCoreControllers bool + EnableLogsSupport bool + EnableUISupport bool // allow downstream consumers to disable swagger EnableSwaggerSupport bool // allow v1beta3 to be conditionally disabled @@ -144,6 +148,7 @@ type Master struct { muxHelper *apiserver.MuxHelper handlerContainer *restful.Container rootWebService *restful.WebService + enableCoreControllers bool enableLogsSupport bool enableUISupport bool enableSwaggerSupport bool @@ -180,6 +185,7 @@ type Master struct { namespaceRegistry namespace.Registry serviceRegistry service.Registry endpointRegistry endpoint.Registry + portalAllocator service.IPRegistry // "Outputs" Handler http.Handler @@ -208,6 +214,9 @@ func setDefaults(c *Config) { if err != nil { glog.Fatalf("Unable to parse CIDR: %v", err) } + if size := ipallocator.RangeSize(portalNet); size < 8 { + glog.Fatalf("The portal net range must be at least %d IP addresses", 8) + } c.PortalNet = portalNet } if c.MasterCount == 0 { @@ -271,11 +280,11 @@ func New(c *Config) *Master { } // Select the first two valid IPs from portalNet to use as the master service portalIPs - serviceReadOnlyIP, err := service.GetIndexedIP(c.PortalNet, 1) + serviceReadOnlyIP, err := ipallocator.GetIndexedIP(c.PortalNet, 1) if err != nil { glog.Fatalf("Failed to generate service read-only IP for master service: %v", err) } - serviceReadWriteIP, err := service.GetIndexedIP(c.PortalNet, 2) + serviceReadWriteIP, err := ipallocator.GetIndexedIP(c.PortalNet, 2) if err != nil { glog.Fatalf("Failed to generate service read-write IP for master service: %v", err) } @@ -284,6 +293,7 @@ func New(c *Config) *Master { m := &Master{ portalNet: c.PortalNet, rootWebService: new(restful.WebService), + enableCoreControllers: c.EnableCoreControllers, enableLogsSupport: c.EnableLogsSupport, enableUISupport: c.EnableUISupport, enableSwaggerSupport: c.EnableSwaggerSupport, @@ -324,7 +334,6 @@ func New(c *Config) *Master { m.handlerContainer.Router(restful.CurlyRouter{}) m.muxHelper = &apiserver.MuxHelper{m.mux, []string{}} - m.masterServices = util.NewRunner(m.serviceWriterLoop, m.roServiceWriterLoop) m.init(c) return m } @@ -405,6 +414,10 @@ func (m *Master) init(c *Config) { registry := etcd.NewRegistry(c.EtcdHelper, podRegistry, m.endpointRegistry) m.serviceRegistry = registry + ipAllocator := ipallocator.NewCIDRRange(m.portalNet) + portalAllocator := etcdipallocator.NewEtcd(ipAllocator, c.EtcdHelper) + m.portalAllocator = portalAllocator + controllerStorage := controlleretcd.NewREST(c.EtcdHelper) // TODO: Factor out the core API registration @@ -421,7 +434,7 @@ func (m *Master) init(c *Config) { "podTemplates": podTemplateStorage, "replicationControllers": controllerStorage, - "services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName), + "services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, portalAllocator, c.ClusterName), "endpoints": endpointsStorage, "minions": nodeStorage, "minions/status": nodeStatusStorage, @@ -544,7 +557,9 @@ func (m *Master) init(c *Config) { } // TODO: Attempt clean shutdown? - m.masterServices.Start() + if m.enableCoreControllers { + m.StartCoreControllers() + } } // InstallSwaggerAPI installs the /swaggerapi/ endpoint to allow schema discovery diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 14c7d0034ef..dadf4cb6173 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -24,11 +24,28 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" + servicecontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) -func (m *Master) serviceWriterLoop(stop chan struct{}) { +// StartCoreControllers begins the core controller loops that must exist for bootstrapping +// a cluster. +func (m *Master) StartCoreControllers() { + if m.masterServices != nil { + return + } + repair := servicecontroller.NewRepair(3*time.Minute, m.serviceRegistry, m.portalNet, m.portalAllocator) + if err := repair.RunOnce(); err != nil { + glog.Errorf("Unable to perform initial IP allocation check: %v", err) + } + m.masterServices = util.NewRunner(m.ServiceWriterLoop, m.ROServiceWriterLoop, repair.RunUntil) + m.masterServices.Start() +} + +// ServiceWriterLoop is exposed for downstream consumers of master +func (m *Master) ServiceWriterLoop(stop chan struct{}) { t := time.NewTicker(10 * time.Second) defer t.Stop() for { @@ -36,14 +53,14 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) { // TODO: when it becomes possible to change this stuff, // stop polling and start watching. // TODO: add endpoints of all replicas, not just the elected master. - if err := m.createMasterNamespaceIfNeeded(api.NamespaceDefault); err != nil { + if err := m.CreateMasterNamespaceIfNeeded(api.NamespaceDefault); err != nil { glog.Errorf("Can't create master namespace: %v", err) } if m.serviceReadWriteIP != nil { - if err := m.createMasterServiceIfNeeded("kubernetes", m.serviceReadWriteIP, m.serviceReadWritePort); err != nil && !errors.IsAlreadyExists(err) { + if err := m.CreateMasterServiceIfNeeded("kubernetes", m.serviceReadWriteIP, m.serviceReadWritePort); err != nil && !errors.IsAlreadyExists(err) { glog.Errorf("Can't create rw service: %v", err) } - if err := m.setEndpoints("kubernetes", m.clusterIP, m.publicReadWritePort); err != nil { + if err := m.SetEndpoints("kubernetes", m.clusterIP, m.publicReadWritePort); err != nil { glog.Errorf("Can't create rw endpoints: %v", err) } } @@ -56,21 +73,22 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) { } } -func (m *Master) roServiceWriterLoop(stop chan struct{}) { +// ROServiceWriterLoop is exposed for downstream consumers of master +func (m *Master) ROServiceWriterLoop(stop chan struct{}) { t := time.NewTicker(10 * time.Second) defer t.Stop() for { // Update service & endpoint records. // TODO: when it becomes possible to change this stuff, // stop polling and start watching. - if err := m.createMasterNamespaceIfNeeded(api.NamespaceDefault); err != nil { + if err := m.CreateMasterNamespaceIfNeeded(api.NamespaceDefault); err != nil { glog.Errorf("Can't create master namespace: %v", err) } if m.serviceReadOnlyIP != nil { - if err := m.createMasterServiceIfNeeded("kubernetes-ro", m.serviceReadOnlyIP, m.serviceReadOnlyPort); err != nil && !errors.IsAlreadyExists(err) { + if err := m.CreateMasterServiceIfNeeded("kubernetes-ro", m.serviceReadOnlyIP, m.serviceReadOnlyPort); err != nil && !errors.IsAlreadyExists(err) { glog.Errorf("Can't create ro service: %v", err) } - if err := m.setEndpoints("kubernetes-ro", m.clusterIP, m.publicReadOnlyPort); err != nil { + if err := m.SetEndpoints("kubernetes-ro", m.clusterIP, m.publicReadOnlyPort); err != nil { glog.Errorf("Can't create ro endpoints: %v", err) } } @@ -83,8 +101,8 @@ func (m *Master) roServiceWriterLoop(stop chan struct{}) { } } -// createMasterNamespaceIfNeeded will create the namespace that contains the master services if it doesn't already exist -func (m *Master) createMasterNamespaceIfNeeded(ns string) error { +// CreateMasterNamespaceIfNeeded will create the namespace that contains the master services if it doesn't already exist +func (m *Master) CreateMasterNamespaceIfNeeded(ns string) error { ctx := api.NewContext() if _, err := m.namespaceRegistry.GetNamespace(ctx, api.NamespaceDefault); err == nil { // the namespace already exists @@ -103,9 +121,9 @@ func (m *Master) createMasterNamespaceIfNeeded(ns string) error { return err } -// createMasterServiceIfNeeded will create the specified service if it +// CreateMasterServiceIfNeeded will create the specified service if it // doesn't already exist. -func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePort int) error { +func (m *Master) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePort int) error { ctx := api.NewDefaultContext() if _, err := m.serviceRegistry.GetService(ctx, serviceName); err == nil { // The service already exists. @@ -132,20 +150,20 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I return err } -// setEndpoints sets the endpoints for the given apiserver service (ro or rw). -// setEndpoints expects that the endpoints objects it manages will all be -// managed only by setEndpoints; therefore, to understand this, you need only +// SetEndpoints sets the endpoints for the given apiserver service (ro or rw). +// SetEndpoints expects that the endpoints objects it manages will all be +// managed only by SetEndpoints; therefore, to understand this, you need only // understand the requirements and the body of this function. // // Requirements: // * All apiservers MUST use the same ports for their {rw, ro} services. -// * All apiservers MUST use setEndpoints and only setEndpoints to manage the +// * All apiservers MUST use SetEndpoints and only SetEndpoints to manage the // endpoints for their {rw, ro} services. // * All apiservers MUST know and agree on the number of apiservers expected // to be running (m.masterCount). -// * setEndpoints is called periodically from all apiservers. +// * SetEndpoints is called periodically from all apiservers. // -func (m *Master) setEndpoints(serviceName string, ip net.IP, port int) error { +func (m *Master) SetEndpoints(serviceName string, ip net.IP, port int) error { ctx := api.NewDefaultContext() e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) if err != nil { diff --git a/pkg/registry/service/ip_allocator.go b/pkg/registry/service/ip_allocator.go deleted file mode 100644 index abb9b7f68dc..00000000000 --- a/pkg/registry/service/ip_allocator.go +++ /dev/null @@ -1,250 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package service - -import ( - "fmt" - math_rand "math/rand" - "net" - "sync" - "time" - - "github.com/golang/glog" -) - -type ipAllocator struct { - lock sync.Mutex // protects 'used' - - subnet net.IPNet - ipSpaceSize int64 // Size of subnet, or -1 if it does not fit in an int64 - used ipAddrSet - randomAttempts int - - random *math_rand.Rand -} - -type ipAddrSet struct { - // We are pretty severely restricted in the types of things we can use as a key - ips map[string]bool -} - -func (s *ipAddrSet) Init() { - s.ips = map[string]bool{} -} - -// Gets the number of IPs in the set -func (s *ipAddrSet) Size() int { - return len(s.ips) -} - -// Tests whether the set holds a given IP -func (s *ipAddrSet) Contains(ip net.IP) bool { - key := ip.String() - exists := s.ips[key] - return exists -} - -// Adds to the ipAddrSet; returns true iff it was added (was not already in set) -func (s *ipAddrSet) Add(ip net.IP) bool { - key := ip.String() - exists := s.ips[key] - if exists { - return false - } - s.ips[key] = true - return true -} - -// Removes from the ipAddrSet; returns true iff it was removed (was already in set) -func (s *ipAddrSet) Remove(ip net.IP) bool { - key := ip.String() - exists := s.ips[key] - if !exists { - return false - } - delete(s.ips, key) - // TODO: We probably should add this IP to an 'embargo' list for a limited amount of time - - return true -} - -// The smallest number of IPs we accept. -const minIPSpace = 8 - -// newIPAllocator creates and intializes a new ipAllocator object. -func newIPAllocator(subnet *net.IPNet) *ipAllocator { - if subnet == nil || subnet.IP == nil || subnet.Mask == nil { - return nil - } - - seed := time.Now().UTC().UnixNano() - r := math_rand.New(math_rand.NewSource(seed)) - - ipSpaceSize := int64(-1) - ones, bits := subnet.Mask.Size() - if (bits - ones) < 63 { - ipSpaceSize = int64(1) << uint(bits-ones) - - if ipSpaceSize < minIPSpace { - glog.Errorf("IPAllocator requires at least %d IPs", minIPSpace) - return nil - } - } - - ipa := &ipAllocator{ - subnet: *subnet, - ipSpaceSize: ipSpaceSize, - random: r, - randomAttempts: 1000, - } - ipa.used.Init() - - network := make(net.IP, len(subnet.IP), len(subnet.IP)) - for i := 0; i < len(subnet.IP); i++ { - network[i] = subnet.IP[i] & subnet.Mask[i] - } - ipa.used.Add(network) // block the network addr - - broadcast := make(net.IP, len(subnet.IP), len(subnet.IP)) - for i := 0; i < len(subnet.IP); i++ { - broadcast[i] = subnet.IP[i] | ^subnet.Mask[i] - } - ipa.used.Add(broadcast) // block the broadcast addr - - return ipa -} - -// Allocate allocates a specific IP. This is useful when recovering saved state. -func (ipa *ipAllocator) Allocate(ip net.IP) error { - ipa.lock.Lock() - defer ipa.lock.Unlock() - - if !ipa.subnet.Contains(ip) { - return fmt.Errorf("IP %s does not fall within subnet %s", ip, ipa.subnet) - } - - if !ipa.used.Add(ip) { - return fmt.Errorf("IP %s is already allocated", ip) - } - - return nil -} - -// AllocateNext allocates and returns a new IP. -func (ipa *ipAllocator) AllocateNext() (net.IP, error) { - ipa.lock.Lock() - defer ipa.lock.Unlock() - - if int64(ipa.used.Size()) == ipa.ipSpaceSize { - return nil, fmt.Errorf("can't find a free IP in %s", ipa.subnet) - } - - // Try randomly first - for i := 0; i < ipa.randomAttempts; i++ { - ip := ipa.createRandomIp() - - if ipa.used.Add(ip) { - return ip, nil - } - } - - // If that doesn't work, try a linear search - ip := copyIP(ipa.subnet.IP) - for ipa.subnet.Contains(ip) { - ip = ipAdd(ip, 1) - if ipa.used.Add(ip) { - return ip, nil - } - } - - return nil, fmt.Errorf("can't find a free IP in %s", ipa.subnet) -} - -// Returns the index-th IP from the specified subnet range. -// For example, subnet "10.0.0.0/24" with index "2" will return the IP "10.0.0.2". -// TODO(saad-ali): Move this (and any other functions that are independent of ipAllocator) to some -// place more generic. -func GetIndexedIP(subnet *net.IPNet, index int) (net.IP, error) { - ip := ipAdd(subnet.IP, index /* offset */) - if !subnet.Contains(ip) { - return nil, fmt.Errorf("can't generate IP with index %d from subnet. subnet too small. subnet: %q", index, subnet) - } - return ip, nil -} - -func (ipa *ipAllocator) createRandomIp() net.IP { - ip := ipa.subnet.IP - mask := ipa.subnet.Mask - n := len(ip) - - randomIp := make(net.IP, n, n) - - for i := 0; i < n; i++ { - if mask[i] == 0xff { - randomIp[i] = ipa.subnet.IP[i] - } else { - b := byte(ipa.random.Intn(256)) - randomIp[i] = (ipa.subnet.IP[i] & mask[i]) | (b &^ mask[i]) - } - } - - return randomIp -} - -// Add an offset to an IP address - used for joining network addr and host addr parts. -func ipAdd(ip net.IP, offset int) net.IP { - out := copyIP(simplifyIP(ip)) - // Loop from least-significant to most. - for i := len(out) - 1; i >= 0 && offset > 0; i-- { - add := offset % 256 - result := int(out[i]) + add - out[i] = byte(result % 256) - offset >>= 8 - offset += result / 256 // carry - } - return out -} - -// Get the optimal slice for an IP. IPv4 addresses will come back in a 4 byte slice. IPv6 -// addresses will come back in a 16 byte slice. Non-IP arguments will produce nil. -func simplifyIP(in net.IP) net.IP { - if ip4 := in.To4(); ip4 != nil { - return ip4 - } - return in.To16() -} - -// Make a copy of a net.IP. It appears to be a value type, but it is actually defined as a -// slice, so value assignment is shallow. Why does a poor dumb user like me need to know -// this sort of implementation detail? -func copyIP(in net.IP) net.IP { - out := make(net.IP, len(in)) - copy(out, in) - return out -} - -// Release de-allocates an IP. -func (ipa *ipAllocator) Release(ip net.IP) error { - ipa.lock.Lock() - defer ipa.lock.Unlock() - - if !ipa.subnet.Contains(ip) { - return fmt.Errorf("IP %s does not fall within subnet %s", ip, ipa.subnet) - } - ipa.used.Remove(ip) - return nil -} diff --git a/pkg/registry/service/ip_allocator_test.go b/pkg/registry/service/ip_allocator_test.go deleted file mode 100644 index a775443c6c5..00000000000 --- a/pkg/registry/service/ip_allocator_test.go +++ /dev/null @@ -1,313 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package service - -import ( - "net" - "testing" -) - -func TestNew(t *testing.T) { - if newIPAllocator(nil) != nil { - t.Errorf("expected nil") - } - if newIPAllocator(&net.IPNet{}) != nil { - t.Errorf("expected nil") - } - _, ipnet, err := net.ParseCIDR("93.76.0.0/22") - if err != nil { - t.Error(err) - } - ipa := newIPAllocator(ipnet) - if ipa == nil { - t.Errorf("expected non-nil") - } - if ipa.ipSpaceSize != 1024 { - t.Errorf("wrong size for ipa.ipSpaceSize") - } - if ipa.used.Size() != 2 { - t.Errorf("wrong size() for ipa.used") - } - if !ipa.used.Contains(net.ParseIP("93.76.0.0")) { - t.Errorf("network address was not reserved") - } - if !ipa.used.Contains(net.ParseIP("93.76.3.255")) { - t.Errorf("broadcast address was not reserved") - } -} - -func TestAllocate(t *testing.T) { - _, ipnet, _ := net.ParseCIDR("93.76.0.0/22") - ipa := newIPAllocator(ipnet) - - if err := ipa.Allocate(net.ParseIP("93.76.0.0")); err == nil { - t.Errorf("expected failure") - } - - if err := ipa.Allocate(net.ParseIP("93.76.3.255")); err == nil { - t.Errorf("expected failure") - } - - if err := ipa.Allocate(net.ParseIP("93.76.0.1")); err != nil { - t.Errorf("expected success, got %s", err) - } - - if ipa.Allocate(net.ParseIP("93.76.0.1")) == nil { - t.Errorf("expected failure") - } - - if ipa.Allocate(net.ParseIP("1.2.3.4")) == nil { - t.Errorf("expected failure") - } -} - -func TestAllocateNext(t *testing.T) { - _, ipnet, _ := net.ParseCIDR("93.76.0.0/22") - ipa := newIPAllocator(ipnet) - - // Turn off random allocation attempts, so we just allocate in sequence - ipa.randomAttempts = 0 - - ip1, err := ipa.AllocateNext() - if err != nil { - t.Error(err) - } - if !ip1.Equal(net.ParseIP("93.76.0.1")) { - t.Errorf("expected 93.76.0.1, got %s", ip1) - } - - ip2, err := ipa.AllocateNext() - if err != nil { - t.Error(err) - } - if !ip2.Equal(net.ParseIP("93.76.0.2")) { - t.Errorf("expected 93.76.0.2, got %s", ip2) - } - - // Burn a bunch of addresses. - for i := 3; i < 256; i++ { - ipa.AllocateNext() - } - - ip256, err := ipa.AllocateNext() - if err != nil { - t.Error(err) - } - if !ip256.Equal(net.ParseIP("93.76.1.0")) { - t.Errorf("expected 93.76.1.0, got %s", ip256) - } - - ip257, err := ipa.AllocateNext() - if err != nil { - t.Error(err) - } - if !ip257.Equal(net.ParseIP("93.76.1.1")) { - t.Errorf("expected 93.76.1.1, got %s", ip257) - } - - // Burn a bunch of addresses. - for i := 258; i < 1022; i++ { - ipa.AllocateNext() - } - - ip1022, err := ipa.AllocateNext() - if err != nil { - t.Error(err) - } - if !ip1022.Equal(net.ParseIP("93.76.3.254")) { - t.Errorf("expected 93.76.3.254, got %s", ip1022) - } - - _, err = ipa.AllocateNext() - if err == nil { - t.Errorf("Expected nil - allocator is full") - } -} - -func TestRelease(t *testing.T) { - _, ipnet, _ := net.ParseCIDR("93.76.0.0/24") - ipa := newIPAllocator(ipnet) - - ipa.randomAttempts = 0 - - err := ipa.Release(net.ParseIP("1.2.3.4")) - if err == nil { - t.Errorf("Expected an error") - } - - ip1, err := ipa.AllocateNext() - if err != nil { - t.Error(err) - } - ip2, err := ipa.AllocateNext() - if err != nil { - t.Error(err) - } - _, err = ipa.AllocateNext() - if err != nil { - t.Error(err) - } - - err = ipa.Release(ip2) - if err != nil { - t.Error(err) - } - - ip4, err := ipa.AllocateNext() - if !ip4.Equal(ip2) { - t.Errorf("Expected %s, got %s", ip2, ip4) - } - - // Burn a bunch of addresses. - for i := 4; i < 255; i++ { - ipa.AllocateNext() - } - _, err = ipa.AllocateNext() - if err == nil { - t.Errorf("Expected an error") - } - ipa.Release(ip1) - - ip5, err := ipa.AllocateNext() - if !ip5.Equal(ip1) { - t.Errorf("Expected %s, got %s", ip1, ip5) - } -} - -func TestIPAdd(t *testing.T) { - testCases := []struct { - ip string - offset int - expected string - }{ - {"1.2.3.0", 0, "1.2.3.0"}, - {"1.2.3.0", 1, "1.2.3.1"}, - {"1.2.3.0", 255, "1.2.3.255"}, - {"1.2.3.1", 255, "1.2.4.0"}, - {"1.2.3.2", 255, "1.2.4.1"}, - {"1.2.3.0", 256, "1.2.4.0"}, - {"1.2.3.0", 257, "1.2.4.1"}, - {"1.2.3.0", 65536, "1.3.3.0"}, - {"1.2.3.4", 1, "1.2.3.5"}, - {"255.255.255.255", 1, "0.0.0.0"}, - {"255.255.255.255", 2, "0.0.0.1"}, - } - for _, tc := range testCases { - r := ipAdd(net.ParseIP(tc.ip), tc.offset) - if !r.Equal(net.ParseIP(tc.expected)) { - t.Errorf("Expected %s, got %s", tc.expected, r) - } - } -} - -func TestGenerateFirstTwoIPsFromSubnet(t *testing.T) { - // Arrange - testCases := []struct { - subnet string - expected1stIP string - expected2ndIP string - }{ - {"10.0.0.0/24", "10.0.0.1", "10.0.0.2"}, - {"10.10.10.10/24", "10.10.10.1", "10.10.10.2"}, - {"10.10.10.10/16", "10.10.0.1", "10.10.0.2"}, - {"10.10.10.10/8", "10.0.0.1", "10.0.0.2"}, - {"10.10.10.10/0", "0.0.0.1", "0.0.0.2"}, - {"192.168.100.1/16", "192.168.0.1", "192.168.0.2"}, - {"153.15.250.5/23", "153.15.250.1", "153.15.250.2"}, - {"2001:db8::/48", "2001:db8::1", "2001:db8::2"}, - {"2001:db8:123:255::/48", "2001:db8:123::1", "2001:db8:123::2"}, - {"12.12.0.0/30", "12.12.0.1", "12.12.0.2"}, - } - - // Act & Assert - for _, testCase := range testCases { - _, subnet, _ := net.ParseCIDR(testCase.subnet) - firstIP, err := GetIndexedIP(subnet, 1) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - secondIP, err := GetIndexedIP(subnet, 2) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - if firstIP.String() != testCase.expected1stIP { - t.Errorf("Unexpected first IP: Expected <%q> Actual <%q>", testCase.expected1stIP, firstIP.String()) - } - if secondIP.String() != testCase.expected2ndIP { - t.Errorf("Unexpected second IP: Expected <%q> Actual <%q>", testCase.expected2ndIP, secondIP.String()) - } - } -} - -func TestGetIndexedIPSubnetTooSmall(t *testing.T) { - // Arrange - testCases := []struct { - subnet string - }{ - {"12.12.0.0/32"}, - {"12.12.0.0/31"}, - } - - // Act & Assert - for _, testCase := range testCases { - _, subnet, _ := net.ParseCIDR(testCase.subnet) - secondIP, err := GetIndexedIP(subnet, 2) - if err == nil { - t.Errorf("Expected error but no error occured for subnet: %s", testCase.subnet) - } - thirdIP, err := GetIndexedIP(subnet, 3) - if err == nil { - t.Errorf("Expected error but no error occured for subnet: %s", testCase.subnet) - } - if secondIP != nil { - t.Errorf("Unexpected second IP: Expected nil Actual <%q>", thirdIP.String()) - } - if thirdIP != nil { - t.Errorf("Unexpected third IP: Expected nil Actual <%q>", secondIP.String()) - } - - } -} - -func TestCopyIP(t *testing.T) { - ip1 := net.ParseIP("1.2.3.4") - ip2 := copyIP(ip1) - ip2[0]++ - if ip1[0] == ip2[0] { - t.Errorf("copyIP did not copy") - } -} - -func TestSimplifyIP(t *testing.T) { - ip4 := net.ParseIP("1.2.3.4") - if len(ip4) != 16 { - t.Errorf("expected 16 bytes") - } - if len(simplifyIP(ip4)) != 4 { - t.Errorf("expected 4 bytes") - } - ip6 := net.ParseIP("::1.2.3.4") - if len(ip6) != 16 { - t.Errorf("expected 16 bytes") - } - if len(simplifyIP(ip6)) != 16 { - t.Errorf("expected 16 bytes") - } - if simplifyIP([]byte{0, 0}) != nil { - t.Errorf("expected nil") - } -} diff --git a/pkg/registry/service/ipallocator/allocator.go b/pkg/registry/service/ipallocator/allocator.go new file mode 100644 index 00000000000..47623d1c881 --- /dev/null +++ b/pkg/registry/service/ipallocator/allocator.go @@ -0,0 +1,334 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipallocator + +import ( + "errors" + "fmt" + "math/big" + "math/rand" + "net" + "sync" +) + +// Interface manages the allocation of IP addresses out of a range. Interface +// should be threadsafe. +type Interface interface { + Allocate(net.IP) error + AllocateNext() (net.IP, error) + Release(net.IP) error +} + +// Snapshottable is an Interface that can be snapshotted and restored. Snapshottable +// should be threadsafe. +type Snapshottable interface { + Interface + Snapshot() (*net.IPNet, []byte) + Restore(*net.IPNet, []byte) error +} + +var ( + ErrFull = errors.New("range is full") + ErrNotInRange = errors.New("provided IP is not in the valid range") + ErrAllocated = errors.New("provided IP is already allocated") + ErrMismatchedNetwork = errors.New("the provided network does not match the current range") + ErrAllocationDisabled = errors.New("IP addresses cannot be allocated at this time") +) + +// Range is a contiguous block of IPs that can be allocated atomically. +// +// The internal structure of the range is: +// +// For CIDR 10.0.0.0/24 +// 254 addresses usable out of 256 total (minus base and broadcast IPs) +// The number of usable addresses is r.max +// +// CIDR base IP CIDR broadcast IP +// 10.0.0.0 10.0.0.255 +// | | +// 0 1 2 3 4 5 ... ... 253 254 255 +// | | +// r.base r.base + r.max +// | | +// first bit of r.allocated last bit of r.allocated +// +// If an address is taken, the bit at offset: +// +// bit offset := IP - r.base +// +// is set to one. r.count is always equal to the number of set bits and +// can be recalculated at any time by counting the set bits in r.allocated. +// +// TODO: use RLE and compact the allocator to minimize space. +type Range struct { + net *net.IPNet + // base is a cached version of the start IP in the CIDR range as a *big.Int + base *big.Int + // strategy is the strategy for choosing the next available IP out of the range + strategy allocateStrategy + // max is the maximum size of the usable addresses in the range + max int + + // lock guards the following members + lock sync.Mutex + // count is the number of currently allocated elements in the range + count int + // allocated is a bit array of the allocated ips in the range + allocated *big.Int +} + +// allocateStrategy is a search strategy in the allocation map for a valid IP. +type allocateStrategy func(allocated *big.Int, max, count int) (int, error) + +// NewCIDRRange creates a Range over a net.IPNet. +func NewCIDRRange(cidr *net.IPNet) *Range { + max := RangeSize(cidr) + base := bigForIP(cidr.IP) + r := Range{ + net: cidr, + strategy: randomScanStrategy, + base: base.Add(base, big.NewInt(1)), // don't use the network base + max: maximum(0, int(max-2)), // don't use the network broadcast + + allocated: big.NewInt(0), + count: 0, + } + return &r +} + +func maximum(a, b int) int { + if a > b { + return a + } + return b +} + +// Free returns the count of IP addresses left in the range. +func (r *Range) Free() int { + r.lock.Lock() + defer r.lock.Unlock() + return r.max - r.count +} + +// Allocate attempts to reserve the provided IP. ErrNotInRange or +// ErrAllocated will be returned if the IP is not valid for this range +// or has already been reserved. ErrFull will be returned if there +// are no addresses left. +func (r *Range) Allocate(ip net.IP) error { + ok, offset := r.contains(ip) + if !ok { + return ErrNotInRange + } + + r.lock.Lock() + defer r.lock.Unlock() + + if r.allocated.Bit(offset) == 1 { + return ErrAllocated + } + r.allocated = r.allocated.SetBit(r.allocated, offset, 1) + r.count++ + return nil +} + +// AllocateNext reserves one of the IPs from the pool. ErrFull may +// be returned if there are no addresses left. +func (r *Range) AllocateNext() (net.IP, error) { + r.lock.Lock() + defer r.lock.Unlock() + + next, err := r.strategy(r.allocated, r.max, r.count) + if err != nil { + return nil, err + } + r.count++ + r.allocated = r.allocated.SetBit(r.allocated, next, 1) + return addIPOffset(r.base, next), nil +} + +// Release releases the IP back to the pool. Releasing an +// unallocated IP or an IP out of the range is a no-op and +// returns no error. +func (r *Range) Release(ip net.IP) error { + ok, offset := r.contains(ip) + if !ok { + return nil + } + + r.lock.Lock() + defer r.lock.Unlock() + + if r.allocated.Bit(offset) == 0 { + return nil + } + + r.allocated = r.allocated.SetBit(r.allocated, offset, 0) + r.count-- + return nil +} + +// Has returns true if the provided IP is already allocated and a call +// to Allocate(ip) would fail with ErrAllocated. +func (r *Range) Has(ip net.IP) bool { + ok, offset := r.contains(ip) + if !ok { + return false + } + + r.lock.Lock() + defer r.lock.Unlock() + + return r.allocated.Bit(offset) == 1 +} + +// Snapshot saves the current state of the pool. +func (r *Range) Snapshot() (*net.IPNet, []byte) { + r.lock.Lock() + defer r.lock.Unlock() + + return r.net, r.allocated.Bytes() +} + +// Restore restores the pool to the previously captured state. ErrMismatchedNetwork +// is returned if the provided IPNet range doesn't exactly match the previous range. +func (r *Range) Restore(net *net.IPNet, data []byte) error { + r.lock.Lock() + defer r.lock.Unlock() + + if !net.IP.Equal(r.net.IP) || net.Mask.String() != r.net.Mask.String() { + return ErrMismatchedNetwork + } + r.allocated = big.NewInt(0).SetBytes(data) + r.count = countBits(r.allocated) + return nil +} + +// contains returns true and the offset if the ip is in the range, and false +// and nil otherwise. The first and last addresses of the CIDR are omitted. +func (r *Range) contains(ip net.IP) (bool, int) { + if !r.net.Contains(ip) { + return false, 0 + } + + offset := calculateIPOffset(r.base, ip) + if offset < 0 || offset >= r.max { + return false, 0 + } + return true, offset +} + +// bigForIP creates a big.Int based on the provided net.IP +func bigForIP(ip net.IP) *big.Int { + b := ip.To4() + if b == nil { + b = ip.To16() + } + return big.NewInt(0).SetBytes(b) +} + +// addIPOffset adds the provided integer offset to a base big.Int representing a +// net.IP +func addIPOffset(base *big.Int, offset int) net.IP { + return net.IP(big.NewInt(0).Add(base, big.NewInt(int64(offset))).Bytes()) +} + +// calculateIPOffset calculates the integer offset of ip from base such that +// base + offset = ip. It requires ip >= base. +func calculateIPOffset(base *big.Int, ip net.IP) int { + return int(big.NewInt(0).Sub(bigForIP(ip), base).Int64()) +} + +// randomScanStrategy chooses a random address from the provided big.Int, and then +// scans forward looking for the next available address (it will wrap the range if +// necessary). +func randomScanStrategy(allocated *big.Int, max, count int) (int, error) { + if count >= max { + return 0, ErrFull + } + offset := rand.Intn(max) + for i := 0; i < max; i++ { + at := (offset + i) % max + if allocated.Bit(at) == 0 { + return at, nil + } + } + return 0, ErrFull +} + +// countBits returns the number of set bits in n +func countBits(n *big.Int) int { + var count int = 0 + for _, b := range n.Bytes() { + count += int(bitCounts[b]) + } + return count +} + +// bitCounts is all of the bits counted for each number between 0-255 +var bitCounts = []int8{ + 0, 1, 1, 2, 1, 2, 2, 3, + 1, 2, 2, 3, 2, 3, 3, 4, + 1, 2, 2, 3, 2, 3, 3, 4, + 2, 3, 3, 4, 3, 4, 4, 5, + 1, 2, 2, 3, 2, 3, 3, 4, + 2, 3, 3, 4, 3, 4, 4, 5, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 1, 2, 2, 3, 2, 3, 3, 4, + 2, 3, 3, 4, 3, 4, 4, 5, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, + 1, 2, 2, 3, 2, 3, 3, 4, + 2, 3, 3, 4, 3, 4, 4, 5, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, + 2, 3, 3, 4, 3, 4, 4, 5, + 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, + 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, + 4, 5, 5, 6, 5, 6, 6, 7, + 5, 6, 6, 7, 6, 7, 7, 8, +} + +// RangeSize returns the size of a range in valid addresses. +func RangeSize(subnet *net.IPNet) int64 { + ones, bits := subnet.Mask.Size() + if (bits - ones) >= 31 { + panic("masks greater than 31 bits are not supported") + } + max := int64(1) << uint(bits-ones) + return max +} + +// GetIndexedIP returns a net.IP that is subnet.IP + index in the contiguous IP space. +func GetIndexedIP(subnet *net.IPNet, index int) (net.IP, error) { + ip := addIPOffset(bigForIP(subnet.IP), index) + if !subnet.Contains(ip) { + return nil, fmt.Errorf("can't generate IP with index %d from subnet. subnet too small. subnet: %q", index, subnet) + } + return ip, nil +} diff --git a/pkg/registry/service/ipallocator/allocator_test.go b/pkg/registry/service/ipallocator/allocator_test.go new file mode 100644 index 00000000000..b83903432bd --- /dev/null +++ b/pkg/registry/service/ipallocator/allocator_test.go @@ -0,0 +1,224 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipallocator + +import ( + "net" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +func TestAllocate(t *testing.T) { + _, cidr, err := net.ParseCIDR("192.168.1.0/24") + if err != nil { + t.Fatal(err) + } + r := NewCIDRRange(cidr) + t.Logf("base: %v", r.base.Bytes()) + if f := r.Free(); f != 254 { + t.Errorf("unexpected free %d", f) + } + found := util.NewStringSet() + count := 0 + for r.Free() > 0 { + ip, err := r.AllocateNext() + if err != nil { + t.Fatalf("error @ %d: %v", count, err) + } + count++ + if !cidr.Contains(ip) { + t.Fatalf("allocated %s which is outside of %s", ip, cidr) + } + if found.Has(ip.String()) { + t.Fatalf("allocated %s twice @ %d", ip, count) + } + found.Insert(ip.String()) + } + if _, err := r.AllocateNext(); err != ErrFull { + t.Fatal(err) + } + + released := net.ParseIP("192.168.1.5") + if err := r.Release(released); err != nil { + t.Fatal(err) + } + if f := r.Free(); f != 1 { + t.Errorf("unexpected free %d", f) + } + ip, err := r.AllocateNext() + if err != nil { + t.Fatal(err) + } + if !released.Equal(ip) { + t.Errorf("unexpected %s : %s", ip, released) + } + + if err := r.Release(released); err != nil { + t.Fatal(err) + } + if err := r.Allocate(net.ParseIP("192.168.0.1")); err != ErrNotInRange { + t.Fatal(err) + } + if err := r.Allocate(net.ParseIP("192.168.1.1")); err != ErrAllocated { + t.Fatal(err) + } + if err := r.Allocate(net.ParseIP("192.168.1.0")); err != ErrNotInRange { + t.Fatal(err) + } + if err := r.Allocate(net.ParseIP("192.168.1.255")); err != ErrNotInRange { + t.Fatal(err) + } + if f := r.Free(); f != 1 { + t.Errorf("unexpected free %d", f) + } + if err := r.Allocate(released); err != nil { + t.Fatal(err) + } + if f := r.Free(); f != 0 { + t.Errorf("unexpected free %d", f) + } +} + +func TestAllocateTiny(t *testing.T) { + _, cidr, err := net.ParseCIDR("192.168.1.0/32") + if err != nil { + t.Fatal(err) + } + r := NewCIDRRange(cidr) + if f := r.Free(); f != 0 { + t.Errorf("free: %d", f) + } + if _, err := r.AllocateNext(); err != ErrFull { + t.Error(err) + } +} + +func TestAllocateSmall(t *testing.T) { + _, cidr, err := net.ParseCIDR("192.168.1.240/30") + if err != nil { + t.Fatal(err) + } + r := NewCIDRRange(cidr) + if f := r.Free(); f != 2 { + t.Errorf("free: %d", f) + } + found := util.NewStringSet() + for i := 0; i < 2; i++ { + ip, err := r.AllocateNext() + if err != nil { + t.Fatal(err) + } + if found.Has(ip.String()) { + t.Fatalf("already reserved: %s", ip) + } + found.Insert(ip.String()) + } + for s := range found { + if !r.Has(net.ParseIP(s)) { + t.Fatalf("missing: %s", s) + } + if err := r.Allocate(net.ParseIP(s)); err != ErrAllocated { + t.Fatal(err) + } + } + for i := 0; i < 100; i++ { + if _, err := r.AllocateNext(); err != ErrFull { + t.Fatalf("suddenly became not-full: %#v", r) + } + } + + if r.count != 2 && r.Free() != 0 && r.max != 2 { + t.Fatalf("unexpected range: %v", r) + } + + t.Logf("allocated: %v", found) +} + +func TestBitCount(t *testing.T) { + for i, c := range bitCounts { + actual := 0 + for j := 0; j < 8; j++ { + if ((1 << uint(j)) & i) != 0 { + actual++ + } + } + if actual != int(c) { + t.Errorf("%d should have %d bits but recorded as %d", i, actual, c) + } + } +} + +func TestRangeSize(t *testing.T) { + testCases := map[string]int64{ + "192.168.1.0/24": 256, + "192.168.1.0/32": 1, + "192.168.1.0/31": 2, + } + for k, v := range testCases { + _, cidr, err := net.ParseCIDR(k) + if err != nil { + t.Fatal(err) + } + if size := RangeSize(cidr); size != v { + t.Errorf("%s should have a range size of %d, got %d", k, v, size) + } + } +} + +func TestSnapshot(t *testing.T) { + _, cidr, err := net.ParseCIDR("192.168.1.0/24") + if err != nil { + t.Fatal(err) + } + r := NewCIDRRange(cidr) + ip := []net.IP{} + for i := 0; i < 10; i++ { + n, err := r.AllocateNext() + if err != nil { + t.Fatal(err) + } + ip = append(ip, n) + } + + network, data := r.Snapshot() + if !network.IP.Equal(cidr.IP) || network.Mask.String() != cidr.Mask.String() { + t.Fatalf("mismatched networks: %s : %s", network, cidr) + } + + _, otherCidr, err := net.ParseCIDR("192.168.2.0/24") + if err != nil { + t.Fatal(err) + } + other := NewCIDRRange(otherCidr) + if err := r.Restore(otherCidr, data); err != ErrMismatchedNetwork { + t.Fatal(err) + } + other = NewCIDRRange(network) + if err := other.Restore(network, data); err != nil { + t.Fatal(err) + } + + for _, n := range ip { + if !other.Has(n) { + t.Errorf("restored range does not have %s", n) + } + } + if other.Free() != r.Free() { + t.Errorf("counts do not match: %d", other.Free()) + } +} diff --git a/pkg/registry/service/ipallocator/controller/repair.go b/pkg/registry/service/ipallocator/controller/repair.go new file mode 100644 index 00000000000..29ef7e7ff22 --- /dev/null +++ b/pkg/registry/service/ipallocator/controller/repair.go @@ -0,0 +1,120 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "net" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Repair is a controller loop that periodically examines all service PortalIP allocations +// and logs any errors, and then sets the compacted and accurate list of all allocated IPs. +// +// Handles: +// * Duplicate PortalIP assignments caused by operator action or undetected race conditions +// * PortalIPs that do not match the current portal network +// * Allocations to services that were not actually created due to a crash or powerloss +// * Migrates old versions of Kubernetes services into the atomic ipallocator model automatically +// +// Can be run at infrequent intervals, and is best performed on startup of the master. +// Is level driven and idempotent - all valid PortalIPs will be updated into the ipallocator +// map at the end of a single execution loop if no race is encountered. +// +// TODO: allocate new IPs if necessary +// TODO: perform repair? +type Repair struct { + interval time.Duration + registry service.Registry + network *net.IPNet + alloc service.IPRegistry +} + +// NewRepair creates a controller that periodically ensures that all portalIPs are uniquely allocated across the cluster +// and generates informational warnings for a cluster that is not in sync. +func NewRepair(interval time.Duration, registry service.Registry, network *net.IPNet, alloc service.IPRegistry) *Repair { + return &Repair{ + interval: interval, + registry: registry, + network: network, + alloc: alloc, + } +} + +// RunUntil starts the controller until the provided ch is closed. +func (c *Repair) RunUntil(ch chan struct{}) { + util.Until(func() { + if err := c.RunOnce(); err != nil { + util.HandleError(err) + } + }, c.interval, ch) +} + +// RunOnce verifies the state of the portal IP allocations and returns an error if an unrecoverable problem occurs. +func (c *Repair) RunOnce() error { + latest, err := c.alloc.Get() + if err != nil { + return fmt.Errorf("unable to refresh the service IP block: %v", err) + } + + ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll) + list, err := c.registry.ListServices(ctx) + if err != nil { + return fmt.Errorf("unable to refresh the service IP block: %v", err) + } + + r := ipallocator.NewCIDRRange(c.network) + for _, svc := range list.Items { + if !api.IsServiceIPSet(&svc) { + continue + } + ip := net.ParseIP(svc.Spec.PortalIP) + if ip == nil { + // portal IP is broken, reallocate + util.HandleError(fmt.Errorf("the portal IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.PortalIP, svc.Name, svc.Namespace)) + continue + } + switch err := r.Allocate(ip); err { + case nil: + case ipallocator.ErrAllocated: + // TODO: send event + // portal IP is broken, reallocate + util.HandleError(fmt.Errorf("the portal IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace)) + case ipallocator.ErrNotInRange: + // TODO: send event + // portal IP is broken, reallocate + util.HandleError(fmt.Errorf("the portal IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network)) + case ipallocator.ErrFull: + // TODO: send event + return fmt.Errorf("the service CIDR %s is full; you must widen the CIDR in order to create new services") + default: + return fmt.Errorf("unable to allocate portal IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err) + } + } + + service.SnapshotRange(latest, r) + + if err := c.alloc.CreateOrUpdate(latest); err != nil { + return fmt.Errorf("unable to persist the updated service IP allocations: %v", err) + } + return nil +} diff --git a/pkg/registry/service/ipallocator/controller/repair_test.go b/pkg/registry/service/ipallocator/controller/repair_test.go new file mode 100644 index 00000000000..8a59e802ff8 --- /dev/null +++ b/pkg/registry/service/ipallocator/controller/repair_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "net" + "strings" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" +) + +type mockIPRegistry struct { + getCalled bool + item *api.RangeAllocation + err error + + updateCalled bool + updated *api.RangeAllocation + updateErr error +} + +func (r *mockIPRegistry) Get() (*api.RangeAllocation, error) { + r.getCalled = true + return r.item, r.err +} + +func (r *mockIPRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error { + r.updateCalled = true + r.updated = alloc + return r.updateErr +} + +func TestRepair(t *testing.T) { + registry := registrytest.NewServiceRegistry() + _, cidr, _ := net.ParseCIDR("192.168.1.0/24") + ipregistry := &mockIPRegistry{ + item: &api.RangeAllocation{}, + } + r := NewRepair(0, registry, cidr, ipregistry) + + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + if !ipregistry.updateCalled || ipregistry.updated == nil || ipregistry.updated.Range != cidr.String() || ipregistry.updated != ipregistry.item { + t.Errorf("unexpected ipregistry: %#v", ipregistry) + } + + ipregistry = &mockIPRegistry{ + item: &api.RangeAllocation{}, + updateErr: fmt.Errorf("test error"), + } + r = NewRepair(0, registry, cidr, ipregistry) + if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { + t.Fatal(err) + } +} + +func TestRepairEmpty(t *testing.T) { + _, cidr, _ := net.ParseCIDR("192.168.1.0/24") + previous := ipallocator.NewCIDRRange(cidr) + previous.Allocate(net.ParseIP("192.168.1.10")) + network, data := previous.Snapshot() + + registry := registrytest.NewServiceRegistry() + ipregistry := &mockIPRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "1", + }, + Range: network.String(), + Data: data, + }, + } + r := NewRepair(0, registry, cidr, ipregistry) + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after := ipallocator.NewCIDRRange(cidr) + if err := after.Restore(cidr, ipregistry.updated.Data); err != nil { + t.Fatal(err) + } + if after.Has(net.ParseIP("192.168.1.10")) { + t.Errorf("unexpected ipallocator state: %#v", after) + } +} + +func TestRepairWithExisting(t *testing.T) { + _, cidr, _ := net.ParseCIDR("192.168.1.0/24") + previous := ipallocator.NewCIDRRange(cidr) + network, data := previous.Snapshot() + registry := registrytest.NewServiceRegistry() + registry.List = api.ServiceList{ + Items: []api.Service{ + { + Spec: api.ServiceSpec{PortalIP: "192.168.1.1"}, + }, + { + Spec: api.ServiceSpec{PortalIP: "192.168.1.100"}, + }, + { // outside CIDR, will be dropped + Spec: api.ServiceSpec{PortalIP: "192.168.0.1"}, + }, + { // empty, ignored + Spec: api.ServiceSpec{PortalIP: ""}, + }, + { // duplicate, dropped + Spec: api.ServiceSpec{PortalIP: "192.168.1.1"}, + }, + { // headless + Spec: api.ServiceSpec{PortalIP: "None"}, + }, + }, + } + + ipregistry := &mockIPRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: api.ObjectMeta{ + ResourceVersion: "1", + }, + Range: network.String(), + Data: data, + }, + } + r := NewRepair(0, registry, cidr, ipregistry) + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after := ipallocator.NewCIDRRange(cidr) + if err := after.Restore(cidr, ipregistry.updated.Data); err != nil { + t.Fatal(err) + } + if !after.Has(net.ParseIP("192.168.1.1")) || !after.Has(net.ParseIP("192.168.1.100")) { + t.Errorf("unexpected ipallocator state: %#v", after) + } + if after.Free() != 252 { + t.Errorf("unexpected ipallocator state: %#v", after) + } +} diff --git a/pkg/registry/service/ipallocator/etcd/etcd.go b/pkg/registry/service/ipallocator/etcd/etcd.go new file mode 100644 index 00000000000..a423f8bbd45 --- /dev/null +++ b/pkg/registry/service/ipallocator/etcd/etcd.go @@ -0,0 +1,194 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "fmt" + "net" + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" +) + +// Etcd exposes a service.Allocator that is backed by etcd. +// TODO: allow multiple allocations to be tried at once +// TODO: subdivide the keyspace to reduce conflicts +// TODO: investigate issuing a CAS without reading first +type Etcd struct { + lock sync.Mutex + + alloc ipallocator.Snapshottable + helper tools.EtcdHelper + last string +} + +// Etcd implements ipallocator.Interface and service.IPRegistry +var _ ipallocator.Interface = &Etcd{} +var _ service.IPRegistry = &Etcd{} + +const baseKey = "/ranges/serviceips" + +// NewEtcd returns a service PortalIP ipallocator that is backed by Etcd and can manage +// persisting the snapshot state of allocation after each allocation is made. +func NewEtcd(alloc ipallocator.Snapshottable, helper tools.EtcdHelper) *Etcd { + return &Etcd{ + alloc: alloc, + helper: helper, + } +} + +// Allocate attempts to allocate the IP locally and then in etcd. +func (e *Etcd) Allocate(ip net.IP) error { + e.lock.Lock() + defer e.lock.Unlock() + + if err := e.alloc.Allocate(ip); err != nil { + return err + } + + return e.tryUpdate(func() error { + return e.alloc.Allocate(ip) + }) +} + +// AllocateNext attempts to allocate the next IP locally and then in etcd. +func (e *Etcd) AllocateNext() (net.IP, error) { + e.lock.Lock() + defer e.lock.Unlock() + + ip, err := e.alloc.AllocateNext() + if err != nil { + return nil, err + } + + err = e.tryUpdate(func() error { + if err := e.alloc.Allocate(ip); err != nil { + if err != ipallocator.ErrAllocated { + return err + } + // update the ip here + ip, err = e.alloc.AllocateNext() + if err != nil { + return err + } + } + return nil + }) + return ip, err +} + +// Release attempts to release the provided IP locally and then in etcd. +func (e *Etcd) Release(ip net.IP) error { + e.lock.Lock() + defer e.lock.Unlock() + + if err := e.alloc.Release(ip); err != nil { + return err + } + + return e.tryUpdate(func() error { + return e.alloc.Release(ip) + }) +} + +// tryUpdate performs a read-update to persist the latest snapshot state of allocation. +func (e *Etcd) tryUpdate(fn func() error) error { + err := e.helper.GuaranteedUpdate(baseKey, &api.RangeAllocation{}, true, + func(input runtime.Object) (output runtime.Object, ttl uint64, err error) { + existing := input.(*api.RangeAllocation) + if len(existing.ResourceVersion) == 0 { + return nil, 0, ipallocator.ErrAllocationDisabled + } + if existing.ResourceVersion != e.last { + if err := service.RestoreRange(e.alloc, existing); err != nil { + return nil, 0, err + } + if err := fn(); err != nil { + return nil, 0, err + } + } + e.last = existing.ResourceVersion + service.SnapshotRange(existing, e.alloc) + return existing, 0, nil + }, + ) + return etcderr.InterpretUpdateError(err, "serviceipallocation", "") +} + +// Refresh reloads the ipallocator from etcd. +func (e *Etcd) Refresh() error { + e.lock.Lock() + defer e.lock.Unlock() + + existing := &api.RangeAllocation{} + if err := e.helper.ExtractObj(baseKey, existing, false); err != nil { + if tools.IsEtcdNotFound(err) { + return ipallocator.ErrAllocationDisabled + } + return etcderr.InterpretGetError(err, "serviceipallocation", "") + } + + return service.RestoreRange(e.alloc, existing) +} + +// Get returns an api.RangeAllocation that represents the current state in +// etcd. If the key does not exist, the object will have an empty ResourceVersion. +func (e *Etcd) Get() (*api.RangeAllocation, error) { + existing := &api.RangeAllocation{} + if err := e.helper.ExtractObj(baseKey, existing, true); err != nil { + return nil, etcderr.InterpretGetError(err, "serviceipallocation", "") + } + return existing, nil +} + +// CreateOrUpdate attempts to update the current etcd state with the provided +// allocation. +func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error { + e.lock.Lock() + defer e.lock.Unlock() + + last := "" + err := e.helper.GuaranteedUpdate(baseKey, &api.RangeAllocation{}, true, + func(input runtime.Object) (output runtime.Object, ttl uint64, err error) { + existing := input.(*api.RangeAllocation) + switch { + case len(snapshot.ResourceVersion) != 0 && len(existing.ResourceVersion) != 0: + if snapshot.ResourceVersion != existing.ResourceVersion { + return nil, 0, errors.NewConflict("serviceipallocation", "", fmt.Errorf("the provided resource version does not match")) + } + case len(existing.ResourceVersion) != 0: + return nil, 0, errors.NewConflict("serviceipallocation", "", fmt.Errorf("another caller has already initialized the resource")) + } + last = snapshot.ResourceVersion + return snapshot, 0, nil + }, + ) + if err != nil { + return etcderr.InterpretUpdateError(err, "serviceipallocation", "") + } + err = service.RestoreRange(e.alloc, snapshot) + if err == nil { + e.last = last + } + return err +} diff --git a/pkg/registry/service/ipallocator/etcd/etcd_test.go b/pkg/registry/service/ipallocator/etcd/etcd_test.go new file mode 100644 index 00000000000..1721ef041f7 --- /dev/null +++ b/pkg/registry/service/ipallocator/etcd/etcd_test.go @@ -0,0 +1,133 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "net" + "testing" + + "github.com/coreos/go-etcd/etcd" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" +) + +func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) { + fakeEtcdClient := tools.NewFakeEtcdClient(t) + fakeEtcdClient.TestIndex = true + helper := tools.NewEtcdHelper(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) + return fakeEtcdClient, helper +} + +func newStorage(t *testing.T) (ipallocator.Interface, *ipallocator.Range, *tools.FakeEtcdClient) { + fakeEtcdClient, h := newHelper(t) + _, cidr, err := net.ParseCIDR("192.168.1.0/24") + if err != nil { + t.Fatal(err) + } + r := ipallocator.NewCIDRRange(cidr) + storage := NewEtcd(r, h) + return storage, r, fakeEtcdClient +} + +func key() string { + s := "/ranges/serviceips" + return etcdtest.AddPrefix(s) +} + +func TestEmpty(t *testing.T) { + storage, _, ecli := newStorage(t) + ecli.ExpectNotFoundGet(key()) + if err := storage.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocationDisabled { + t.Fatal(err) + } +} + +func TestErrors(t *testing.T) { + storage, _, _ := newStorage(t) + if err := storage.Allocate(net.ParseIP("192.168.0.0")); err != ipallocator.ErrNotInRange { + t.Fatal(err) + } +} + +func initialObject(ecli *tools.FakeEtcdClient) { + _, cidr, _ := net.ParseCIDR("192.168.1.0/24") + ecli.Data[key()] = tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + CreatedIndex: 1, + ModifiedIndex: 2, + Value: runtime.EncodeOrDie(testapi.Codec(), &api.RangeAllocation{ + Range: cidr.String(), + }), + }, + }, + E: nil, + } +} + +func TestStore(t *testing.T) { + _, cidr, _ := net.ParseCIDR("192.168.1.0/24") + + storage, r, ecli := newStorage(t) + initialObject(ecli) + + if err := storage.Allocate(net.ParseIP("192.168.1.2")); err != nil { + t.Fatal(err) + } + if err := r.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocated { + t.Fatal(err) + } + if err := storage.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocated { + t.Fatal(err) + } + + obj := ecli.Data[key()] + if obj.R == nil || obj.R.Node == nil { + t.Fatalf("%s is empty: %#v", key(), obj) + } + t.Logf("data: %#v", obj.R.Node) + + other := ipallocator.NewCIDRRange(cidr) + + allocation := &api.RangeAllocation{} + if err := storage.(*Etcd).helper.ExtractObj(key(), allocation, false); err != nil { + t.Fatal(err) + } + if allocation.ResourceVersion != "1" { + t.Fatalf("%#v", allocation) + } + if allocation.Range != "192.168.1.0/24" { + t.Errorf("unexpected stored Range: %s", allocation.Range) + } + if err := other.Restore(cidr, allocation.Data); err != nil { + t.Fatal(err) + } + if !other.Has(net.ParseIP("192.168.1.2")) { + t.Fatalf("could not restore allocated IP: %#v", other) + } + + other = ipallocator.NewCIDRRange(cidr) + otherStorage := NewEtcd(other, storage.(*Etcd).helper) + if err := otherStorage.Allocate(net.ParseIP("192.168.1.2")); err != ipallocator.ErrAllocated { + t.Fatal(err) + } +} diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index 5b4e50f911a..7404d5ecf45 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -17,9 +17,12 @@ limitations under the License. package service import ( + "net" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -32,3 +35,29 @@ type Registry interface { UpdateService(ctx api.Context, svc *api.Service) (*api.Service, error) WatchServices(ctx api.Context, labels labels.Selector, fields fields.Selector, resourceVersion string) (watch.Interface, error) } + +// IPRegistry is a registry that can retrieve or persist a RangeAllocation object. +type IPRegistry interface { + // Get returns the latest allocation, an empty object if no allocation has been made, + // or an error if the allocation could not be retrieved. + Get() (*api.RangeAllocation, error) + // CreateOrUpdate should create or update the provide allocation, unless a conflict + // has occured since the item was last created. + CreateOrUpdate(*api.RangeAllocation) error +} + +// RestoreRange updates a snapshottable ipallocator from a RangeAllocation +func RestoreRange(dst ipallocator.Snapshottable, src *api.RangeAllocation) error { + _, network, err := net.ParseCIDR(src.Range) + if err != nil { + return err + } + return dst.Restore(network, src.Data) +} + +// SnapshotRange updates a RangeAllocation to match a snapshottable ipallocator +func SnapshotRange(dst *api.RangeAllocation, src ipallocator.Snapshottable) { + network, data := src.Snapshot() + dst.Range = network.String() + dst.Data = data +} diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 5e1fe10aac5..512978215e1 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -32,11 +32,11 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/golang/glog" ) // REST adapts a service registry into apiserver's RESTStorage model. @@ -44,49 +44,22 @@ type REST struct { registry Registry machines minion.Registry endpoints endpoint.Registry - portalMgr *ipAllocator + portals ipallocator.Interface clusterName string } // NewStorage returns a new REST. -func NewStorage(registry Registry, machines minion.Registry, endpoints endpoint.Registry, portalNet *net.IPNet, +func NewStorage(registry Registry, machines minion.Registry, endpoints endpoint.Registry, portals ipallocator.Interface, clusterName string) *REST { - // TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd) - ipa := newIPAllocator(portalNet) - if ipa == nil { - glog.Fatalf("Failed to create an IP allocator. Is subnet '%v' valid?", portalNet) - } - reloadIPsFromStorage(ipa, registry) - return &REST{ registry: registry, machines: machines, endpoints: endpoints, - portalMgr: ipa, + portals: portals, clusterName: clusterName, } } -// Helper: mark all previously allocated IPs in the allocator. -func reloadIPsFromStorage(ipa *ipAllocator, registry Registry) { - services, err := registry.ListServices(api.NewContext()) - if err != nil { - // This is really bad. - glog.Errorf("can't list services to init service REST: %v", err) - return - } - for i := range services.Items { - service := &services.Items[i] - if !api.IsServiceIPSet(service) { - continue - } - if err := ipa.Allocate(net.ParseIP(service.Spec.PortalIP)); err != nil { - // This is really bad. - glog.Errorf("service %q PortalIP %s could not be allocated: %v", service.Name, service.Spec.PortalIP, err) - } - } -} - func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) { service := obj.(*api.Service) @@ -98,22 +71,23 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err defer func() { if releaseServiceIP { if api.IsServiceIPSet(service) { - rs.portalMgr.Release(net.ParseIP(service.Spec.PortalIP)) + rs.portals.Release(net.ParseIP(service.Spec.PortalIP)) } } }() if api.IsServiceIPRequested(service) { // Allocate next available. - ip, err := rs.portalMgr.AllocateNext() + ip, err := rs.portals.AllocateNext() if err != nil { - return nil, err + el := fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("spec.portalIP", service.Spec.PortalIP, err.Error())} + return nil, errors.NewInvalid("Service", service.Name, el) } service.Spec.PortalIP = ip.String() releaseServiceIP = true } else if api.IsServiceIPSet(service) { // Try to respect the requested IP. - if err := rs.portalMgr.Allocate(net.ParseIP(service.Spec.PortalIP)); err != nil { + if err := rs.portals.Allocate(net.ParseIP(service.Spec.PortalIP)); err != nil { el := fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("spec.portalIP", service.Spec.PortalIP, err.Error())} return nil, errors.NewInvalid("Service", service.Name, el) } @@ -138,7 +112,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { return nil, err } if api.IsServiceIPSet(service) { - rs.portalMgr.Release(net.ParseIP(service.Spec.PortalIP)) + rs.portals.Release(net.ParseIP(service.Spec.PortalIP)) } return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 8478c180eb3..425533b61ab 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -30,6 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" ) func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registrytest.ServiceRegistry) { @@ -39,7 +40,8 @@ func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *registryte Endpoints: endpoints, } nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{}) - storage := NewStorage(registry, nodeRegistry, endpointRegistry, makeIPNet(t), "kubernetes") + r := ipallocator.NewCIDRRange(makeIPNet(t)) + storage := NewStorage(registry, nodeRegistry, endpointRegistry, r, "kubernetes") return storage, registry } @@ -63,7 +65,6 @@ func deepCloneService(svc *api.Service) *api.Service { func TestServiceRegistryCreate(t *testing.T) { storage, registry := NewTestREST(t, nil) - storage.portalMgr.randomAttempts = 0 svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, @@ -91,7 +92,7 @@ func TestServiceRegistryCreate(t *testing.T) { if created_service.CreationTimestamp.IsZero() { t.Errorf("Expected timestamp to be set, got: %v", created_service.CreationTimestamp) } - if created_service.Spec.PortalIP != "1.2.3.1" { + if !makeIPNet(t).Contains(net.ParseIP(created_service.Spec.PortalIP)) { t.Errorf("Unexpected PortalIP: %s", created_service.Spec.PortalIP) } srv, err := registry.GetService(ctx, svc.Name) @@ -487,7 +488,6 @@ func TestServiceRegistryList(t *testing.T) { func TestServiceRegistryIPAllocation(t *testing.T) { rest, _ := NewTestREST(t, nil) - rest.portalMgr.randomAttempts = 0 svc1 := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, @@ -506,7 +506,7 @@ func TestServiceRegistryIPAllocation(t *testing.T) { if created_service_1.Name != "foo" { t.Errorf("Expected foo, but got %v", created_service_1.Name) } - if created_service_1.Spec.PortalIP != "1.2.3.1" { + if !makeIPNet(t).Contains(net.ParseIP(created_service_1.Spec.PortalIP)) { t.Errorf("Unexpected PortalIP: %s", created_service_1.Spec.PortalIP) } @@ -526,7 +526,7 @@ func TestServiceRegistryIPAllocation(t *testing.T) { if created_service_2.Name != "bar" { t.Errorf("Expected bar, but got %v", created_service_2.Name) } - if created_service_2.Spec.PortalIP != "1.2.3.2" { // new IP + if !makeIPNet(t).Contains(net.ParseIP(created_service_2.Spec.PortalIP)) { t.Errorf("Unexpected PortalIP: %s", created_service_2.Spec.PortalIP) } @@ -543,7 +543,10 @@ func TestServiceRegistryIPAllocation(t *testing.T) { }, } ctx = api.NewDefaultContext() - created_svc3, _ := rest.Create(ctx, svc3) + created_svc3, err := rest.Create(ctx, svc3) + if err != nil { + t.Fatal(err) + } created_service_3 := created_svc3.(*api.Service) if created_service_3.Spec.PortalIP != "1.2.3.93" { // specific IP t.Errorf("Unexpected PortalIP: %s", created_service_3.Spec.PortalIP) @@ -552,7 +555,6 @@ func TestServiceRegistryIPAllocation(t *testing.T) { func TestServiceRegistryIPReallocation(t *testing.T) { rest, _ := NewTestREST(t, nil) - rest.portalMgr.randomAttempts = 0 svc1 := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo"}, @@ -571,7 +573,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) { if created_service_1.Name != "foo" { t.Errorf("Expected foo, but got %v", created_service_1.Name) } - if created_service_1.Spec.PortalIP != "1.2.3.1" { + if !makeIPNet(t).Contains(net.ParseIP(created_service_1.Spec.PortalIP)) { t.Errorf("Unexpected PortalIP: %s", created_service_1.Spec.PortalIP) } @@ -594,14 +596,13 @@ func TestServiceRegistryIPReallocation(t *testing.T) { if created_service_2.Name != "bar" { t.Errorf("Expected bar, but got %v", created_service_2.Name) } - if created_service_2.Spec.PortalIP != "1.2.3.1" { // same IP as before + if !makeIPNet(t).Contains(net.ParseIP(created_service_2.Spec.PortalIP)) { t.Errorf("Unexpected PortalIP: %s", created_service_2.Spec.PortalIP) } } func TestServiceRegistryIPUpdate(t *testing.T) { rest, _ := NewTestREST(t, nil) - rest.portalMgr.randomAttempts = 0 svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}, @@ -620,7 +621,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) { if created_service.Spec.Ports[0].Port != 6502 { t.Errorf("Expected port 6502, but got %v", created_service.Spec.Ports[0].Port) } - if created_service.Spec.PortalIP != "1.2.3.1" { + if !makeIPNet(t).Contains(net.ParseIP(created_service.Spec.PortalIP)) { t.Errorf("Unexpected PortalIP: %s", created_service.Spec.PortalIP) } @@ -645,7 +646,6 @@ func TestServiceRegistryIPUpdate(t *testing.T) { func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { rest, _ := NewTestREST(t, nil) - rest.portalMgr.randomAttempts = 0 svc := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}, @@ -665,7 +665,7 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { if created_service.Spec.Ports[0].Port != 6502 { t.Errorf("Expected port 6502, but got %v", created_service.Spec.Ports[0].Port) } - if created_service.Spec.PortalIP != "1.2.3.1" { + if !makeIPNet(t).Contains(net.ParseIP(created_service.Spec.PortalIP)) { t.Errorf("Unexpected PortalIP: %s", created_service.Spec.PortalIP) } @@ -677,63 +677,6 @@ func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { } } -func TestServiceRegistryIPReloadFromStorage(t *testing.T) { - registry := registrytest.NewServiceRegistry() - machines := []string{"foo", "bar", "baz"} - nodeRegistry := registrytest.NewMinionRegistry(machines, api.NodeResources{}) - endpoints := ®istrytest.EndpointRegistry{} - rest1 := NewStorage(registry, nodeRegistry, endpoints, makeIPNet(t), "kubernetes") - rest1.portalMgr.randomAttempts = 0 - - svc := &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - SessionAffinity: api.AffinityTypeNone, - Ports: []api.ServicePort{{ - Port: 6502, - Protocol: api.ProtocolTCP, - }}, - }, - } - ctx := api.NewDefaultContext() - rest1.Create(ctx, svc) - svc = &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - SessionAffinity: api.AffinityTypeNone, - Ports: []api.ServicePort{{ - Port: 6502, - Protocol: api.ProtocolTCP, - }}, - }, - } - rest1.Create(ctx, svc) - - // This will reload from storage, finding the previous 2 - nodeRegistry = registrytest.NewMinionRegistry(machines, api.NodeResources{}) - rest2 := NewStorage(registry, nodeRegistry, endpoints, makeIPNet(t), "kubernetes") - rest2.portalMgr.randomAttempts = 0 - - svc = &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - SessionAffinity: api.AffinityTypeNone, - Ports: []api.ServicePort{{ - Port: 6502, - Protocol: api.ProtocolTCP, - }}, - }, - } - created_svc, _ := rest2.Create(ctx, svc) - created_service := created_svc.(*api.Service) - if created_service.Spec.PortalIP != "1.2.3.3" { - t.Errorf("Unexpected PortalIP: %s", created_service.Spec.PortalIP) - } -} - // TODO: remove, covered by TestCreate func TestCreateServiceWithConflictingNamespace(t *testing.T) { storage := REST{} @@ -773,7 +716,6 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) { func TestCreate(t *testing.T) { rest, registry := NewTestREST(t, nil) - rest.portalMgr.randomAttempts = 0 test := resttest.New(t, rest, registry.SetError) test.TestCreate( diff --git a/test/integration/auth_test.go b/test/integration/auth_test.go index 75c5ffcccae..56304d90d6c 100644 --- a/test/integration/auth_test.go +++ b/test/integration/auth_test.go @@ -391,14 +391,15 @@ func TestAuthModeAlwaysAllow(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authorizer: apiserver.NewAlwaysAllowAuthorizer(), - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), }) transport := http.DefaultTransport @@ -530,14 +531,15 @@ func TestAuthModeAlwaysDeny(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authorizer: apiserver.NewAlwaysDenyAuthorizer(), - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysDenyAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), }) transport := http.DefaultTransport @@ -596,15 +598,16 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authenticator: getTestTokenAuth(), - Authorizer: allowAliceAuthorizer{}, - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authenticator: getTestTokenAuth(), + Authorizer: allowAliceAuthorizer{}, + AdmissionControl: admit.NewAlwaysAdmit(), }) previousResourceVersion := make(map[string]float64) @@ -684,15 +687,16 @@ func TestBobIsForbidden(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authenticator: getTestTokenAuth(), - Authorizer: allowAliceAuthorizer{}, - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authenticator: getTestTokenAuth(), + Authorizer: allowAliceAuthorizer{}, + AdmissionControl: admit.NewAlwaysAdmit(), }) transport := http.DefaultTransport @@ -744,15 +748,16 @@ func TestUnknownUserIsUnauthorized(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authenticator: getTestTokenAuth(), - Authorizer: allowAliceAuthorizer{}, - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authenticator: getTestTokenAuth(), + Authorizer: allowAliceAuthorizer{}, + AdmissionControl: admit.NewAlwaysAdmit(), }) transport := http.DefaultTransport @@ -823,15 +828,16 @@ func TestNamespaceAuthorization(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authenticator: getTestTokenAuth(), - Authorizer: a, - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authenticator: getTestTokenAuth(), + Authorizer: a, + AdmissionControl: admit.NewAlwaysAdmit(), }) previousResourceVersion := make(map[string]float64) @@ -937,15 +943,16 @@ func TestKindAuthorization(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authenticator: getTestTokenAuth(), - Authorizer: a, - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authenticator: getTestTokenAuth(), + Authorizer: a, + AdmissionControl: admit.NewAlwaysAdmit(), }) previousResourceVersion := make(map[string]float64) @@ -1038,15 +1045,16 @@ func TestReadOnlyAuthorization(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authenticator: getTestTokenAuth(), - Authorizer: a, - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authenticator: getTestTokenAuth(), + Authorizer: a, + AdmissionControl: admit.NewAlwaysAdmit(), }) transport := http.DefaultTransport diff --git a/test/integration/scheduler_test.go b/test/integration/scheduler_test.go index 4ce9efe4341..e5be559eeec 100644 --- a/test/integration/scheduler_test.go +++ b/test/integration/scheduler_test.go @@ -69,14 +69,15 @@ func TestUnschedulableNodes(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authorizer: apiserver.NewAlwaysAllowAuthorizer(), - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), }) restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Version()}) diff --git a/test/integration/secret_test.go b/test/integration/secret_test.go index ada8c2684c2..02983a3450a 100644 --- a/test/integration/secret_test.go +++ b/test/integration/secret_test.go @@ -63,14 +63,15 @@ func TestSecrets(t *testing.T) { defer s.Close() m = master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableUISupport: false, - EnableIndex: true, - APIPrefix: "/api", - Authorizer: apiserver.NewAlwaysAllowAuthorizer(), - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableUISupport: false, + EnableIndex: true, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), }) deleteAllEtcdKeys() diff --git a/test/integration/utils.go b/test/integration/utils.go index 62b715d11fc..d8e5187b485 100644 --- a/test/integration/utils.go +++ b/test/integration/utils.go @@ -73,14 +73,15 @@ func runAMaster(t *testing.T) (*master.Master, *httptest.Server) { } m := master.New(&master.Config{ - EtcdHelper: helper, - KubeletClient: client.FakeKubeletClient{}, - EnableLogsSupport: false, - EnableProfiling: true, - EnableUISupport: false, - APIPrefix: "/api", - Authorizer: apiserver.NewAlwaysAllowAuthorizer(), - AdmissionControl: admit.NewAlwaysAdmit(), + EtcdHelper: helper, + KubeletClient: client.FakeKubeletClient{}, + EnableCoreControllers: true, + EnableLogsSupport: false, + EnableProfiling: true, + EnableUISupport: false, + APIPrefix: "/api", + Authorizer: apiserver.NewAlwaysAllowAuthorizer(), + AdmissionControl: admit.NewAlwaysAdmit(), }) s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {