From 5bcb96dae126f8efe3a1cefbd03f095bee74d245 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 4 May 2015 15:11:19 -0400 Subject: [PATCH] Separate out the master's control loops These are "Bootstrap Controllers" as distinct from the controllers in the controller-manager binary - they are necessary for the cluster to start running. --- pkg/master/controller.go | 277 ++++++++++++++++++ .../{publish_test.go => controller_test.go} | 6 +- pkg/master/master.go | 31 +- pkg/master/publish.go | 236 --------------- test/integration/openshift_test.go | 35 +++ 5 files changed, 343 insertions(+), 242 deletions(-) create mode 100644 pkg/master/controller.go rename pkg/master/{publish_test.go => controller_test.go} (97%) delete mode 100644 pkg/master/publish.go create mode 100644 test/integration/openshift_test.go diff --git a/pkg/master/controller.go b/pkg/master/controller.go new file mode 100644 index 00000000000..462d6b0f99a --- /dev/null +++ b/pkg/master/controller.go @@ -0,0 +1,277 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package master + +import ( + "fmt" + "net" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + servicecontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/golang/glog" +) + +// Controller is the controller manager for the core bootstrap Kubernetes controller +// loops, which manage creating the "kubernetes" and "kubernetes-ro" services, the "default" +// namespace, and provide the IP repair check on service PortalIPs +type Controller struct { + NamespaceRegistry namespace.Registry + ServiceRegistry service.Registry + ServiceIPRegistry service.IPRegistry + EndpointRegistry endpoint.Registry + PortalNet *net.IPNet + // TODO: MasterCount is yucky + MasterCount int + + PortalIPInterval time.Duration + EndpointInterval time.Duration + + PublicIP net.IP + + ServiceIP net.IP + ServicePort int + PublicServicePort int + + ReadOnlyServiceIP net.IP + ReadOnlyServicePort int + PublicReadOnlyServicePort int + + runner *util.Runner +} + +// Start begins the core controller loops that must exist for bootstrapping +// a cluster. +func (c *Controller) Start() { + if c.runner != nil { + return + } + + repair := servicecontroller.NewRepair(c.PortalIPInterval, c.ServiceRegistry, c.PortalNet, c.ServiceIPRegistry) + + // run all of the controllers once prior to returning from Start. + if err := repair.RunOnce(); err != nil { + glog.Errorf("Unable to perform initial IP allocation check: %v", err) + } + if err := c.UpdateKubernetesService(); err != nil { + glog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err) + } + if err := c.UpdateKubernetesROService(); err != nil { + glog.Errorf("Unable to perform initial Kubernetes RO service initialization: %v", err) + } + + c.runner = util.NewRunner(c.RunKubernetesService, c.RunKubernetesROService, repair.RunUntil) + c.runner.Start() +} + +// RunKubernetesService periodically updates the kubernetes service +func (c *Controller) RunKubernetesService(ch chan struct{}) { + util.Until(func() { + if err := c.UpdateKubernetesService(); err != nil { + util.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err)) + } + }, c.EndpointInterval, ch) +} + +// UpdateKubernetesService attempts to update the default Kube service. +func (c *Controller) UpdateKubernetesService() error { + // Update service & endpoint records. + // TODO: when it becomes possible to change this stuff, + // stop polling and start watching. + // TODO: add endpoints of all replicas, not just the elected master. + if err := c.CreateNamespaceIfNeeded(api.NamespaceDefault); err != nil { + return err + } + if c.ServiceIP != nil { + if err := c.CreateMasterServiceIfNeeded("kubernetes", c.ServiceIP, c.ServicePort); err != nil { + return err + } + if err := c.SetEndpoints("kubernetes", c.PublicIP, c.PublicServicePort); err != nil { + return err + } + } + return nil +} + +// RunKubernetesROService periodically updates the kubernetes RO service +func (c *Controller) RunKubernetesROService(ch chan struct{}) { + util.Until(func() { + if err := c.UpdateKubernetesROService(); err != nil { + util.HandleError(fmt.Errorf("unable to sync kubernetes RO service: %v", err)) + } + }, c.EndpointInterval, ch) +} + +// UpdateKubernetesROService attempts to update the default Kube read-only service. +func (c *Controller) UpdateKubernetesROService() error { + // Update service & endpoint records. + // TODO: when it becomes possible to change this stuff, + // stop polling and start watching. + if err := c.CreateNamespaceIfNeeded(api.NamespaceDefault); err != nil { + return err + } + if c.ReadOnlyServiceIP != nil { + if err := c.CreateMasterServiceIfNeeded("kubernetes-ro", c.ReadOnlyServiceIP, c.ReadOnlyServicePort); err != nil { + return err + } + if err := c.SetEndpoints("kubernetes-ro", c.PublicIP, c.PublicReadOnlyServicePort); err != nil { + return err + } + } + return nil +} + +// CreateNamespaceIfNeeded will create the namespace that contains the master services if it doesn't already exist +func (c *Controller) CreateNamespaceIfNeeded(ns string) error { + ctx := api.NewContext() + if _, err := c.NamespaceRegistry.GetNamespace(ctx, api.NamespaceDefault); err == nil { + // the namespace already exists + return nil + } + newNs := &api.Namespace{ + ObjectMeta: api.ObjectMeta{ + Name: ns, + Namespace: "", + }, + } + err := c.NamespaceRegistry.CreateNamespace(ctx, newNs) + if err != nil && errors.IsAlreadyExists(err) { + err = nil + } + return err +} + +// CreateMasterServiceIfNeeded will create the specified service if it +// doesn't already exist. +func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePort int) error { + ctx := api.NewDefaultContext() + if _, err := c.ServiceRegistry.GetService(ctx, serviceName); err == nil { + // The service already exists. + return nil + } + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: serviceName, + Namespace: api.NamespaceDefault, + Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"}, + }, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{Port: servicePort, Protocol: api.ProtocolTCP}}, + // maintained by this code, not by the pod selector + Selector: nil, + PortalIP: serviceIP.String(), + SessionAffinity: api.AffinityTypeNone, + }, + } + _, err := c.ServiceRegistry.CreateService(ctx, svc) + if err != nil && errors.IsAlreadyExists(err) { + err = nil + } + return err +} + +// SetEndpoints sets the endpoints for the given apiserver service (ro or rw). +// SetEndpoints expects that the endpoints objects it manages will all be +// managed only by SetEndpoints; therefore, to understand this, you need only +// understand the requirements and the body of this function. +// +// Requirements: +// * All apiservers MUST use the same ports for their {rw, ro} services. +// * All apiservers MUST use SetEndpoints and only SetEndpoints to manage the +// endpoints for their {rw, ro} services. +// * All apiservers MUST know and agree on the number of apiservers expected +// to be running (c.masterCount). +// * SetEndpoints is called periodically from all apiservers. +// +func (c *Controller) SetEndpoints(serviceName string, ip net.IP, port int) error { + ctx := api.NewDefaultContext() + e, err := c.EndpointRegistry.GetEndpoints(ctx, serviceName) + if err != nil { + e = &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: serviceName, + Namespace: api.NamespaceDefault, + }, + } + } + + // First, determine if the endpoint is in the format we expect (one + // subset, one port, N IP addresses). + formatCorrect, ipCorrect := checkEndpointSubsetFormat(e, ip.String(), port, c.MasterCount) + if !formatCorrect { + // Something is egregiously wrong, just re-make the endpoints record. + e.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: ip.String()}}, + Ports: []api.EndpointPort{{Port: port, Protocol: api.ProtocolTCP}}, + }} + glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) + return c.EndpointRegistry.UpdateEndpoints(ctx, e) + } else if !ipCorrect { + // We *always* add our own IP address; if there are too many IP + // addresses, we remove the ones lexicographically after our + // own IP address. Given the requirements stated at the top of + // this function, this should cause the list of IP addresses to + // become eventually correct. + e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()}) + e.Subsets = endpoints.RepackSubsets(e.Subsets) + if addrs := &e.Subsets[0].Addresses; len(*addrs) > c.MasterCount { + // addrs is a pointer because we're going to mutate it. + for i, addr := range *addrs { + if addr.IP == ip.String() { + for len(*addrs) > c.MasterCount { + remove := (i + 1) % len(*addrs) + *addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...) + } + break + } + } + } + return c.EndpointRegistry.UpdateEndpoints(ctx, e) + } + // We didn't make any changes, no need to actually call update. + return nil +} + +// Determine if the endpoint is in the format SetEndpoints expect (one subset, +// one port, N IP addresses); and if the specified IP address is present and +// the correct number of ip addresses are found. +func checkEndpointSubsetFormat(e *api.Endpoints, ip string, port int, count int) (formatCorrect, ipCorrect bool) { + if len(e.Subsets) != 1 { + return false, false + } + sub := &e.Subsets[0] + if len(sub.Ports) != 1 { + return false, false + } + p := &sub.Ports[0] + if p.Port != port || p.Protocol != api.ProtocolTCP { + return false, false + } + for _, addr := range sub.Addresses { + if addr.IP == ip { + return true, len(sub.Addresses) == count + } + } + return true, false +} diff --git a/pkg/master/publish_test.go b/pkg/master/controller_test.go similarity index 97% rename from pkg/master/publish_test.go rename to pkg/master/controller_test.go index bdda8642833..47fa082416b 100644 --- a/pkg/master/publish_test.go +++ b/pkg/master/controller_test.go @@ -248,12 +248,12 @@ func TestSetEndpoints(t *testing.T) { }, } for _, test := range tests { - master := Master{masterCount: test.additionalMasters + 1} + master := Controller{MasterCount: test.additionalMasters + 1} registry := ®istrytest.EndpointRegistry{ Endpoints: test.endpoints, } - master.endpointRegistry = registry - err := master.setEndpoints(test.serviceName, net.ParseIP(test.ip), test.port) + master.EndpointRegistry = registry + err := master.SetEndpoints(test.serviceName, net.ParseIP(test.ip), test.port) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } diff --git a/pkg/master/master.go b/pkg/master/master.go index d72de2b791d..5fa1734ce7d 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -63,8 +63,8 @@ import ( resourcequotaetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequota/etcd" secretetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/secret/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" - ipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator" - etcdipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/etcd" + ipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" + etcdipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/ui" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -558,7 +558,32 @@ func (m *Master) init(c *Config) { // TODO: Attempt clean shutdown? if m.enableCoreControllers { - m.StartCoreControllers() + m.NewBootstrapController().Start() + } +} + +// NewBootstrapController returns a controller for watching the core capabilities of the master. +func (m *Master) NewBootstrapController() *Controller { + return &Controller{ + NamespaceRegistry: m.namespaceRegistry, + ServiceRegistry: m.serviceRegistry, + ServiceIPRegistry: m.portalAllocator, + EndpointRegistry: m.endpointRegistry, + PortalNet: m.portalNet, + MasterCount: m.masterCount, + + PortalIPInterval: 3 * time.Minute, + EndpointInterval: 10 * time.Second, + + PublicIP: m.clusterIP, + + ServiceIP: m.serviceReadWriteIP, + ServicePort: m.serviceReadWritePort, + PublicServicePort: m.publicReadWritePort, + + ReadOnlyServiceIP: m.serviceReadOnlyIP, + ReadOnlyServicePort: m.serviceReadOnlyPort, + PublicReadOnlyServicePort: m.serviceReadOnlyPort, } } diff --git a/pkg/master/publish.go b/pkg/master/publish.go deleted file mode 100644 index dadf4cb6173..00000000000 --- a/pkg/master/publish.go +++ /dev/null @@ -1,236 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package master - -import ( - "net" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" - servicecontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/controller" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - - "github.com/golang/glog" -) - -// StartCoreControllers begins the core controller loops that must exist for bootstrapping -// a cluster. -func (m *Master) StartCoreControllers() { - if m.masterServices != nil { - return - } - repair := servicecontroller.NewRepair(3*time.Minute, m.serviceRegistry, m.portalNet, m.portalAllocator) - if err := repair.RunOnce(); err != nil { - glog.Errorf("Unable to perform initial IP allocation check: %v", err) - } - m.masterServices = util.NewRunner(m.ServiceWriterLoop, m.ROServiceWriterLoop, repair.RunUntil) - m.masterServices.Start() -} - -// ServiceWriterLoop is exposed for downstream consumers of master -func (m *Master) ServiceWriterLoop(stop chan struct{}) { - t := time.NewTicker(10 * time.Second) - defer t.Stop() - for { - // Update service & endpoint records. - // TODO: when it becomes possible to change this stuff, - // stop polling and start watching. - // TODO: add endpoints of all replicas, not just the elected master. - if err := m.CreateMasterNamespaceIfNeeded(api.NamespaceDefault); err != nil { - glog.Errorf("Can't create master namespace: %v", err) - } - if m.serviceReadWriteIP != nil { - if err := m.CreateMasterServiceIfNeeded("kubernetes", m.serviceReadWriteIP, m.serviceReadWritePort); err != nil && !errors.IsAlreadyExists(err) { - glog.Errorf("Can't create rw service: %v", err) - } - if err := m.SetEndpoints("kubernetes", m.clusterIP, m.publicReadWritePort); err != nil { - glog.Errorf("Can't create rw endpoints: %v", err) - } - } - - select { - case <-stop: - return - case <-t.C: - } - } -} - -// ROServiceWriterLoop is exposed for downstream consumers of master -func (m *Master) ROServiceWriterLoop(stop chan struct{}) { - t := time.NewTicker(10 * time.Second) - defer t.Stop() - for { - // Update service & endpoint records. - // TODO: when it becomes possible to change this stuff, - // stop polling and start watching. - if err := m.CreateMasterNamespaceIfNeeded(api.NamespaceDefault); err != nil { - glog.Errorf("Can't create master namespace: %v", err) - } - if m.serviceReadOnlyIP != nil { - if err := m.CreateMasterServiceIfNeeded("kubernetes-ro", m.serviceReadOnlyIP, m.serviceReadOnlyPort); err != nil && !errors.IsAlreadyExists(err) { - glog.Errorf("Can't create ro service: %v", err) - } - if err := m.SetEndpoints("kubernetes-ro", m.clusterIP, m.publicReadOnlyPort); err != nil { - glog.Errorf("Can't create ro endpoints: %v", err) - } - } - - select { - case <-stop: - return - case <-t.C: - } - } -} - -// CreateMasterNamespaceIfNeeded will create the namespace that contains the master services if it doesn't already exist -func (m *Master) CreateMasterNamespaceIfNeeded(ns string) error { - ctx := api.NewContext() - if _, err := m.namespaceRegistry.GetNamespace(ctx, api.NamespaceDefault); err == nil { - // the namespace already exists - return nil - } - namespace := &api.Namespace{ - ObjectMeta: api.ObjectMeta{ - Name: ns, - Namespace: "", - }, - } - _, err := m.storage["namespaces"].(rest.Creater).Create(ctx, namespace) - if err != nil && errors.IsAlreadyExists(err) { - err = nil - } - return err -} - -// CreateMasterServiceIfNeeded will create the specified service if it -// doesn't already exist. -func (m *Master) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePort int) error { - ctx := api.NewDefaultContext() - if _, err := m.serviceRegistry.GetService(ctx, serviceName); err == nil { - // The service already exists. - return nil - } - svc := &api.Service{ - ObjectMeta: api.ObjectMeta{ - Name: serviceName, - Namespace: api.NamespaceDefault, - Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"}, - }, - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{{Port: servicePort, Protocol: api.ProtocolTCP}}, - // maintained by this code, not by the pod selector - Selector: nil, - PortalIP: serviceIP.String(), - SessionAffinity: api.AffinityTypeNone, - }, - } - _, err := m.storage["services"].(rest.Creater).Create(ctx, svc) - if err != nil && errors.IsAlreadyExists(err) { - err = nil - } - return err -} - -// SetEndpoints sets the endpoints for the given apiserver service (ro or rw). -// SetEndpoints expects that the endpoints objects it manages will all be -// managed only by SetEndpoints; therefore, to understand this, you need only -// understand the requirements and the body of this function. -// -// Requirements: -// * All apiservers MUST use the same ports for their {rw, ro} services. -// * All apiservers MUST use SetEndpoints and only SetEndpoints to manage the -// endpoints for their {rw, ro} services. -// * All apiservers MUST know and agree on the number of apiservers expected -// to be running (m.masterCount). -// * SetEndpoints is called periodically from all apiservers. -// -func (m *Master) SetEndpoints(serviceName string, ip net.IP, port int) error { - ctx := api.NewDefaultContext() - e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) - if err != nil { - e = &api.Endpoints{ - ObjectMeta: api.ObjectMeta{ - Name: serviceName, - Namespace: api.NamespaceDefault, - }, - } - } - - // First, determine if the endpoint is in the format we expect (one - // subset, one port, N IP addresses). - formatCorrect, ipCorrect := m.checkEndpointSubsetFormat(e, ip.String(), port) - if !formatCorrect { - // Something is egregiously wrong, just re-make the endpoints record. - e.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: ip.String()}}, - Ports: []api.EndpointPort{{Port: port, Protocol: api.ProtocolTCP}}, - }} - glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) - return m.endpointRegistry.UpdateEndpoints(ctx, e) - } else if !ipCorrect { - // We *always* add our own IP address; if there are too many IP - // addresses, we remove the ones lexicographically after our - // own IP address. Given the requirements stated at the top of - // this function, this should cause the list of IP addresses to - // become eventually correct. - e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()}) - e.Subsets = endpoints.RepackSubsets(e.Subsets) - if addrs := &e.Subsets[0].Addresses; len(*addrs) > m.masterCount { - // addrs is a pointer because we're going to mutate it. - for i, addr := range *addrs { - if addr.IP == ip.String() { - for len(*addrs) > m.masterCount { - remove := (i + 1) % len(*addrs) - *addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...) - } - break - } - } - } - return m.endpointRegistry.UpdateEndpoints(ctx, e) - } - // We didn't make any changes, no need to actually call update. - return nil -} - -// Determine if the endpoint is in the format setEndpoints expect (one subset, -// one port, N IP addresses); and if the specified IP address is present and -// the correct number of ip addresses are found. -func (m *Master) checkEndpointSubsetFormat(e *api.Endpoints, ip string, port int) (formatCorrect, ipCorrect bool) { - if len(e.Subsets) != 1 { - return false, false - } - sub := &e.Subsets[0] - if len(sub.Ports) != 1 { - return false, false - } - p := &sub.Ports[0] - if p.Port != port || p.Protocol != api.ProtocolTCP { - return false, false - } - for _, addr := range sub.Addresses { - if addr.IP == ip { - return true, len(sub.Addresses) == m.masterCount - } - } - return true, false -} diff --git a/test/integration/openshift_test.go b/test/integration/openshift_test.go new file mode 100644 index 00000000000..42d761857bf --- /dev/null +++ b/test/integration/openshift_test.go @@ -0,0 +1,35 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/master" +) + +// This test references methods that OpenShift uses to customize the master on startup, that +// are not referenced directly by a master. +func TestMasterExportsSymbols(t *testing.T) { + _ = &master.Config{ + EnableCoreControllers: false, + EnableUISupport: false, + EnableSwaggerSupport: false, + RestfulContainer: nil, + } + _ = (&master.Master{}).NewBootstrapController() +}